refactor: rename a2a server's "task" to session for clearity and save to sqlite

This commit is contained in:
lin onetwo 2025-04-15 22:33:06 +08:00
parent af440f3612
commit f1ad206b66
10 changed files with 785 additions and 593 deletions

View file

@ -124,7 +124,7 @@ export const useAgentStore = create<AgentViewModelStoreState>((set, get) => ({
});
},
// 创建新会话 - 确保智能体已选择
// 创建新会话 - 确保使用正确的智能体ID
createNewSession: async () => {
let { selectedAgentId, availableAgents } = get();
// 如果没有选择的智能体,尝试加载可用智能体
@ -150,6 +150,7 @@ export const useAgentStore = create<AgentViewModelStoreState>((set, get) => ({
set(state => ({ creatingSession: true }));
// 调用服务器创建会话并等待完成
// 确保使用id属性而不是name属性确保与数据库中的记录匹配
const createdSession = await window.service.agent.createSession(selectedAgentId);
// 更新前端状态

View file

@ -35,17 +35,62 @@ export class AgentService implements IAgentService {
// Session updates stream - only sends updated sessions
public sessionUpdates$ = new BehaviorSubject<Record<string, AgentSession>>({});
// Remove database initialization and agent registration from constructor
constructor() {
// Initialize agents
this.registerDefaultAgents();
// Do not initialize in constructor
}
// Database initialization flag
private databaseInitialized = false;
/**
* Ensure database is initialized
*/
private async ensureDatabaseInitialized(): Promise<void> {
if (this.databaseInitialized) return;
try {
// Initialize the database
await this.databaseService.initializeDatabase('agent-default');
logger.info('Agent database initialized');
this.databaseInitialized = true;
} catch (error) {
logger.error('Failed to initialize agent database:', error);
throw error;
}
}
// Agent registration flag
private agentsRegistered = false;
/**
* Ensure agents are registered
*/
private async ensureAgentsRegistered(): Promise<void> {
if (this.agentsRegistered) return;
try {
// Register default agents
await this.registerDefaultAgents();
this.agentsRegistered = true;
} catch (error) {
logger.error('Failed to register default agents:', error);
throw error;
}
}
/**
* Register default agents
*/
private registerDefaultAgents(): void {
private async registerDefaultAgents(): Promise<void> {
try {
// Print registration info
// Ensure database is initialized
await this.ensureDatabaseInitialized();
// Get database connection for agent operations
const dataSource = await this.databaseService.getDatabase('agent-default');
// Register agent logic
console.log('Registering default agents');
// Example: Register a simple echo agent
@ -73,12 +118,40 @@ export class AgentService implements IAgentService {
},
};
// Store agent in memory
this.agents.set(echoAgent.id, echoAgent);
void this.createAgentServer(echoAgent);
// Store agent in database to satisfy foreign key constraints
const agentRepository = dataSource.getRepository('agents');
try {
// Check if agent already exists in database
const existingAgent = await agentRepository.findOne({ where: { id: echoAgent.id } });
if (!existingAgent) {
// Insert agent record into database
await agentRepository.save({
id: echoAgent.id,
name: echoAgent.name,
description: echoAgent.description || null,
avatarUrl: echoAgent.avatarUrl || null,
card: echoAgent.card ? JSON.stringify(echoAgent.card) : null
});
console.log(`Inserted agent record into database: ${echoAgent.id}`);
} else {
console.log(`Agent record already exists in database: ${echoAgent.id}`);
}
} catch (dbError) {
console.error(`Failed to store agent in database: ${echoAgent.id}`, dbError);
// Continue anyway - the agent is in memory
}
// Create server instance
await this.createAgentServer(echoAgent);
console.log('Registered default agent:', echoAgent.id);
} catch (error) {
console.error('Error registering default agents:', error);
throw error;
}
}
@ -89,7 +162,7 @@ export class AgentService implements IAgentService {
return async function* echoHandler(context: any) {
// DEBUG: console
console.log(`echoHandler`);
// 先发送工作中状态
// Send working status first
yield {
state: 'working',
message: {
@ -98,18 +171,18 @@ export class AgentService implements IAgentService {
},
} as TaskYieldUpdate;
// 等待一会儿模拟处理
// Wait a while to simulate processing
await new Promise(resolve => setTimeout(resolve, 1000));
// DEBUG: console context
console.log(`context`, context);
// 检查是否取消
// Check if cancelled
if (context.isCancelled()) {
yield { state: 'canceled' } as TaskYieldUpdate;
return;
}
// DEBUG: console context.userMessage.parts
console.log(`context.userMessage.parts`, context.userMessage.parts);
// 获取用户消息文本
// Get user message text
const userText = context.userMessage.parts
.filter((part: any) => part.text)
.map((part: any) => part.text)
@ -117,7 +190,7 @@ export class AgentService implements IAgentService {
// DEBUG: console userText
console.log(`userText`, userText);
// 回显用户消息
// Echo user message
yield {
state: 'completed',
message: {
@ -148,6 +221,7 @@ export class AgentService implements IAgentService {
const server = new A2AServer(agent.handler, {
taskStore,
card: agent.card,
agentId: agent.id, // 添加这一行传递正确的agent.id而不是使用card.name
});
// Store server instance
@ -166,18 +240,18 @@ export class AgentService implements IAgentService {
*/
private notifySessionUpdate(agentId: string, sessionId: string, session: AgentSession | null): void {
if (session === null) {
// 发送删除会话通知
// Send delete session notification
this.sessionUpdates$.next({ [sessionId]: null as any });
} else {
// 发送会话更新通知
// Send session update notification
this.sessionUpdates$.next({ [sessionId]: session });
}
}
/**
* Convert A2A task to session object
* Convert A2A session to UI session object
*/
private convertTaskToSession(agentId: string, task: schema.Task, history: schema.Message[]): AgentSession {
private convertSessionToAgentSession(agentId: string, task: schema.Task, history: schema.Message[]): AgentSession {
const timestamp = task.status.timestamp
? new Date(task.status.timestamp)
: new Date();
@ -186,28 +260,28 @@ export class AgentService implements IAgentService {
id: task.id,
agentId,
messages: history || [],
currentTaskId: task.id,
currentSessionId: task.id, // Renamed from currentTaskId
createdAt: timestamp,
updatedAt: timestamp,
};
}
/**
* Get task and its history
* Get session and its history
*/
private async getTaskWithHistory(agentId: string, taskId: string): Promise<{ task: schema.Task; history: schema.Message[] } | null> {
private async getSessionWithHistory(agentId: string, sessionId: string): Promise<{ task: schema.Task; history: schema.Message[] } | null> {
try {
const server = await this.getOrCreateAgentServer(agentId);
// 请求获取任务
const getTaskRequest: schema.GetTaskRequest = {
// Request to get session
const getSessionRequest: schema.GetTaskRequest = {
jsonrpc: '2.0',
id: nanoid(),
method: 'tasks/get',
params: { id: taskId },
params: { id: sessionId },
};
const response = await server.handleRequest(getTaskRequest);
const response = await server.handleRequest(getSessionRequest);
if (response.error || !response.result) {
return null;
@ -215,20 +289,20 @@ export class AgentService implements IAgentService {
const task = response.result as schema.Task;
// 获取历史记录 - 使用服务器的方法获取完整历史
// Get history - use server method to get full history
let history: schema.Message[] = [];
try {
// 直接从A2A服务器获取
history = await server.getTaskHistory(taskId);
// Get directly from A2A server
history = await server.getSessionHistory(sessionId);
// 如果历史为空但有当前消息,确保至少包含当前消息
// If history is empty but there is a current message, ensure it is included
if (history.length === 0 && task.status.message) {
history.push(task.status.message);
}
} catch (historyError) {
console.error(`Failed to get history for task ${taskId}:`, historyError);
// 回退方案:至少捕获当前状态消息
console.error(`Failed to get history for session ${sessionId}:`, historyError);
// Fallback: at least capture current status message
if (task.status.message) {
history.push(task.status.message);
}
@ -236,7 +310,7 @@ export class AgentService implements IAgentService {
return { task, history };
} catch (error) {
console.error(`Failed to get task ${taskId} for agent ${agentId}:`, error);
console.error(`Failed to get session ${sessionId} for agent ${agentId}:`, error);
return null;
}
}
@ -244,18 +318,22 @@ export class AgentService implements IAgentService {
// Implement IAgentService interface methods
async getAgents(): Promise<Omit<Agent, 'handler'>[]> {
// 返回不含handler的Agent对象列表确保可以通过IPC传输
await this.ensureAgentsRegistered();
// Return list of Agent objects without handler, ensuring they can be transferred via IPC
return Array.from(this.agents.values()).map(agent => ({
id: agent.id,
name: agent.name,
description: agent.description,
avatarUrl: agent.avatarUrl,
card: agent.card,
// 不包含handler属性因为函数不能通过IPC传输
// Do not include handler property, as functions cannot be transferred via IPC
}));
}
async getAgent(id: string): Promise<Agent | undefined> {
await this.ensureAgentsRegistered();
return this.agents.get(id);
}
@ -263,14 +341,16 @@ export class AgentService implements IAgentService {
* Create new session
*/
async createSession(agentId: string): Promise<AgentSession> {
await this.ensureAgentsRegistered();
if (!this.agents.has(agentId)) {
throw new Error(`Agent with ID ${agentId} not found`);
}
// 生成会话ID同时也是任务ID
// Generate session ID (also task ID)
const sessionId = nanoid();
// 创建一个空会话对象
// Create an empty session object
const now = new Date();
const session: AgentSession = {
id: sessionId,
@ -281,7 +361,7 @@ export class AgentService implements IAgentService {
updatedAt: now,
};
// 更新缓存并通知
// Update cache and notify
this.notifySessionUpdate(agentId, sessionId, session);
return session;
@ -291,34 +371,36 @@ export class AgentService implements IAgentService {
* Send message to session
*/
async sendMessage(agentId: string, sessionId: string, messageText: string): Promise<schema.JSONRPCResponse> {
await this.ensureAgentsRegistered();
// Get server instance
const server = await this.getOrCreateAgentServer(agentId);
// 创建消息对象
// Create message object
const message: schema.Message = {
role: 'user',
parts: [{ text: messageText }],
};
// 构建A2A请求
// Build A2A request
const request: schema.SendTaskRequest = {
jsonrpc: '2.0',
id: nanoid(),
method: 'tasks/send',
params: {
id: sessionId, // 会话ID即任务ID
id: sessionId, // Session ID is task ID
message,
},
};
// 发送请求
// Send request
const response = await server.handleRequest(request);
// 请求成功时,获取最新会话状态并更新
// On successful request, get latest session status and update
if (response.result && !response.error) {
const taskData = await this.getTaskWithHistory(agentId, sessionId);
const taskData = await this.getSessionWithHistory(agentId, sessionId);
if (taskData) {
const session = this.convertTaskToSession(agentId, taskData.task, taskData.history);
const session = this.convertSessionToAgentSession(agentId, taskData.task, taskData.history);
this.notifySessionUpdate(agentId, sessionId, session);
}
}
@ -335,50 +417,50 @@ export class AgentService implements IAgentService {
// Get server instance asynchronously
this.getOrCreateAgentServer(agentId)
.then(server => {
// 创建消息对象
// Create message object
const message: schema.Message = {
role: 'user',
parts: [{ text: messageText }],
};
// 构建A2A流式请求
// Build A2A streaming request
const request: schema.SendTaskStreamingRequest = {
jsonrpc: '2.0',
id: nanoid(),
method: 'tasks/sendSubscribe',
params: {
id: sessionId, // 会话ID即任务ID
id: sessionId, // Session ID is task ID
message,
},
};
// 调用服务器的流式处理方法
// Call server's streaming handling method
const eventEmitter = server.handleStreamingRequest(request);
// 订阅事件流
// Subscribe to event stream
eventEmitter.on('update', async (event: schema.TaskStatusUpdateEvent | schema.TaskArtifactUpdateEvent) => {
console.log(`[Agent Service] Received event:`, event); // 添加日志
console.log(`[Agent Service] Received event:`, event); // Add log
subscriber.next(event);
// 如果更新包含消息,刷新会话
// If update contains message, refresh session
if ('status' in event && event.status.message) {
console.log(`[Agent Service] Event contains message:`, event.status.message); // 添加日志
const taskData = await this.getTaskWithHistory(agentId, sessionId);
if (taskData) {
const session = this.convertTaskToSession(agentId, taskData.task, taskData.history);
console.log(`[Agent Service] Updated session:`, session); // 添加日志
console.log(`[Agent Service] Event contains message:`, event.status.message); // Add log
const sessionData = await this.getSessionWithHistory(agentId, sessionId);
if (sessionData) {
const session = this.convertSessionToAgentSession(agentId, sessionData.task, sessionData.history);
console.log(`[Agent Service] Updated session:`, session); // Add log
this.notifySessionUpdate(agentId, sessionId, session);
}
}
// 如果是最终事件,完成流
// If final event, complete stream
if (event.final) {
console.log(`[Agent Service] Final event received`); // 添加日志
// 一次性获取完整会话状态
const taskData = await this.getTaskWithHistory(agentId, sessionId);
if (taskData) {
console.log(`[Agent Service] Final session history:`, taskData.history); // 添加日志
const session = this.convertTaskToSession(agentId, taskData.task, taskData.history);
console.log(`[Agent Service] Final event received`); // Add log
// Get complete session status once
const sessionData = await this.getSessionWithHistory(agentId, sessionId);
if (sessionData) {
console.log(`[Agent Service] Final session history:`, sessionData.history); // Add log
const session = this.convertSessionToAgentSession(agentId, sessionData.task, sessionData.history);
this.notifySessionUpdate(agentId, sessionId, session);
}
subscriber.complete();
@ -404,6 +486,8 @@ export class AgentService implements IAgentService {
* Get or create agent server instance
*/
private async getOrCreateAgentServer(agentId: string): Promise<A2AServer> {
await this.ensureAgentsRegistered();
if (!this.agents.has(agentId)) {
throw new Error(`Agent with ID ${agentId} not found`);
}
@ -419,16 +503,18 @@ export class AgentService implements IAgentService {
* Get session by ID
*/
async getSession(sessionId: string): Promise<AgentSession | undefined> {
// 遍历所有智能体服务器,查找匹配的会话
await this.ensureAgentsRegistered();
// Traverse all agent servers to find matching session
for (const [agentId, server] of this.agentServers.entries()) {
try {
const taskData = await this.getTaskWithHistory(agentId, sessionId);
if (taskData) {
// 找到匹配的会话,转换并返回
return this.convertTaskToSession(agentId, taskData.task, taskData.history);
const sessionData = await this.getSessionWithHistory(agentId, sessionId);
if (sessionData) {
// Found matching session, convert and return
return this.convertSessionToAgentSession(agentId, sessionData.task, sessionData.history);
}
} catch (error) {
// 继续查找下一个智能体
// Continue to next agent
}
}
return undefined;
@ -438,20 +524,22 @@ export class AgentService implements IAgentService {
* Get all sessions for an agent
*/
async getAgentSessions(agentId: string): Promise<AgentSession[]> {
await this.ensureAgentsRegistered();
try {
const server = await this.getOrCreateAgentServer(agentId);
// 获取所有任务(每个任务即一个会话)
// Get all tasks (each task is a session)
const tasks = await server.getAllTasks();
// 转换为会话对象
// Convert to session objects
const sessions: AgentSession[] = [];
for (const task of tasks) {
const taskData = await this.getTaskWithHistory(agentId, task.id);
const taskData = await this.getSessionWithHistory(agentId, task.id);
if (!taskData) continue;
const session = this.convertTaskToSession(agentId, taskData.task, taskData.history);
const session = this.convertSessionToAgentSession(agentId, taskData.task, taskData.history);
sessions.push(session);
}
@ -466,34 +554,38 @@ export class AgentService implements IAgentService {
* Delete session
*/
async deleteSession(agentId: string, sessionId: string): Promise<void> {
await this.ensureAgentsRegistered();
try {
// Get server instance
const server = await this.getOrCreateAgentServer(agentId);
// 构建请求
// Build request
const request: schema.CancelTaskRequest = {
jsonrpc: '2.0',
id: nanoid(),
method: 'tasks/cancel',
params: {
id: sessionId, // 会话ID即任务ID
id: sessionId, // Session ID is task ID
},
};
// 尝试取消任务(如果正在运行)
// Attempt to cancel task (if running)
await server.handleRequest(request);
// 通知删除
// Notify deletion
this.notifySessionUpdate(agentId, sessionId, null);
// 注意A2A协议本身不支持删除任务这里只是通知前端删除
// 真正从存储中删除任务可能需要扩展A2A服务器实现
// Note: A2A protocol itself does not support task deletion, this just notifies frontend
// Actual task deletion from storage may require extending A2A server implementation
} catch (error) {
logger.error(`Failed to delete session ${sessionId}:`, error);
}
}
async startHttpServer(config: AgentServiceConfig): Promise<void> {
await this.ensureAgentsRegistered();
if (this.httpServer) {
await this.stopHttpServer();
}

View file

@ -6,55 +6,55 @@ import { TaskHandler } from './server/handler';
import * as schema from './server/schema';
/**
*
* Agent definition, including basic information and processing logic
*/
export interface Agent {
/** 智能体唯一标识符 */
/** Unique identifier for the agent */
id: string;
/** 智能体名称 */
/** Agent name */
name: string;
/** 智能体描述 */
/** Agent description */
description?: string;
/** 智能体图标或头像URL */
/** Agent icon or avatar URL */
avatarUrl?: string;
/** 智能体处理器函数 */
/** Agent handler function */
handler: TaskHandler;
/** 智能体特性卡片 */
/** Agent feature card */
card?: schema.AgentCard;
}
/**
*
* Agent service configuration
*/
export interface AgentServiceConfig {
/** 是否启用HTTP服务器 */
/** Whether to enable HTTP server */
enableHttpServer?: boolean;
/** HTTP服务器端口 */
/** HTTP server port */
httpServerPort?: number;
/** HTTP服务基础路径 */
/** HTTP server base path */
httpServerBasePath?: string;
}
/**
*
* Agent session information
*/
export interface AgentSession {
/** 会话ID */
/** Session ID */
id: string;
/** 智能体ID */
/** Agent ID */
agentId: string;
/** 消息历史 */
/** Message history */
messages: schema.Message[];
/** 当前任务ID */
currentTaskId?: string;
/** 会话创建时间 */
/** Current session ID - used as reference to task in A2A protocol */
currentSessionId: string;
/** Session creation time */
createdAt: Date;
/** 上次更新时间 */
/** Last update time */
updatedAt: Date;
}
/**
*
* Agent request result
*/
export interface AgentRequestResult<T = any> {
data?: T;
@ -66,69 +66,69 @@ export interface AgentRequestResult<T = any> {
*/
export interface IAgentService {
/**
* handler
* Get all available agents (simplified, without handler)
*/
getAgents(): Promise<Omit<Agent, 'handler'>[]>;
/**
*
* @param id ID
* Get a specific agent
* @param id Agent ID
*/
getAgent(id: string): Promise<Agent | undefined>;
/**
*
* @param agentId ID
* Create a new session
* @param agentId Agent ID
*/
createSession(agentId: string): Promise<AgentSession>;
/**
*
* @param agentId ID
* @param sessionId ID
* @param messageText
* Send a message to a session
* @param agentId Agent ID
* @param sessionId Session ID
* @param messageText Message text
*/
sendMessage(agentId: string, sessionId: string, messageText: string): Promise<schema.JSONRPCResponse>;
/**
*
* @param agentId ID
* @param sessionId ID
* @param messageText
* Stream a message to a session and subscribe to results
* @param agentId Agent ID
* @param sessionId Session ID
* @param messageText Message text
*/
handleStreamingRequest(agentId: string, sessionId: string, messageText: string): Observable<schema.TaskStatusUpdateEvent | schema.TaskArtifactUpdateEvent>;
/**
*
* @param sessionId ID
* Get a specific session
* @param sessionId Session ID
*/
getSession(sessionId: string): Promise<AgentSession | undefined>;
/**
*
* @param agentId ID
* Get all sessions for an agent
* @param agentId Agent ID
*/
getAgentSessions(agentId: string): Promise<AgentSession[]>;
/**
*
* @param agentId ID
* @param sessionId ID
* Delete a session
* @param agentId Agent ID
* @param sessionId Session ID
*/
deleteSession(agentId: string, sessionId: string): Promise<void>;
/**
* HTTP服务器
* @param config
* Start HTTP server
* @param config Server configuration
*/
startHttpServer(config: AgentServiceConfig): Promise<void>;
/**
* HTTP服务器
* Stop HTTP server
*/
stopHttpServer(): Promise<void>;
/** 会话更新流 - 只包含变更的会话 */
/** Session updates stream - only includes changed sessions */
sessionUpdates$: BehaviorSubject<Record<string, AgentSession>>;
}

View file

@ -8,15 +8,15 @@ This directory contains a TypeScript server implementation for the Agent-to-Agen
import {
A2AServer,
InMemoryTaskStore,
TaskContext,
TaskYieldUpdate,
SessionContext, // Renamed from TaskContext
SessionYieldUpdate, // Renamed from TaskYieldUpdate
} from "./index"; // Assuming imports from the server package
// 1. Define your agent's logic as a TaskHandler
// 1. Define your agent's logic as a SessionHandler
async function* myAgentLogic(
context: TaskContext
): AsyncGenerator<TaskYieldUpdate> {
console.log(`Handling task: ${context.task.id}`);
context: SessionContext
): AsyncGenerator<SessionYieldUpdate> {
console.log(`Handling session: ${context.task.id}`);
yield {
state: "working",
message: { role: "agent", parts: [{ text: "Processing..." }] },
@ -26,7 +26,7 @@ async function* myAgentLogic(
await new Promise((resolve) => setTimeout(resolve, 1000));
if (context.isCancelled()) {
console.log("Task cancelled!");
console.log("Session cancelled!");
yield { state: "canceled" };
return;
}
@ -35,7 +35,7 @@ async function* myAgentLogic(
yield {
name: "result.txt",
mimeType: "text/plain",
parts: [{ text: `Task ${context.task.id} completed.` }],
parts: [{ text: `Session ${context.task.id} completed.` }],
};
// Yield final status

View file

@ -1,67 +1,56 @@
import * as schema from "./schema";
import { TaskStore } from "./store"; // Import TaskStore
import * as schema from './schema';
import { TaskStore } from './store';
/**
* Context object provided to the TaskHandler.
* Update yielded by a session handler.
*/
export interface TaskContext {
export type SessionYieldUpdate =
| Omit<schema.TaskStatus, 'timestamp'>
| schema.Artifact;
// 保持与 A2A 协议兼容
export type TaskYieldUpdate = SessionYieldUpdate;
/**
* Context provided to session handlers for processing messages
*/
export interface SessionContext {
/**
* The current state of the task when the handler is invoked or resumed.
* Note: This is a snapshot. For the absolute latest state during async operations,
* the handler might need to reload the task via the store.
* The current session object (kept as 'task' for handler API compatibility)
*/
task: schema.Task;
/**
* The specific user message that triggered this handler invocation or resumption.
* The most recent message from the user
*/
userMessage: schema.Message;
/**
* Function to check if cancellation has been requested for this task.
* Handlers should ideally check this periodically during long-running operations.
* @returns {boolean} True if cancellation has been requested, false otherwise.
* Full history of the conversation
*/
isCancelled(): boolean;
history: schema.Message[];
/**
* The message history associated with the task up to the point the handler is invoked.
* Optional, as history might not always be available or relevant.
* Check if the session has been cancelled
*/
history?: schema.Message[];
isCancelled: () => boolean;
// taskStore is removed as the server now handles loading/saving directly.
// If a handler specifically needs history, it would need to be passed differently
// or the handler pattern might need adjustment based on use case.
// Potential future additions:
// - logger instance
// - AbortSignal linked to cancellation
/**
* Optional session store for persistence (rarely needed by handlers)
*/
taskStore?: TaskStore;
}
/**
* Represents the possible types of updates a TaskHandler can yield.
* It's either a partial TaskStatus (without the server-managed timestamp)
* or a complete Artifact object.
*/
export type TaskYieldUpdate =
| Omit<schema.TaskStatus, "timestamp">
| schema.Artifact;
// Keep TaskContext type for backward compatibility
export type TaskContext = SessionContext;
/**
* Defines the signature for a task handler function.
*
* Handlers are implemented as async generators. They receive context about the
* task and the triggering message. They can perform work and `yield` status
* or artifact updates (`TaskYieldUpdate`). The server consumes these yields,
* updates the task state in the store, and streams events if applicable.
*
* @param context - The TaskContext object containing task details, cancellation status, and store access.
* @yields {TaskYieldUpdate} - Updates to the task's status or artifacts.
* @returns {Promise<schema.Task | void>} - Optionally returns the final complete Task object
* (needed for non-streaming 'tasks/send'). If void is returned, the server uses the
* last known state from the store after processing all yields.
* Session handler function type
* Takes a session context and returns an async generator of updates
*/
export type TaskHandler = (
context: TaskContext
) => AsyncGenerator<TaskYieldUpdate, schema.Task | void, unknown>;
export type SessionHandler = (
context: SessionContext
) => AsyncGenerator<SessionYieldUpdate>;
// Keep TaskHandler type for backward compatibility with A2A protocol
export type TaskHandler = SessionHandler;

View file

@ -11,6 +11,7 @@ interface AgentHttpServerOptions {
/**
* Node.js原生HTTP模块的A2A HTTP服务器
*
*/
export class AgentHttpServer {
private server: http.Server | null = null;
@ -155,7 +156,7 @@ export class AgentHttpServer {
}
/**
* A2A API请求
* A2A API请求 - A2A协议命名便
*/
private async handleA2ARequest(request: http.IncomingMessage, res: http.ServerResponse, agentId: string): Promise<void> {
// 检查智能体是否存在
@ -180,18 +181,21 @@ export class AgentHttpServer {
request.on('end', async () => {
try {
// 解析JSON-RPC请求
const request = JSON.parse(body) as schema.JSONRPCRequest;
const jsonRpcRequest = JSON.parse(body) as schema.JSONRPCRequest;
// 检查是否是流式请求
if (request.method === 'tasks/sendSubscribe') {
await this.handleStreamingRequest(
if (jsonRpcRequest.method === 'tasks/sendSubscribe') {
await this.handleStreamingSession(
res,
agentId,
request as schema.SendTaskStreamingRequest,
jsonRpcRequest as schema.SendTaskStreamingRequest,
);
} else {
// 处理常规请求
const response = await this.options.agentService.handleRequest(agentId, request);
// 在这里我们将会话概念转换为A2A协议中的任务概念
const response = await this.convertSessionRequestToTaskRequest(
agentId,
jsonRpcRequest
);
res.statusCode = 200;
res.setHeader('Content-Type', 'application/json');
@ -219,9 +223,100 @@ export class AgentHttpServer {
}
/**
* SSE
* A2A协议中的任务请求
* http-server.ts中
*/
private async handleStreamingRequest(
private async convertSessionRequestToTaskRequest(
agentId: string,
request: schema.JSONRPCRequest
): Promise<schema.JSONRPCResponse> {
// 处理不同类型的请求
switch (request.method) {
case 'tasks/send':
return this.options.agentService.sendMessage(
agentId,
(request as schema.SendTaskRequest).params.id,
(request as schema.SendTaskRequest).params.message.parts[0].text
);
case 'tasks/get':
const session = await this.options.agentService.getSession(
(request as schema.GetTaskRequest).params.id
);
if (!session) {
return {
jsonrpc: '2.0',
id: request.id || null,
error: {
code: -32001,
message: `Session not found: ${(request as schema.GetTaskRequest).params.id}`
}
};
}
// 将会话转换为任务格式
return {
jsonrpc: '2.0',
id: request.id || null,
result: this.convertAgentSessionToA2ATask(session)
};
case 'tasks/cancel':
await this.options.agentService.deleteSession(
agentId,
(request as schema.CancelTaskRequest).params.id
);
return {
jsonrpc: '2.0',
id: request.id || null,
result: {
id: (request as schema.CancelTaskRequest).params.id,
status: {
state: 'canceled',
timestamp: new Date().toISOString()
}
}
};
default:
return {
jsonrpc: '2.0',
id: request.id || null,
error: {
code: -32601,
message: `Method not found: ${request.method}`
}
};
}
}
/**
* AgentSession转换为A2A协议中的Task
* http-server.ts中
*/
private convertAgentSessionToA2ATask(session: any): schema.Task {
const lastAgentMessage = session.messages
.filter((m: any) => m.role === 'agent')
.pop();
return {
id: session.id,
sessionId: session.id,
status: {
state: lastAgentMessage ? 'completed' : 'submitted',
timestamp: session.updatedAt.toISOString(),
message: lastAgentMessage
},
artifacts: [],
metadata: {}
};
}
/**
*
*/
private async handleStreamingSession(
res: http.ServerResponse,
agentId: string,
request: schema.SendTaskStreamingRequest,
@ -234,17 +329,22 @@ export class AgentHttpServer {
try {
// 获取流式响应
const stream = this.options.agentService.handleStreamingRequest(agentId, request);
const stream = this.options.agentService.handleStreamingRequest(
agentId,
request.params.id,
request.params.message.parts[0].text
);
// 订阅流
// 创建订阅
const subscription = stream.subscribe({
next: (event) => {
const response = {
// 将事件转换为SSE格式并发送
const eventData = JSON.stringify({
jsonrpc: '2.0',
id: request.id,
result: event,
};
res.write(`data: ${JSON.stringify(response)}\n\n`);
result: event
});
res.write(`data: ${eventData}\n\n`);
// 如果是最终事件,结束连接
if (event.final) {
@ -253,38 +353,39 @@ export class AgentHttpServer {
},
error: (error) => {
console.error('Error in stream:', error);
const errorResponse = {
// 发送错误事件
const errorData = JSON.stringify({
jsonrpc: '2.0',
id: request.id,
error: {
code: -32603,
message: error instanceof Error ? error.message : 'Stream error',
},
};
res.write(`data: ${JSON.stringify(errorResponse)}\n\n`);
message: error instanceof Error ? error.message : 'Stream error'
}
});
res.write(`data: ${errorData}\n\n`);
res.end();
},
complete: () => {
// 流完成
res.end();
},
}
});
// 当连接关闭时取消订阅
req.on('close', () => {
// 当请求关闭时取消订阅
request.on('close', () => {
subscription.unsubscribe();
});
} catch (error) {
console.error('Error setting up stream:', error);
const errorResponse = {
const errorData = JSON.stringify({
jsonrpc: '2.0',
id: request.id,
error: {
code: -32603,
message: error instanceof Error ? error.message : 'Stream error',
},
};
res.write(`data: ${JSON.stringify(errorResponse)}\n\n`);
message: error instanceof Error ? error.message : 'Stream error'
}
});
res.write(`data: ${errorData}\n\n`);
res.end();
}
}

View file

@ -1,45 +1,61 @@
import { EventEmitter } from 'events';
import { A2AError } from './error';
import { TaskContext as OldTaskContext, TaskHandler } from './handler';
import { SessionContext as OldSessionContext, TaskHandler } from './handler';
import * as schema from './schema';
import { InMemoryTaskStore, TaskAndHistory, TaskStore } from './store';
import { InMemoryTaskStore, SessionAndHistory, TaskAndHistory, TaskStore } from './store';
import { getCurrentTimestamp, isArtifactUpdate, isTaskStatusUpdate } from './utils';
/**
* Options for configuring the A2AServer.
*/
export interface A2AServerOptions {
/** Task storage implementation. Defaults to InMemoryTaskStore. */
/** Session storage implementation. Defaults to InMemoryTaskStore. */
taskStore?: TaskStore;
/** Agent Card for the agent being served. */
card?: schema.AgentCard;
/** Database ID of the agent this server instance belongs to */
agentId?: string;
}
// Define new TaskContext without the store, based on the original from handler.ts
export interface TaskContext extends Omit<OldTaskContext, 'taskStore'> {}
// 定义新的SessionContext接口不包含store
export interface SessionContext extends Omit<OldSessionContext, 'taskStore'> {}
/**
* Implements an A2A specification compliant server.
*/
export class A2AServer {
private taskHandler: TaskHandler;
private taskStore: TaskStore;
private sessionStore: TaskStore;
// Track active cancellations
private activeCancellations: Set<string> = new Set();
card: schema.AgentCard;
// Agent ID - this should be the database ID
private agentId: string;
constructor(handler: TaskHandler, options: A2AServerOptions = {}) {
this.taskHandler = handler;
this.sessionStore = options.taskStore ?? new InMemoryTaskStore();
if (options.card) this.card = options.card;
// 优先使用传入的agentId如果没有则使用默认值
// 确保不使用card.name因为它是显示名称而不是数据库ID
this.agentId = options.agentId || 'echo-agent';
console.log(`A2AServer initialized with agentId: ${this.agentId}`);
}
// Helper to apply updates (status or artifact) immutably
private applyUpdateToTaskAndHistory(
current: TaskAndHistory,
private applyUpdateToSessionAndHistory(
current: SessionAndHistory,
update: Omit<schema.TaskStatus, 'timestamp'> | schema.Artifact,
): TaskAndHistory {
const newTask = { ...current.task }; // Shallow copy task
): SessionAndHistory {
const newSession = { ...current.session }; // Shallow copy session
const newHistory = [...current.history]; // Shallow copy history
if (isTaskStatusUpdate(update)) {
// Merge status update
newTask.status = {
...newTask.status, // Keep existing properties if not overwritten
newSession.status = {
...newSession.status, // Keep existing properties if not overwritten
...update, // Apply updates
timestamp: getCurrentTimestamp(), // Always update timestamp
};
@ -74,18 +90,18 @@ export class A2AServer {
}
} else if (isArtifactUpdate(update)) {
// Handle artifact update
if (!newTask.artifacts) {
newTask.artifacts = [];
if (!newSession.artifacts) {
newSession.artifacts = [];
} else {
// Ensure we're working with a copy of the artifacts array
newTask.artifacts = [...newTask.artifacts];
newSession.artifacts = [...newSession.artifacts];
}
const existingIndex = update.index ?? -1; // Use index if provided
let replaced = false;
if (existingIndex >= 0 && existingIndex < newTask.artifacts.length) {
const existingArtifact = newTask.artifacts[existingIndex];
if (existingIndex >= 0 && existingIndex < newSession.artifacts.length) {
const existingArtifact = newSession.artifacts[existingIndex];
if (update.append) {
// Create a deep copy for modification to avoid mutating original
const appendedArtifact = JSON.parse(JSON.stringify(existingArtifact));
@ -102,39 +118,33 @@ export class A2AServer {
if (update.description) {
appendedArtifact.description = update.description;
}
newTask.artifacts[existingIndex] = appendedArtifact; // Replace with appended version
newSession.artifacts[existingIndex] = appendedArtifact; // Replace with appended version
replaced = true;
} else {
// Overwrite artifact at index (with a copy of the update)
newTask.artifacts[existingIndex] = { ...update };
newSession.artifacts[existingIndex] = { ...update };
replaced = true;
}
} else if (update.name) {
const namedIndex = newTask.artifacts.findIndex(
const namedIndex = newSession.artifacts.findIndex(
(a) => a.name === update.name,
);
if (namedIndex >= 0) {
newTask.artifacts[namedIndex] = { ...update }; // Replace by name (with copy)
newSession.artifacts[namedIndex] = { ...update }; // Replace by name (with copy)
replaced = true;
}
}
if (!replaced) {
newTask.artifacts.push({ ...update }); // Add as a new artifact (copy)
newSession.artifacts.push({ ...update }); // Add as a new artifact (copy)
// Sort if indices are present
if (newTask.artifacts.some((a) => a.index !== undefined)) {
newTask.artifacts.sort((a, b) => (a.index ?? 0) - (b.index ?? 0));
if (newSession.artifacts.some((a) => a.index !== undefined)) {
newSession.artifacts.sort((a, b) => (a.index ?? 0) - (b.index ?? 0));
}
}
}
return { task: newTask, history: newHistory };
}
constructor(handler: TaskHandler, options: A2AServerOptions = {}) {
this.taskHandler = handler;
this.taskStore = options.taskStore ?? new InMemoryTaskStore();
if (options.card) this.card = options.card;
return { session: newSession, history: newHistory };
}
/**
@ -147,23 +157,23 @@ export class A2AServer {
throw A2AError.invalidRequest('Invalid JSON-RPC request structure.');
}
const taskId: string | undefined = (request.params as any)?.id;
const sessionId: string | undefined = (request.params as any)?.id;
let result: any;
// 2. 基于方法处理请求
// 2. 基于方法处理请求 - 保持A2A协议命名约定
switch (request.method) {
case 'tasks/send':
result = await this.processTaskSend(request as schema.SendTaskRequest);
result = await this.processSessionSend(request as schema.SendTaskRequest);
break;
case 'tasks/get':
result = await this.processTaskGet(request as schema.GetTaskRequest);
result = await this.processSessionGet(request as schema.GetTaskRequest);
break;
case 'tasks/cancel':
result = await this.processTaskCancel(request as schema.CancelTaskRequest);
result = await this.processSessionCancel(request as schema.CancelTaskRequest);
break;
case 'tasks/list':
// 新增:获取所有任务列表(不按会话分组,每个任务就是一个会话)
result = await this.processListTasks();
// 获取所有会话列表
result = await this.processListSessions();
break;
default:
throw A2AError.methodNotFound(request.method);
@ -191,11 +201,11 @@ export class A2AServer {
return eventEmitter;
}
// 获取任务ID
const taskId = (request.params).id;
// 获取会话ID
const sessionId = (request.params).id;
// 异步处理流请求
this.processStreamingRequest(request, eventEmitter).catch((error) => {
this.processStreamingSession(request, eventEmitter).catch((error) => {
eventEmitter.emit('error', error);
});
@ -205,24 +215,24 @@ export class A2AServer {
/**
*
*/
private async processStreamingRequest(
private async processStreamingSession(
request: schema.SendTaskStreamingRequest,
emitter: EventEmitter,
): Promise<void> {
const { id: taskId, message, sessionId, metadata } = request.params;
const { id: sessionId, message, sessionId: requestSessionId, metadata } = request.params;
try {
// 加载或创建任务
let currentData = await this.loadOrCreateTaskAndHistory(
taskId,
message,
// 加载或创建会话
let currentData = await this.loadOrCreateSessionAndHistory(
sessionId,
message,
requestSessionId,
metadata,
);
// 创建任务上下文
const context = this.createTaskContext(
currentData.task,
// 创建会话上下文
const context = this.createSessionContext(
currentData.session,
message,
currentData.history,
);
@ -241,11 +251,11 @@ export class A2AServer {
}
// 应用更新
currentData = this.applyUpdateToTaskAndHistory(currentData, yieldValue);
currentData = this.applyUpdateToSessionAndHistory(currentData, yieldValue);
// 保存更新后的状态
await this.taskStore.save(currentData);
await this.sessionStore.save(currentData);
// 更新上下文
context.task = currentData.task;
context.task = currentData.session;
let event;
let isFinal = false;
@ -258,19 +268,19 @@ export class A2AServer {
'canceled',
'input-required',
];
isFinal = terminalStates.includes(currentData.task.status.state);
isFinal = terminalStates.includes(currentData.session.status.state);
event = this.createTaskStatusEvent(
taskId,
currentData.task.status,
sessionId,
currentData.session.status,
isFinal,
);
} else if (isArtifactUpdate(yieldValue)) {
const updatedArtifact = currentData.task.artifacts?.find(
const updatedArtifact = currentData.session.artifacts?.find(
(a) =>
(a.index !== undefined && a.index === yieldValue.index) ||
(a.name && a.name === yieldValue.name),
) ?? yieldValue;
event = this.createTaskArtifactEvent(taskId, updatedArtifact, false);
event = this.createTaskArtifactEvent(sessionId, updatedArtifact, false);
} else {
console.warn('[Stream] Handler yielded unknown value:', yieldValue);
continue;
@ -286,70 +296,68 @@ export class A2AServer {
// 处理结束,确保有最终事件
if (!lastEventWasFinal) {
// 确保任务处于最终状态
// 确保会话处于最终状态
const finalStates: schema.TaskState[] = [
'completed',
'failed',
'canceled',
'input-required',
];
if (!finalStates.includes(currentData.task.status.state)) {
currentData = this.applyUpdateToTaskAndHistory(currentData, {
if (!finalStates.includes(currentData.session.status.state)) {
currentData = this.applyUpdateToSessionAndHistory(currentData, {
state: 'completed',
});
await this.taskStore.save(currentData);
await this.sessionStore.save(currentData);
}
// 发送最终状态事件
const finalEvent = this.createTaskStatusEvent(
taskId,
currentData.task.status,
sessionId,
currentData.session.status,
true,
);
emitter.emit('update', finalEvent);
}
} catch (error) {
console.error(`[Stream ${taskId}] Error:`, error);
console.error(`[Stream ${sessionId}] Error:`, error);
emitter.emit('error', error);
}
}
/**
*
*
* A2A API的一部分
* A2A任务就是一个完整的会话
*/
private async processListTasks(): Promise<schema.Task[]> {
// 获取所有任务ID
const taskIds = await this.getAllTaskIds();
const tasks: schema.Task[] = [];
private async processListSessions(): Promise<schema.Task[]> {
// 获取所有会话ID
const sessionIds = await this.getAllSessionIds();
const sessions: schema.Task[] = [];
// 加载每个任务的详细信息
for (const taskId of taskIds) {
const taskAndHistory = await this.taskStore.load(taskId);
if (taskAndHistory) {
tasks.push({ ...taskAndHistory.task });
// 加载每个会话的详细信息
for (const sessionId of sessionIds) {
const sessionAndHistory = await this.sessionStore.load(sessionId);
if (sessionAndHistory) {
sessions.push({ ...sessionAndHistory.session });
}
}
return tasks;
return sessions;
}
// 处理任务发送请求
private async processTaskSend(request: schema.SendTaskRequest): Promise<schema.Task> {
this.validateTaskSendParams(request.params);
const { id: taskId, message, metadata } = request.params;
// 不使用sessionId将每个task视为独立的会话
// 处理会话发送请求 (tasks/send)
private async processSessionSend(request: schema.SendTaskRequest): Promise<schema.Task> {
this.validateSessionSendParams(request.params);
const { id: sessionId, message, metadata } = request.params;
// 加载或创建任务
let currentData = await this.loadOrCreateTaskAndHistory(
taskId,
// 加载或创建会话
let currentData = await this.loadOrCreateSessionAndHistory(
sessionId,
message,
undefined, // 不使用sessionId
undefined, // 不使用传入的sessionId参数
metadata,
);
const context = this.createTaskContext(
currentData.task,
const context = this.createSessionContext(
currentData.session,
message,
currentData.history,
);
@ -358,9 +366,9 @@ export class A2AServer {
// 处理生成器产出
try {
for await (const yieldValue of generator) {
currentData = this.applyUpdateToTaskAndHistory(currentData, yieldValue);
await this.taskStore.save(currentData);
context.task = currentData.task;
currentData = this.applyUpdateToSessionAndHistory(currentData, yieldValue);
await this.sessionStore.save(currentData);
context.task = currentData.session;
}
} catch (handlerError) {
// 处理器错误
@ -379,59 +387,59 @@ export class A2AServer {
],
},
};
currentData = this.applyUpdateToTaskAndHistory(
currentData = this.applyUpdateToSessionAndHistory(
currentData,
failureStatusUpdate,
);
try {
await this.taskStore.save(currentData);
await this.sessionStore.save(currentData);
} catch (saveError) {
console.error(
`Failed to save task ${taskId} after handler error:`,
`Failed to save session ${sessionId} after handler error:`,
saveError,
);
}
throw this.normalizeError(handlerError, request.id, taskId);
throw this.normalizeError(handlerError, request.id, sessionId);
}
return currentData.task;
return currentData.session;
}
// 处理获取任务请求
private async processTaskGet(request: schema.GetTaskRequest): Promise<schema.Task> {
const { id: taskId } = request.params;
if (!taskId) throw A2AError.invalidParams('Missing task ID.');
// 处理获取会话请求 (tasks/get)
private async processSessionGet(request: schema.GetTaskRequest): Promise<schema.Task> {
const { id: sessionId } = request.params;
if (!sessionId) throw A2AError.invalidParams('Missing session ID.');
const data = await this.taskStore.load(taskId);
const data = await this.sessionStore.load(sessionId);
if (!data) {
throw A2AError.taskNotFound(taskId);
throw A2AError.taskNotFound(sessionId);
}
return data.task;
return data.session;
}
// 处理取消任务请求
private async processTaskCancel(request: schema.CancelTaskRequest): Promise<schema.Task> {
const { id: taskId } = request.params;
if (!taskId) throw A2AError.invalidParams('Missing task ID.');
// 处理取消会话请求 (tasks/cancel)
private async processSessionCancel(request: schema.CancelTaskRequest): Promise<schema.Task> {
const { id: sessionId } = request.params;
if (!sessionId) throw A2AError.invalidParams('Missing session ID.');
// 加载任务
let data = await this.taskStore.load(taskId);
// 加载会话
let data = await this.sessionStore.load(sessionId);
if (!data) {
throw A2AError.taskNotFound(taskId);
throw A2AError.taskNotFound(sessionId);
}
// 检查是否可取消
const finalStates: schema.TaskState[] = ['completed', 'failed', 'canceled'];
if (finalStates.includes(data.task.status.state)) {
if (finalStates.includes(data.session.status.state)) {
console.log(
`Task ${taskId} already in final state ${data.task.status.state}, cannot cancel.`,
`Session ${sessionId} already in final state ${data.session.status.state}, cannot cancel.`,
);
return data.task;
return data.session;
}
// 标记取消
this.activeCancellations.add(taskId);
this.activeCancellations.add(sessionId);
// 更新状态为已取消
const cancelUpdate: Omit<schema.TaskStatus, 'timestamp'> = {
@ -441,143 +449,145 @@ export class A2AServer {
parts: [{ text: 'Task cancelled by request.' }],
},
};
data = this.applyUpdateToTaskAndHistory(data, cancelUpdate);
data = this.applyUpdateToSessionAndHistory(data, cancelUpdate);
// 保存更新状态
await this.taskStore.save(data);
await this.sessionStore.save(data);
// 移除活跃取消标记
this.activeCancellations.delete(taskId);
this.activeCancellations.delete(sessionId);
return data.task;
return data.session;
}
// --- Helper Methods ---
/**
* ID列表
* ID列表
*/
async getAllTaskIds(): Promise<string[]> {
if ('getAllTaskIds' in this.taskStore) {
return (this.taskStore as any).getAllTaskIds();
async getAllSessionIds(): Promise<string[]> {
if ('getAllSessionIds' in this.sessionStore) {
return (this.sessionStore as any).getAllSessionIds();
}
return [];
}
/**
*
*
*/
async getAllTasks(): Promise<schema.Task[]> {
const taskIds = await this.getAllTaskIds();
const tasks: schema.Task[] = [];
const sessionIds = await this.getAllSessionIds();
const sessions: schema.Task[] = [];
for (const taskId of taskIds) {
const data = await this.taskStore.load(taskId);
for (const sessionId of sessionIds) {
const data = await this.sessionStore.load(sessionId);
if (data) {
tasks.push(data.task);
sessions.push(data.session);
}
}
return tasks;
return sessions;
}
// Renamed and updated to handle both task and history
private async loadOrCreateTaskAndHistory(
taskId: string,
// 加载或创建会话及其历史记录
private async loadOrCreateSessionAndHistory(
sessionId: string,
initialMessage: schema.Message,
_sessionId?: string | null, // 忽略sessionId参数不再使用
metadata?: Record<string, unknown> | null, // Allow null
): Promise<TaskAndHistory> {
let data = await this.taskStore.load(taskId);
_sessionIdParam?: string | null, // 忽略sessionId参数不再使用
metadata?: Record<string, unknown> | null, // 允许为null
): Promise<SessionAndHistory> {
let data = await this.sessionStore.load(sessionId);
let needsSave = false;
if (!data) {
// Create new task and history
const initialTask: schema.Task & { agentId?: string | null } = {
id: taskId,
// Add agentId for database compatibility
agentId: null,
// 不设置sessionId每个task就是一个独立的会话
// 创建新会话和历史记录
const initialSession: schema.Task & { agentId: string } = {
id: sessionId,
// 使用当前agentId
agentId: this.agentId,
status: {
state: 'submitted', // Start as submitted
state: 'submitted', // 初始状态为submitted
timestamp: getCurrentTimestamp(),
message: null, // Initial user message goes only to history for now
message: null, // 初始用户消息仅放入历史记录
},
artifacts: [],
metadata: metadata ?? undefined, // Store undefined if null
metadata: metadata ?? undefined, // null转为undefined
};
const initialHistory: schema.Message[] = [initialMessage]; // History starts with user message
data = { task: initialTask, history: initialHistory };
needsSave = true; // Mark for saving
console.log(`[Task ${taskId}] Created new task and history.`);
const initialHistory: schema.Message[] = [initialMessage]; // 历史记录以用户消息开始
data = { session: initialSession, history: initialHistory };
needsSave = true; // 标记需要保存
console.log(`[Session ${sessionId}] Created new session and history.`);
} else {
console.log(`[Task ${taskId}] Loaded existing task and history.`);
// Add current user message to history
// Make a copy before potentially modifying
data = { task: data.task, history: [...data.history, initialMessage] };
needsSave = true; // History updated, mark for saving
console.log(`[Session ${sessionId}] Loaded existing session and history.`);
// 将当前用户消息添加到历史记录
// 在可能修改前创建副本
data = { session: data.session, history: [...data.history, initialMessage] };
needsSave = true; // 历史记录已更新,标记需要保存
// Handle state transitions for existing tasks
// 如果加载的现有会话没有agentId设置为当前agentId
if (!data.session.agentId) {
data.session.agentId = this.agentId;
needsSave = true;
}
// 处理现有会话的状态转换
const finalStates: schema.TaskState[] = [
'completed',
'failed',
'canceled',
];
if (finalStates.includes(data.task.status.state)) {
if (finalStates.includes(data.session.status.state)) {
console.warn(
`[Task ${taskId}] Received message for task already in final state ${data.task.status.state}. Handling as new submission (keeping history).`,
`[Session ${sessionId}] Received message for session already in final state ${data.session.status.state}. Handling as new submission (keeping history).`,
);
// Option 1: Reset state to 'submitted' (keeps history, effectively restarts)
// 选项1重置状态为'submitted'(保留历史记录,实际上是重新启动)
const resetUpdate: Omit<schema.TaskStatus, 'timestamp'> = {
state: 'submitted',
message: null, // Clear old agent message
message: null, // 清除旧的智能体消息
};
data = this.applyUpdateToTaskAndHistory(data, resetUpdate);
// needsSave is already true
// Option 2: Throw error (stricter)
// throw A2AError.invalidRequest(`Task ${taskId} is already in a final state.`);
} else if (data.task.status.state === 'input-required') {
data = this.applyUpdateToSessionAndHistory(data, resetUpdate);
// needsSave已为true
} else if (data.session.status.state === 'input-required') {
console.log(
`[Task ${taskId}] Received message while 'input-required', changing state to 'working'.`,
`[Session ${sessionId}] Received message while 'input-required', changing state to 'working'.`,
);
// If it was waiting for input, update state to 'working'
// 如果它在等待输入,将状态更新为'working'
const workingUpdate: Omit<schema.TaskStatus, 'timestamp'> = {
state: 'working',
};
data = this.applyUpdateToTaskAndHistory(data, workingUpdate);
// needsSave is already true
} else if (data.task.status.state === 'working') {
// If already working, maybe warn but allow? Or force back to submitted?
data = this.applyUpdateToSessionAndHistory(data, workingUpdate);
// needsSave已为true
} else if (data.session.status.state === 'working') {
// 如果已经在工作,可能发出警告但允许继续
console.warn(
`[Task ${taskId}] Received message while already 'working'. Proceeding.`,
`[Session ${sessionId}] Received message while already 'working'. Proceeding.`,
);
// No state change needed, but history was updated, so needsSave is true.
// 不需要状态更改但历史记录已更新所以needsSave为true
}
// If 'submitted', receiving another message might be odd, but proceed.
// 如果状态为'submitted',收到另一条消息可能很奇怪,但继续处理
}
// Save if created or modified before returning
// 如果创建或修改过,保存再返回
if (needsSave) {
await this.taskStore.save(data);
await this.sessionStore.save(data);
}
// Return copies to prevent mutation by caller before handler runs
return { task: { ...data.task }, history: [...data.history] };
// 返回副本以防止处理程序运行前被调用者修改
return { session: { ...data.session }, history: [...data.history] };
}
// Update context creator to accept and include history
private createTaskContext(
task: schema.Task,
// 更新创建上下文的方法名和实现
private createSessionContext(
session: schema.Task,
userMessage: schema.Message,
history: schema.Message[], // Add history parameter
): TaskContext {
history: schema.Message[], // 添加历史记录参数
): SessionContext {
return {
task: { ...task }, // Pass a copy
task: { ...session }, // 保持字段名为task以兼容handler
userMessage: userMessage,
history: [...history], // Pass a copy of the history
isCancelled: () => this.activeCancellations.has(task.id),
// taskStore is removed
history: [...history], // 传递历史记录副本
isCancelled: () => this.activeCancellations.has(session.id),
// taskStore 已删除
};
}
@ -589,14 +599,14 @@ export class A2AServer {
typeof body.method === 'string' &&
(body.id === null ||
typeof body.id === 'string' ||
typeof body.id === 'number') && // ID is required for requests needing response
typeof body.id === 'number') && // 需要响应的请求需要ID
(body.params === undefined ||
typeof body.params === 'object' || // Allows null, array, or object
typeof body.params === 'object' || // 允许null、数组或对象
Array.isArray(body.params))
);
}
private validateTaskSendParams(
private validateSessionSendParams(
parameters: any,
): asserts parameters is schema.TaskSendParams {
if (!parameters || typeof parameters !== 'object') {
@ -614,7 +624,7 @@ export class A2AServer {
'Invalid or missing message object (params.message).',
);
}
// Add more checks for message structure, sessionID, metadata, etc. if needed
// 可以添加更多有关消息结构、sessionID、metadata等的检查
}
// --- Response Formatting ---
@ -624,7 +634,7 @@ export class A2AServer {
result: T,
): schema.JSONRPCResponse<T> {
if (id === null) {
// This shouldn't happen for methods that expect a response, but safeguard
// 这对于期望响应的方法不应该发生,但作为保障
throw A2AError.internalError(
'Cannot create success response for null ID.',
);
@ -640,10 +650,10 @@ export class A2AServer {
id: number | string | null,
error: schema.JSONRPCError,
): schema.JSONRPCResponse<null> {
// For errors, ID should be the same as request ID, or null if that couldn't be determined
// 对于错误ID应与请求ID相同如果无法确定则为null
return {
jsonrpc: '2.0',
id: id, // Can be null if request ID was invalid/missing
id: id, // 如果请求ID无效/缺失可以为null
error: error,
};
}
@ -669,6 +679,20 @@ export class A2AServer {
return this.createErrorResponse(requestId, a2aError.toJSONRPCError());
}
private normalizeError(error: any, requestId: any, sessionId: string): A2AError {
if (error instanceof A2AError) {
return error;
}
console.error(
`Handler error for request ${requestId ?? 'N/A'}, session ${sessionId}:`,
error,
);
return A2AError.internalError(
error instanceof Error ? error.message : String(error),
{ originalError: error },
);
}
private createTaskStatusEvent(
taskId: string,
status: schema.TaskStatus,
@ -676,7 +700,7 @@ export class A2AServer {
): schema.TaskStatusUpdateEvent {
return {
id: taskId,
status: status, // Assumes status already has timestamp from applyUpdate
status: status, // 假设状态已经从applyUpdate获取了时间戳
final: final,
};
}
@ -689,24 +713,29 @@ export class A2AServer {
return {
id: taskId,
artifact: artifact,
final: final, // Usually false unless it's the very last thing
final: final, // 通常为false除非是最后一件事
};
}
/**
*
*
* A2A协议的标准部分
*/
async getTaskHistory(taskId: string): Promise<schema.Message[]> {
async getSessionHistory(sessionId: string): Promise<schema.Message[]> {
try {
const data = await this.taskStore.load(taskId);
const data = await this.sessionStore.load(sessionId);
if (!data) {
throw A2AError.taskNotFound(taskId);
throw A2AError.taskNotFound(sessionId);
}
return [...data.history]; // 返回副本以防止修改
} catch (error) {
console.error(`Failed to get history for task ${taskId}:`, error);
console.error(`Failed to get history for session ${sessionId}:`, error);
return [];
}
}
// A2A协议兼容方法 - 只保留必要的其他应该移到http-server.ts
async getTaskHistory(taskId: string): Promise<schema.Message[]> {
return this.getSessionHistory(taskId);
}
}

View file

@ -1,32 +1,15 @@
import { DataSource } from 'typeorm';
import { DataSource, Repository } from 'typeorm';
import * as schema from './schema';
import { SessionEntity, SessionMessageEntity } from '@services/database/schema/agent';
// Helper type for the simplified store
export interface TaskAndHistory {
task: schema.Task & { agentId?: string | null };
export interface SessionAndHistory {
session: schema.Task & { agentId: string };
history: schema.Message[];
}
// Database row types
interface AgentTaskRow {
id: string;
agentId: string | null;
state: string;
status: string;
artifacts?: string | null;
metadata?: string | null;
createdAt?: string;
updatedAt?: string;
}
interface AgentTaskMessageRow {
id: number;
taskId: string;
role: string;
parts: string;
metadata?: string | null;
timestamp: string;
}
// Rename for compatibility
export type TaskAndHistory = SessionAndHistory;
// Typed JSON parsing helpers
function parseJSON<T>(json: string | null | undefined, defaultValue: T): T {
@ -46,215 +29,212 @@ interface TaskStatusData {
timestamp?: string;
}
// Interface for transaction
interface Transaction {
query<T = any>(query: string, parameters?: any[]): Promise<T[]>;
commitTransaction(): Promise<void>;
rollbackTransaction(): Promise<void>;
/**
* Session storage interface (renamed from TaskStore)
*/
export interface SessionStore {
/**
* Save session and its related history
*/
save(data: SessionAndHistory): Promise<void>;
/**
* Load session and its history
*/
load(sessionId: string): Promise<SessionAndHistory | null>;
/**
* Get all session IDs
*/
getAllSessionIds(): Promise<string[]>;
/**
* Get all sessions
*/
getAllSessions(): Promise<(schema.Task & { agentId: string })[]>;
/**
* Get history for a session
*/
getSessionHistory(sessionId: string): Promise<schema.Message[]>;
}
/**
* Task storage interface
*/
export interface TaskStore {
/**
* Save task and its related history
*/
save(data: TaskAndHistory): Promise<void>;
/**
* Load task and its history
*/
load(taskId: string): Promise<TaskAndHistory | null>;
/**
* Get all task IDs
*/
// For compatibility
export interface TaskStore extends SessionStore {
getAllTaskIds(): Promise<string[]>;
/**
* Get all tasks
*/
getAllTasks(): Promise<(schema.Task & { agentId?: string | null })[]>;
/**
* Get history for a task
*/
getAllTasks(): Promise<(schema.Task & { agentId: string })[]>;
getTaskHistory(taskId: string): Promise<schema.Message[]>;
}
/**
* SQLite implementation of task store
* SQLite implementation of session store using TypeORM
*/
export class SQLiteTaskStore implements TaskStore {
export class SQLiteSessionStore implements SessionStore, TaskStore {
private dataSource: DataSource;
private sessionRepository: Repository<SessionEntity>;
private messageRepository: Repository<SessionMessageEntity>;
constructor(dataSource: DataSource) {
this.dataSource = dataSource;
this.sessionRepository = dataSource.getRepository(SessionEntity);
this.messageRepository = dataSource.getRepository(SessionMessageEntity);
}
async load(taskId: string): Promise<TaskAndHistory | null> {
async load(sessionId: string): Promise<SessionAndHistory | null> {
try {
// Query task
const taskRows = await this.dataSource.query<AgentTaskRow[]>(
`SELECT * FROM agent_tasks WHERE id = ?`,
[taskId],
);
const task = taskRows[0];
if (!task) {
// Find session by ID
const sessionEntity = await this.sessionRepository.findOne({
where: { id: sessionId },
relations: ['agent'] // 加载关联的agent实体
});
if (!sessionEntity) {
return null;
}
// Query messages
const messages = await this.dataSource.query<AgentTaskMessageRow[]>(
`SELECT * FROM agent_task_messages WHERE taskId = ? ORDER BY timestamp ASC`,
[taskId],
);
// Find associated messages
const messageEntities = await this.messageRepository.find({
where: { sessionId: sessionId },
order: { timestamp: 'ASC' }
});
// Parse JSON with proper typing
const status = parseJSON<TaskStatusData>(task.status, { state: 'unknown' });
const artifacts = parseJSON<schema.Artifact[] | null>(task.artifacts, null);
const metadata = parseJSON<Record<string, unknown> | null>(task.metadata, null);
const taskData: schema.Task & { agentId?: string | null } = {
id: task.id,
agentId: task.agentId,
status: status,
artifacts: artifacts,
metadata: metadata,
// Convert entity to schema object
const sessionData: schema.Task & { agentId: string } = {
id: sessionEntity.id,
agentId: sessionEntity.agentId,
status: parseJSON<TaskStatusData>(sessionEntity.status, { state: 'unknown' }),
artifacts: parseJSON<schema.Artifact[] | null>(sessionEntity.artifacts || null, null),
metadata: parseJSON<Record<string, unknown> | null>(sessionEntity.metadata || null, null),
};
const history: schema.Message[] = messages.map((message) => ({
// Convert message entities to schema messages
const history: schema.Message[] = messageEntities.map(message => ({
role: message.role as 'user' | 'agent',
parts: parseJSON<schema.Part[]>(message.parts, []),
metadata: message.metadata ? parseJSON<Record<string, unknown>>(message.metadata, {}) : undefined,
}));
return { task: taskData, history };
return { session: sessionData, history };
} catch (error) {
console.error(`Failed to load task ${taskId} from SQLite:`, error);
console.error(`Failed to load session ${sessionId} from database:`, error);
return null;
}
}
async save(data: TaskAndHistory): Promise<void> {
async save(data: SessionAndHistory): Promise<void> {
const { session, history } = data;
try {
const { task, history } = data;
await this.dataSource.transaction(async transactionalEntityManager => {
// 检查指定的agentId是否存在
const agentExists = await transactionalEntityManager.findOne('agents', {
where: { id: session.agentId }
});
// Start a transaction
await this.dataSource.transaction(async (entityManager) => {
const transaction = entityManager as unknown as Transaction;
// Check if task exists
const existsResult = await transaction.query<{ exists: number }[]>(
'SELECT COUNT(*) as exists FROM agent_tasks WHERE id = ?',
[task.id],
);
const exists = existsResult[0]?.exists > 0;
if (exists) {
// Update existing task
await transaction.query(
`UPDATE agent_tasks SET
agentId = ?,
state = ?,
status = ?,
artifacts = ?,
metadata = ?,
updatedAt = CURRENT_TIMESTAMP
WHERE id = ?`,
[
task.agentId || null,
task.status.state,
JSON.stringify(task.status),
task.artifacts ? JSON.stringify(task.artifacts) : null,
task.metadata ? JSON.stringify(task.metadata) : null,
task.id,
],
);
} else {
// Insert new task
await transaction.query(
`INSERT INTO agent_tasks (id, agentId, state, status, artifacts, metadata)
VALUES (?, ?, ?, ?, ?, ?)`,
[
task.id,
task.agentId || null,
task.status.state,
JSON.stringify(task.status),
task.artifacts ? JSON.stringify(task.artifacts) : null,
task.metadata ? JSON.stringify(task.metadata) : null,
],
);
if (!agentExists) {
console.error(`Agent with ID "${session.agentId}" does not exist in the database`);
throw new Error(`Foreign key constraint failed: Agent with ID "${session.agentId}" does not exist`);
}
// Delete existing messages for this task to avoid duplicates
await transaction.query('DELETE FROM agent_task_messages WHERE taskId = ?', [task.id]);
// Check if session exists
const existingSession = await transactionalEntityManager.findOne(SessionEntity, {
where: { id: session.id }
});
// Insert all messages
// Create session entity
const sessionEntity = existingSession || new SessionEntity();
sessionEntity.id = session.id;
sessionEntity.agentId = session.agentId;
sessionEntity.state = session.status.state;
sessionEntity.status = JSON.stringify(session.status);
sessionEntity.artifacts = session.artifacts ? JSON.stringify(session.artifacts) : null;
sessionEntity.metadata = session.metadata ? JSON.stringify(session.metadata) : null;
// Save session
await transactionalEntityManager.save(sessionEntity);
// Delete existing messages
if (existingSession) {
await transactionalEntityManager.delete(SessionMessageEntity, { sessionId: session.id });
}
// Save messages
for (const message of history) {
await transaction.query(
`INSERT INTO agent_task_messages (taskId, role, parts, metadata)
VALUES (?, ?, ?, ?)`,
[
task.id,
message.role,
JSON.stringify(message.parts),
message.metadata ? JSON.stringify(message.metadata) : null,
],
);
const messageEntity = new SessionMessageEntity();
messageEntity.sessionId = session.id; // 修改: 使用sessionId替代taskId
messageEntity.role = message.role;
messageEntity.parts = JSON.stringify(message.parts);
messageEntity.metadata = message.metadata ? JSON.stringify(message.metadata) : null;
await transactionalEntityManager.save(messageEntity);
}
});
} catch (error) {
console.error(`Failed to save task ${data.task.id} to SQLite:`, error);
throw new Error(`Failed to save task: ${(error as Error).message}`);
console.error(`Failed to save session ${session.id} to database:`, error);
throw new Error(`Failed to save session: ${(error as Error).message}`);
}
}
async getAllTaskIds(): Promise<string[]> {
async getAllSessionIds(): Promise<string[]> {
try {
const rows = await this.dataSource.query<Pick<AgentTaskRow, 'id'>[]>(
'SELECT id FROM agent_tasks',
);
return rows.map((t) => t.id);
const sessions = await this.sessionRepository.find({
select: ['id']
});
return sessions.map(session => session.id);
} catch (error) {
console.error('Failed to get all task IDs from SQLite:', error);
console.error('Failed to get all session IDs from database:', error);
return [];
}
}
async getAllTasks(): Promise<(schema.Task & { agentId?: string | null })[]> {
async getAllSessions(): Promise<(schema.Task & { agentId: string })[]> {
try {
const rows = await this.dataSource.query<AgentTaskRow[]>(
'SELECT * FROM agent_tasks',
);
return rows.map((task) => ({
id: task.id,
agentId: task.agentId,
status: parseJSON<TaskStatusData>(task.status, { state: 'unknown' }),
artifacts: parseJSON<schema.Artifact[] | null>(task.artifacts, null),
metadata: parseJSON<Record<string, unknown> | null>(task.metadata, null),
const sessions = await this.sessionRepository.find();
return sessions.map(session => ({
id: session.id,
agentId: session.agentId,
status: parseJSON<TaskStatusData>(session.status, { state: 'unknown' }),
artifacts: parseJSON<schema.Artifact[] | null>(session.artifacts || null, null),
metadata: parseJSON<Record<string, unknown> | null>(session.metadata || null, null),
}));
} catch (error) {
console.error('Failed to get all tasks from SQLite:', error);
console.error('Failed to get all sessions from database:', error);
return [];
}
}
async getTaskHistory(taskId: string): Promise<schema.Message[]> {
async getSessionHistory(sessionId: string): Promise<schema.Message[]> {
try {
const messages = await this.dataSource.query<AgentTaskMessageRow[]>(
'SELECT * FROM agent_task_messages WHERE taskId = ? ORDER BY timestamp ASC',
[taskId],
);
return messages.map((message) => ({
const messages = await this.messageRepository.find({
where: { sessionId: sessionId }, // 修改: 使用sessionId替代taskId
order: { timestamp: 'ASC' }
});
return messages.map(message => ({
role: message.role as 'user' | 'agent',
parts: parseJSON<schema.Part[]>(message.parts, []),
metadata: message.metadata ? parseJSON<Record<string, unknown>>(message.metadata, {}) : undefined,
}));
} catch (error) {
console.error(`Failed to get history for task ${taskId} from SQLite:`, error);
console.error(`Failed to get history for session ${sessionId} from database:`, error);
return [];
}
}
// Compatibility methods
getAllTaskIds(): Promise<string[]> {
return this.getAllSessionIds();
}
getAllTasks(): Promise<(schema.Task & { agentId: string })[]> {
return this.getAllSessions();
}
getTaskHistory(taskId: string): Promise<schema.Message[]> {
return this.getSessionHistory(taskId);
}
}
// Replace the existing SQLiteTaskStore with our renamed store
export const SQLiteTaskStore = SQLiteSessionStore;

View file

@ -13,7 +13,7 @@ import { logger } from '@services/libs/log';
import { BaseDataSourceOptions } from 'typeorm/data-source/BaseDataSourceOptions';
import { ensureSettingFolderExist, fixSettingFileWhenError } from './configSetting';
import { IDatabaseService, ISettingFile } from './interface';
import { AgentEntity, TaskEntity, TaskMessageEntity } from './schema/agent';
import { AgentEntity, SessionEntity, SessionMessageEntity } from './schema/agent';
import { WikiTiddler } from './schema/wiki';
// Schema config interface
@ -74,8 +74,8 @@ export class DatabaseService implements IDatabaseService {
this.registerSchema('agent', {
entities: [
AgentEntity,
TaskEntity,
TaskMessageEntity,
SessionEntity,
SessionMessageEntity,
],
synchronize: true,
migrationsRun: false,

View file

@ -1,4 +1,4 @@
import { Column, CreateDateColumn, Entity, ManyToOne, OneToMany, PrimaryColumn, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';
import { Column, CreateDateColumn, Entity, JoinColumn, ManyToOne, OneToMany, PrimaryColumn, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';
/**
* Agent entity, represents a chat agent.
@ -18,29 +18,29 @@ export class AgentEntity {
avatarUrl?: string;
@Column({ type: 'simple-json', nullable: true })
card?: object;
card?: string;
@OneToMany(() => TaskEntity, task => task.agent)
tasks!: TaskEntity[];
@OneToMany(() => SessionEntity, session => session.agent)
sessions!: SessionEntity[];
}
/**
* Task entity, represents a task in the A2A protocol.
* Each task acts as a session in the conversation.
* Session entity, represents a session in the A2A protocol.
*/
@Entity('agent_tasks')
export class TaskEntity {
@Entity('agent_sessions')
export class SessionEntity {
@PrimaryColumn()
id!: string;
@Column()
agentId!: string;
@ManyToOne(() => AgentEntity, agent => agent.tasks)
@ManyToOne(() => AgentEntity, agent => agent.sessions)
@JoinColumn({ name: 'agentId' })
agent!: AgentEntity;
@Column('text')
state!: string; // TaskState
state!: string; // 等价于 TaskState (保持与A2A协议兼容)
@Column({ type: 'text' })
status!: string; // JSON string of TaskStatus
@ -57,24 +57,24 @@ export class TaskEntity {
@UpdateDateColumn()
updatedAt!: Date;
@OneToMany(() => TaskMessageEntity, message => message.task)
messages!: TaskMessageEntity[];
@OneToMany(() => SessionMessageEntity, message => message.session)
messages!: SessionMessageEntity[];
}
/**
* Task message entity, represents a message in a task's history.
* Each message is part of the conversation in a task.
* Session message entity, represents a message in a session's history.
*/
@Entity('agent_task_messages')
export class TaskMessageEntity {
@Entity('agent_session_messages')
export class SessionMessageEntity {
@PrimaryGeneratedColumn()
id!: number;
@Column()
taskId!: string;
sessionId!: string;
@ManyToOne(() => TaskEntity, task => task.messages)
task!: TaskEntity;
@ManyToOne(() => SessionEntity, session => session.messages)
@JoinColumn({ name: 'sessionId' })
session!: SessionEntity;
@Column('text')
role!: string; // 'user' | 'agent'