Skip to content
Merged
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,23 @@ implementation 'com.github:copilot-sdk-java:0.2.1-java.0'
import com.github.copilot.sdk.CopilotClient;
import com.github.copilot.sdk.events.AssistantMessageEvent;
import com.github.copilot.sdk.events.SessionUsageInfoEvent;
import com.github.copilot.sdk.json.CopilotClientOptions;
import com.github.copilot.sdk.json.MessageOptions;
import com.github.copilot.sdk.json.PermissionHandler;
import com.github.copilot.sdk.json.SessionConfig;

import java.util.concurrent.Executors;

public class CopilotSDK {
public static void main(String[] args) throws Exception {
var lastMessage = new String[]{null};

// Create and start client
try (var client = new CopilotClient()) {
try (var client = new CopilotClient()) { // JDK 25+: comment out this line
// JDK 25+: uncomment the following 3 lines for virtual thread support
// var options = new CopilotClientOptions()
// .setExecutor(Executors.newVirtualThreadPerTaskExecutor());
// try (var client = new CopilotClient(options)) {
client.start().get();

// Create a session
Expand Down
95 changes: 62 additions & 33 deletions src/main/java/com/github/copilot/sdk/CopilotClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -150,42 +152,51 @@ public CompletableFuture<Void> start() {
private CompletableFuture<Connection> startCore() {
LOG.fine("Starting Copilot client");

return CompletableFuture.supplyAsync(() -> {
try {
JsonRpcClient rpc;
Process process = null;

if (optionsHost != null && optionsPort != null) {
// External server (TCP)
rpc = serverManager.connectToServer(null, optionsHost, optionsPort);
} else {
// Child process (stdio or TCP)
CliServerManager.ProcessInfo processInfo = serverManager.startCliServer();
process = processInfo.process();
rpc = serverManager.connectToServer(process, processInfo.port() != null ? "localhost" : null,
processInfo.port());
}
Executor exec = options.getExecutor();
try {
return exec != null
? CompletableFuture.supplyAsync(this::startCoreBody, exec)
: CompletableFuture.supplyAsync(this::startCoreBody);
} catch (RejectedExecutionException e) {
return CompletableFuture.failedFuture(e);
}
}

Connection connection = new Connection(rpc, process);
private Connection startCoreBody() {
try {
JsonRpcClient rpc;
Process process = null;

if (optionsHost != null && optionsPort != null) {
// External server (TCP)
rpc = serverManager.connectToServer(null, optionsHost, optionsPort);
} else {
// Child process (stdio or TCP)
CliServerManager.ProcessInfo processInfo = serverManager.startCliServer();
process = processInfo.process();
rpc = serverManager.connectToServer(process, processInfo.port() != null ? "localhost" : null,
processInfo.port());
}

// Register handlers for server-to-client calls
RpcHandlerDispatcher dispatcher = new RpcHandlerDispatcher(sessions, lifecycleManager::dispatch);
dispatcher.registerHandlers(rpc);
Connection connection = new Connection(rpc, process);

// Verify protocol version
verifyProtocolVersion(connection);
// Register handlers for server-to-client calls
RpcHandlerDispatcher dispatcher = new RpcHandlerDispatcher(sessions, lifecycleManager::dispatch,
options.getExecutor());
dispatcher.registerHandlers(rpc);

LOG.info("Copilot client connected");
return connection;
} catch (Exception e) {
String stderr = serverManager.getStderrOutput();
if (!stderr.isEmpty()) {
throw new CompletionException(
new IOException("CLI process exited unexpectedly. stderr: " + stderr, e));
}
throw new CompletionException(e);
// Verify protocol version
verifyProtocolVersion(connection);

LOG.info("Copilot client connected");
return connection;
} catch (Exception e) {
String stderr = serverManager.getStderrOutput();
if (!stderr.isEmpty()) {
throw new CompletionException(new IOException("CLI process exited unexpectedly. stderr: " + stderr, e));
}
});
throw new CompletionException(e);
}
}

private static final int MIN_PROTOCOL_VERSION = 2;
Expand Down Expand Up @@ -228,15 +239,27 @@ private void verifyProtocolVersion(Connection connection) throws Exception {
*/
public CompletableFuture<Void> stop() {
var closeFutures = new ArrayList<CompletableFuture<Void>>();
Executor exec = options.getExecutor();

for (CopilotSession session : new ArrayList<>(sessions.values())) {
closeFutures.add(CompletableFuture.runAsync(() -> {
Runnable closeTask = () -> {
try {
session.close();
} catch (Exception e) {
LOG.log(Level.WARNING, "Error closing session " + session.getSessionId(), e);
}
}));
};
CompletableFuture<Void> future;
try {
future = exec != null
? CompletableFuture.runAsync(closeTask, exec)
: CompletableFuture.runAsync(closeTask);
} catch (RejectedExecutionException e) {
LOG.log(Level.WARNING, "Executor rejected session close task; closing inline", e);
closeTask.run();
future = CompletableFuture.completedFuture(null);
}
closeFutures.add(future);
}
sessions.clear();

Expand Down Expand Up @@ -329,6 +352,9 @@ public CompletableFuture<CopilotSession> createSession(SessionConfig config) {
: java.util.UUID.randomUUID().toString();

var session = new CopilotSession(sessionId, connection.rpc);
if (options.getExecutor() != null) {
session.setExecutor(options.getExecutor());
}
SessionRequestBuilder.configureSession(session, config);
sessions.put(sessionId, session);

Expand Down Expand Up @@ -399,6 +425,9 @@ public CompletableFuture<CopilotSession> resumeSession(String sessionId, ResumeS
return ensureConnected().thenCompose(connection -> {
// Register the session before the RPC call to avoid missing early events.
var session = new CopilotSession(sessionId, connection.rpc);
if (options.getExecutor() != null) {
session.setExecutor(options.getExecutor());
}
SessionRequestBuilder.configureSession(session, config);
sessions.put(sessionId, session);

Expand Down
38 changes: 34 additions & 4 deletions src/main/java/com/github/copilot/sdk/CopilotSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -125,6 +126,7 @@ public final class CopilotSession implements AutoCloseable {
private volatile EventErrorPolicy eventErrorPolicy = EventErrorPolicy.PROPAGATE_AND_LOG_ERRORS;
private volatile Map<String, java.util.function.Function<String, CompletableFuture<String>>> transformCallbacks;
private final ScheduledExecutorService timeoutScheduler;
private volatile Executor executor;

/** Tracks whether this session instance has been terminated via close(). */
private volatile boolean isTerminated = false;
Expand Down Expand Up @@ -170,6 +172,14 @@ public final class CopilotSession implements AutoCloseable {
this.timeoutScheduler = executor;
}

/**
* Sets the executor for internal async operations. Package-private; called by
* CopilotClient after construction.
*/
void setExecutor(Executor executor) {
this.executor = executor;
}

/**
* Gets the unique identifier for this session.
*
Expand Down Expand Up @@ -673,7 +683,7 @@ private void handleBroadcastEventAsync(AbstractSessionEvent event) {
*/
private void executeToolAndRespondAsync(String requestId, String toolName, String toolCallId, Object arguments,
ToolDefinition tool) {
CompletableFuture.runAsync(() -> {
Runnable task = () -> {
try {
JsonNode argumentsNode = arguments instanceof JsonNode jn
? jn
Expand Down Expand Up @@ -718,7 +728,17 @@ private void executeToolAndRespondAsync(String requestId, String toolName, Strin
LOG.log(Level.WARNING, "Error sending tool error for requestId=" + requestId, sendEx);
}
}
});
};
try {
if (executor != null) {
CompletableFuture.runAsync(task, executor);
} else {
CompletableFuture.runAsync(task);
}
} catch (RejectedExecutionException e) {
LOG.log(Level.WARNING, "Executor rejected tool task for requestId=" + requestId + "; running inline", e);
task.run();
}
}

/**
Expand All @@ -727,7 +747,7 @@ private void executeToolAndRespondAsync(String requestId, String toolName, Strin
*/
private void executePermissionAndRespondAsync(String requestId, PermissionRequest permissionRequest,
PermissionHandler handler) {
CompletableFuture.runAsync(() -> {
Runnable task = () -> {
try {
var invocation = new PermissionInvocation();
invocation.setSessionId(sessionId);
Expand Down Expand Up @@ -766,7 +786,17 @@ private void executePermissionAndRespondAsync(String requestId, PermissionReques
LOG.log(Level.WARNING, "Error sending permission denied for requestId=" + requestId, sendEx);
}
}
});
};
try {
if (executor != null) {
CompletableFuture.runAsync(task, executor);
} else {
CompletableFuture.runAsync(task);
}
} catch (RejectedExecutionException e) {
LOG.log(Level.WARNING, "Executor rejected perm task for requestId=" + requestId + "; running inline", e);
task.run();
}
}

/**
Expand Down
32 changes: 26 additions & 6 deletions src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -45,6 +47,7 @@ final class RpcHandlerDispatcher {

private final Map<String, CopilotSession> sessions;
private final LifecycleEventDispatcher lifecycleDispatcher;
private final Executor executor;

/**
* Creates a dispatcher with session registry and lifecycle dispatcher.
Expand All @@ -53,10 +56,14 @@ final class RpcHandlerDispatcher {
* the session registry to look up sessions by ID
* @param lifecycleDispatcher
* callback for dispatching lifecycle events
* @param executor
* the executor for async dispatch, or {@code null} for default
*/
RpcHandlerDispatcher(Map<String, CopilotSession> sessions, LifecycleEventDispatcher lifecycleDispatcher) {
RpcHandlerDispatcher(Map<String, CopilotSession> sessions, LifecycleEventDispatcher lifecycleDispatcher,
Executor executor) {
this.sessions = sessions;
this.lifecycleDispatcher = lifecycleDispatcher;
this.executor = executor;
}

/**
Expand Down Expand Up @@ -118,7 +125,7 @@ private void handleLifecycleEvent(JsonNode params) {
}

private void handleToolCall(JsonRpcClient rpc, String requestId, JsonNode params) {
CompletableFuture.runAsync(() -> {
runAsync(() -> {
try {
String sessionId = params.get("sessionId").asText();
String toolCallId = params.get("toolCallId").asText();
Expand Down Expand Up @@ -178,7 +185,7 @@ private void handleToolCall(JsonRpcClient rpc, String requestId, JsonNode params
}

private void handlePermissionRequest(JsonRpcClient rpc, String requestId, JsonNode params) {
CompletableFuture.runAsync(() -> {
runAsync(() -> {
try {
String sessionId = params.get("sessionId").asText();
JsonNode permissionRequest = params.get("permissionRequest");
Expand Down Expand Up @@ -222,7 +229,7 @@ private void handlePermissionRequest(JsonRpcClient rpc, String requestId, JsonNo

private void handleUserInputRequest(JsonRpcClient rpc, String requestId, JsonNode params) {
LOG.fine("Received userInput.request: " + params);
CompletableFuture.runAsync(() -> {
runAsync(() -> {
try {
String sessionId = params.get("sessionId").asText();
String question = params.get("question").asText();
Expand Down Expand Up @@ -278,7 +285,7 @@ private void handleUserInputRequest(JsonRpcClient rpc, String requestId, JsonNod
}

private void handleHooksInvoke(JsonRpcClient rpc, String requestId, JsonNode params) {
CompletableFuture.runAsync(() -> {
runAsync(() -> {
try {
String sessionId = params.get("sessionId").asText();
String hookType = params.get("hookType").asText();
Expand Down Expand Up @@ -321,7 +328,7 @@ interface LifecycleEventDispatcher {
}

private void handleSystemMessageTransform(JsonRpcClient rpc, String requestId, JsonNode params) {
CompletableFuture.runAsync(() -> {
runAsync(() -> {
try {
final long requestIdLong;
try {
Expand Down Expand Up @@ -359,4 +366,17 @@ private void handleSystemMessageTransform(JsonRpcClient rpc, String requestId, J
}
});
}

private void runAsync(Runnable task) {
try {
if (executor != null) {
CompletableFuture.runAsync(task, executor);
} else {
CompletableFuture.runAsync(task);
}
} catch (RejectedExecutionException e) {
LOG.log(Level.WARNING, "Executor rejected handler task; running inline", e);
task.run();
}
}
}
Loading
Loading