diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 00000000..7f17b2ff --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,21 @@ +{ + "permissions": { + "allow": [ + "Bash(dir:*)", + "Bash(npm run build)", + "Bash(ccr restart:*)", + "Bash(ccr:*)", + "Bash(curl:*)", + "Bash(Select-Object -Last 30)", + "Bash(git log:*)", + "Bash(git add:*)", + "Bash(git commit:*)", + "Bash(del:*)", + "Bash(git rm:*)", + "Bash(node -r ts-node/register src/utils/SSEParserUtils.test.ts:*)", + "Bash(node:*)" + ], + "deny": [], + "ask": [] + } +} diff --git a/parse_logs.js b/parse_logs.js new file mode 100644 index 00000000..f3fccb9d --- /dev/null +++ b/parse_logs.js @@ -0,0 +1,125 @@ +const fs = require('fs'); +const path = require('path'); + +/** + * 解析日志文件并提取请求数据 + * @param {string} logFilePath - 日志文件路径 + * @param {string} outputDir - 输出目录 + */ +function parseLogFile(logFilePath, outputDir = './output') { + try { + // 创建输出目录 + if (!fs.existsSync(outputDir)) { + fs.mkdirSync(outputDir, { recursive: true }); + } + + // 读取日志文件 + const logContent = fs.readFileSync(logFilePath, 'utf8'); + const lines = logContent.split('\n').filter(line => line.trim()); + + // 存储请求数据 + const requests = new Map(); + + console.log(`正在处理 ${lines.length} 行日志...`); + + // 解析每一行 + lines.forEach((line, index) => { + try { + // 跳过空行和非JSON行 + if (!line.trim() || !line.startsWith('{')) { + return; + } + + const logEntry = JSON.parse(line); + + // 只处理包含 reqId 的日志条目 + if (logEntry.reqId) { + const reqId = logEntry.reqId; + + // 初始化请求数据结构 + if (!requests.has(reqId)) { + requests.set(reqId, { + requestData: null, + streamCompleteResponse: null + }); + } + + const request = requests.get(reqId); + + // 提取请求数据 + if (logEntry.requestData) { + request.requestData = logEntry.requestData; + } + + // 提取响应数据 + if (logEntry.streamCompleteResponse) { + request.streamCompleteResponse = logEntry.streamCompleteResponse; + } + } + } catch (parseError) { + console.warn(`警告: 无法解析第 ${index + 1} 行: ${parseError.message}`); + } + }); + + // 保存每个请求的数据 + let savedCount = 0; + requests.forEach((requestData, reqId) => { + if (requestData.requestData || requestData.streamCompleteResponse) { + // 保存请求数据 + if (requestData.requestData) { + const requestFilePath = path.join(outputDir, `${reqId}-request.json`); + fs.writeFileSync(requestFilePath, JSON.stringify(requestData.requestData, null, 2)); + console.log(`✓ 已保存请求数据: ${requestFilePath}`); + } + + // 保存响应数据 + if (requestData.streamCompleteResponse) { + const responseFilePath = path.join(outputDir, `${reqId}-response.json`); + fs.writeFileSync(responseFilePath, JSON.stringify(requestData.streamCompleteResponse, null, 2)); + console.log(`✓ 已保存响应数据: ${responseFilePath}`); + } + + savedCount++; + } + }); + + console.log(`\n处理完成! 总共处理了 ${requests.size} 个请求,成功保存 ${savedCount} 个请求的数据。`); + + } catch (error) { + console.error('处理日志文件时出错:', error.message); + process.exit(1); + } +} + +// 命令行参数处理 +function main() { + const args = process.argv.slice(2); + + if (args.length === 0) { + console.log('使用方法: node parse_logs.js <日志文件路径> [输出目录]'); + console.log('示例: node parse_logs.js ccr-20251127092353.log ./output'); + process.exit(1); + } + + const logFilePath = args[0]; + const outputDir = args[1] || './output'; + + // 检查日志文件是否存在 + if (!fs.existsSync(logFilePath)) { + console.error(`错误: 日志文件不存在: ${logFilePath}`); + process.exit(1); + } + + console.log(`开始处理日志文件: ${logFilePath}`); + console.log(`输出目录: ${outputDir}`); + console.log('---'); + + parseLogFile(logFilePath, outputDir); +} + +// 如果直接运行此脚本 +if (require.main === module) { + main(); +} + +module.exports = { parseLogFile }; \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index efb38f4e..ab9bb017 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,39 +4,41 @@ import { homedir } from "os"; import path, { join } from "path"; import { initConfig, initDir, cleanupLogFiles } from "./utils"; import { createServer } from "./server"; -import { router } from "./utils/router"; -import { apiKeyAuth } from "./middleware/auth"; import { cleanupPidFile, isServiceRunning, savePid, } from "./utils/processCheck"; -import { CONFIG_FILE } from "./constants"; +import { CONFIG_FILE, HOME_DIR } from "./constants"; import { createStream } from 'rotating-file-stream'; -import { HOME_DIR } from "./constants"; -import { sessionUsageCache } from "./utils/cache"; -import {SSEParserTransform} from "./utils/SSEParser.transform"; -import {SSESerializerTransform} from "./utils/SSESerializer.transform"; -import {rewriteStream} from "./utils/rewriteStream"; -import JSON5 from "json5"; -import { IAgent } from "./agents/type"; -import agentsManager from "./agents"; +import { + setupRequestLoggingHook, + setupResponseLoggingHook, + setupAuthHook, + setupAgentAndRoutingHook, + setupErrorEventHook, + setupSendEventHook, + setupAgentProcessingHook +} from "./utils/hooks"; import { EventEmitter } from "node:events"; const event = new EventEmitter() -async function initializeClaudeConfig() { +/** + * Initialize Claude configuration file + */ +async function initializeClaudeConfig(): Promise { const homeDir = homedir(); const configPath = join(homeDir, ".claude.json"); if (!existsSync(configPath)) { - const userID = Array.from( + const userId = Array.from( { length: 64 }, () => Math.random().toString(16)[2] ).join(""); const configContent = { numStartups: 184, autoUpdaterStatus: "enabled", - userID, + userID: userId, hasCompletedOnboarding: true, lastOnboardingVersion: "1.0.17", projects: {}, @@ -45,89 +47,146 @@ async function initializeClaudeConfig() { } } +/** + * Set up process signal handlers for graceful shutdown + */ +function setupSignalHandlers(): void { + const handleShutdown = (signal: string) => { + console.log(`Received ${signal}, cleaning up...`); + cleanupPidFile(); + process.exit(0); + }; + + process.on("SIGINT", () => handleShutdown("SIGINT")); + process.on("SIGTERM", () => handleShutdown("SIGTERM")); +} + +/** + * Pads a number with leading zero if needed + */ +function padNumber(num: number): string { + return (num > 9 ? "" : "0") + num; +} + +/** + * Configure logger based on config settings + */ +function configureLogger(config: any, homeDir: string) { + return config.LOG !== false + ? { + level: config.LOG_LEVEL || "debug", + stream: createStream((time, index) => { + if (!time) { + time = new Date(); + } + + const yearAndMonth = time.getFullYear() + "" + padNumber(time.getMonth() + 1); + const day = padNumber(time.getDate()); + const hour = padNumber(time.getHours()); + const minute = padNumber(time.getMinutes()); + const second = padNumber(time.getSeconds()); + + return `./logs/ccr-${yearAndMonth}${day}${hour}${minute}${second}${index ? `_${index}` : ''}.log`; + }, { + path: homeDir, + maxFiles: 3, + interval: "1d", + compress: false, + maxSize: "50M" + }), + } + : false; +} + interface RunOptions { port?: number; } -async function run(options: RunOptions = {}) { - // Check if service is already running - const isRunning = await isServiceRunning() - if (isRunning) { - console.log("✅ Service is already running in the background."); - return; - } - +/** + * Initialize application configuration and directories + */ +async function initializeApp(): Promise { await initializeClaudeConfig(); await initDir(); - // Clean up old log files, keeping only the 10 most recent ones await cleanupLogFiles(); - const config = await initConfig(); + return await initConfig(); +} +// Configuration constants +const DEFAULT_HOST = "127.0.0.1"; +const DEFAULT_PORT = 3456; - let HOST = config.HOST || "127.0.0.1"; +/** + * Resolve host configuration with security considerations + */ +function resolveHostConfig(config: any): string { + let host = config.HOST || DEFAULT_HOST; if (config.HOST && !config.APIKEY) { - HOST = "127.0.0.1"; - console.warn("⚠️ API key is not set. HOST is forced to 127.0.0.1."); + host = DEFAULT_HOST; + console.warn("⚠️ API key is not set. HOST is forced to 127.0.0.1 for security."); } - const port = config.PORT || 3456; + return host; +} - // Save the PID of the background process - savePid(process.pid); +/** + * Resolve service port configuration from environment or config + */ +function resolveServicePort(config: any): number { + const configPort = config.PORT || DEFAULT_PORT; + const envPort = process.env.SERVICE_PORT; + + if (envPort) { + const parsedEnvPort = parseInt(envPort, 10); + if (isNaN(parsedEnvPort)) { + console.warn(`⚠️ Invalid SERVICE_PORT: ${envPort}. Using default port ${configPort}.`); + return configPort; + } + return parsedEnvPort; + } - // Handle SIGINT (Ctrl+C) to clean up PID file - process.on("SIGINT", () => { - console.log("Received SIGINT, cleaning up..."); - cleanupPidFile(); - process.exit(0); + return configPort; +} + +/** + * Setup global error handlers for the process + */ +function setupGlobalErrorHandlers(server: any): void { + process.on("uncaughtException", (err) => { + server.logger.error("Uncaught exception:", err); }); - // Handle SIGTERM to clean up PID file - process.on("SIGTERM", () => { - cleanupPidFile(); - process.exit(0); + process.on("unhandledRejection", (reason, promise) => { + server.logger.error("Unhandled rejection at:", promise, "reason:", reason); }); +} - // Use port from environment variable if set (for background process) - const servicePort = process.env.SERVICE_PORT - ? parseInt(process.env.SERVICE_PORT) - : port; +async function run(options: RunOptions = {}): Promise { + // Check if service is already running + const isRunning = await isServiceRunning(); + if (isRunning) { + console.log("✅ Service is already running in the background."); + return; + } - // Configure logger based on config settings - const pad = num => (num > 9 ? "" : "0") + num; - const generator = (time, index) => { - if (!time) { - time = new Date() - } + // Initialize application + const config = await initializeApp(); + const host = resolveHostConfig(config); + const servicePort = resolveServicePort(config); - var month = time.getFullYear() + "" + pad(time.getMonth() + 1); - var day = pad(time.getDate()); - var hour = pad(time.getHours()); - var minute = pad(time.getMinutes()); + // Save the PID and set up signal handlers + savePid(process.pid); + setupSignalHandlers(); - return `./logs/ccr-${month}${day}${hour}${minute}${pad(time.getSeconds())}${index ? `_${index}` : ''}.log`; - }; - const loggerConfig = - config.LOG !== false - ? { - level: config.LOG_LEVEL || "debug", - stream: createStream(generator, { - path: HOME_DIR, - maxFiles: 3, - interval: "1d", - compress: false, - maxSize: "50M" - }), - } - : false; + // Configure logger + const loggerConfig = configureLogger(config, HOME_DIR); const server = createServer({ jsonPath: CONFIG_FILE, initialConfig: { - // ...config, providers: config.Providers || config.providers, - HOST: HOST, + HOST: host, PORT: servicePort, LOG_FILE: join( homedir(), @@ -138,247 +197,17 @@ async function run(options: RunOptions = {}) { logger: loggerConfig, }); - // Add global error handlers to prevent the service from crashing - process.on("uncaughtException", (err) => { - server.logger.error("Uncaught exception:", err); - }); - - process.on("unhandledRejection", (reason, promise) => { - server.logger.error("Unhandled rejection at:", promise, "reason:", reason); - }); - // Add async preHandler hook for authentication - server.addHook("preHandler", async (req, reply) => { - return new Promise((resolve, reject) => { - const done = (err?: Error) => { - if (err) reject(err); - else resolve(); - }; - // Call the async auth function - apiKeyAuth(config)(req, reply, done).catch(reject); - }); - }); - server.addHook("preHandler", async (req, reply) => { - if (req.url.startsWith("/v1/messages") && !req.url.startsWith("/v1/messages/count_tokens")) { - const useAgents = [] - - for (const agent of agentsManager.getAllAgents()) { - if (agent.shouldHandle(req, config)) { - // 设置agent标识 - useAgents.push(agent.name) - - // change request body - agent.reqHandler(req, config); - - // append agent tools - if (agent.tools.size) { - if (!req.body?.tools?.length) { - req.body.tools = [] - } - req.body.tools.unshift(...Array.from(agent.tools.values()).map(item => { - return { - name: item.name, - description: item.description, - input_schema: item.input_schema - } - })) - } - } - } - - if (useAgents.length) { - req.agents = useAgents; - } - await router(req, reply, { - config, - event - }); - } - }); - server.addHook("onError", async (request, reply, error) => { - event.emit('onError', request, reply, error); - }) - server.addHook("onSend", (req, reply, payload, done) => { - if (req.sessionId && req.url.startsWith("/v1/messages") && !req.url.startsWith("/v1/messages/count_tokens")) { - if (payload instanceof ReadableStream) { - if (req.agents) { - const abortController = new AbortController(); - const eventStream = payload.pipeThrough(new SSEParserTransform()) - let currentAgent: undefined | IAgent; - let currentToolIndex = -1 - let currentToolName = '' - let currentToolArgs = '' - let currentToolId = '' - const toolMessages: any[] = [] - const assistantMessages: any[] = [] - // 存储Anthropic格式的消息体,区分文本和工具类型 - return done(null, rewriteStream(eventStream, async (data, controller) => { - try { - // 检测工具调用开始 - if (data.event === 'content_block_start' && data?.data?.content_block?.name) { - const agent = req.agents.find((name: string) => agentsManager.getAgent(name)?.tools.get(data.data.content_block.name)) - if (agent) { - currentAgent = agentsManager.getAgent(agent) - currentToolIndex = data.data.index - currentToolName = data.data.content_block.name - currentToolId = data.data.content_block.id - return undefined; - } - } - - // 收集工具参数 - if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data?.delta?.type === 'input_json_delta') { - currentToolArgs += data.data?.delta?.partial_json; - return undefined; - } - - // 工具调用完成,处理agent调用 - if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data.type === 'content_block_stop') { - try { - const args = JSON5.parse(currentToolArgs); - assistantMessages.push({ - type: "tool_use", - id: currentToolId, - name: currentToolName, - input: args - }) - const toolResult = await currentAgent?.tools.get(currentToolName)?.handler(args, { - req, - config - }); - toolMessages.push({ - "tool_use_id": currentToolId, - "type": "tool_result", - "content": toolResult - }) - currentAgent = undefined - currentToolIndex = -1 - currentToolName = '' - currentToolArgs = '' - currentToolId = '' - } catch (e) { - console.log(e); - } - return undefined; - } - - if (data.event === 'message_delta' && toolMessages.length) { - req.body.messages.push({ - role: 'assistant', - content: assistantMessages - }) - req.body.messages.push({ - role: 'user', - content: toolMessages - }) - const response = await fetch(`http://127.0.0.1:${config.PORT || 3456}/v1/messages`, { - method: "POST", - headers: { - 'x-api-key': config.APIKEY, - 'content-type': 'application/json', - }, - body: JSON.stringify(req.body), - }) - if (!response.ok) { - return undefined; - } - const stream = response.body!.pipeThrough(new SSEParserTransform()) - const reader = stream.getReader() - while (true) { - try { - const {value, done} = await reader.read(); - if (done) { - break; - } - if (['message_start', 'message_stop'].includes(value.event)) { - continue - } - - // 检查流是否仍然可写 - if (!controller.desiredSize) { - break; - } - - controller.enqueue(value) - }catch (readError: any) { - if (readError.name === 'AbortError' || readError.code === 'ERR_STREAM_PREMATURE_CLOSE') { - abortController.abort(); // 中止所有相关操作 - break; - } - throw readError; - } - - } - return undefined - } - return data - }catch (error: any) { - console.error('Unexpected error in stream processing:', error); - - // 处理流提前关闭的错误 - if (error.code === 'ERR_STREAM_PREMATURE_CLOSE') { - abortController.abort(); - return undefined; - } - - // 其他错误仍然抛出 - throw error; - } - }).pipeThrough(new SSESerializerTransform())) - } - - const [originalStream, clonedStream] = payload.tee(); - const read = async (stream: ReadableStream) => { - const reader = stream.getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - // Process the value if needed - const dataStr = new TextDecoder().decode(value); - if (!dataStr.startsWith("event: message_delta")) { - continue; - } - const str = dataStr.slice(27); - try { - const message = JSON.parse(str); - sessionUsageCache.put(req.sessionId, message.usage); - } catch {} - } - } catch (readError: any) { - if (readError.name === 'AbortError' || readError.code === 'ERR_STREAM_PREMATURE_CLOSE') { - console.error('Background read stream closed prematurely'); - } else { - console.error('Error in background stream reading:', readError); - } - } finally { - reader.releaseLock(); - } - } - read(clonedStream); - return done(null, originalStream) - } - sessionUsageCache.put(req.sessionId, payload.usage); - if (typeof payload ==='object') { - if (payload.error) { - return done(payload.error, null) - } else { - return done(payload, null) - } - } - } - if (typeof payload ==='object' && payload.error) { - return done(payload.error, null) - } - done(null, payload) - }); - server.addHook("onSend", async (req, reply, payload) => { - event.emit('onSend', req, reply, payload); - return payload; - }) - + // Setup global error handlers and hooks + setupGlobalErrorHandlers(server); + setupRequestLoggingHook(server); + setupResponseLoggingHook(server); + setupAuthHook(server, config); + setupAgentAndRoutingHook(server, config, event); + setupErrorEventHook(server, event); + setupSendEventHook(server, event); + setupAgentProcessingHook(server, config); server.start(); } export { run }; -// run(); diff --git a/src/utils/SSEParserUtils.test.ts b/src/utils/SSEParserUtils.test.ts new file mode 100644 index 00000000..2bf8edf3 --- /dev/null +++ b/src/utils/SSEParserUtils.test.ts @@ -0,0 +1,240 @@ +import * as assert from 'assert'; +import { getFullContent } from './SSEParserUtils'; + +// 测试用例1: 正常的文本和工具调用场景 +function testNormalTextAndToolCalls(): void { + console.log('Running testNormalTextAndToolCalls...'); + + const sseEvents = [ + 'data: {"type": "message_start", "message": {"id": "msg_123", "role": "assistant", "model": "claude-3-sonnet"}}', + 'data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}', + 'data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}}', + 'data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": " World"}}', + 'data: {"type": "content_block_stop", "index": 0}', + 'data: {"type": "content_block_start", "index": 1, "content_block": {"type": "tool_use", "id": "toolu_001", "name": "get_weather"}}', + 'data: {"type": "content_block_delta", "index": 1, "delta": {"type": "input_json_delta", "partial_json": "{\\"location\\":\\"Beijing\\""}}', + 'data: {"type": "content_block_delta", "index": 1, "delta": {"type": "input_json_delta", "partial_json": ",\\"unit\\":\\"celsius\\"}"}}', + 'data: {"type": "content_block_stop", "index": 1}', + 'data: {"type": "message_stop"}' + ]; + + const result = getFullContent(sseEvents); + console.log('Result:', result); + + // 验证结果包含预期的文本和工具调用 + assert(result.includes('Hello World'), 'Should contain "Hello World"'); + assert(result.includes('get_weather'), 'Should contain "get_weather"'); + assert(result.includes('Beijing'), 'Should contain "Beijing"'); + assert(result.includes('celsius'), 'Should contain "celsius"'); + + console.log('✓ testNormalTextAndToolCalls passed'); +} + +// 测试用例2: 只有文本内容 +function testTextOnly(): void { + console.log('Running testTextOnly...'); + + const sseEvents = [ + 'data: {"type": "message_start", "message": {"id": "msg_456", "role": "assistant", "model": "claude-3-sonnet"}}', + 'data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}', + 'data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Simple text"}}', + 'data: {"type": "content_block_stop", "index": 0}', + 'data: {"type": "message_stop"}' + ]; + + const result = getFullContent(sseEvents); + console.log('Result:', result); + + assert(result.includes('Simple text'), 'Should contain "Simple text"'); + assert(result.includes('\n[]'), 'Should contain empty array for tool calls'); + + console.log('✓ testTextOnly passed'); +} + +// 测试用例3: 只有工具调用(揭示Bug) +function testToolCallsOnly(): void { + console.log('Running testToolCallsOnly...'); + + const sseEvents = [ + 'data: {"type": "message_start", "message": {"id": "msg_789", "role": "assistant", "model": "claude-3-sonnet"}}', + 'data: {"type": "content_block_start", "index": 0, "content_block": {"type": "tool_use", "id": "toolu_002", "name": "calculate"}}', + 'data: {"type": "content_block_delta", "index": 0, "delta": {"type": "input_json_delta", "partial_json": "{\\"a\\":1,\\"b\\":2}"}}', + 'data: {"type": "content_block_stop", "index": 0}', + 'data: {"type": "message_stop"}' + ]; + + const result = getFullContent(sseEvents); + console.log('Result:', result); + + // Bug验证:当没有文本内容时,会返回换行符+工具调用JSON + assert(result.startsWith('\n'), 'Should start with newline when no text content'); + assert(result.includes('calculate'), 'Should contain "calculate"'); + assert(result.includes('{"a":1,"b":2}'), 'Should contain tool input'); + + console.log('✓ testToolCallsOnly passed'); +} + +// 测试用例4: 空事件数组(揭示Bug) +function testEmptyEvents(): void { + console.log('Running testEmptyEvents...'); + + const sseEvents: string[] = []; + const result = getFullContent(sseEvents); + + console.log('Result:', result); + + // Bug验证:空事件应该返回空数组的JSON字符串 + assert.strictEqual(result, '[]', 'Should return JSON stringified empty array'); + + console.log('✓ testEmptyEvents passed'); +} + +// 测试用例5: 无效的SSE事件(揭示Bug) +function testInvalidEvents(): void { + console.log('Running testInvalidEvents...'); + + const sseEvents = [ + 'invalid data', + 'data: {"invalid": json}', + 'data: not json at all' + ]; + + const result = getFullContent(sseEvents); + console.log('Result:', result); + + // Bug验证:解析失败时返回原始事件数组的JSON字符串 + const expectedResult = JSON.stringify(sseEvents); + assert.strictEqual(result, expectedResult, 'Should return JSON stringified original events'); + + console.log('✓ testInvalidEvents passed'); +} + +// 测试用例6: 不完整的事件流(没有message_stop) +function testIncompleteEvents(): void { + console.log('Running testIncompleteEvents...'); + + const sseEvents = [ + 'data: {"type": "message_start", "message": {"id": "msg_incomplete", "role": "assistant", "model": "claude-3-sonnet"}}', + 'data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}', + 'data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Incomplete"}}', + // 缺少 content_block_stop 和 message_stop + ]; + + const result = getFullContent(sseEvents); + console.log('Result:', result); + + // 应该能够处理不完整的事件流 + assert(result.includes('Incomplete'), 'Should contain partial text content'); + + console.log('✓ testIncompleteEvents passed'); +} + +// 测试用例7: 包含redacted_thinking +function testRedactedThinking(): void { + console.log('Running testRedactedThinking...'); + + const sseEvents = [ + 'data: {"type": "message_start", "message": {"id": "msg_redacted", "role": "assistant", "model": "claude-3-sonnet"}}', + 'data: {"type": "content_block_start", "index": 0, "content_block": {"type": "redacted_thinking", "text": ""}}', + 'data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hidden thought"}}', + 'data: {"type": "content_block_stop", "index": 0}', + 'data: {"type": "content_block_start", "index": 1, "content_block": {"type": "text", "text": ""}}', + 'data: {"type": "content_block_delta", "index": 1, "delta": {"type": "text_delta", "text": "Visible response"}}', + 'data: {"type": "content_block_stop", "index": 1}', + 'data: {"type": "message_stop"}' + ]; + + const result = getFullContent(sseEvents); + console.log('Result:', result); + + // redacted_thinking 不应该出现在最终结果中,只有可见文本 + assert(!result.includes('Hidden thought'), 'Should not include redacted thinking content'); + assert(result.includes('Visible response'), 'Should include visible text content'); + + console.log('✓ testRedactedThinking passed'); +} + +// 测试用例8: 包含ping事件 +function testPingEvents(): void { + console.log('Running testPingEvents...'); + + const sseEvents = [ + 'data: {"type": "message_start", "message": {"id": "msg_ping", "role": "assistant", "model": "claude-3-sonnet"}}', + 'data: {"type": "ping"}', + 'data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}', + 'data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "After ping"}}', + 'data: {"type": "content_block_stop", "index": 0}', + 'data: {"type": "ping"}', + 'data: {"type": "message_stop"}' + ]; + + const result = getFullContent(sseEvents); + console.log('Result:', result); + + // ping事件应该被忽略 + assert(result.includes('After ping'), 'Should contain text content'); + assert(!result.includes('ping'), 'Should not include ping events'); + + console.log('✓ testPingEvents passed'); +} + +// 运行所有测试 +function runAllTests(): void { + console.log('=== 开始运行 getFullContent 函数的单元测试 ===\n'); + + try { + testNormalTextAndToolCalls(); + console.log(''); + + testTextOnly(); + console.log(''); + + testToolCallsOnly(); + console.log(''); + + testEmptyEvents(); + console.log(''); + + testInvalidEvents(); + console.log(''); + + testIncompleteEvents(); + console.log(''); + + testRedactedThinking(); + console.log(''); + + testPingEvents(); + console.log(''); + + console.log('🎉 所有测试都通过了!'); + console.log('\n📋 发现的潜在问题总结:'); + console.log('1. 🐛 当没有文本内容时,返回结果以换行符开头(Bug: 应该处理空文本情况)'); + console.log('2. 🐛 成功解析和失败解析时的返回格式不一致(Bug: 统一返回格式)'); + console.log('3. 🐛 工具调用直接JSON.stringify,可读性较差(改进建议: 格式化输出)'); + console.log('4. ✅ redacted_thinking类型的内容被正确过滤(正确行为)'); + console.log('5. ✅ ping事件被正确忽略(正确行为)'); + console.log('6. ✅ 不完整事件流能够正确处理(正确行为)'); + + } catch (error) { + console.error('❌ 测试失败:', (error as Error).message); + process.exit(1); + } +} + +// 如果直接运行此文件,则执行测试 +if (require.main === module) { + runAllTests(); +} + +export { + runAllTests, + testNormalTextAndToolCalls, + testTextOnly, + testToolCallsOnly, + testEmptyEvents, + testInvalidEvents, + testIncompleteEvents, + testRedactedThinking, + testPingEvents +}; \ No newline at end of file diff --git a/src/utils/SSEParserUtils.ts b/src/utils/SSEParserUtils.ts new file mode 100644 index 00000000..21f25f65 --- /dev/null +++ b/src/utils/SSEParserUtils.ts @@ -0,0 +1,459 @@ +// Content block types +interface TextContent { + type: 'text'; + text: string; +} + +interface ToolUseContent { + type: 'tool_use'; + id: string; + name: string; + input: Record; + partialInput?: string; +} + +interface RedactedThinkingContent { + type: 'redacted_thinking'; + text: string; +} + +type ContentBlock = TextContent | ToolUseContent | RedactedThinkingContent; + +// Event type constants for better maintainability +const SSE_EVENT_TYPES = { + MESSAGE_START: 'message_start', + CONTENT_BLOCK_START: 'content_block_start', + CONTENT_BLOCK_DELTA: 'content_block_delta', + CONTENT_BLOCK_STOP: 'content_block_stop', + MESSAGE_DELTA: 'message_delta', + MESSAGE_STOP: 'message_stop', + PING: 'ping' +} as const; + +// Content block type constants +const CONTENT_BLOCK_TYPES = { + TEXT: 'text', + TOOL_USE: 'tool_use', + REDACTED_THINKING: 'redacted_thinking' +} as const; + +// Delta type constants +const DELTA_TYPES = { + TEXT_DELTA: 'text_delta', + INPUT_JSON_DELTA: 'input_json_delta' +} as const; + +// Message and SSE data types +interface CompleteMessage { + id: string | null; + role: string | null; + content: ContentBlock[]; + model: string | null; +} + +interface SSEData { + type: string; + message?: { + id: string; + role: string; + model: string; + content?: any[]; + }; + index?: number; + content_block?: { + type: string; + text?: string; + id?: string; + name?: string; + input?: any; + }; + delta?: { + type?: string; + text?: string; + partial_json?: string; + }; +} + +// ============================================================================= +// SSE Message Assembler Class +// ============================================================================= + +class SSEMessageAssembler { + private currentMessage: CompleteMessage | null = null; + private currentContentBlock: ContentBlock | null = null; + private currentIndex: number | null = null; + + public reset(): void { + this.currentMessage = null; + this.currentContentBlock = null; + this.currentIndex = null; + } + + public processEvent(eventData: string): CompleteMessage | null { + try { + const processedEventData = this.fixTruncatedData(eventData); + const data: SSEData = JSON.parse(processedEventData); + + switch (data.type) { + case SSE_EVENT_TYPES.MESSAGE_START: + return this.handleMessageStart(data); + case SSE_EVENT_TYPES.CONTENT_BLOCK_START: + this.handleContentBlockStart(data); + break; + case SSE_EVENT_TYPES.CONTENT_BLOCK_DELTA: + this.handleContentBlockDelta(data); + break; + case SSE_EVENT_TYPES.CONTENT_BLOCK_STOP: + this.handleContentBlockStop(data); + break; + case SSE_EVENT_TYPES.MESSAGE_DELTA: + // message_delta 事件包含使用信息,但不需要处理为内容块 + break; + case SSE_EVENT_TYPES.MESSAGE_STOP: + return this.handleMessageStop(data); + case SSE_EVENT_TYPES.PING: + // 忽略心跳包,但保持当前状态 + break; + default: + console.warn(`Unknown SSE event type: ${data.type}`); + } + } catch (error) { + console.error(`Failed to parse SSE event data:`, error, `Raw data:`, eventData); + return null; + } + + return null; + } + + /** + * Fix common truncated JSON data patterns + */ + private fixTruncatedData(eventData: string): string { + if (eventData.includes('"output_to"') && !eventData.includes('"output_tokens"')) { + return eventData.replace('"output_to"', '"output_tokens"'); + } + return eventData; + } + + private handleMessageStart(data: SSEData): null { + this.currentMessage = { + id: data.message?.id || null, + role: data.message?.role || null, + content: [], + model: data.message?.model || null + }; + return null; + } + + private handleContentBlockStart(data: SSEData): void { + if (!this.currentMessage) return; + + this.currentIndex = data.index ?? 0; + + const contentBlock = data.content_block; + if (!contentBlock) return; + + switch (contentBlock.type) { + case CONTENT_BLOCK_TYPES.TEXT: + this.currentContentBlock = { + type: CONTENT_BLOCK_TYPES.TEXT, + text: contentBlock.text || '' + }; + break; + + case CONTENT_BLOCK_TYPES.TOOL_USE: + this.currentContentBlock = { + type: CONTENT_BLOCK_TYPES.TOOL_USE, + id: contentBlock.id || '', + name: contentBlock.name || '', + input: contentBlock.input as Record || {}, + partialInput: '' + }; + break; + + case CONTENT_BLOCK_TYPES.REDACTED_THINKING: + this.currentContentBlock = { + type: CONTENT_BLOCK_TYPES.REDACTED_THINKING, + text: contentBlock.text || '' + }; + break; + + default: + console.warn(`Unknown content block type: ${(contentBlock as any).type}`); + this.currentContentBlock = null; + return; + } + + if (this.currentContentBlock && this.currentIndex !== null) { + this.currentMessage.content[this.currentIndex] = this.currentContentBlock; + } + } + + private handleContentBlockDelta(data: SSEData): void { + if (!this.currentContentBlock || !this.currentMessage || this.currentIndex === null) return; + + const delta = data.delta; + if (!delta) return; + + switch (this.currentContentBlock.type) { + case CONTENT_BLOCK_TYPES.TEXT: + if (delta.text) { + this.currentContentBlock.text += delta.text; + } + break; + + case CONTENT_BLOCK_TYPES.TOOL_USE: + if (delta.type === DELTA_TYPES.INPUT_JSON_DELTA && delta.partial_json) { + this.accumulatePartialJsonInput(delta.partial_json); + } + break; + + case CONTENT_BLOCK_TYPES.REDACTED_THINKING: + if (delta.text) { + this.currentContentBlock.text += delta.text; + } + break; + } + + // 更新消息中的对应块 + this.currentMessage.content[this.currentIndex] = this.currentContentBlock; + } + + /** + * Accumulate partial JSON input for tool calls + */ + private accumulatePartialJsonInput(partialJson: string): void { + if (!this.currentContentBlock || this.currentContentBlock.type !== CONTENT_BLOCK_TYPES.TOOL_USE) { + return; + } + + if (!this.currentContentBlock.partialInput) { + this.currentContentBlock.partialInput = ''; + } + this.currentContentBlock.partialInput += partialJson; + + try { + this.currentContentBlock.input = JSON.parse(this.currentContentBlock.partialInput); + } catch (e) { + // JSON parsing incomplete, continue accumulation + } + } + + private handleContentBlockStop(data: SSEData): void { + if (this.currentContentBlock?.type === CONTENT_BLOCK_TYPES.TOOL_USE && this.currentContentBlock.partialInput) { + try { + this.currentContentBlock.input = JSON.parse(this.currentContentBlock.partialInput); + delete this.currentContentBlock.partialInput; + } catch (e) { + console.error(`Failed to parse tool call input JSON:`, e, `Raw data:`, this.currentContentBlock.partialInput); + } + } + + this.currentContentBlock = null; + this.currentIndex = null; + } + + private handleMessageStop(data: SSEData): CompleteMessage | null { + if (!this.currentMessage) return null; + + const completeMessage = { ...this.currentMessage }; + this.reset(); + + return completeMessage; + } + + public getCurrentState(): { message: CompleteMessage | null; currentBlock: ContentBlock | null } { + return { + message: this.currentMessage ? { ...this.currentMessage } : null, + currentBlock: this.currentContentBlock ? { ...this.currentContentBlock } : null + }; + } +} + +// ============================================================================= +// Main SSE Parser Functions +// ============================================================================= + +/** + * Parse SSE content from an array of SSE event strings + * Main function for SSE parsing - use this directly + */ +export function parseSSEContent(events: string[]): CompleteMessage | null { + const assembler = new SSEMessageAssembler(); + let finalMessage: CompleteMessage | null = null; + + const sseEvents = parseSSEEvents(events); + + for (const sseEvent of sseEvents) { + const eventData = extractEventData(sseEvent); + + if (!eventData) { + continue; + } + + const result = assembler.processEvent(eventData); + if (result) { + finalMessage = result; + } + } + + // If no message_stop event received, get current state + if (!finalMessage) { + const state = assembler.getCurrentState(); + finalMessage = state.message; + } + + return finalMessage; +} + +/** + * Parses and filters SSE events from raw event strings + */ +function parseSSEEvents(events: string[]): string[] { + const fullSSEStream = events.join('\n'); + + return fullSSEStream + .split('\n\n') + .filter(event => event.trim()) + .filter(sseEvent => !isPingEvent(sseEvent)); +} + +/** + * Checks if SSE event is a ping event + */ +function isPingEvent(sseEvent: string): boolean { + const lines = sseEvent.split('\n'); + for (const line of lines) { + const trimmedLine = line.trim(); + if (trimmedLine.startsWith('event:')) { + const eventType = trimmedLine.replace(/^event:\s*/, '').trim(); + return eventType === SSE_EVENT_TYPES.PING; + } + } + return false; +} + +/** + * Extracts event data from SSE event string + */ +function extractEventData(sseEvent: string): string { + const lines = sseEvent.split('\n'); + let eventData = ''; + + for (const line of lines) { + const trimmedLine = line.trim(); + if (trimmedLine.startsWith('data:')) { + eventData = trimmedLine.replace(/^data:\s*/, '').trim(); + } + } + + return eventData; +} + +// ============================================================================= +// Utility Functions +// ============================================================================= + +/** + * Extract content blocks by type from a complete message + */ +export function extractContentByType( + message: CompleteMessage, + contentType: ContentBlock['type'] +): ContentBlock[] { + return message.content.filter(block => block.type === contentType); +} + +/** + * Get all text content from a complete message (joins all text blocks) + */ +export function getAllTextContent(message: CompleteMessage): string { + return message.content + .filter((block): block is TextContent => block.type === CONTENT_BLOCK_TYPES.TEXT) + .map(block => block.text) + .join('\n') + .trim(); +} + +/** + * Get all tool calls from a complete message + */ +export function getAllToolCalls(message: CompleteMessage): ToolUseContent[] { + return message.content + .filter((block): block is ToolUseContent => block.type === CONTENT_BLOCK_TYPES.TOOL_USE); +} + +/** + * Format tool calls for output with proper error handling + */ +function formatToolCalls(toolCalls: ToolUseContent[]): Array> { + return toolCalls.map(toolCall => { + const sanitized: Record = { + type: toolCall.type, + id: toolCall.id, + name: toolCall.name + }; + + // Use complete input if available and not empty, otherwise try partialInput + if (toolCall.input && Object.keys(toolCall.input).length > 0) { + sanitized.input = toolCall.input; + } else if (toolCall.partialInput && toolCall.partialInput.trim()) { + sanitized.input = parsePartialInput(toolCall.partialInput); + } else { + sanitized.input = {}; + } + + return sanitized; + }); +} + +/** + * Parse partial input with error handling + */ +function parsePartialInput(partialInput: string): unknown { + try { + const parsedInput = JSON.parse(partialInput); + // Return parsed input if it's a non-empty object + return parsedInput && Object.keys(parsedInput as Record).length > 0 + ? parsedInput + : partialInput; + } catch (e) { + // Return raw string if parsing fails + return partialInput; + } +} + +/** + * Create error response for failed SSE parsing + */ +function createErrorResponse(events: string[]): string { + return `Error: Failed to parse SSE events. Original events:\n${JSON.stringify(events, null, 2)}`; +} + +/** + * Main export function: Parse SSE events and return formatted content + */ +export function getFullContent(events: string[]): Record | string { + const result = parseSSEContent(events); + + if (!result) { + return createErrorResponse(events); + } + + const textContent = getAllTextContent(result); + const toolCalls = getAllToolCalls(result); + + const output: Record = {}; + + // Only add text content if it's not empty + if (textContent) { + output.text = textContent; + } + + // Format tool calls output + if (toolCalls.length > 0) { + output.toolCalls = formatToolCalls(toolCalls); + } + + return output; +} \ No newline at end of file diff --git a/src/utils/hooks.ts b/src/utils/hooks.ts new file mode 100644 index 00000000..eb31f58f --- /dev/null +++ b/src/utils/hooks.ts @@ -0,0 +1,839 @@ +import { EventEmitter } from "node:events"; +import { IAgent } from "../agents/type"; +import agentsManager from "../agents"; +import { sessionUsageCache } from "./cache"; +import { apiKeyAuth } from "../middleware/auth"; +import { router } from "./router"; +import { SSEParserTransform } from "./SSEParser.transform"; +import { SSESerializerTransform } from "./SSESerializer.transform"; +import { rewriteStream } from "./rewriteStream"; +import { getFullContent } from "./SSEParserUtils"; +import JSON5 from "json5"; + +// Type aliases for better readability +type ToolMessage = { + tool_use_id: string; + type: "tool_result"; + content: string; +}; + +type AssistantMessage = { + type: "tool_use"; + id: string; + name: string; + input: Record; +}; + +type StreamMetadata = { + totalChunks: number; + duration: number; + method: string; + url: string; + statusCode: number; + timestamp: string; +}; + +type StreamErrorInfo = { + error: string; + stack: string; + metadata: { + chunksCollected: number; + duration: number; + method: string; + url: string; + timestamp: string; + }; +}; + +/** + * Agent processing state for handling tool calls + */ +class AgentProcessingState { + currentAgent?: IAgent; + currentToolIndex: number = -1; + currentToolName: string = ''; + currentToolArgs: string = ''; + currentToolId: string = ''; + toolMessages: ToolMessage[] = []; + assistantMessages: AssistantMessage[] = []; + + reset(): void { + this.currentAgent = undefined; + this.currentToolIndex = -1; + this.currentToolName = ''; + this.currentToolArgs = ''; + this.currentToolId = ''; + } + + isValid(): boolean { + return this.currentToolIndex > -1 && this.currentToolName.length > 0; + } +} + +/** + * Setup request logging hook + */ +export function setupRequestLoggingHook(server: any): void { + server.addHook("preHandler", async (req, reply) => { + const requestData = { + method: req.method, + url: req.url, + headers: req.headers, + body: req.body, + query: req.query, + timestamp: new Date().toISOString() + }; + + req.log.info({ + requestData, + msg: "Request details" + }, "Incoming request"); + }); +} + +/** + * Setup response logging hook + */ +export function setupResponseLoggingHook(server: any): void { + server.addHook("onSend", async (req, reply, payload) => { + const responseData = { + method: req.method, + url: req.url, + statusCode: reply.statusCode, + headers: reply.getHeaders(), + timestamp: new Date().toISOString() + }; + + const responseBody = extractResponseBody(payload); + + if (payload instanceof ReadableStream) { + responseData.body = { + type: "ReadableStream", + readable: true, + note: "Streaming response - complete content will be logged when stream ends" + }; + payload = createLoggingWrappedStream(payload, req, reply); + } else { + responseData.body = responseBody; + } + + req.log.info({ + responseData, + msg: "Response details" + }, "Response completed"); + + return payload; + }); +} + +/** + * Extract response body for logging + */ +function extractResponseBody(payload: any): any { + if (payload === null || payload === undefined) { + return null; + } + + if (typeof payload === 'string') { + return payload; + } + + if (Buffer.isBuffer(payload)) { + return payload.toString('utf8'); + } + + if (typeof payload === 'object') { + return payload; + } + + return { type: typeof payload, content: payload }; +} + +/** + * Setup authentication hook + */ +export function setupAuthHook(server: any, config: any): void { + server.addHook("preHandler", async (req, reply) => { + return new Promise((resolve, reject) => { + const done = (err?: Error) => { + if (err) reject(err); + else resolve(); + }; + apiKeyAuth(config)(req, reply, done).catch(reject); + }); + }); +} + +/** + * Setup agent and routing hook + */ +export function setupAgentAndRoutingHook(server: any, config: any, event: EventEmitter): void { + server.addHook("preHandler", async (req, reply) => { + if (isMessagesEndpoint(req)) { + const activeAgents = processActiveAgents(req, config); + + if (activeAgents.length) { + req.agents = activeAgents; + } + + await router(req, reply, { config, event }); + } + }); +} + +/** + * Check if request is for messages endpoint + */ +function isMessagesEndpoint(req: any): boolean { + return req.url.startsWith("/v1/messages") && !req.url.startsWith("/v1/messages/count_tokens"); +} + +/** + * Process active agents and update request + */ +function processActiveAgents(req: any, config: any): string[] { + const activeAgents: string[] = []; + + for (const agent of agentsManager.getAllAgents()) { + if (agent.shouldHandle(req, config)) { + activeAgents.push(agent.name); + agent.reqHandler(req, config); + addAgentToolsToRequest(agent, req); + } + } + + return activeAgents; +} + +/** + * Add agent tools to request body + */ +function addAgentToolsToRequest(agent: IAgent, req: any): void { + if (agent.tools.size) { + if (!req.body?.tools?.length) { + req.body.tools = []; + } + const tools = Array.from(agent.tools.values()).map(item => ({ + name: item.name, + description: item.description, + input_schema: item.input_schema + })); + req.body.tools.unshift(...tools); + } +} + +/** + * Setup error event hook + */ +export function setupErrorEventHook(server: any, event: EventEmitter): void { + server.addHook("onError", async (request, reply, error) => { + event.emit('onError', request, reply, error); + }); +} + +/** + * Setup send event hook + */ +export function setupSendEventHook(server: any, event: EventEmitter): void { + server.addHook("onSend", async (req, reply, payload) => { + event.emit('onSend', req, reply, payload); + return payload; + }); +} + +/** + * Setup agent processing hook for handling agent interactions + */ +export function setupAgentProcessingHook(server: any, config: any): void { + server.addHook("onSend", async (req, reply, payload) => { + if (shouldProcessAgentRequest(req)) { + return await processAgentRequest(req, payload, config); + } + + return handleErrorResponse(payload); + }); +} + +/** + * Check if request should be processed by agent + */ +function shouldProcessAgentRequest(req: any): boolean { + return req.sessionId && isMessagesEndpoint(req); +} + +/** + * Process agent request + */ +async function processAgentRequest(req: any, payload: any, config: any): Promise { + if (payload instanceof ReadableStream) { + return handleStreamPayload(payload, req, config); + } + + return handleNonStreamPayload(payload, req); +} + +/** + * Handle stream payload + */ +function handleStreamPayload(payload: ReadableStream, req: any, config: any): ReadableStream { + if (req.agents) { + return handleAgentStreamProcessing(payload, req, config); + } + + const [originalStream, clonedStream] = payload.tee(); + readSessionUsageFromStream(clonedStream, req.sessionId); + return originalStream; +} + +/** + * Handle non-stream payload + */ +function handleNonStreamPayload(payload: any, req: any): any { + sessionUsageCache.put(req.sessionId, payload.usage); + + if (typeof payload === 'object') { + if (payload.error) { + throw payload.error; + } + return payload; + } +} + +/** + * Handle error payload + */ +function handleErrorResponse(payload: any): any { + if (typeof payload === 'object' && payload.error) { + throw payload.error; + } + + return payload; +} + + +// ============= Stream Utilities ============= + +/** + * Pads a number with leading zero if needed + */ +function padNumber(num: number): string { + return (num > 9 ? "" : "0") + num; +} + +/** + * Generates log file names with timestamp + */ +export function generateLogFileName(time?: Date, index?: number): string { + if (!time) { + time = new Date(); + } + + const yearAndMonth = time.getFullYear() + "" + padNumber(time.getMonth() + 1); + const day = padNumber(time.getDate()); + const hour = padNumber(time.getHours()); + const minute = padNumber(time.getMinutes()); + const second = padNumber(time.getSeconds()); + + return `./logs/ccr-${yearAndMonth}${day}${hour}${minute}${second}${index ? `_${index}` : ''}.log`; +} + + +/** + * Creates a logging wrapper for response streams + */ +function createLoggingWrappedStream( + originalStream: ReadableStream, + req: any, + reply: any +): ReadableStream { + const loggedChunks: string[] = []; + const startTime = Date.now(); + + return new ReadableStream({ + async start(controller) { + const reader = originalStream.getReader(); + + try { + while (true) { + const { done, value } = await reader.read(); + + if (done) { + await handleStreamCompletion(loggedChunks, startTime, req, reply, controller); + break; + } + + const chunkText = new TextDecoder().decode(value); + loggedChunks.push(chunkText); + controller.enqueue(value); + } + } catch (error) { + handleStreamError(error as Error, loggedChunks, startTime, req, controller); + } + } + }); +} + +/** + * Handles stream completion and logs final response + */ +async function handleStreamCompletion( + loggedChunks: string[], + startTime: number, + req: any, + reply: any, + controller: ReadableStreamDefaultController +): Promise { + const endTime = Date.now(); + const duration = endTime - startTime; + + req.log.info({ + streamCompleteResponse: { + type: "ReadableStream_complete_response", + content: getFullContent(loggedChunks), + metadata: createStreamMetadata(loggedChunks.length, duration, req, reply), + }, + msg: "Complete stream response" + }, "Stream response completed - full body"); + + controller.close(); +} + +/** + * Handles stream errors and logs error information + */ +function handleStreamError( + error: Error, + loggedChunks: string[], + startTime: number, + req: any, + controller: ReadableStreamDefaultController +): void { + const endTime = Date.now(); + const duration = endTime - startTime; + + req.log.error({ + streamError: createStreamErrorInfo(error, loggedChunks.length, duration, req), + }, "Stream reading error"); + + controller.error(error); +} + +/** + * Creates stream metadata for logging purposes + */ +function createStreamMetadata( + totalChunks: number, + duration: number, + req: any, + reply: any +): StreamMetadata { + return { + totalChunks, + duration, + method: req.method, + url: req.url, + statusCode: reply.statusCode, + timestamp: new Date().toISOString() + }; +} + +/** + * Creates stream error information for logging purposes + */ +function createStreamErrorInfo( + error: Error, + chunksCollected: number, + duration: number, + req: any +): StreamErrorInfo { + return { + error: error.message, + stack: error.stack || '', + metadata: { + chunksCollected, + duration, + method: req.method, + url: req.url, + timestamp: new Date().toISOString() + } + }; +} + +/** + * Reads session usage from background stream + */ +function readSessionUsageFromStream(stream: ReadableStream, sessionId: string): void { + const read = async (stream: ReadableStream) => { + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const dataStr = new TextDecoder().decode(value); + if (!dataStr.startsWith("event: message_delta")) { + continue; + } + + const str = dataStr.slice(27); + try { + const message = JSON.parse(str); + sessionUsageCache.put(sessionId, message.usage); + } catch { + // Ignore parsing errors + } + } + } catch (readError: any) { + handleBackgroundReadError(readError); + } finally { + reader.releaseLock(); + } + }; + + read(stream); +} + +/** + * Handles background read stream errors + */ +function handleBackgroundReadError(readError: any): void { + if (readError.name === 'AbortError' || readError.code === 'ERR_STREAM_PREMATURE_CLOSE') { + console.error('Background read stream closed prematurely'); + } else { + console.error('Error in background stream reading:', readError); + } +} + +// ============= Agent Stream Processing ============= + +/** + * Handle agent stream processing with tool calls + */ +function handleAgentStreamProcessing( + payload: ReadableStream, + req: any, + config: any +): ReadableStream { + const abortController = new AbortController(); + const eventStream = payload.pipeThrough(new SSEParserTransform()); + const agentState = createInitialAgentState(); + + return rewriteStream(eventStream, async (data, controller) => { + try { + const result = await processStreamEvent(data, agentState, req, config); + + if (result === 'continue') { + return undefined; + } + + if (result === 'retry_with_tools') { + await retryRequestWithToolResults(agentState, req, config, controller, abortController); + return undefined; + } + + return data; + } catch (error: any) { + handleStreamProcessingError(error, abortController); + } + }).pipeThrough(new SSESerializerTransform()); +} + +/** + * Create initial agent processing state + */ +function createInitialAgentState(): AgentProcessingState { + return new AgentProcessingState(); +} + +/** + * Process individual stream events + */ +async function processStreamEvent( + data: any, + agentState: AgentProcessingState, + req: any, + config: any +): Promise<'continue' | 'retry_with_tools' | 'pass_through'> { + // Handle tool call start + if (isToolCallStart(data)) { + const agent = findAgentForTool(data.data.content_block.name, req); + if (agent) { + updateAgentStateForToolStart(agentState, agent, data); + return 'continue'; + } + } + + // Collect tool arguments + if (isToolArgumentCollection(data, agentState)) { + collectToolArguments(agentState, data); + return 'continue'; + } + + // Handle tool call completion + if (isToolCallCompletion(data, agentState)) { + await processToolCall(agentState, req, config); + return 'continue'; + } + + // Handle message delta with tool results + if (isMessageDeltaWithToolResults(data, agentState)) { + return 'retry_with_tools'; + } + + return 'pass_through'; +} + +/** + * Check if event is tool call start + */ +function isToolCallStart(data: any): boolean { + return data.event === 'content_block_start' && + data?.data?.content_block?.name; +} + +/** + * Find agent that can handle the tool + */ +function findAgentForTool(toolName: string, req: any): string | undefined { + return req.agents.find((name: string) => + agentsManager.getAgent(name)?.tools.get(toolName) + ); +} + +/** + * Update agent state when tool call starts + */ +function updateAgentStateForToolStart( + agentState: AgentProcessingState, + agentName: string, + data: any +): void { + const agent = agentsManager.getAgent(agentName); + if (!agent) { + console.error(`Agent not found: ${agentName}`); + return; + } + + agentState.currentAgent = agent; + agentState.currentToolIndex = data.data.index; + agentState.currentToolName = data.data.content_block.name; + agentState.currentToolId = data.data.content_block.id; +} + +/** + * Check if event is for collecting tool arguments + */ +function isToolArgumentCollection(data: any, agentState: AgentProcessingState): boolean { + return agentState.currentToolIndex > -1 && + data.data.index === agentState.currentToolIndex && + data.data?.delta?.type === 'input_json_delta'; +} + +/** + * Collect tool arguments from delta + */ +function collectToolArguments(agentState: AgentProcessingState, data: any): void { + agentState.currentToolArgs += data.data?.delta?.partial_json; +} + +/** + * Check if event is tool call completion + */ +function isToolCallCompletion(data: any, agentState: AgentProcessingState): boolean { + return agentState.currentToolIndex > -1 && + data.data.index === agentState.currentToolIndex && + data.data.type === 'content_block_stop'; +} + +/** + * Process a completed tool call + */ +async function processToolCall( + agentState: AgentProcessingState, + req: any, + config: any +): Promise { + try { + const args = JSON5.parse(agentState.currentToolArgs); + + const assistantMessage = createAssistantMessage(agentState); + agentState.assistantMessages.push(assistantMessage); + + const toolResult = await executeTool(agentState, args, req, config); + const toolMessage = createToolMessage(agentState.currentToolId, toolResult); + agentState.toolMessages.push(toolMessage); + + resetAgentToolState(agentState); + } catch (error) { + console.error('Error processing tool call:', error); + } +} + +/** + * Create assistant message for tool use + */ +function createAssistantMessage(agentState: AgentProcessingState): AssistantMessage { + return { + type: "tool_use", + id: agentState.currentToolId, + name: agentState.currentToolName, + input: JSON5.parse(agentState.currentToolArgs) + }; +} + +/** + * Execute tool and get result + */ +async function executeTool( + agentState: AgentProcessingState, + args: any, + req: any, + config: any +): Promise { + const tool = agentState.currentAgent?.tools.get(agentState.currentToolName); + if (!tool) { + throw new Error(`Tool ${agentState.currentToolName} not found`); + } + + return await tool.handler(args, { req, config }); +} + +/** + * Create tool result message + */ +function createToolMessage(toolUseId: string, toolResult: string): ToolMessage { + return { + tool_use_id: toolUseId, + type: "tool_result", + content: toolResult + }; +} + +/** + * Reset agent tool-specific state + */ +function resetAgentToolState(agentState: AgentProcessingState): void { + agentState.reset(); +} + +/** + * Check if event is message delta with tool results + */ +function isMessageDeltaWithToolResults(data: any, agentState: AgentProcessingState): boolean { + return data.event === 'message_delta' && agentState.toolMessages.length > 0; +} + +/** + * Retry request with tool results + */ +async function retryRequestWithToolResults( + agentState: AgentProcessingState, + req: any, + config: any, + controller: any, + abortController: AbortController +): Promise { + addToolMessagesToRequest(req, agentState); + + const response = await makeRetryRequest(req, config); + if (!response.ok) { + return; + } + + await processRetryResponse(response, controller, abortController); +} + +/** + * Add tool messages to request body + */ +function addToolMessagesToRequest(req: any, agentState: AgentProcessingState): void { + req.body.messages.push({ + role: 'assistant', + content: agentState.assistantMessages + }); + + req.body.messages.push({ + role: 'user', + content: agentState.toolMessages + }); +} + +/** + * Make retry request with tool results + */ +async function makeRetryRequest(req: any, config: any): Promise { + return await fetch(`http://127.0.0.1:${config.PORT || 3456}/v1/messages`, { + method: "POST", + headers: { + 'x-api-key': config.APIKEY, + 'content-type': 'application/json', + }, + body: JSON.stringify(req.body), + }); +} + +/** + * Process retry response stream + */ +async function processRetryResponse( + response: Response, + controller: any, + abortController: AbortController +): Promise { + const stream = response.body!.pipeThrough(new SSEParserTransform()); + const reader = stream.getReader(); + + while (true) { + try { + const { value, done } = await reader.read(); + if (done) { + break; + } + + if (shouldSkipEvent(value)) { + continue; + } + + if (!controller.desiredSize) { + break; + } + + controller.enqueue(value); + } catch (readError: any) { + handleRetryStreamError(readError, abortController); + break; + } + } +} + +/** + * Check if event should be skipped + */ +function shouldSkipEvent(value: any): boolean { + return ['message_start', 'message_stop'].includes(value.event); +} + +/** + * Handle retry stream errors + */ +function handleRetryStreamError(readError: any, abortController: AbortController): void { + if (readError.name === 'AbortError' || readError.code === 'ERR_STREAM_PREMATURE_CLOSE') { + abortController.abort(); + } else { + throw readError; + } +} + +/** + * Handle stream processing errors + */ +function handleStreamProcessingError(error: any, abortController: AbortController): void { + console.error('Unexpected error in stream processing:', error); + + if (error.code === 'ERR_STREAM_PREMATURE_CLOSE') { + abortController.abort(); + return; + } + + throw error; +} \ No newline at end of file