diff --git a/packages/core/src/tracing/ai/utils.ts b/packages/core/src/tracing/ai/utils.ts index d9628e3c75e2..a63f7f2e755d 100644 --- a/packages/core/src/tracing/ai/utils.ts +++ b/packages/core/src/tracing/ai/utils.ts @@ -6,6 +6,12 @@ import { getClient } from '../../currentScopes'; import type { Span } from '../../types-hoist/span'; import { isThenable } from '../../utils/is'; import { + GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, + GEN_AI_RESPONSE_ID_ATTRIBUTE, + GEN_AI_RESPONSE_MODEL_ATTRIBUTE, + GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, + GEN_AI_RESPONSE_TEXT_ATTRIBUTE, + GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE, GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE, GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, @@ -99,6 +105,68 @@ export function setTokenUsageAttributes( } } +export interface StreamResponseState { + responseId?: string; + responseModel?: string; + finishReasons: string[]; + responseTexts: string[]; + toolCalls: unknown[]; + promptTokens?: number; + completionTokens?: number; + totalTokens?: number; + cacheCreationInputTokens?: number; + cacheReadInputTokens?: number; +} + +/** + * Ends a streaming span by setting all accumulated response attributes and ending the span. + * Shared across OpenAI, Anthropic, and Google GenAI streaming implementations. + */ +export function endStreamSpan(span: Span, state: StreamResponseState, recordOutputs: boolean): void { + if (!span.isRecording()) { + return; + } + + const attrs: Record = { + [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, + }; + + if (state.responseId) attrs[GEN_AI_RESPONSE_ID_ATTRIBUTE] = state.responseId; + if (state.responseModel) attrs[GEN_AI_RESPONSE_MODEL_ATTRIBUTE] = state.responseModel; + + if (state.promptTokens !== undefined) attrs[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE] = state.promptTokens; + if (state.completionTokens !== undefined) attrs[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE] = state.completionTokens; + + // Use explicit total if provided (OpenAI, Google), otherwise compute from cache tokens (Anthropic) + if (state.totalTokens !== undefined) { + attrs[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE] = state.totalTokens; + } else if ( + state.promptTokens !== undefined || + state.completionTokens !== undefined || + state.cacheCreationInputTokens !== undefined || + state.cacheReadInputTokens !== undefined + ) { + attrs[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE] = + (state.promptTokens ?? 0) + + (state.completionTokens ?? 0) + + (state.cacheCreationInputTokens ?? 0) + + (state.cacheReadInputTokens ?? 0); + } + + if (state.finishReasons.length) { + attrs[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE] = JSON.stringify(state.finishReasons); + } + if (recordOutputs && state.responseTexts.length) { + attrs[GEN_AI_RESPONSE_TEXT_ATTRIBUTE] = state.responseTexts.join(''); + } + if (recordOutputs && state.toolCalls.length) { + attrs[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE] = JSON.stringify(state.toolCalls); + } + + span.setAttributes(attrs); + span.end(); +} + /** * Get the truncated JSON string for a string or array of strings. * diff --git a/packages/core/src/tracing/anthropic-ai/streaming.ts b/packages/core/src/tracing/anthropic-ai/streaming.ts index 940ec53e8030..6e649b32253a 100644 --- a/packages/core/src/tracing/anthropic-ai/streaming.ts +++ b/packages/core/src/tracing/anthropic-ai/streaming.ts @@ -1,15 +1,7 @@ import { captureException } from '../../exports'; import { SPAN_STATUS_ERROR } from '../../tracing'; import type { Span } from '../../types-hoist/span'; -import { - GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, - GEN_AI_RESPONSE_ID_ATTRIBUTE, - GEN_AI_RESPONSE_MODEL_ATTRIBUTE, - GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, - GEN_AI_RESPONSE_TEXT_ATTRIBUTE, - GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE, -} from '../ai/gen-ai-attributes'; -import { setTokenUsageAttributes } from '../ai/utils'; +import { endStreamSpan } from '../ai/utils'; import type { AnthropicAiStreamingEvent } from './types'; import { mapAnthropicErrorToStatusMessage } from './utils'; @@ -208,60 +200,6 @@ function processEvent( handleContentBlockStop(event, state); } -/** - * Finalizes span attributes when stream processing completes - */ -function finalizeStreamSpan(state: StreamingState, span: Span, recordOutputs: boolean): void { - if (!span.isRecording()) { - return; - } - - // Set common response attributes if available - if (state.responseId) { - span.setAttributes({ - [GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId, - }); - } - if (state.responseModel) { - span.setAttributes({ - [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel, - }); - } - - setTokenUsageAttributes( - span, - state.promptTokens, - state.completionTokens, - state.cacheCreationInputTokens, - state.cacheReadInputTokens, - ); - - span.setAttributes({ - [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, - }); - - if (state.finishReasons.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), - }); - } - - if (recordOutputs && state.responseTexts.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), - }); - } - - // Set tool calls if any were captured - if (recordOutputs && state.toolCalls.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls), - }); - } - - span.end(); -} - /** * Instruments an async iterable stream of Anthropic events, updates the span with * streaming attributes and (optionally) the aggregated output text, and yields @@ -291,50 +229,7 @@ export async function* instrumentAsyncIterableStream( yield event; } } finally { - // Set common response attributes if available - if (state.responseId) { - span.setAttributes({ - [GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId, - }); - } - if (state.responseModel) { - span.setAttributes({ - [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel, - }); - } - - setTokenUsageAttributes( - span, - state.promptTokens, - state.completionTokens, - state.cacheCreationInputTokens, - state.cacheReadInputTokens, - ); - - span.setAttributes({ - [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, - }); - - if (state.finishReasons.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), - }); - } - - if (recordOutputs && state.responseTexts.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), - }); - } - - // Set tool calls if any were captured - if (recordOutputs && state.toolCalls.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls), - }); - } - - span.end(); + endStreamSpan(span, state, recordOutputs); } } @@ -366,7 +261,7 @@ export function instrumentMessageStream // The event fired when a message is done being streamed by the API. Corresponds to the message_stop SSE event. // @see https://github.com/anthropics/anthropic-sdk-typescript/blob/d3be31f5a4e6ebb4c0a2f65dbb8f381ae73a9166/helpers.md?plain=1#L42-L44 stream.on('message', () => { - finalizeStreamSpan(state, span, recordOutputs); + endStreamSpan(span, state, recordOutputs); }); stream.on('error', (error: unknown) => { diff --git a/packages/core/src/tracing/google-genai/streaming.ts b/packages/core/src/tracing/google-genai/streaming.ts index d3f6598b8fd7..4a308d922eb9 100644 --- a/packages/core/src/tracing/google-genai/streaming.ts +++ b/packages/core/src/tracing/google-genai/streaming.ts @@ -1,17 +1,7 @@ import { captureException } from '../../exports'; import { SPAN_STATUS_ERROR } from '../../tracing'; -import type { Span, SpanAttributeValue } from '../../types-hoist/span'; -import { - GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, - GEN_AI_RESPONSE_ID_ATTRIBUTE, - GEN_AI_RESPONSE_MODEL_ATTRIBUTE, - GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, - GEN_AI_RESPONSE_TEXT_ATTRIBUTE, - GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE, - GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE, - GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, - GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, -} from '../ai/gen-ai-attributes'; +import type { Span } from '../../types-hoist/span'; +import { endStreamSpan } from '../ai/utils'; import type { GoogleGenAIResponse } from './types'; /** @@ -137,27 +127,6 @@ export async function* instrumentStream( yield chunk; } } finally { - const attrs: Record = { - [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, - }; - - if (state.responseId) attrs[GEN_AI_RESPONSE_ID_ATTRIBUTE] = state.responseId; - if (state.responseModel) attrs[GEN_AI_RESPONSE_MODEL_ATTRIBUTE] = state.responseModel; - if (state.promptTokens !== undefined) attrs[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE] = state.promptTokens; - if (state.completionTokens !== undefined) attrs[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE] = state.completionTokens; - if (state.totalTokens !== undefined) attrs[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE] = state.totalTokens; - - if (state.finishReasons.length) { - attrs[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE] = JSON.stringify(state.finishReasons); - } - if (recordOutputs && state.responseTexts.length) { - attrs[GEN_AI_RESPONSE_TEXT_ATTRIBUTE] = state.responseTexts.join(''); - } - if (recordOutputs && state.toolCalls.length) { - attrs[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE] = JSON.stringify(state.toolCalls); - } - - span.setAttributes(attrs); - span.end(); + endStreamSpan(span, state, recordOutputs); } } diff --git a/packages/core/src/tracing/openai/streaming.ts b/packages/core/src/tracing/openai/streaming.ts index dec3457269da..d51c47cad228 100644 --- a/packages/core/src/tracing/openai/streaming.ts +++ b/packages/core/src/tracing/openai/streaming.ts @@ -1,12 +1,7 @@ import { captureException } from '../../exports'; import { SPAN_STATUS_ERROR } from '../../tracing'; import type { Span } from '../../types-hoist/span'; -import { - GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, - GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, - GEN_AI_RESPONSE_TEXT_ATTRIBUTE, - GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE, -} from '../ai/gen-ai-attributes'; +import { endStreamSpan } from '../ai/utils'; import { RESPONSE_EVENT_TYPES } from './constants'; import type { ChatCompletionChunk, @@ -15,12 +10,7 @@ import type { ResponseFunctionCall, ResponseStreamingEvent, } from './types'; -import { - isChatCompletionChunk, - isResponsesApiStreamEvent, - setCommonResponseAttributes, - setTokenUsageAttributes, -} from './utils'; +import { isChatCompletionChunk, isResponsesApiStreamEvent } from './utils'; /** * State object used to accumulate information from a stream of OpenAI events/chunks. @@ -240,35 +230,7 @@ export async function* instrumentStream( yield event; } } finally { - setCommonResponseAttributes(span, state.responseId, state.responseModel); - setTokenUsageAttributes(span, state.promptTokens, state.completionTokens, state.totalTokens); - - span.setAttributes({ - [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, - }); - - if (state.finishReasons.length) { - span.setAttributes({ - [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), - }); - } - - if (recordOutputs && state.responseTexts.length) { - span.setAttributes({ - [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), - }); - } - - // Set tool calls attribute if any were accumulated - const chatCompletionToolCallsArray = Object.values(state.chatCompletionToolCalls); - const allToolCalls = [...chatCompletionToolCallsArray, ...state.responsesApiToolCalls]; - - if (allToolCalls.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(allToolCalls), - }); - } - - span.end(); + const allToolCalls = [...Object.values(state.chatCompletionToolCalls), ...state.responsesApiToolCalls]; + endStreamSpan(span, { ...state, toolCalls: allToolCalls }, recordOutputs); } }