-
Notifications
You must be signed in to change notification settings - Fork 2
Add **Shared ScheduledExecutorService** for timeouts
#38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
3c405b7
On branch edburns/dd-2758695-virtual-threads Add **Shared `ScheduledE…
edburns a36d145
On branch edburns/dd-2758695-virtual-threads Add **Shared `ScheduledE…
edburns 9e102bc
Fix memory leak from cancelled timeout tasks in CopilotSession
edburns a1668c7
Fix scheduler memory leak and close() race condition in CopilotSession
edburns a01f0b5
Honor documented contract: timeoutMs <= 0 means no timeout in sendAnd…
edburns 9dc5933
Prevent CopilotSession leak on assertion failure in TimeoutEdgeCaseTest
edburns 075df31
spotless
edburns 446fe74
Update src/test/java/com/github/copilot/sdk/ZeroTimeoutContractTest.java
edburns 4c6c2ec
Update Javadoc in TimeoutEdgeCaseTest and SchedulerShutdownRaceTest t…
Copilot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
62 changes: 62 additions & 0 deletions
62
src/test/java/com/github/copilot/sdk/SchedulerShutdownRaceTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| /*--------------------------------------------------------------------------------------------- | ||
| * Copyright (c) Microsoft Corporation. All rights reserved. | ||
| *--------------------------------------------------------------------------------------------*/ | ||
|
|
||
| package com.github.copilot.sdk; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.*; | ||
| import static org.mockito.ArgumentMatchers.*; | ||
| import static org.mockito.Mockito.*; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import com.github.copilot.sdk.json.MessageOptions; | ||
|
|
||
| /** | ||
| * Regression coverage for the race between {@code sendAndWait()} and | ||
| * {@code close()}. | ||
| * <p> | ||
| * If {@code close()} shuts down the timeout scheduler after | ||
| * {@code ensureNotTerminated()} passes but before | ||
| * {@code timeoutScheduler.schedule()} executes, the schedule call throws | ||
| * {@link RejectedExecutionException}. This test asserts that | ||
| * {@code sendAndWait()} handles this race by returning a future that completes | ||
| * exceptionally (rather than propagating the exception to the caller or leaving | ||
| * the returned future incomplete). | ||
| */ | ||
| public class SchedulerShutdownRaceTest { | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| @Test | ||
| void sendAndWaitShouldReturnFailedFutureWhenSchedulerIsShutDown() throws Exception { | ||
| // Build a session via reflection (package-private constructor) | ||
| var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class); | ||
| ctor.setAccessible(true); | ||
|
|
||
| // Mock JsonRpcClient so send() returns a pending future instead of NPE | ||
| var mockRpc = mock(JsonRpcClient.class); | ||
| when(mockRpc.invoke(any(), any(), any())).thenReturn(new CompletableFuture<>()); | ||
|
|
||
| var session = ctor.newInstance("race-test", mockRpc, null); | ||
|
|
||
| // Shut down the scheduler without setting isTerminated, | ||
| // simulating the race window between ensureNotTerminated() and schedule() | ||
| var schedulerField = CopilotSession.class.getDeclaredField("timeoutScheduler"); | ||
| schedulerField.setAccessible(true); | ||
| var scheduler = (ScheduledExecutorService) schedulerField.get(session); | ||
| scheduler.shutdownNow(); | ||
|
|
||
| // sendAndWait must return a failed future rather than throwing directly. | ||
| CompletableFuture<?> result = session.sendAndWait(new MessageOptions().setPrompt("test"), 5000); | ||
|
|
||
| assertNotNull(result, "sendAndWait should return a future, not throw"); | ||
| var ex = assertThrows(ExecutionException.class, () -> result.get(1, TimeUnit.SECONDS)); | ||
| assertInstanceOf(RejectedExecutionException.class, ex.getCause()); | ||
| } | ||
| } | ||
142 changes: 142 additions & 0 deletions
142
src/test/java/com/github/copilot/sdk/TimeoutEdgeCaseTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| /*--------------------------------------------------------------------------------------------- | ||
| * Copyright (c) Microsoft Corporation. All rights reserved. | ||
| *--------------------------------------------------------------------------------------------*/ | ||
|
|
||
| package com.github.copilot.sdk; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertFalse; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
|
||
| import java.io.ByteArrayOutputStream; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.net.Socket; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import com.github.copilot.sdk.events.AssistantMessageEvent; | ||
| import com.github.copilot.sdk.json.MessageOptions; | ||
|
|
||
| /** | ||
| * Regression tests for timeout edge cases in | ||
| * {@link CopilotSession#sendAndWait}. | ||
| * <p> | ||
| * These tests assert two behavioral contracts of the shared | ||
| * {@code ScheduledExecutorService} approach: | ||
| * <ol> | ||
| * <li>A pending timeout must NOT fire after {@code close()} and must NOT | ||
| * complete the returned future with a {@code TimeoutException}.</li> | ||
| * <li>Multiple {@code sendAndWait} calls must reuse a single shared scheduler | ||
| * thread rather than spawning a new OS thread per call.</li> | ||
| * </ol> | ||
edburns marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| */ | ||
| public class TimeoutEdgeCaseTest { | ||
|
|
||
| /** | ||
| * Creates a {@link JsonRpcClient} whose {@code invoke()} returns futures that | ||
| * never complete. The reader thread blocks forever on the input stream, and | ||
| * writes go to a no-op output stream. | ||
| */ | ||
| private JsonRpcClient createHangingRpcClient() throws Exception { | ||
| InputStream blockingInput = new InputStream() { | ||
| @Override | ||
| public int read() throws IOException { | ||
| try { | ||
| Thread.sleep(Long.MAX_VALUE); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| return -1; | ||
| } | ||
| return -1; | ||
| } | ||
| }; | ||
| ByteArrayOutputStream sinkOutput = new ByteArrayOutputStream(); | ||
|
|
||
| var ctor = JsonRpcClient.class.getDeclaredConstructor(InputStream.class, java.io.OutputStream.class, | ||
| Socket.class, Process.class); | ||
| ctor.setAccessible(true); | ||
| return (JsonRpcClient) ctor.newInstance(blockingInput, sinkOutput, null, null); | ||
| } | ||
|
|
||
| /** | ||
| * After {@code close()}, the future returned by {@code sendAndWait} must NOT be | ||
| * completed by a stale timeout. | ||
| * <p> | ||
| * Contract: {@code close()} shuts down the timeout scheduler before the | ||
| * blocking {@code session.destroy} RPC call, so any pending timeout task is | ||
| * cancelled and the future remains incomplete (not exceptionally completed with | ||
| * {@code TimeoutException}). | ||
| */ | ||
| @Test | ||
| void testTimeoutDoesNotFireAfterSessionClose() throws Exception { | ||
| JsonRpcClient rpc = createHangingRpcClient(); | ||
| try { | ||
| try (CopilotSession session = new CopilotSession("test-timeout-id", rpc)) { | ||
|
|
||
| CompletableFuture<AssistantMessageEvent> result = session | ||
| .sendAndWait(new MessageOptions().setPrompt("hello"), 2000); | ||
|
|
||
| assertFalse(result.isDone(), "Future should be pending before timeout fires"); | ||
|
|
||
| // close() blocks up to 5s on session.destroy RPC. The 2s timeout | ||
| // fires during that window with the current per-call scheduler. | ||
| session.close(); | ||
|
|
||
| assertFalse(result.isDone(), "Future should not be completed by a timeout after session is closed. " | ||
| + "The per-call ScheduledExecutorService leaked a TimeoutException."); | ||
| } | ||
| } finally { | ||
| rpc.close(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A shared scheduler must reuse a single thread across multiple | ||
| * {@code sendAndWait} calls, rather than spawning a new OS thread per call. | ||
| * <p> | ||
| * Contract: after two consecutive {@code sendAndWait} calls the number of live | ||
| * {@code sendAndWait-timeout} threads must not increase after the second call. | ||
| */ | ||
| @Test | ||
| void testSendAndWaitReusesTimeoutThread() throws Exception { | ||
| JsonRpcClient rpc = createHangingRpcClient(); | ||
| try { | ||
| try (CopilotSession session = new CopilotSession("test-thread-count-id", rpc)) { | ||
|
|
||
| long baselineCount = countTimeoutThreads(); | ||
|
|
||
| CompletableFuture<AssistantMessageEvent> result1 = session | ||
| .sendAndWait(new MessageOptions().setPrompt("hello1"), 30000); | ||
|
|
||
| Thread.sleep(100); | ||
| long afterFirst = countTimeoutThreads(); | ||
| assertTrue(afterFirst >= baselineCount + 1, | ||
| "Expected at least one new sendAndWait-timeout thread after first call. " + "Baseline: " | ||
| + baselineCount + ", after: " + afterFirst); | ||
|
|
||
| CompletableFuture<AssistantMessageEvent> result2 = session | ||
| .sendAndWait(new MessageOptions().setPrompt("hello2"), 30000); | ||
|
|
||
| Thread.sleep(100); | ||
| long afterSecond = countTimeoutThreads(); | ||
| assertTrue(afterSecond == afterFirst, | ||
| "Shared scheduler should reuse the same thread — no new threads after second call. " | ||
| + "After first: " + afterFirst + ", after second: " + afterSecond); | ||
|
|
||
| result1.cancel(true); | ||
| result2.cancel(true); | ||
| } | ||
| } finally { | ||
| rpc.close(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Counts the number of live threads whose name contains "sendAndWait-timeout". | ||
| */ | ||
| private long countTimeoutThreads() { | ||
| return Thread.getAllStackTraces().keySet().stream().filter(t -> t.getName().contains("sendAndWait-timeout")) | ||
| .filter(Thread::isAlive).count(); | ||
| } | ||
| } | ||
57 changes: 57 additions & 0 deletions
57
src/test/java/com/github/copilot/sdk/ZeroTimeoutContractTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| /*--------------------------------------------------------------------------------------------- | ||
| * Copyright (c) Microsoft Corporation. All rights reserved. | ||
| *--------------------------------------------------------------------------------------------*/ | ||
|
|
||
| package com.github.copilot.sdk; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.*; | ||
| import static org.mockito.ArgumentMatchers.*; | ||
| import static org.mockito.Mockito.*; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import com.github.copilot.sdk.events.AssistantMessageEvent; | ||
| import com.github.copilot.sdk.json.MessageOptions; | ||
|
|
||
| /** | ||
| * Verifies the documented contract that {@code timeoutMs <= 0} means "no | ||
| * timeout" in {@link CopilotSession#sendAndWait(MessageOptions, long)}. | ||
| */ | ||
| public class ZeroTimeoutContractTest { | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| @Test | ||
| void sendAndWaitWithZeroTimeoutShouldNotTimeOut() throws Exception { | ||
| // Build a session via reflection (package-private constructor) | ||
| var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class); | ||
| ctor.setAccessible(true); | ||
|
|
||
| var mockRpc = mock(JsonRpcClient.class); | ||
| when(mockRpc.invoke(any(), any(), any())).thenAnswer(invocation -> { | ||
| Object method = invocation.getArgument(0); | ||
| if ("session.destroy".equals(method)) { | ||
| // Make session.close() non-blocking by completing destroy immediately | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
| // For other calls (e.g., message send), return an incomplete future so the | ||
| // sendAndWait result does not complete due to a mock response. | ||
| return new CompletableFuture<>(); | ||
| }); | ||
|
|
||
| try (var session = ctor.newInstance("zero-timeout-test", mockRpc, null)) { | ||
|
|
||
| // Per the Javadoc: timeoutMs of 0 means "no timeout". | ||
| // The future should NOT complete with TimeoutException. | ||
| CompletableFuture<AssistantMessageEvent> result = session | ||
| .sendAndWait(new MessageOptions().setPrompt("test"), 0); | ||
|
|
||
| // Give the scheduler a chance to fire if it was (incorrectly) scheduled | ||
| Thread.sleep(200); | ||
|
|
||
| // The future should still be pending — not timed out | ||
| assertFalse(result.isDone(), "Future should not be done; timeoutMs=0 means no timeout per Javadoc"); | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.