mirror of
https://github.com/tiddly-gittly/TidGi-Desktop.git
synced 2026-03-20 22:01:01 -07:00
refactor: use async generator
This commit is contained in:
parent
a00466e414
commit
f554e7229b
8 changed files with 293 additions and 221 deletions
108
src/services/agent/defaultAgents/echo.ts
Normal file
108
src/services/agent/defaultAgents/echo.ts
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
import { container } from '@services/container';
|
||||
import { IExternalAPIService } from '@services/externalAPI/interface';
|
||||
import serviceIdentifier from '@services/serviceIdentifier';
|
||||
import { TaskContext, TaskYieldUpdate } from '../server';
|
||||
import { TextPart } from '../server/schema';
|
||||
|
||||
export async function* echoHandler(context: TaskContext) {
|
||||
// Send working status first
|
||||
yield {
|
||||
state: 'working',
|
||||
message: {
|
||||
role: 'agent',
|
||||
parts: [{ text: 'Processing your message...' }],
|
||||
},
|
||||
} as TaskYieldUpdate;
|
||||
|
||||
// Get ai api service
|
||||
const externalAPIService = container.get<IExternalAPIService>(serviceIdentifier.ExternalAPI);
|
||||
|
||||
// Check if cancelled
|
||||
if (context.isCancelled()) {
|
||||
yield { state: 'canceled' } as TaskYieldUpdate;
|
||||
return;
|
||||
}
|
||||
|
||||
// Get user message text
|
||||
const userText = (context.userMessage.parts as TextPart[])
|
||||
.filter((part) => part.text)
|
||||
.map((part) => part.text)
|
||||
.join(' ');
|
||||
|
||||
// Echo user message
|
||||
yield {
|
||||
state: 'working',
|
||||
message: {
|
||||
role: 'agent',
|
||||
parts: [{ text: `You said: ${userText}\n\nGetting AI response...` }],
|
||||
},
|
||||
} as TaskYieldUpdate;
|
||||
|
||||
// Get AI configuration
|
||||
const aiConfig = await externalAPIService.getAIConfig();
|
||||
|
||||
// Use generateFromAI instead of streamFromAI with observable-to-async-generator
|
||||
let currentRequestId: string | null = null;
|
||||
|
||||
try {
|
||||
// Directly use the async generator
|
||||
for await (const response of externalAPIService.generateFromAI(
|
||||
[{ role: 'user', content: userText }],
|
||||
aiConfig
|
||||
)) {
|
||||
// Store requestId for cancellation
|
||||
if (!currentRequestId && response.requestId) {
|
||||
currentRequestId = response.requestId;
|
||||
}
|
||||
|
||||
// Check for cancellation
|
||||
if (context.isCancelled()) {
|
||||
// Cancel the current request if we have a requestId
|
||||
if (currentRequestId) {
|
||||
await externalAPIService.cancelAIRequest(currentRequestId);
|
||||
}
|
||||
yield { state: 'canceled' } as TaskYieldUpdate;
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle different response states
|
||||
if (response.status === 'update' || response.status === 'done') {
|
||||
// Update UI
|
||||
yield {
|
||||
state: response.status === 'done' ? 'completed' : 'working',
|
||||
message: {
|
||||
role: 'agent',
|
||||
parts: [{ text: `You said: ${userText}\n\nAI response: ${response.content}` }],
|
||||
},
|
||||
} as TaskYieldUpdate;
|
||||
} else if (response.status === 'error') {
|
||||
// Handle error case
|
||||
yield {
|
||||
state: 'completed',
|
||||
message: {
|
||||
role: 'agent',
|
||||
parts: [{ text: `You said: ${userText}\n\nError getting AI response: ${response.content}` }],
|
||||
},
|
||||
} as TaskYieldUpdate;
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
// Handle any unexpected errors
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
console.error('Error in echoHandler:', errorMessage);
|
||||
|
||||
yield {
|
||||
state: 'completed',
|
||||
message: {
|
||||
role: 'agent',
|
||||
parts: [{ text: `You said: ${userText}\n\nError processing AI response: ${errorMessage}` }],
|
||||
},
|
||||
} as TaskYieldUpdate;
|
||||
} finally {
|
||||
// Ensure request is cancelled if needed
|
||||
if (context.isCancelled() && currentRequestId) {
|
||||
await externalAPIService.cancelAIRequest(currentRequestId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,18 +1,16 @@
|
|||
/* eslint-disable @typescript-eslint/require-await */
|
||||
import { injectable } from 'inversify';
|
||||
import { nanoid } from 'nanoid';
|
||||
import { BehaviorSubject, Observable } from 'rxjs';
|
||||
|
||||
import { lazyInject } from '@services/container';
|
||||
import { IDatabaseService } from '@services/database/interface';
|
||||
import { AIMessage, AISessionConfig } from '@services/externalAPI/interface'; // 添加AISessionConfig导入
|
||||
import { AISessionConfig } from '@services/externalAPI/interface'; // 添加AISessionConfig导入
|
||||
import { IExternalAPIService } from '@services/externalAPI/interface';
|
||||
import { logger } from '@services/libs/log';
|
||||
import serviceIdentifier from '@services/serviceIdentifier';
|
||||
import { IWikiService } from '@services/wiki/interface';
|
||||
import { AgentEntity, TaskEntity } from '@services/database/schema/agent'; // 添加数据库实体导入
|
||||
import { echoHandler } from './defaultAgents/echo';
|
||||
import type { Agent, AgentServiceConfig, AgentTask, IAgentService } from './interface';
|
||||
import { TaskYieldUpdate } from './server';
|
||||
import { AgentHttpServer } from './server/http-server';
|
||||
import * as schema from './server/schema';
|
||||
import { A2AServer } from './server/server';
|
||||
|
|
@ -41,16 +39,15 @@ export class AgentService implements IAgentService {
|
|||
// 重命名流式更新订阅
|
||||
public taskUpdates$ = new BehaviorSubject<Record<string, AgentTask>>({});
|
||||
|
||||
|
||||
// Database initialization flag
|
||||
private databaseInitialized = false;
|
||||
|
||||
|
||||
/**
|
||||
* Ensure database is initialized
|
||||
*/
|
||||
private async ensureDatabaseInitialized(): Promise<void> {
|
||||
if (this.databaseInitialized) return;
|
||||
|
||||
|
||||
try {
|
||||
// Initialize the database
|
||||
await this.databaseService.initializeDatabase('agent-default');
|
||||
|
|
@ -64,13 +61,13 @@ export class AgentService implements IAgentService {
|
|||
|
||||
// Agent registration flag
|
||||
private agentsRegistered = false;
|
||||
|
||||
|
||||
/**
|
||||
* Ensure agents are registered
|
||||
*/
|
||||
private async ensureAgentsRegistered(): Promise<void> {
|
||||
if (this.agentsRegistered) return;
|
||||
|
||||
|
||||
try {
|
||||
// Register default agents
|
||||
await this.registerDefaultAgents();
|
||||
|
|
@ -88,10 +85,10 @@ export class AgentService implements IAgentService {
|
|||
try {
|
||||
// Ensure database is initialized
|
||||
await this.ensureDatabaseInitialized();
|
||||
|
||||
|
||||
// Get database connection for agent operations
|
||||
const dataSource = await this.databaseService.getDatabase('agent-default');
|
||||
|
||||
|
||||
// Register agent logic
|
||||
console.log('Registering default agents');
|
||||
|
||||
|
|
@ -101,7 +98,7 @@ export class AgentService implements IAgentService {
|
|||
name: 'Echo Agent',
|
||||
description: 'Simple echo agent that returns user messages',
|
||||
avatarUrl: 'https://example.com/echo-agent.png',
|
||||
handler: this.createEchoHandler(),
|
||||
handler: echoHandler,
|
||||
card: {
|
||||
name: 'Echo Agent',
|
||||
description: 'Simple echo agent',
|
||||
|
|
@ -122,13 +119,13 @@ export class AgentService implements IAgentService {
|
|||
|
||||
// Store agent in memory
|
||||
this.agents.set(echoAgent.id, echoAgent);
|
||||
|
||||
|
||||
// Store agent in database to satisfy foreign key constraints
|
||||
const agentRepository = dataSource.getRepository('agents');
|
||||
try {
|
||||
// Check if agent already exists in database
|
||||
const existingAgent = await agentRepository.findOne({ where: { id: echoAgent.id } });
|
||||
|
||||
|
||||
if (!existingAgent) {
|
||||
// Insert agent record into database
|
||||
await agentRepository.save({
|
||||
|
|
@ -136,17 +133,17 @@ export class AgentService implements IAgentService {
|
|||
name: echoAgent.name,
|
||||
description: echoAgent.description || null,
|
||||
avatarUrl: echoAgent.avatarUrl || null,
|
||||
card: echoAgent.card ? JSON.stringify(echoAgent.card) : null
|
||||
card: echoAgent.card ? JSON.stringify(echoAgent.card) : null,
|
||||
});
|
||||
console.log(`Inserted agent record into database: ${echoAgent.id}`);
|
||||
} else {
|
||||
console.log(`Agent record already exists in database: ${echoAgent.id}`);
|
||||
}
|
||||
} catch (dbError) {
|
||||
console.error(`Failed to store agent in database: ${echoAgent.id}`, dbError);
|
||||
} catch (databaseError) {
|
||||
console.error(`Failed to store agent in database: ${echoAgent.id}`, databaseError);
|
||||
// Continue anyway - the agent is in memory
|
||||
}
|
||||
|
||||
|
||||
// Create server instance
|
||||
await this.createAgentServer(echoAgent);
|
||||
|
||||
|
|
@ -157,52 +154,6 @@ export class AgentService implements IAgentService {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create echo handler
|
||||
*/
|
||||
private createEchoHandler() {
|
||||
return async function* echoHandler(context: any) {
|
||||
// DEBUG: console
|
||||
console.log(`echoHandler`);
|
||||
// Send working status first
|
||||
yield {
|
||||
state: 'working',
|
||||
message: {
|
||||
role: 'agent',
|
||||
parts: [{ text: 'Processing your message...' }],
|
||||
},
|
||||
} as TaskYieldUpdate;
|
||||
|
||||
// Wait a while to simulate processing
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
// DEBUG: console context
|
||||
console.log(`context`, context);
|
||||
// Check if cancelled
|
||||
if (context.isCancelled()) {
|
||||
yield { state: 'canceled' } as TaskYieldUpdate;
|
||||
return;
|
||||
}
|
||||
// DEBUG: console context.userMessage.parts
|
||||
console.log(`context.userMessage.parts`, context.userMessage.parts);
|
||||
// Get user message text
|
||||
const userText = context.userMessage.parts
|
||||
.filter((part: any) => part.text)
|
||||
.map((part: any) => part.text)
|
||||
.join(' ');
|
||||
// DEBUG: console userText
|
||||
console.log(`userText`, userText);
|
||||
|
||||
// Echo user message
|
||||
yield {
|
||||
state: 'completed',
|
||||
message: {
|
||||
role: 'agent',
|
||||
parts: [{ text: `You said: ${userText}` }],
|
||||
},
|
||||
} as TaskYieldUpdate;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a server instance for an agent
|
||||
*/
|
||||
|
|
@ -238,7 +189,7 @@ export class AgentService implements IAgentService {
|
|||
}
|
||||
|
||||
/**
|
||||
* Notify task update
|
||||
* Notify task update
|
||||
*/
|
||||
private notifyTaskUpdate(agentId: string, taskId: string, task: AgentTask | null): void {
|
||||
this.taskUpdates$.next({ [taskId]: task });
|
||||
|
|
@ -256,7 +207,7 @@ export class AgentService implements IAgentService {
|
|||
id: task.id,
|
||||
agentId,
|
||||
messages: history || [],
|
||||
status: task.status,
|
||||
status: task.status,
|
||||
metadata: task.metadata,
|
||||
createdAt: timestamp,
|
||||
updatedAt: timestamp,
|
||||
|
|
@ -316,7 +267,7 @@ export class AgentService implements IAgentService {
|
|||
|
||||
async getAgents(): Promise<Omit<Agent, 'handler'>[]> {
|
||||
await this.ensureAgentsRegistered();
|
||||
|
||||
|
||||
// Return list of Agent objects without handler, ensuring they can be transferred via IPC
|
||||
return Array.from(this.agents.values()).map(agent => ({
|
||||
id: agent.id,
|
||||
|
|
@ -330,7 +281,7 @@ export class AgentService implements IAgentService {
|
|||
|
||||
async getAgent(id: string): Promise<Agent | undefined> {
|
||||
await this.ensureAgentsRegistered();
|
||||
|
||||
|
||||
return this.agents.get(id);
|
||||
}
|
||||
|
||||
|
|
@ -339,7 +290,7 @@ export class AgentService implements IAgentService {
|
|||
*/
|
||||
async createTask(agentId: string): Promise<AgentTask> {
|
||||
await this.ensureAgentsRegistered();
|
||||
|
||||
|
||||
if (!this.agents.has(agentId)) {
|
||||
throw new Error(`Agent with ID ${agentId} not found`);
|
||||
}
|
||||
|
|
@ -372,7 +323,7 @@ export class AgentService implements IAgentService {
|
|||
*/
|
||||
async sendMessage(agentId: string, taskId: string, messageText: string): Promise<schema.JSONRPCResponse> {
|
||||
await this.ensureAgentsRegistered();
|
||||
|
||||
|
||||
// Get server instance
|
||||
const server = await this.getOrCreateAgentServer(agentId);
|
||||
|
||||
|
|
@ -487,7 +438,7 @@ export class AgentService implements IAgentService {
|
|||
*/
|
||||
private async getOrCreateAgentServer(agentId: string): Promise<A2AServer> {
|
||||
await this.ensureAgentsRegistered();
|
||||
|
||||
|
||||
if (!this.agents.has(agentId)) {
|
||||
throw new Error(`Agent with ID ${agentId} not found`);
|
||||
}
|
||||
|
|
@ -504,7 +455,7 @@ export class AgentService implements IAgentService {
|
|||
*/
|
||||
async getTask(taskId: string): Promise<AgentTask | undefined> {
|
||||
await this.ensureAgentsRegistered();
|
||||
|
||||
|
||||
// Traverse all agent servers to find matching task
|
||||
for (const [agentId, server] of this.agentServers.entries()) {
|
||||
try {
|
||||
|
|
@ -525,7 +476,7 @@ export class AgentService implements IAgentService {
|
|||
*/
|
||||
async getAgentTasks(agentId: string): Promise<AgentTask[]> {
|
||||
await this.ensureAgentsRegistered();
|
||||
|
||||
|
||||
try {
|
||||
const server = await this.getOrCreateAgentServer(agentId);
|
||||
|
||||
|
|
@ -555,7 +506,7 @@ export class AgentService implements IAgentService {
|
|||
*/
|
||||
async deleteTask(agentId: string, taskId: string): Promise<void> {
|
||||
await this.ensureAgentsRegistered();
|
||||
|
||||
|
||||
try {
|
||||
// Get server instance
|
||||
const server = await this.getOrCreateAgentServer(agentId);
|
||||
|
|
@ -584,7 +535,7 @@ export class AgentService implements IAgentService {
|
|||
|
||||
async startHttpServer(config: AgentServiceConfig): Promise<void> {
|
||||
await this.ensureAgentsRegistered();
|
||||
|
||||
|
||||
if (this.httpServer) {
|
||||
await this.stopHttpServer();
|
||||
}
|
||||
|
|
@ -610,7 +561,7 @@ export class AgentService implements IAgentService {
|
|||
*/
|
||||
public async getDefaultAgentId(): Promise<string | undefined> {
|
||||
const dataSource = await this.databaseService.getDatabase('agent-default');
|
||||
|
||||
|
||||
// Get agent with most recent activity
|
||||
const result = await dataSource.getRepository(TaskEntity)
|
||||
.createQueryBuilder('task')
|
||||
|
|
@ -618,18 +569,18 @@ export class AgentService implements IAgentService {
|
|||
.orderBy('task.updatedAt', 'DESC')
|
||||
.take(1)
|
||||
.getOne();
|
||||
|
||||
|
||||
if (result) {
|
||||
return result.agentId;
|
||||
}
|
||||
|
||||
|
||||
// If no tasks, get any available agent
|
||||
const agent = await dataSource.getRepository(AgentEntity)
|
||||
.createQueryBuilder('agent')
|
||||
.orderBy('agent.createdAt', 'DESC')
|
||||
.take(1)
|
||||
.getOne();
|
||||
|
||||
|
||||
return agent?.id;
|
||||
}
|
||||
|
||||
|
|
@ -640,14 +591,14 @@ export class AgentService implements IAgentService {
|
|||
try {
|
||||
const dataSource = await this.databaseService.getDatabase('agent-default');
|
||||
const repository = dataSource.getRepository(AgentEntity);
|
||||
|
||||
|
||||
const agent = await repository.findOne({ where: { id: agentId } });
|
||||
|
||||
|
||||
if (!agent || !agent.aiConfig) {
|
||||
// If no specific config exists, return undefined to use global defaults
|
||||
return undefined;
|
||||
}
|
||||
|
||||
|
||||
// Parse stored JSON config
|
||||
return JSON.parse(agent.aiConfig) as AISessionConfig;
|
||||
} catch (error) {
|
||||
|
|
@ -663,21 +614,21 @@ export class AgentService implements IAgentService {
|
|||
try {
|
||||
const dataSource = await this.databaseService.getDatabase('agent-default');
|
||||
const repository = dataSource.getRepository(AgentEntity);
|
||||
|
||||
|
||||
const agent = await repository.findOne({ where: { id: agentId } });
|
||||
|
||||
|
||||
if (!agent) {
|
||||
throw new Error(`Agent with ID ${agentId} not found`);
|
||||
}
|
||||
|
||||
|
||||
// Merge with existing config if it exists
|
||||
let currentConfig: AISessionConfig = agent.aiConfig ?
|
||||
JSON.parse(agent.aiConfig) :
|
||||
{ provider: '', model: '' };
|
||||
|
||||
const currentConfig: AISessionConfig = agent.aiConfig
|
||||
? JSON.parse(agent.aiConfig)
|
||||
: { provider: '', model: '' };
|
||||
|
||||
// Get defaults for any missing fields from externalAPIService
|
||||
const defaults = await this.externalAPIService.getAIConfig();
|
||||
|
||||
|
||||
// Merge in this order: defaults -> current -> new config
|
||||
const mergedConfig = {
|
||||
...defaults,
|
||||
|
|
@ -687,15 +638,15 @@ export class AgentService implements IAgentService {
|
|||
modelParameters: {
|
||||
...(defaults.modelParameters || {}),
|
||||
...(currentConfig.modelParameters || {}),
|
||||
...(config.modelParameters || {})
|
||||
}
|
||||
...(config.modelParameters || {}),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
// Store the updated config
|
||||
agent.aiConfig = JSON.stringify(mergedConfig);
|
||||
|
||||
|
||||
await repository.save(agent);
|
||||
|
||||
|
||||
logger.info(`Updated AI config for agent ${agentId}`);
|
||||
} catch (error) {
|
||||
logger.error(`Failed to update AI config for agent ${agentId}:`, error);
|
||||
|
|
@ -710,26 +661,26 @@ export class AgentService implements IAgentService {
|
|||
try {
|
||||
const dataSource = await this.databaseService.getDatabase('agent-default');
|
||||
const repository = dataSource.getRepository(TaskEntity);
|
||||
|
||||
const task = await repository.findOne({
|
||||
|
||||
const task = await repository.findOne({
|
||||
where: { id: taskId },
|
||||
relations: ['agent'] // Load the related agent
|
||||
relations: ['agent'], // Load the related agent
|
||||
});
|
||||
|
||||
|
||||
if (!task) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
|
||||
// First try task-specific config
|
||||
if (task.aiConfig) {
|
||||
return JSON.parse(task.aiConfig) as AISessionConfig;
|
||||
}
|
||||
|
||||
|
||||
// Then try agent-level config
|
||||
if (task.agent && task.agent.aiConfig) {
|
||||
return JSON.parse(task.agent.aiConfig) as AISessionConfig;
|
||||
}
|
||||
|
||||
|
||||
// Fall back to global defaults
|
||||
return undefined;
|
||||
} catch (error) {
|
||||
|
|
@ -745,19 +696,19 @@ export class AgentService implements IAgentService {
|
|||
try {
|
||||
const dataSource = await this.databaseService.getDatabase('agent-default');
|
||||
const repository = dataSource.getRepository(TaskEntity);
|
||||
|
||||
const task = await repository.findOne({
|
||||
|
||||
const task = await repository.findOne({
|
||||
where: { id: taskId },
|
||||
relations: ['agent'] // Load the related agent
|
||||
relations: ['agent'], // Load the related agent
|
||||
});
|
||||
|
||||
|
||||
if (!task) {
|
||||
throw new Error(`Task with ID ${taskId} not found`);
|
||||
}
|
||||
|
||||
|
||||
// Start with agent config or empty config
|
||||
let baseConfig: AISessionConfig = { provider: '', model: '' };
|
||||
|
||||
|
||||
// Try to use agent config if available
|
||||
if (task.agent && task.agent.aiConfig) {
|
||||
baseConfig = JSON.parse(task.agent.aiConfig);
|
||||
|
|
@ -765,13 +716,13 @@ export class AgentService implements IAgentService {
|
|||
// Otherwise get global defaults
|
||||
baseConfig = await this.externalAPIService.getAIConfig();
|
||||
}
|
||||
|
||||
|
||||
// Get existing task config if any
|
||||
let currentConfig: Partial<AISessionConfig> = {};
|
||||
if (task.aiConfig) {
|
||||
currentConfig = JSON.parse(task.aiConfig);
|
||||
}
|
||||
|
||||
|
||||
// Merge in this order: base (agent or global) -> current task -> new config
|
||||
const mergedConfig = {
|
||||
...baseConfig,
|
||||
|
|
@ -781,14 +732,14 @@ export class AgentService implements IAgentService {
|
|||
modelParameters: {
|
||||
...(baseConfig.modelParameters || {}),
|
||||
...(currentConfig.modelParameters || {}),
|
||||
...(config.modelParameters || {})
|
||||
}
|
||||
...(config.modelParameters || {}),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
task.aiConfig = JSON.stringify(mergedConfig);
|
||||
|
||||
|
||||
await repository.save(task);
|
||||
|
||||
|
||||
logger.info(`Updated AI config for task ${taskId}`);
|
||||
} catch (error) {
|
||||
logger.error(`Failed to update AI config for task ${taskId}:`, error);
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ export interface AgentTask extends Omit<schema.Task, 'artifacts'> {
|
|||
/** Last update time (converted from ISO string) */
|
||||
updatedAt: Date;
|
||||
/** Optional artifacts */
|
||||
artifacts?: schema.Artifact[];
|
||||
artifacts?: schema.Artifact[] | null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -4,21 +4,21 @@
|
|||
*/
|
||||
|
||||
// Export the main server class and its options
|
||||
export { A2AServer } from "./server";
|
||||
export type { A2AServerOptions } from "./server";
|
||||
export { A2AServer } from './server';
|
||||
export type { A2AServerOptions } from './server';
|
||||
|
||||
// Export handler-related types
|
||||
export type { TaskHandler, TaskContext, TaskYieldUpdate } from "./handler";
|
||||
export type { TaskContext, TaskHandler, TaskYieldUpdate } from './handler';
|
||||
|
||||
// Export store-related types and implementations
|
||||
export type { TaskStore } from "./store";
|
||||
export { InMemoryTaskStore, FileStore } from "./store";
|
||||
export type { TaskStore } from './store';
|
||||
export { InMemoryTaskStore } from './store';
|
||||
|
||||
// Export the custom error class
|
||||
export { A2AError } from "./error";
|
||||
export { A2AError } from './error';
|
||||
|
||||
// Re-export all schema types for convenience
|
||||
export * as schema from "./schema";
|
||||
export * as schema from './schema';
|
||||
|
||||
// Example basic usage (for documentation or testing)
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -47,9 +47,9 @@ export class A2AServer {
|
|||
this.taskHandler = handler;
|
||||
this.taskStore = options.taskStore ?? new InMemoryTaskStore();
|
||||
if (options.card) this.card = options.card;
|
||||
|
||||
|
||||
this.agentId = options.agentId || 'echo-agent';
|
||||
|
||||
|
||||
console.log(`A2AServer initialized with agentId: ${this.agentId}`);
|
||||
}
|
||||
|
||||
|
|
@ -113,7 +113,7 @@ export class A2AServer {
|
|||
const existingArtifact = newTask.artifacts[existingIndex];
|
||||
if (update.append) {
|
||||
// Create a deep copy for modification to avoid mutating original
|
||||
const appendedArtifact = JSON.parse(JSON.stringify(existingArtifact));
|
||||
const appendedArtifact = JSON.parse(JSON.stringify(existingArtifact)) as schema.Artifact;
|
||||
appendedArtifact.parts.push(...update.parts);
|
||||
if (update.metadata) {
|
||||
appendedArtifact.metadata = {
|
||||
|
|
@ -535,7 +535,7 @@ export class A2AServer {
|
|||
private async loadOrCreateTaskAndHistory(
|
||||
taskId: string,
|
||||
initialMessage: schema.Message,
|
||||
_sessionIdParam?: string | null, // 忽略sessionId参数,不再使用
|
||||
_sessionIdParameter?: string | null, // 忽略sessionId参数,不再使用
|
||||
metadata?: Record<string, unknown> | null, // 允许为null
|
||||
): Promise<TaskAndHistory> {
|
||||
let data = await this.taskStore.load(taskId);
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ export function streamFromProvider(
|
|||
try {
|
||||
if (!providerConfig?.apiKey && providerConfig?.providerClass !== 'ollama') {
|
||||
// Ollama doesn't require API key
|
||||
throw new Error(`API key for ${provider} not found`);
|
||||
throw new Error(`API key for ${provider} not found!!!`);
|
||||
}
|
||||
|
||||
const client = createProviderClient(
|
||||
|
|
|
|||
|
|
@ -2,7 +2,8 @@
|
|||
import { injectable } from 'inversify';
|
||||
import { cloneDeep, mergeWith } from 'lodash';
|
||||
import { nanoid } from 'nanoid';
|
||||
import { Observable } from 'rxjs';
|
||||
import { defer, from, Observable } from 'rxjs';
|
||||
import { filter, finalize, startWith } from 'rxjs/operators';
|
||||
|
||||
import { lazyInject } from '@services/container';
|
||||
import { IDatabaseService } from '@services/database/interface';
|
||||
|
|
@ -25,6 +26,14 @@ const defaultProvidersConfig = {
|
|||
})),
|
||||
};
|
||||
|
||||
/**
|
||||
* Simplified request context
|
||||
*/
|
||||
interface AIRequestContext {
|
||||
requestId: string;
|
||||
controller: AbortController;
|
||||
}
|
||||
|
||||
@injectable()
|
||||
export class ExternalAPIService implements IExternalAPIService {
|
||||
@lazyInject(serviceIdentifier.Database)
|
||||
|
|
@ -185,104 +194,99 @@ export class ExternalAPIService implements IExternalAPIService {
|
|||
this.saveSettingsToDatabase();
|
||||
}
|
||||
|
||||
streamFromAI(messages: Array<CoreMessage> | Array<Omit<Message, 'id'>>, config: AISessionConfig): Observable<AIStreamResponse> {
|
||||
return new Observable<AIStreamResponse>(observer => {
|
||||
// Generate unique request ID internally
|
||||
const requestId = nanoid();
|
||||
/**
|
||||
* Prepare a new AI request with minimal initialization
|
||||
*/
|
||||
private prepareAIRequest(): AIRequestContext {
|
||||
const requestId = nanoid();
|
||||
const controller = new AbortController();
|
||||
|
||||
// Cancel existing request with same ID (shouldn't happen but as precaution)
|
||||
if (this.activeRequests.has(requestId)) {
|
||||
this.cancelAIRequest(requestId)
|
||||
.catch(error => logger.error(`Error canceling previous request ${requestId}:`, error));
|
||||
this.activeRequests.set(requestId, controller);
|
||||
|
||||
return { requestId, controller };
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up resources for an AI request
|
||||
*/
|
||||
private cleanupAIRequest(requestId: string): void {
|
||||
this.activeRequests.delete(requestId);
|
||||
}
|
||||
|
||||
streamFromAI(messages: Array<CoreMessage> | Array<Omit<Message, 'id'>>, config: AISessionConfig): Observable<AIStreamResponse> {
|
||||
// Use defer to create a new observable stream for each subscription
|
||||
return defer(() => {
|
||||
// Prepare request context
|
||||
const { requestId, controller } = this.prepareAIRequest();
|
||||
|
||||
// Get AsyncGenerator from generateFromAI and convert to Observable
|
||||
return from(this.generateFromAI(messages, config)).pipe(
|
||||
// Skip the first 'start' event since we'll emit our own
|
||||
// to ensure it happens immediately (AsyncGenerator might delay it)
|
||||
filter((response, index) => !(index === 0 && response.status === 'start')),
|
||||
// Ensure we emit a start event immediately
|
||||
startWith({ requestId, content: '', status: 'start' as const }),
|
||||
// Ensure cleanup happens on completion, error, or unsubscribe
|
||||
finalize(() => {
|
||||
if (this.activeRequests.has(requestId)) {
|
||||
controller.abort();
|
||||
this.cleanupAIRequest(requestId);
|
||||
logger.debug(`[${requestId}] Cleaned up in streamFromAI finalize`);
|
||||
}
|
||||
}),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
async *generateFromAI(
|
||||
messages: Array<CoreMessage> | Array<Omit<Message, 'id'>>,
|
||||
config: AISessionConfig,
|
||||
): AsyncGenerator<AIStreamResponse, void, unknown> {
|
||||
// Prepare request with minimal context
|
||||
const { requestId, controller } = this.prepareAIRequest();
|
||||
|
||||
try {
|
||||
// Send start event
|
||||
yield { requestId, content: '', status: 'start' };
|
||||
|
||||
// Get provider configuration
|
||||
const providerConfig = await this.getProviderConfig(config.provider);
|
||||
if (!providerConfig) {
|
||||
yield {
|
||||
requestId,
|
||||
content: `Provider ${config.provider} not found or not configured`,
|
||||
status: 'error',
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
// Create controller for this request
|
||||
const controller = new AbortController();
|
||||
this.activeRequests.set(requestId, controller);
|
||||
// Create the stream
|
||||
const result = streamFromProvider(
|
||||
config,
|
||||
messages,
|
||||
controller.signal,
|
||||
providerConfig,
|
||||
);
|
||||
|
||||
// Helper function to emit events via the observer
|
||||
const emitEvent = (content: string, status: 'start' | 'update' | 'done' | 'error' | 'cancel') => {
|
||||
if (status === 'done' && (!content || content.trim() === '')) {
|
||||
content = '(No response, please check API settings and network connection)';
|
||||
}
|
||||
observer.next({ requestId, content, status });
|
||||
};
|
||||
// Process the stream
|
||||
let fullResponse = '';
|
||||
|
||||
// Get provider configuration asynchronously
|
||||
this.getProviderConfig(config.provider)
|
||||
.then(providerConfig => {
|
||||
// Emit start event
|
||||
emitEvent('', 'start');
|
||||
// Iterate through stream chunks
|
||||
for await (const chunk of result.textStream) {
|
||||
// Accumulate response and yield updates
|
||||
fullResponse += chunk;
|
||||
yield { requestId, content: fullResponse, status: 'update' };
|
||||
}
|
||||
|
||||
const result = streamFromProvider(
|
||||
config,
|
||||
messages,
|
||||
controller.signal,
|
||||
providerConfig,
|
||||
);
|
||||
|
||||
let fullResponse = '';
|
||||
let firstChunkReceived = false;
|
||||
let timeoutId: NodeJS.Timeout | undefined;
|
||||
|
||||
// Set timeout for initial response
|
||||
const responseTimeout = new Promise<never>((_, reject) => {
|
||||
timeoutId = setTimeout(() => {
|
||||
reject(new Error(`${config.provider} model ${config.model} response timeout`));
|
||||
}, 30000);
|
||||
});
|
||||
|
||||
// Process stream
|
||||
const processStream = async () => {
|
||||
try {
|
||||
await Promise.race([
|
||||
(async () => {
|
||||
for await (const chunk of result.textStream) {
|
||||
if (!firstChunkReceived) {
|
||||
if (timeoutId) clearTimeout(timeoutId);
|
||||
firstChunkReceived = true;
|
||||
}
|
||||
|
||||
fullResponse += chunk;
|
||||
emitEvent(fullResponse, 'update');
|
||||
}
|
||||
|
||||
// Complete
|
||||
emitEvent(fullResponse, 'done');
|
||||
observer.complete();
|
||||
})(),
|
||||
responseTimeout,
|
||||
]);
|
||||
} catch (streamError) {
|
||||
if (timeoutId) clearTimeout(timeoutId);
|
||||
throw streamError;
|
||||
}
|
||||
};
|
||||
|
||||
// Start processing
|
||||
processStream().catch(error => {
|
||||
const errorMessage = `Error: ${error instanceof Error ? error.message : String(error)}`;
|
||||
emitEvent(errorMessage, 'error');
|
||||
observer.error(error);
|
||||
}).finally(() => {
|
||||
this.activeRequests.delete(requestId);
|
||||
});
|
||||
})
|
||||
.catch(error => {
|
||||
const errorMessage = `Error: ${error instanceof Error ? error.message : String(error)}`;
|
||||
emitEvent(errorMessage, 'error');
|
||||
observer.error(error);
|
||||
this.activeRequests.delete(requestId);
|
||||
});
|
||||
|
||||
// Return cleanup function
|
||||
return () => {
|
||||
if (this.activeRequests.has(requestId)) {
|
||||
this.activeRequests.get(requestId)?.abort();
|
||||
this.activeRequests.delete(requestId);
|
||||
}
|
||||
};
|
||||
});
|
||||
// Stream completed
|
||||
yield { requestId, content: fullResponse, status: 'done' };
|
||||
} catch (error) {
|
||||
// Basic error handling
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
yield { requestId, content: `Error: ${errorMessage}`, status: 'error' };
|
||||
} finally {
|
||||
this.cleanupAIRequest(requestId);
|
||||
}
|
||||
}
|
||||
|
||||
async cancelAIRequest(requestId: string): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -91,6 +91,13 @@ export interface IExternalAPIService {
|
|||
*/
|
||||
streamFromAI(messages: Array<CoreMessage> | Array<Omit<Message, 'id'>>, config: AISessionConfig): Observable<AIStreamResponse>;
|
||||
|
||||
/**
|
||||
* Send messages to AI provider and get streaming response as an AsyncGenerator
|
||||
* This is a more direct approach than Observable for certain use cases
|
||||
* requestId will be automatically generated and returned in the AIStreamResponse
|
||||
*/
|
||||
generateFromAI(messages: Array<CoreMessage> | Array<Omit<Message, 'id'>>, config: AISessionConfig): AsyncGenerator<AIStreamResponse, void, unknown>;
|
||||
|
||||
/**
|
||||
* Cancel an ongoing AI request
|
||||
*/
|
||||
|
|
@ -132,5 +139,7 @@ export const ExternalAPIServiceIPCDescriptor = {
|
|||
getAIConfig: ProxyPropertyType.Function,
|
||||
updateProvider: ProxyPropertyType.Function,
|
||||
updateDefaultAIConfig: ProxyPropertyType.Function,
|
||||
// generateFromAI is intentionally not exposed via IPC as AsyncGenerators
|
||||
// aren't directly supported by electron-ipc-cat
|
||||
},
|
||||
};
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue