mirror of
https://github.com/tiddly-gittly/TidGi-Desktop.git
synced 2026-01-09 10:41:49 -08:00
Replaces string-based logger messages with structured logging throughout the codebase, providing function names, error messages, and relevant context as objects. This improves log readability, enables better filtering and searching, and standardizes error and debug reporting across services.
924 lines
35 KiB
TypeScript
924 lines
35 KiB
TypeScript
import { inject, injectable } from 'inversify';
|
|
import { debounce, pick } from 'lodash';
|
|
import { nanoid } from 'nanoid';
|
|
import { BehaviorSubject, Observable } from 'rxjs';
|
|
import { DataSource, Repository } from 'typeorm';
|
|
|
|
import type { IAgentDefinitionService } from '@services/agentDefinition/interface';
|
|
import { basicPromptConcatHandler } from '@services/agentInstance/buildInAgentHandlers/basicPromptConcatHandler';
|
|
import type { AgentHandler, AgentHandlerContext } from '@services/agentInstance/buildInAgentHandlers/type';
|
|
import { createHooksWithPlugins, initializePluginSystem } from '@services/agentInstance/plugins';
|
|
import { promptConcatStream, PromptConcatStreamState } from '@services/agentInstance/promptConcat/promptConcat';
|
|
import type { AgentPromptDescription } from '@services/agentInstance/promptConcat/promptConcatSchema';
|
|
import { getPromptConcatHandlerConfigJsonSchema } from '@services/agentInstance/promptConcat/promptConcatSchema/jsonSchema';
|
|
import type { IDatabaseService } from '@services/database/interface';
|
|
import { AgentInstanceEntity, AgentInstanceMessageEntity } from '@services/database/schema/agent';
|
|
import { logger } from '@services/libs/log';
|
|
import serviceIdentifier from '@services/serviceIdentifier';
|
|
|
|
import type { AgentInstance, AgentInstanceLatestStatus, AgentInstanceMessage, IAgentInstanceService } from './interface';
|
|
import { AGENT_INSTANCE_FIELDS, createAgentInstanceData, createAgentMessage, MESSAGE_FIELDS, toDatabaseCompatibleInstance, toDatabaseCompatibleMessage } from './utilities';
|
|
|
|
@injectable()
|
|
export class AgentInstanceService implements IAgentInstanceService {
|
|
@inject(serviceIdentifier.Database)
|
|
private readonly databaseService!: IDatabaseService;
|
|
|
|
@inject(serviceIdentifier.AgentDefinition)
|
|
private readonly agentDefinitionService!: IAgentDefinitionService;
|
|
|
|
private dataSource: DataSource | null = null;
|
|
private agentInstanceRepository: Repository<AgentInstanceEntity> | null = null;
|
|
private agentMessageRepository: Repository<AgentInstanceMessageEntity> | null = null;
|
|
|
|
private agentInstanceSubjects: Map<string, BehaviorSubject<AgentInstance | undefined>> = new Map();
|
|
private statusSubjects: Map<string, BehaviorSubject<AgentInstanceLatestStatus | undefined>> = new Map();
|
|
|
|
private agentHandlers: Map<string, AgentHandler> = new Map();
|
|
private handlerSchemas: Map<string, Record<string, unknown>> = new Map();
|
|
private cancelTokenMap: Map<string, { value: boolean }> = new Map();
|
|
private debouncedUpdateFunctions: Map<string, (message: AgentInstanceLatestStatus['message'] & { id: string }, agentId?: string) => void> = new Map();
|
|
|
|
public async initialize(): Promise<void> {
|
|
try {
|
|
await this.initializeDatabase();
|
|
await this.initializeHandlers();
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to initialize agent instance service: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
private async initializeDatabase(): Promise<void> {
|
|
try {
|
|
// Database is already initialized in the agent definition service
|
|
this.dataSource = await this.databaseService.getDatabase('agent');
|
|
this.agentInstanceRepository = this.dataSource.getRepository(AgentInstanceEntity);
|
|
this.agentMessageRepository = this.dataSource.getRepository(AgentInstanceMessageEntity);
|
|
logger.debug('AgentInstance repositories initialized');
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to initialize agent instance database: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async initializeHandlers(): Promise<void> {
|
|
try {
|
|
// Register plugins to global registry once during initialization
|
|
await initializePluginSystem();
|
|
logger.debug('AgentInstance Plugin system initialized and plugins registered to global registry');
|
|
|
|
// Register built-in handlers
|
|
this.registerBuiltinHandlers();
|
|
logger.debug('AgentInstance handlers registered');
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to initialize agent instance handlers: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public registerBuiltinHandlers(): void {
|
|
// Plugins are already registered in initialize(), so we only register handlers here
|
|
// Register basic prompt concatenation handler with its schema
|
|
this.registerHandler('basicPromptConcatHandler', basicPromptConcatHandler, getPromptConcatHandlerConfigJsonSchema());
|
|
}
|
|
|
|
/**
|
|
* Register a handler with an optional schema
|
|
* @param handlerId ID for the handler
|
|
* @param handler The handler function
|
|
* @param schema Optional JSON schema for the handler configuration
|
|
*/
|
|
private registerHandler(handlerId: string, handler: AgentHandler, schema?: Record<string, unknown>): void {
|
|
this.agentHandlers.set(handlerId, handler);
|
|
if (schema) {
|
|
this.handlerSchemas.set(handlerId, schema);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Ensure repositories are initialized
|
|
*/
|
|
private ensureRepositories(): void {
|
|
if (!this.agentInstanceRepository || !this.agentMessageRepository) {
|
|
throw new Error('Agent instance repositories not initialized');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Clean up subscriptions for specific agent
|
|
*/
|
|
private cleanupAgentSubscriptions(agentId: string): void {
|
|
if (this.agentInstanceSubjects.has(agentId)) {
|
|
this.agentInstanceSubjects.delete(agentId);
|
|
}
|
|
|
|
// Clean up all status subscriptions related to this agent
|
|
for (const [key, _] of this.statusSubjects.entries()) {
|
|
if (key.startsWith(`${agentId}:`)) {
|
|
this.statusSubjects.delete(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
public async createAgent(agentDefinitionID?: string, options?: { preview?: boolean }): Promise<AgentInstance> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
// Get agent definition
|
|
const agentDefinition = await this.agentDefinitionService.getAgentDef(agentDefinitionID);
|
|
if (!agentDefinition) {
|
|
throw new Error(`Agent definition not found: ${agentDefinitionID}`);
|
|
}
|
|
|
|
// Create new agent instance using utility function
|
|
// Ensure required fields exist before creating instance
|
|
if (!agentDefinition.name) {
|
|
throw new Error(`Agent definition missing required field 'name': ${agentDefinitionID}`);
|
|
}
|
|
|
|
const { instanceData, instanceId, now } = createAgentInstanceData(agentDefinition as Required<Pick<typeof agentDefinition, 'name'>> & typeof agentDefinition);
|
|
|
|
// Mark as preview if specified
|
|
if (options?.preview) {
|
|
instanceData.volatile = true;
|
|
}
|
|
|
|
// Create and save entity
|
|
const instanceEntity = this.agentInstanceRepository!.create(toDatabaseCompatibleInstance(instanceData));
|
|
await this.agentInstanceRepository!.save(instanceEntity);
|
|
logger.info('Created agent instance', {
|
|
function: 'createAgent',
|
|
instanceId,
|
|
preview: !!options?.preview,
|
|
});
|
|
|
|
// Return complete instance object
|
|
return {
|
|
...instanceData,
|
|
created: now,
|
|
modified: now,
|
|
};
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to create agent instance: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async getAgent(agentId: string): Promise<AgentInstance | undefined> {
|
|
this.ensureRepositories();
|
|
try {
|
|
// Query agent instance with messages in chronological order (oldest first)
|
|
const instanceEntity = await this.agentInstanceRepository!.findOne({
|
|
where: { id: agentId },
|
|
relations: ['messages'],
|
|
order: {
|
|
messages: {
|
|
modified: 'ASC', // Ensure messages are sorted in ascending order by creation time, otherwise streaming will update it and cause wrong order
|
|
},
|
|
},
|
|
});
|
|
if (!instanceEntity) {
|
|
return undefined;
|
|
}
|
|
const messages = (instanceEntity.messages || []).slice().sort((a, b) => {
|
|
const aTime = a.created ? new Date(a.created).getTime() : (a.modified ? new Date(a.modified).getTime() : 0);
|
|
const bTime = b.created ? new Date(b.created).getTime() : (b.modified ? new Date(b.modified).getTime() : 0);
|
|
return aTime - bTime;
|
|
});
|
|
return {
|
|
...pick(instanceEntity, AGENT_INSTANCE_FIELDS),
|
|
messages,
|
|
};
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to get agent instance: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async updateAgent(agentId: string, data: Partial<AgentInstance>): Promise<AgentInstance> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
// Get existing instance with messages
|
|
const instanceEntity = await this.agentInstanceRepository!.findOne({
|
|
where: { id: agentId },
|
|
relations: ['messages'],
|
|
order: {
|
|
messages: {
|
|
modified: 'ASC', // Ensure messages are sorted in ascending order by creation time, otherwise streaming will update it and cause wrong order
|
|
},
|
|
},
|
|
});
|
|
|
|
if (!instanceEntity) {
|
|
throw new Error(`Agent instance not found: ${agentId}`);
|
|
}
|
|
|
|
// Update fields using pick + Object.assign for consistency with updateAgentDef
|
|
const pickedProperties = pick(data, ['name', 'status', 'avatarUrl', 'aiApiConfig', 'closed', 'handlerConfig']);
|
|
Object.assign(instanceEntity, pickedProperties);
|
|
|
|
// Save instance updates
|
|
await this.agentInstanceRepository!.save(instanceEntity);
|
|
|
|
// Handle message updates if provided
|
|
if (data.messages && data.messages.length > 0) {
|
|
// Create entities for new messages and update existing ones
|
|
for (const message of data.messages) {
|
|
// Check if message already exists
|
|
const existingMessage = instanceEntity.messages?.find(m => m.id === message.id);
|
|
|
|
if (existingMessage) {
|
|
// Update existing message
|
|
existingMessage.content = message.content;
|
|
existingMessage.modified = message.modified || new Date();
|
|
if (message.metadata) existingMessage.metadata = message.metadata;
|
|
if (message.contentType) existingMessage.contentType = message.contentType;
|
|
|
|
await this.agentMessageRepository!.save(existingMessage);
|
|
} else {
|
|
// Create new message
|
|
const messageData = pick(message, MESSAGE_FIELDS) as AgentInstanceMessage;
|
|
const messageEntity = this.agentMessageRepository!.create(toDatabaseCompatibleMessage(messageData));
|
|
|
|
await this.agentMessageRepository!.save(messageEntity);
|
|
|
|
// Add new message to the instance entity
|
|
if (!instanceEntity.messages) {
|
|
instanceEntity.messages = [];
|
|
}
|
|
instanceEntity.messages.push(messageEntity);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Construct the response object directly from the entity
|
|
// This avoids an additional database query with getAgent()
|
|
const updatedAgent: AgentInstance = {
|
|
...pick(instanceEntity, AGENT_INSTANCE_FIELDS),
|
|
messages: instanceEntity.messages || [],
|
|
};
|
|
|
|
// Notify subscribers about the updates with the already available data
|
|
// This avoids another database query within notifyAgentUpdate
|
|
this.notifyAgentUpdate(agentId, updatedAgent);
|
|
|
|
return updatedAgent;
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to update agent instance: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async deleteAgent(agentId: string): Promise<void> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
// First delete all messages for this agent
|
|
await this.agentMessageRepository!.delete({ agentId });
|
|
|
|
// Then delete the agent instance
|
|
await this.agentInstanceRepository!.delete(agentId);
|
|
|
|
// Clean up subscriptions related to this agent
|
|
this.cleanupAgentSubscriptions(agentId);
|
|
|
|
logger.info(`Deleted agent instance: ${agentId}`);
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to delete agent instance: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async getAgents(
|
|
page: number,
|
|
pageSize: number,
|
|
options?: { closed?: boolean; searchName?: string },
|
|
): Promise<Omit<AgentInstance, 'messages'>[]> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
const skip = (page - 1) * pageSize;
|
|
const take = pageSize;
|
|
|
|
// Build query conditions
|
|
const whereCondition: Record<string, unknown> = {};
|
|
|
|
// Always exclude preview instances from normal listing
|
|
whereCondition.preview = false;
|
|
|
|
// Add closed filter if provided
|
|
if (options && options.closed !== undefined) {
|
|
whereCondition.closed = options.closed;
|
|
}
|
|
|
|
// Add name search filter if provided
|
|
if (options && options.searchName) {
|
|
whereCondition.name = { like: `%${options.searchName}%` };
|
|
}
|
|
|
|
const [instances, _] = await this.agentInstanceRepository!.findAndCount({
|
|
where: Object.keys(whereCondition).length > 0 ? whereCondition : undefined,
|
|
skip,
|
|
take,
|
|
order: {
|
|
// Sort by creation time descending
|
|
created: 'DESC',
|
|
},
|
|
});
|
|
|
|
return instances.map(entity => pick(entity, AGENT_INSTANCE_FIELDS));
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to get agent instances: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async sendMsgToAgent(agentId: string, content: { text: string; file?: File }): Promise<void> {
|
|
try {
|
|
// Get agent instance
|
|
const agentInstance = await this.getAgent(agentId);
|
|
if (!agentInstance) {
|
|
throw new Error(`Agent instance not found: ${agentId}`);
|
|
}
|
|
|
|
// Create user message
|
|
const messageId = nanoid();
|
|
const now = new Date();
|
|
|
|
// Get agent configuration
|
|
const agentDefinition = await this.agentDefinitionService.getAgentDef(agentInstance.agentDefId);
|
|
if (!agentDefinition) {
|
|
throw new Error(`Agent definition not found: ${agentInstance.agentDefId}`);
|
|
}
|
|
|
|
// Get appropriate handler
|
|
const handlerId = agentDefinition.handlerID;
|
|
if (!handlerId) {
|
|
throw new Error(`Handler ID not found in agent definition: ${agentDefinition.id}`);
|
|
}
|
|
const handler = this.agentHandlers.get(handlerId);
|
|
if (!handler) {
|
|
throw new Error(`Handler not found: ${handlerId}`);
|
|
}
|
|
|
|
// Create handler context with temporary message added for processing
|
|
const cancelToken = { value: false };
|
|
this.cancelTokenMap.set(agentId, cancelToken);
|
|
const handlerContext: AgentHandlerContext = {
|
|
agent: {
|
|
...agentInstance,
|
|
messages: [...agentInstance.messages],
|
|
status: {
|
|
state: 'working',
|
|
modified: now,
|
|
},
|
|
},
|
|
agentDef: agentDefinition,
|
|
isCancelled: () => cancelToken.value,
|
|
};
|
|
|
|
// Create fresh hooks for this handler execution and register plugins based on handlerConfig
|
|
const { hooks: handlerHooks } = await createHooksWithPlugins(agentDefinition.handlerConfig || {});
|
|
|
|
// Trigger userMessageReceived hook with the configured plugins
|
|
await handlerHooks.userMessageReceived.promise({
|
|
handlerContext,
|
|
content,
|
|
messageId,
|
|
timestamp: now,
|
|
});
|
|
|
|
// Notify agent update after user message is added
|
|
this.notifyAgentUpdate(agentId, handlerContext.agent);
|
|
|
|
try {
|
|
// Create async generator
|
|
const generator = handler(handlerContext);
|
|
|
|
// Track the last message for completion handling
|
|
let lastResult: AgentInstanceLatestStatus | undefined;
|
|
|
|
for await (const result of generator) {
|
|
// Update status subscribers for specific message
|
|
if (result.message?.content) {
|
|
// Ensure message has correct modification timestamp
|
|
if (!result.message.modified) {
|
|
result.message.modified = new Date();
|
|
}
|
|
|
|
// Update status subscribers directly
|
|
const statusKey = `${agentId}:${result.message.id}`;
|
|
if (this.statusSubjects.has(statusKey)) {
|
|
this.statusSubjects.get(statusKey)?.next(result);
|
|
}
|
|
|
|
// Notify agent update with latest messages for real-time UI updates
|
|
this.notifyAgentUpdate(agentId, handlerContext.agent);
|
|
}
|
|
|
|
// Store the last result for completion handling
|
|
lastResult = result;
|
|
}
|
|
|
|
// Handle stream completion
|
|
if (lastResult?.message) {
|
|
// Complete the message stream directly using the last message from the generator
|
|
const statusKey = `${agentId}:${lastResult.message.id}`;
|
|
if (this.statusSubjects.has(statusKey)) {
|
|
const subject = this.statusSubjects.get(statusKey);
|
|
if (subject) {
|
|
// Send final update with completed state
|
|
subject.next({
|
|
state: 'completed',
|
|
message: lastResult.message,
|
|
modified: new Date(),
|
|
});
|
|
// Complete the Observable and remove the subject
|
|
subject.complete();
|
|
this.statusSubjects.delete(statusKey);
|
|
}
|
|
}
|
|
|
|
// Trigger agentStatusChanged hook for completion
|
|
await handlerHooks.agentStatusChanged.promise({
|
|
handlerContext,
|
|
status: {
|
|
state: 'completed',
|
|
modified: new Date(),
|
|
},
|
|
});
|
|
}
|
|
|
|
// Remove cancel token after generator completes
|
|
this.cancelTokenMap.delete(agentId);
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Agent handler execution failed: ${errorMessage}`);
|
|
|
|
// Trigger agentStatusChanged hook for failure
|
|
await handlerHooks.agentStatusChanged.promise({
|
|
handlerContext,
|
|
status: {
|
|
state: 'failed',
|
|
modified: new Date(),
|
|
},
|
|
}).catch(() => {
|
|
// Ignore hook errors during error handling
|
|
});
|
|
|
|
// Remove cancel token
|
|
this.cancelTokenMap.delete(agentId);
|
|
throw error;
|
|
}
|
|
} catch (error: unknown) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to send message to agent: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async cancelAgent(agentId: string): Promise<void> {
|
|
// Try to get cancel token
|
|
const cancelToken = this.cancelTokenMap.get(agentId);
|
|
|
|
if (cancelToken) {
|
|
// Set cancel flag
|
|
cancelToken.value = true;
|
|
|
|
try {
|
|
// Update agent status to canceled
|
|
logger.debug(`cancelAgent called for ${agentId} - updating agent status to canceled`);
|
|
await this.updateAgent(agentId, {
|
|
status: {
|
|
state: 'canceled',
|
|
modified: new Date(),
|
|
},
|
|
});
|
|
logger.debug(`updateAgent returned for cancelAgent ${agentId}`);
|
|
|
|
// Propagate canceled status to any message-specific subscriptions so UI can react
|
|
try {
|
|
logger.debug('propagating canceled status to message-specific subscriptions', { function: 'cancelAgent', agentId });
|
|
const agent = await this.getAgent(agentId);
|
|
if (agent && agent.messages) {
|
|
for (const key of Array.from(this.statusSubjects.keys())) {
|
|
if (key.startsWith(`${agentId}:`)) {
|
|
const parts = key.split(':');
|
|
const messageId = parts[1];
|
|
const subject = this.statusSubjects.get(key);
|
|
const message = agent.messages.find(m => m.id === messageId);
|
|
if (subject) {
|
|
try {
|
|
const message_ = message || ({} as AgentInstanceMessage);
|
|
logger.debug('propagate canceled to subscription', { function: 'cancelAgent', subscriptionKey: key });
|
|
subject.next({
|
|
state: 'canceled',
|
|
message: message_,
|
|
modified: new Date(),
|
|
});
|
|
} catch {
|
|
// ignore
|
|
}
|
|
try {
|
|
subject.complete();
|
|
} catch {
|
|
// ignore
|
|
}
|
|
this.statusSubjects.delete(key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.warn('Failed to propagate cancel status to message subscriptions', { function: 'cancelAgent', error: String(error) });
|
|
}
|
|
|
|
// Remove cancel token from map
|
|
this.cancelTokenMap.delete(agentId);
|
|
|
|
logger.info('Canceled agent instance', {
|
|
function: 'cancelAgent',
|
|
agentId,
|
|
});
|
|
} catch (error: unknown) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error('Failed to cancel agent instance', {
|
|
function: 'cancelAgent',
|
|
error: errorMessage,
|
|
});
|
|
throw error;
|
|
}
|
|
} else {
|
|
logger.warn(`No active operation found for agent: ${agentId}`);
|
|
}
|
|
}
|
|
|
|
public async closeAgent(agentId: string): Promise<void> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
// Get agent instance
|
|
const instanceEntity = await this.agentInstanceRepository!.findOne({
|
|
where: { id: agentId },
|
|
});
|
|
|
|
if (!instanceEntity) {
|
|
throw new Error(`Agent instance not found: ${agentId}`);
|
|
}
|
|
|
|
// Mark as closed
|
|
instanceEntity.closed = true;
|
|
await this.agentInstanceRepository!.save(instanceEntity);
|
|
|
|
// Cancel any ongoing operations
|
|
if (this.cancelTokenMap.has(agentId)) {
|
|
const token = this.cancelTokenMap.get(agentId);
|
|
if (token) {
|
|
token.value = true;
|
|
}
|
|
this.cancelTokenMap.delete(agentId);
|
|
}
|
|
|
|
// Clean up subscriptions
|
|
this.cleanupAgentSubscriptions(agentId);
|
|
|
|
logger.info('Closed agent instance', {
|
|
function: 'closeAgent',
|
|
agentId,
|
|
});
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error('Failed to close agent instance', {
|
|
function: 'closeAgent',
|
|
error: errorMessage,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public subscribeToAgentUpdates(agentId: string): Observable<AgentInstance | undefined>;
|
|
/**
|
|
* Subscribe to agent instance message status updates
|
|
*/
|
|
public subscribeToAgentUpdates(agentId: string, messageId: string): Observable<AgentInstanceLatestStatus | undefined>;
|
|
public subscribeToAgentUpdates(agentId: string, messageId?: string): Observable<AgentInstance | AgentInstanceLatestStatus | undefined> {
|
|
// If messageId provided, subscribe to specific message status updates
|
|
if (messageId) {
|
|
const statusKey = `${agentId}:${messageId}`;
|
|
if (!this.statusSubjects.has(statusKey)) {
|
|
this.statusSubjects.set(statusKey, new BehaviorSubject<AgentInstanceLatestStatus | undefined>(undefined));
|
|
|
|
// Try to get initial status
|
|
this.getAgent(agentId).then(agent => {
|
|
if (agent) {
|
|
const message = agent.messages.find(m => m.id === messageId);
|
|
if (message) {
|
|
// 创建状态对象,注意不再检查 isComplete
|
|
const status: AgentInstanceLatestStatus = {
|
|
state: agent.status.state,
|
|
message,
|
|
modified: message.modified,
|
|
};
|
|
|
|
this.statusSubjects.get(statusKey)?.next(status);
|
|
}
|
|
}
|
|
}).catch((error: unknown) => {
|
|
logger.error('Failed to get initial status for message', { function: 'subscribeToAgentUpdates', error: String(error) });
|
|
});
|
|
}
|
|
|
|
return this.statusSubjects.get(statusKey)!.asObservable();
|
|
}
|
|
|
|
// If no messageId provided, subscribe to entire agent instance updates
|
|
if (!this.agentInstanceSubjects.has(agentId)) {
|
|
this.agentInstanceSubjects.set(agentId, new BehaviorSubject<AgentInstance | undefined>(undefined));
|
|
|
|
// Try to get initial data
|
|
this.getAgent(agentId).then(agent => {
|
|
this.agentInstanceSubjects.get(agentId)?.next(agent);
|
|
}).catch((error: unknown) => {
|
|
logger.error('Failed to get initial agent data', { function: 'subscribeToAgentUpdates', error: String(error) });
|
|
});
|
|
}
|
|
|
|
return this.agentInstanceSubjects.get(agentId)!.asObservable();
|
|
}
|
|
|
|
/**
|
|
* Notify agent subscription of updates
|
|
* @param agentId Agent ID
|
|
* @param agentData Agent data to use for notification
|
|
*/
|
|
private notifyAgentUpdate(agentId: string, agentData: AgentInstance): void {
|
|
try {
|
|
// Only notify if there are active subscriptions
|
|
if (this.agentInstanceSubjects.has(agentId)) {
|
|
// Use the provided data for notification (no database query)
|
|
this.agentInstanceSubjects.get(agentId)?.next(agentData);
|
|
}
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to notify agent update: ${errorMessage}`);
|
|
}
|
|
}
|
|
|
|
public async saveUserMessage(userMessage: AgentInstanceMessage): Promise<void> {
|
|
this.ensureRepositories();
|
|
try {
|
|
const now = new Date();
|
|
const summary = {
|
|
id: userMessage.id,
|
|
role: userMessage.role,
|
|
agentId: userMessage.agentId,
|
|
isToolResult: !!userMessage.metadata?.isToolResult,
|
|
isPersisted: !!userMessage.metadata?.isPersisted,
|
|
};
|
|
logger.debug('Saving user message to DB (start)', {
|
|
when: now.toISOString(),
|
|
...summary,
|
|
source: 'saveUserMessage',
|
|
stack: new Error().stack?.split('\n').slice(0, 4).join('\n'),
|
|
});
|
|
|
|
await this.agentMessageRepository!.save(this.agentMessageRepository!.create(toDatabaseCompatibleMessage(userMessage)));
|
|
|
|
logger.debug('User message saved to database', {
|
|
when: new Date().toISOString(),
|
|
...summary,
|
|
source: 'saveUserMessage',
|
|
});
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to save user message: ${errorMessage}`, {
|
|
messageId: userMessage.id,
|
|
agentId: userMessage.agentId,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public debounceUpdateMessage(
|
|
message: AgentInstanceMessage,
|
|
agentId?: string,
|
|
debounceMs = 300,
|
|
): void {
|
|
const messageId = message.id;
|
|
|
|
// Update status subscribers for specific message if available
|
|
if (agentId) {
|
|
const statusKey = `${agentId}:${messageId}`;
|
|
if (this.statusSubjects.has(statusKey)) {
|
|
this.statusSubjects.get(statusKey)?.next({
|
|
state: 'working',
|
|
message,
|
|
modified: message.modified ?? new Date(),
|
|
});
|
|
}
|
|
}
|
|
|
|
// Lazy load or get existing debounced function
|
|
if (!this.debouncedUpdateFunctions.has(messageId)) {
|
|
// Create debounced function for each message ID
|
|
const debouncedUpdate = debounce(
|
|
async (messageData_: AgentInstanceMessage, aid?: string) => {
|
|
try {
|
|
this.ensureRepositories();
|
|
// ensureRepositories guarantees dataSource is available
|
|
await this.dataSource!.transaction(async transaction => {
|
|
const messageRepo = transaction.getRepository(AgentInstanceMessageEntity);
|
|
const messageEntity = await messageRepo.findOne({
|
|
where: { id: messageId },
|
|
});
|
|
|
|
if (messageEntity) {
|
|
// Update message content
|
|
messageEntity.content = messageData_.content;
|
|
if (messageData_.contentType) messageEntity.contentType = messageData_.contentType;
|
|
if (messageData_.metadata) messageEntity.metadata = messageData_.metadata;
|
|
if (messageData_.duration !== undefined) messageEntity.duration = messageData_.duration ?? undefined; // Fix: Update duration field
|
|
// Preserve provided modified; if not provided, keep existing DB value to avoid late overwrites
|
|
// Only adjust modified if the incoming timestamp is earlier; otherwise leave DB value unchanged
|
|
if (messageData_.modified instanceof Date) {
|
|
if (!messageEntity.modified || messageData_.modified.getTime() < new Date(messageEntity.modified).getTime()) {
|
|
messageEntity.modified = messageData_.modified;
|
|
}
|
|
}
|
|
|
|
const startSave = new Date();
|
|
logger.debug('Updating existing message (start save)', {
|
|
when: startSave.toISOString(),
|
|
messageId,
|
|
agentId: aid,
|
|
source: 'debounceUpdateMessage:update',
|
|
stack: new Error().stack?.split('\n').slice(0, 4).join('\n'),
|
|
});
|
|
await messageRepo.save(messageEntity);
|
|
logger.debug('Updating existing message (saved)', {
|
|
when: new Date().toISOString(),
|
|
messageId,
|
|
agentId: aid,
|
|
source: 'debounceUpdateMessage:update',
|
|
});
|
|
} else if (aid) {
|
|
// Create new message if it doesn't exist and agentId provided
|
|
// Create message using utility function
|
|
const messageData = createAgentMessage(messageId, aid, {
|
|
role: messageData_.role,
|
|
content: messageData_.content,
|
|
contentType: messageData_.contentType,
|
|
metadata: messageData_.metadata,
|
|
duration: messageData_.duration, // Include duration for new messages
|
|
});
|
|
const newMessage = messageRepo.create(toDatabaseCompatibleMessage(messageData));
|
|
|
|
const startSaveNew = new Date();
|
|
logger.debug('Creating new message (start save)', {
|
|
when: startSaveNew.toISOString(),
|
|
messageId,
|
|
agentId: aid,
|
|
source: 'debounceUpdateMessage:create',
|
|
stack: new Error().stack?.split('\n').slice(0, 4).join('\n'),
|
|
});
|
|
await messageRepo.save(newMessage);
|
|
logger.debug('Creating new message (saved)', {
|
|
when: new Date().toISOString(),
|
|
messageId,
|
|
agentId: aid,
|
|
source: 'debounceUpdateMessage:create',
|
|
});
|
|
|
|
// Get agent instance repository for transaction
|
|
const agentRepo = transaction.getRepository(AgentInstanceEntity);
|
|
|
|
// Get agent instance within the current transaction
|
|
const agentEntity = await agentRepo.findOne({
|
|
where: { id: aid },
|
|
relations: ['messages'],
|
|
});
|
|
|
|
if (agentEntity) {
|
|
// Add the new message to the agent entity
|
|
if (!agentEntity.messages) {
|
|
agentEntity.messages = [];
|
|
}
|
|
agentEntity.messages.push(newMessage);
|
|
|
|
// Save the updated agent entity
|
|
await agentRepo.save(agentEntity);
|
|
|
|
// Construct agent data from entity directly without additional query
|
|
const updatedAgent: AgentInstance = {
|
|
...pick(agentEntity, AGENT_INSTANCE_FIELDS),
|
|
messages: agentEntity.messages,
|
|
};
|
|
|
|
// Notify subscribers directly without additional queries
|
|
if (this.agentInstanceSubjects.has(aid)) {
|
|
this.agentInstanceSubjects.get(aid)?.next(updatedAgent);
|
|
logger.debug(`Notified agent subscribers of new message: ${messageId}`, {
|
|
method: 'debounceUpdateMessage',
|
|
agentId: aid,
|
|
});
|
|
}
|
|
} else {
|
|
logger.warn(`Agent instance not found for message: ${messageId}`);
|
|
}
|
|
} else {
|
|
logger.warn(`Cannot create message: missing agent ID for message ID: ${messageId}`);
|
|
}
|
|
});
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to update/create message content: ${errorMessage}`);
|
|
}
|
|
},
|
|
debounceMs,
|
|
);
|
|
|
|
this.debouncedUpdateFunctions.set(messageId, debouncedUpdate);
|
|
}
|
|
|
|
// Call debounced function
|
|
const debouncedFunction = this.debouncedUpdateFunctions.get(messageId);
|
|
if (debouncedFunction) {
|
|
debouncedFunction(message, agentId);
|
|
}
|
|
}
|
|
|
|
public concatPrompt(promptDescription: Pick<AgentPromptDescription, 'handlerConfig'>, messages: AgentInstanceMessage[]): Observable<PromptConcatStreamState> {
|
|
logger.debug('AgentInstanceService.concatPrompt called', {
|
|
hasPromptConfig: !!promptDescription.handlerConfig,
|
|
promptConfigKeys: Object.keys(promptDescription.handlerConfig),
|
|
messagesCount: messages.length,
|
|
});
|
|
|
|
return new Observable<PromptConcatStreamState>((observer) => {
|
|
const processStream = async () => {
|
|
try {
|
|
// Create a minimal handler context for prompt concatenation
|
|
const handlerContext = {
|
|
agent: {
|
|
id: 'temp',
|
|
messages,
|
|
agentDefId: 'temp',
|
|
status: { state: 'working' as const, modified: new Date() },
|
|
created: new Date(),
|
|
handlerConfig: {},
|
|
},
|
|
agentDef: { id: 'temp', name: 'temp', handlerConfig: promptDescription.handlerConfig },
|
|
isCancelled: () => false,
|
|
};
|
|
|
|
const streamGenerator = promptConcatStream(promptDescription as AgentPromptDescription, messages, handlerContext);
|
|
for await (const state of streamGenerator) {
|
|
observer.next(state);
|
|
if (state.isComplete) {
|
|
observer.complete();
|
|
break;
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.error('Error in AgentInstanceService.concatPrompt', {
|
|
error: error instanceof Error ? error.message : String(error),
|
|
promptDescriptionId: (promptDescription as AgentPromptDescription).id,
|
|
messagesCount: messages.length,
|
|
});
|
|
observer.error(error);
|
|
}
|
|
};
|
|
void processStream();
|
|
});
|
|
}
|
|
|
|
public getHandlerConfigSchema(handlerId: string): Record<string, unknown> {
|
|
try {
|
|
logger.debug('AgentInstanceService.getHandlerConfigSchema called', { handlerId });
|
|
// Check if we have a schema for this handler
|
|
const schema = this.handlerSchemas.get(handlerId);
|
|
if (schema) {
|
|
return schema;
|
|
}
|
|
// If no schema found, return an empty schema
|
|
logger.warn(`No schema found for handler: ${handlerId}`);
|
|
return { type: 'object', properties: {} };
|
|
} catch (error) {
|
|
logger.error('Error in AgentInstanceService.getHandlerConfigSchema', {
|
|
error: error instanceof Error ? error.message : String(error),
|
|
handlerId,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
}
|