From f554e7229bcb14a67889568ff51c3a8f6b4d4e52 Mon Sep 17 00:00:00 2001 From: lin onetwo Date: Thu, 17 Apr 2025 17:08:39 +0800 Subject: [PATCH] refactor: use async generator --- src/services/agent/defaultAgents/echo.ts | 108 +++++++++++ src/services/agent/index.ts | 181 +++++++------------ src/services/agent/interface.ts | 2 +- src/services/agent/server/index.ts | 14 +- src/services/agent/server/server.ts | 8 +- src/services/externalAPI/callProviderAPI.ts | 2 +- src/services/externalAPI/index.ts | 190 ++++++++++---------- src/services/externalAPI/interface.ts | 9 + 8 files changed, 293 insertions(+), 221 deletions(-) create mode 100644 src/services/agent/defaultAgents/echo.ts diff --git a/src/services/agent/defaultAgents/echo.ts b/src/services/agent/defaultAgents/echo.ts new file mode 100644 index 00000000..48ba5221 --- /dev/null +++ b/src/services/agent/defaultAgents/echo.ts @@ -0,0 +1,108 @@ +import { container } from '@services/container'; +import { IExternalAPIService } from '@services/externalAPI/interface'; +import serviceIdentifier from '@services/serviceIdentifier'; +import { TaskContext, TaskYieldUpdate } from '../server'; +import { TextPart } from '../server/schema'; + +export async function* echoHandler(context: TaskContext) { + // Send working status first + yield { + state: 'working', + message: { + role: 'agent', + parts: [{ text: 'Processing your message...' }], + }, + } as TaskYieldUpdate; + + // Get ai api service + const externalAPIService = container.get(serviceIdentifier.ExternalAPI); + + // Check if cancelled + if (context.isCancelled()) { + yield { state: 'canceled' } as TaskYieldUpdate; + return; + } + + // Get user message text + const userText = (context.userMessage.parts as TextPart[]) + .filter((part) => part.text) + .map((part) => part.text) + .join(' '); + + // Echo user message + yield { + state: 'working', + message: { + role: 'agent', + parts: [{ text: `You said: ${userText}\n\nGetting AI response...` }], + }, + } as TaskYieldUpdate; + + // Get AI configuration + const aiConfig = await externalAPIService.getAIConfig(); + + // Use generateFromAI instead of streamFromAI with observable-to-async-generator + let currentRequestId: string | null = null; + + try { + // Directly use the async generator + for await (const response of externalAPIService.generateFromAI( + [{ role: 'user', content: userText }], + aiConfig + )) { + // Store requestId for cancellation + if (!currentRequestId && response.requestId) { + currentRequestId = response.requestId; + } + + // Check for cancellation + if (context.isCancelled()) { + // Cancel the current request if we have a requestId + if (currentRequestId) { + await externalAPIService.cancelAIRequest(currentRequestId); + } + yield { state: 'canceled' } as TaskYieldUpdate; + return; + } + + // Handle different response states + if (response.status === 'update' || response.status === 'done') { + // Update UI + yield { + state: response.status === 'done' ? 'completed' : 'working', + message: { + role: 'agent', + parts: [{ text: `You said: ${userText}\n\nAI response: ${response.content}` }], + }, + } as TaskYieldUpdate; + } else if (response.status === 'error') { + // Handle error case + yield { + state: 'completed', + message: { + role: 'agent', + parts: [{ text: `You said: ${userText}\n\nError getting AI response: ${response.content}` }], + }, + } as TaskYieldUpdate; + return; + } + } + } catch (error) { + // Handle any unexpected errors + const errorMessage = error instanceof Error ? error.message : String(error); + console.error('Error in echoHandler:', errorMessage); + + yield { + state: 'completed', + message: { + role: 'agent', + parts: [{ text: `You said: ${userText}\n\nError processing AI response: ${errorMessage}` }], + }, + } as TaskYieldUpdate; + } finally { + // Ensure request is cancelled if needed + if (context.isCancelled() && currentRequestId) { + await externalAPIService.cancelAIRequest(currentRequestId); + } + } +} diff --git a/src/services/agent/index.ts b/src/services/agent/index.ts index 15316138..16fd0bfc 100644 --- a/src/services/agent/index.ts +++ b/src/services/agent/index.ts @@ -1,18 +1,16 @@ -/* eslint-disable @typescript-eslint/require-await */ import { injectable } from 'inversify'; import { nanoid } from 'nanoid'; import { BehaviorSubject, Observable } from 'rxjs'; import { lazyInject } from '@services/container'; import { IDatabaseService } from '@services/database/interface'; -import { AIMessage, AISessionConfig } from '@services/externalAPI/interface'; // 添加AISessionConfig导入 +import { AISessionConfig } from '@services/externalAPI/interface'; // 添加AISessionConfig导入 import { IExternalAPIService } from '@services/externalAPI/interface'; import { logger } from '@services/libs/log'; import serviceIdentifier from '@services/serviceIdentifier'; import { IWikiService } from '@services/wiki/interface'; -import { AgentEntity, TaskEntity } from '@services/database/schema/agent'; // 添加数据库实体导入 +import { echoHandler } from './defaultAgents/echo'; import type { Agent, AgentServiceConfig, AgentTask, IAgentService } from './interface'; -import { TaskYieldUpdate } from './server'; import { AgentHttpServer } from './server/http-server'; import * as schema from './server/schema'; import { A2AServer } from './server/server'; @@ -41,16 +39,15 @@ export class AgentService implements IAgentService { // 重命名流式更新订阅 public taskUpdates$ = new BehaviorSubject>({}); - // Database initialization flag private databaseInitialized = false; - + /** * Ensure database is initialized */ private async ensureDatabaseInitialized(): Promise { if (this.databaseInitialized) return; - + try { // Initialize the database await this.databaseService.initializeDatabase('agent-default'); @@ -64,13 +61,13 @@ export class AgentService implements IAgentService { // Agent registration flag private agentsRegistered = false; - + /** * Ensure agents are registered */ private async ensureAgentsRegistered(): Promise { if (this.agentsRegistered) return; - + try { // Register default agents await this.registerDefaultAgents(); @@ -88,10 +85,10 @@ export class AgentService implements IAgentService { try { // Ensure database is initialized await this.ensureDatabaseInitialized(); - + // Get database connection for agent operations const dataSource = await this.databaseService.getDatabase('agent-default'); - + // Register agent logic console.log('Registering default agents'); @@ -101,7 +98,7 @@ export class AgentService implements IAgentService { name: 'Echo Agent', description: 'Simple echo agent that returns user messages', avatarUrl: 'https://example.com/echo-agent.png', - handler: this.createEchoHandler(), + handler: echoHandler, card: { name: 'Echo Agent', description: 'Simple echo agent', @@ -122,13 +119,13 @@ export class AgentService implements IAgentService { // Store agent in memory this.agents.set(echoAgent.id, echoAgent); - + // Store agent in database to satisfy foreign key constraints const agentRepository = dataSource.getRepository('agents'); try { // Check if agent already exists in database const existingAgent = await agentRepository.findOne({ where: { id: echoAgent.id } }); - + if (!existingAgent) { // Insert agent record into database await agentRepository.save({ @@ -136,17 +133,17 @@ export class AgentService implements IAgentService { name: echoAgent.name, description: echoAgent.description || null, avatarUrl: echoAgent.avatarUrl || null, - card: echoAgent.card ? JSON.stringify(echoAgent.card) : null + card: echoAgent.card ? JSON.stringify(echoAgent.card) : null, }); console.log(`Inserted agent record into database: ${echoAgent.id}`); } else { console.log(`Agent record already exists in database: ${echoAgent.id}`); } - } catch (dbError) { - console.error(`Failed to store agent in database: ${echoAgent.id}`, dbError); + } catch (databaseError) { + console.error(`Failed to store agent in database: ${echoAgent.id}`, databaseError); // Continue anyway - the agent is in memory } - + // Create server instance await this.createAgentServer(echoAgent); @@ -157,52 +154,6 @@ export class AgentService implements IAgentService { } } - /** - * Create echo handler - */ - private createEchoHandler() { - return async function* echoHandler(context: any) { - // DEBUG: console - console.log(`echoHandler`); - // Send working status first - yield { - state: 'working', - message: { - role: 'agent', - parts: [{ text: 'Processing your message...' }], - }, - } as TaskYieldUpdate; - - // Wait a while to simulate processing - await new Promise(resolve => setTimeout(resolve, 1000)); - // DEBUG: console context - console.log(`context`, context); - // Check if cancelled - if (context.isCancelled()) { - yield { state: 'canceled' } as TaskYieldUpdate; - return; - } - // DEBUG: console context.userMessage.parts - console.log(`context.userMessage.parts`, context.userMessage.parts); - // Get user message text - const userText = context.userMessage.parts - .filter((part: any) => part.text) - .map((part: any) => part.text) - .join(' '); - // DEBUG: console userText - console.log(`userText`, userText); - - // Echo user message - yield { - state: 'completed', - message: { - role: 'agent', - parts: [{ text: `You said: ${userText}` }], - }, - } as TaskYieldUpdate; - }; - } - /** * Create a server instance for an agent */ @@ -238,7 +189,7 @@ export class AgentService implements IAgentService { } /** - * Notify task update + * Notify task update */ private notifyTaskUpdate(agentId: string, taskId: string, task: AgentTask | null): void { this.taskUpdates$.next({ [taskId]: task }); @@ -256,7 +207,7 @@ export class AgentService implements IAgentService { id: task.id, agentId, messages: history || [], - status: task.status, + status: task.status, metadata: task.metadata, createdAt: timestamp, updatedAt: timestamp, @@ -316,7 +267,7 @@ export class AgentService implements IAgentService { async getAgents(): Promise[]> { await this.ensureAgentsRegistered(); - + // Return list of Agent objects without handler, ensuring they can be transferred via IPC return Array.from(this.agents.values()).map(agent => ({ id: agent.id, @@ -330,7 +281,7 @@ export class AgentService implements IAgentService { async getAgent(id: string): Promise { await this.ensureAgentsRegistered(); - + return this.agents.get(id); } @@ -339,7 +290,7 @@ export class AgentService implements IAgentService { */ async createTask(agentId: string): Promise { await this.ensureAgentsRegistered(); - + if (!this.agents.has(agentId)) { throw new Error(`Agent with ID ${agentId} not found`); } @@ -372,7 +323,7 @@ export class AgentService implements IAgentService { */ async sendMessage(agentId: string, taskId: string, messageText: string): Promise { await this.ensureAgentsRegistered(); - + // Get server instance const server = await this.getOrCreateAgentServer(agentId); @@ -487,7 +438,7 @@ export class AgentService implements IAgentService { */ private async getOrCreateAgentServer(agentId: string): Promise { await this.ensureAgentsRegistered(); - + if (!this.agents.has(agentId)) { throw new Error(`Agent with ID ${agentId} not found`); } @@ -504,7 +455,7 @@ export class AgentService implements IAgentService { */ async getTask(taskId: string): Promise { await this.ensureAgentsRegistered(); - + // Traverse all agent servers to find matching task for (const [agentId, server] of this.agentServers.entries()) { try { @@ -525,7 +476,7 @@ export class AgentService implements IAgentService { */ async getAgentTasks(agentId: string): Promise { await this.ensureAgentsRegistered(); - + try { const server = await this.getOrCreateAgentServer(agentId); @@ -555,7 +506,7 @@ export class AgentService implements IAgentService { */ async deleteTask(agentId: string, taskId: string): Promise { await this.ensureAgentsRegistered(); - + try { // Get server instance const server = await this.getOrCreateAgentServer(agentId); @@ -584,7 +535,7 @@ export class AgentService implements IAgentService { async startHttpServer(config: AgentServiceConfig): Promise { await this.ensureAgentsRegistered(); - + if (this.httpServer) { await this.stopHttpServer(); } @@ -610,7 +561,7 @@ export class AgentService implements IAgentService { */ public async getDefaultAgentId(): Promise { const dataSource = await this.databaseService.getDatabase('agent-default'); - + // Get agent with most recent activity const result = await dataSource.getRepository(TaskEntity) .createQueryBuilder('task') @@ -618,18 +569,18 @@ export class AgentService implements IAgentService { .orderBy('task.updatedAt', 'DESC') .take(1) .getOne(); - + if (result) { return result.agentId; } - + // If no tasks, get any available agent const agent = await dataSource.getRepository(AgentEntity) .createQueryBuilder('agent') .orderBy('agent.createdAt', 'DESC') .take(1) .getOne(); - + return agent?.id; } @@ -640,14 +591,14 @@ export class AgentService implements IAgentService { try { const dataSource = await this.databaseService.getDatabase('agent-default'); const repository = dataSource.getRepository(AgentEntity); - + const agent = await repository.findOne({ where: { id: agentId } }); - + if (!agent || !agent.aiConfig) { // If no specific config exists, return undefined to use global defaults return undefined; } - + // Parse stored JSON config return JSON.parse(agent.aiConfig) as AISessionConfig; } catch (error) { @@ -663,21 +614,21 @@ export class AgentService implements IAgentService { try { const dataSource = await this.databaseService.getDatabase('agent-default'); const repository = dataSource.getRepository(AgentEntity); - + const agent = await repository.findOne({ where: { id: agentId } }); - + if (!agent) { throw new Error(`Agent with ID ${agentId} not found`); } - + // Merge with existing config if it exists - let currentConfig: AISessionConfig = agent.aiConfig ? - JSON.parse(agent.aiConfig) : - { provider: '', model: '' }; - + const currentConfig: AISessionConfig = agent.aiConfig + ? JSON.parse(agent.aiConfig) + : { provider: '', model: '' }; + // Get defaults for any missing fields from externalAPIService const defaults = await this.externalAPIService.getAIConfig(); - + // Merge in this order: defaults -> current -> new config const mergedConfig = { ...defaults, @@ -687,15 +638,15 @@ export class AgentService implements IAgentService { modelParameters: { ...(defaults.modelParameters || {}), ...(currentConfig.modelParameters || {}), - ...(config.modelParameters || {}) - } + ...(config.modelParameters || {}), + }, }; - + // Store the updated config agent.aiConfig = JSON.stringify(mergedConfig); - + await repository.save(agent); - + logger.info(`Updated AI config for agent ${agentId}`); } catch (error) { logger.error(`Failed to update AI config for agent ${agentId}:`, error); @@ -710,26 +661,26 @@ export class AgentService implements IAgentService { try { const dataSource = await this.databaseService.getDatabase('agent-default'); const repository = dataSource.getRepository(TaskEntity); - - const task = await repository.findOne({ + + const task = await repository.findOne({ where: { id: taskId }, - relations: ['agent'] // Load the related agent + relations: ['agent'], // Load the related agent }); - + if (!task) { return undefined; } - + // First try task-specific config if (task.aiConfig) { return JSON.parse(task.aiConfig) as AISessionConfig; } - + // Then try agent-level config if (task.agent && task.agent.aiConfig) { return JSON.parse(task.agent.aiConfig) as AISessionConfig; } - + // Fall back to global defaults return undefined; } catch (error) { @@ -745,19 +696,19 @@ export class AgentService implements IAgentService { try { const dataSource = await this.databaseService.getDatabase('agent-default'); const repository = dataSource.getRepository(TaskEntity); - - const task = await repository.findOne({ + + const task = await repository.findOne({ where: { id: taskId }, - relations: ['agent'] // Load the related agent + relations: ['agent'], // Load the related agent }); - + if (!task) { throw new Error(`Task with ID ${taskId} not found`); } - + // Start with agent config or empty config let baseConfig: AISessionConfig = { provider: '', model: '' }; - + // Try to use agent config if available if (task.agent && task.agent.aiConfig) { baseConfig = JSON.parse(task.agent.aiConfig); @@ -765,13 +716,13 @@ export class AgentService implements IAgentService { // Otherwise get global defaults baseConfig = await this.externalAPIService.getAIConfig(); } - + // Get existing task config if any let currentConfig: Partial = {}; if (task.aiConfig) { currentConfig = JSON.parse(task.aiConfig); } - + // Merge in this order: base (agent or global) -> current task -> new config const mergedConfig = { ...baseConfig, @@ -781,14 +732,14 @@ export class AgentService implements IAgentService { modelParameters: { ...(baseConfig.modelParameters || {}), ...(currentConfig.modelParameters || {}), - ...(config.modelParameters || {}) - } + ...(config.modelParameters || {}), + }, }; - + task.aiConfig = JSON.stringify(mergedConfig); - + await repository.save(task); - + logger.info(`Updated AI config for task ${taskId}`); } catch (error) { logger.error(`Failed to update AI config for task ${taskId}:`, error); diff --git a/src/services/agent/interface.ts b/src/services/agent/interface.ts index db95b081..13cc5af3 100644 --- a/src/services/agent/interface.ts +++ b/src/services/agent/interface.ts @@ -49,7 +49,7 @@ export interface AgentTask extends Omit { /** Last update time (converted from ISO string) */ updatedAt: Date; /** Optional artifacts */ - artifacts?: schema.Artifact[]; + artifacts?: schema.Artifact[] | null; } /** diff --git a/src/services/agent/server/index.ts b/src/services/agent/server/index.ts index 68e5e4ea..3f74a80e 100644 --- a/src/services/agent/server/index.ts +++ b/src/services/agent/server/index.ts @@ -4,21 +4,21 @@ */ // Export the main server class and its options -export { A2AServer } from "./server"; -export type { A2AServerOptions } from "./server"; +export { A2AServer } from './server'; +export type { A2AServerOptions } from './server'; // Export handler-related types -export type { TaskHandler, TaskContext, TaskYieldUpdate } from "./handler"; +export type { TaskContext, TaskHandler, TaskYieldUpdate } from './handler'; // Export store-related types and implementations -export type { TaskStore } from "./store"; -export { InMemoryTaskStore, FileStore } from "./store"; +export type { TaskStore } from './store'; +export { InMemoryTaskStore } from './store'; // Export the custom error class -export { A2AError } from "./error"; +export { A2AError } from './error'; // Re-export all schema types for convenience -export * as schema from "./schema"; +export * as schema from './schema'; // Example basic usage (for documentation or testing) /* diff --git a/src/services/agent/server/server.ts b/src/services/agent/server/server.ts index 05d3d460..787eceb2 100644 --- a/src/services/agent/server/server.ts +++ b/src/services/agent/server/server.ts @@ -47,9 +47,9 @@ export class A2AServer { this.taskHandler = handler; this.taskStore = options.taskStore ?? new InMemoryTaskStore(); if (options.card) this.card = options.card; - + this.agentId = options.agentId || 'echo-agent'; - + console.log(`A2AServer initialized with agentId: ${this.agentId}`); } @@ -113,7 +113,7 @@ export class A2AServer { const existingArtifact = newTask.artifacts[existingIndex]; if (update.append) { // Create a deep copy for modification to avoid mutating original - const appendedArtifact = JSON.parse(JSON.stringify(existingArtifact)); + const appendedArtifact = JSON.parse(JSON.stringify(existingArtifact)) as schema.Artifact; appendedArtifact.parts.push(...update.parts); if (update.metadata) { appendedArtifact.metadata = { @@ -535,7 +535,7 @@ export class A2AServer { private async loadOrCreateTaskAndHistory( taskId: string, initialMessage: schema.Message, - _sessionIdParam?: string | null, // 忽略sessionId参数,不再使用 + _sessionIdParameter?: string | null, // 忽略sessionId参数,不再使用 metadata?: Record | null, // 允许为null ): Promise { let data = await this.taskStore.load(taskId); diff --git a/src/services/externalAPI/callProviderAPI.ts b/src/services/externalAPI/callProviderAPI.ts index a938f1c9..c3cc2aab 100644 --- a/src/services/externalAPI/callProviderAPI.ts +++ b/src/services/externalAPI/callProviderAPI.ts @@ -56,7 +56,7 @@ export function streamFromProvider( try { if (!providerConfig?.apiKey && providerConfig?.providerClass !== 'ollama') { // Ollama doesn't require API key - throw new Error(`API key for ${provider} not found`); + throw new Error(`API key for ${provider} not found!!!`); } const client = createProviderClient( diff --git a/src/services/externalAPI/index.ts b/src/services/externalAPI/index.ts index 82887435..9f81b12d 100644 --- a/src/services/externalAPI/index.ts +++ b/src/services/externalAPI/index.ts @@ -2,7 +2,8 @@ import { injectable } from 'inversify'; import { cloneDeep, mergeWith } from 'lodash'; import { nanoid } from 'nanoid'; -import { Observable } from 'rxjs'; +import { defer, from, Observable } from 'rxjs'; +import { filter, finalize, startWith } from 'rxjs/operators'; import { lazyInject } from '@services/container'; import { IDatabaseService } from '@services/database/interface'; @@ -25,6 +26,14 @@ const defaultProvidersConfig = { })), }; +/** + * Simplified request context + */ +interface AIRequestContext { + requestId: string; + controller: AbortController; +} + @injectable() export class ExternalAPIService implements IExternalAPIService { @lazyInject(serviceIdentifier.Database) @@ -185,104 +194,99 @@ export class ExternalAPIService implements IExternalAPIService { this.saveSettingsToDatabase(); } - streamFromAI(messages: Array | Array>, config: AISessionConfig): Observable { - return new Observable(observer => { - // Generate unique request ID internally - const requestId = nanoid(); + /** + * Prepare a new AI request with minimal initialization + */ + private prepareAIRequest(): AIRequestContext { + const requestId = nanoid(); + const controller = new AbortController(); - // Cancel existing request with same ID (shouldn't happen but as precaution) - if (this.activeRequests.has(requestId)) { - this.cancelAIRequest(requestId) - .catch(error => logger.error(`Error canceling previous request ${requestId}:`, error)); + this.activeRequests.set(requestId, controller); + + return { requestId, controller }; + } + + /** + * Clean up resources for an AI request + */ + private cleanupAIRequest(requestId: string): void { + this.activeRequests.delete(requestId); + } + + streamFromAI(messages: Array | Array>, config: AISessionConfig): Observable { + // Use defer to create a new observable stream for each subscription + return defer(() => { + // Prepare request context + const { requestId, controller } = this.prepareAIRequest(); + + // Get AsyncGenerator from generateFromAI and convert to Observable + return from(this.generateFromAI(messages, config)).pipe( + // Skip the first 'start' event since we'll emit our own + // to ensure it happens immediately (AsyncGenerator might delay it) + filter((response, index) => !(index === 0 && response.status === 'start')), + // Ensure we emit a start event immediately + startWith({ requestId, content: '', status: 'start' as const }), + // Ensure cleanup happens on completion, error, or unsubscribe + finalize(() => { + if (this.activeRequests.has(requestId)) { + controller.abort(); + this.cleanupAIRequest(requestId); + logger.debug(`[${requestId}] Cleaned up in streamFromAI finalize`); + } + }), + ); + }); + } + + async *generateFromAI( + messages: Array | Array>, + config: AISessionConfig, + ): AsyncGenerator { + // Prepare request with minimal context + const { requestId, controller } = this.prepareAIRequest(); + + try { + // Send start event + yield { requestId, content: '', status: 'start' }; + + // Get provider configuration + const providerConfig = await this.getProviderConfig(config.provider); + if (!providerConfig) { + yield { + requestId, + content: `Provider ${config.provider} not found or not configured`, + status: 'error', + }; + return; } - // Create controller for this request - const controller = new AbortController(); - this.activeRequests.set(requestId, controller); + // Create the stream + const result = streamFromProvider( + config, + messages, + controller.signal, + providerConfig, + ); - // Helper function to emit events via the observer - const emitEvent = (content: string, status: 'start' | 'update' | 'done' | 'error' | 'cancel') => { - if (status === 'done' && (!content || content.trim() === '')) { - content = '(No response, please check API settings and network connection)'; - } - observer.next({ requestId, content, status }); - }; + // Process the stream + let fullResponse = ''; - // Get provider configuration asynchronously - this.getProviderConfig(config.provider) - .then(providerConfig => { - // Emit start event - emitEvent('', 'start'); + // Iterate through stream chunks + for await (const chunk of result.textStream) { + // Accumulate response and yield updates + fullResponse += chunk; + yield { requestId, content: fullResponse, status: 'update' }; + } - const result = streamFromProvider( - config, - messages, - controller.signal, - providerConfig, - ); - - let fullResponse = ''; - let firstChunkReceived = false; - let timeoutId: NodeJS.Timeout | undefined; - - // Set timeout for initial response - const responseTimeout = new Promise((_, reject) => { - timeoutId = setTimeout(() => { - reject(new Error(`${config.provider} model ${config.model} response timeout`)); - }, 30000); - }); - - // Process stream - const processStream = async () => { - try { - await Promise.race([ - (async () => { - for await (const chunk of result.textStream) { - if (!firstChunkReceived) { - if (timeoutId) clearTimeout(timeoutId); - firstChunkReceived = true; - } - - fullResponse += chunk; - emitEvent(fullResponse, 'update'); - } - - // Complete - emitEvent(fullResponse, 'done'); - observer.complete(); - })(), - responseTimeout, - ]); - } catch (streamError) { - if (timeoutId) clearTimeout(timeoutId); - throw streamError; - } - }; - - // Start processing - processStream().catch(error => { - const errorMessage = `Error: ${error instanceof Error ? error.message : String(error)}`; - emitEvent(errorMessage, 'error'); - observer.error(error); - }).finally(() => { - this.activeRequests.delete(requestId); - }); - }) - .catch(error => { - const errorMessage = `Error: ${error instanceof Error ? error.message : String(error)}`; - emitEvent(errorMessage, 'error'); - observer.error(error); - this.activeRequests.delete(requestId); - }); - - // Return cleanup function - return () => { - if (this.activeRequests.has(requestId)) { - this.activeRequests.get(requestId)?.abort(); - this.activeRequests.delete(requestId); - } - }; - }); + // Stream completed + yield { requestId, content: fullResponse, status: 'done' }; + } catch (error) { + // Basic error handling + const errorMessage = error instanceof Error ? error.message : String(error); + yield { requestId, content: `Error: ${errorMessage}`, status: 'error' }; + } finally { + this.cleanupAIRequest(requestId); + } } async cancelAIRequest(requestId: string): Promise { diff --git a/src/services/externalAPI/interface.ts b/src/services/externalAPI/interface.ts index 950615fa..7f2ae889 100644 --- a/src/services/externalAPI/interface.ts +++ b/src/services/externalAPI/interface.ts @@ -91,6 +91,13 @@ export interface IExternalAPIService { */ streamFromAI(messages: Array | Array>, config: AISessionConfig): Observable; + /** + * Send messages to AI provider and get streaming response as an AsyncGenerator + * This is a more direct approach than Observable for certain use cases + * requestId will be automatically generated and returned in the AIStreamResponse + */ + generateFromAI(messages: Array | Array>, config: AISessionConfig): AsyncGenerator; + /** * Cancel an ongoing AI request */ @@ -132,5 +139,7 @@ export const ExternalAPIServiceIPCDescriptor = { getAIConfig: ProxyPropertyType.Function, updateProvider: ProxyPropertyType.Function, updateDefaultAIConfig: ProxyPropertyType.Function, + // generateFromAI is intentionally not exposed via IPC as AsyncGenerators + // aren't directly supported by electron-ipc-cat }, };