TidGi-Desktop/src/services/agentInstance/index.ts
lin onetwo 5372097d42 Refactor: logging for structured and consistent output
Replaces string-based logger messages with structured logging throughout the codebase, providing function names, error messages, and relevant context as objects. This improves log readability, enables better filtering and searching, and standardizes error and debug reporting across services.
2025-10-09 15:59:16 +08:00

924 lines
35 KiB
TypeScript

import { inject, injectable } from 'inversify';
import { debounce, pick } from 'lodash';
import { nanoid } from 'nanoid';
import { BehaviorSubject, Observable } from 'rxjs';
import { DataSource, Repository } from 'typeorm';
import type { IAgentDefinitionService } from '@services/agentDefinition/interface';
import { basicPromptConcatHandler } from '@services/agentInstance/buildInAgentHandlers/basicPromptConcatHandler';
import type { AgentHandler, AgentHandlerContext } from '@services/agentInstance/buildInAgentHandlers/type';
import { createHooksWithPlugins, initializePluginSystem } from '@services/agentInstance/plugins';
import { promptConcatStream, PromptConcatStreamState } from '@services/agentInstance/promptConcat/promptConcat';
import type { AgentPromptDescription } from '@services/agentInstance/promptConcat/promptConcatSchema';
import { getPromptConcatHandlerConfigJsonSchema } from '@services/agentInstance/promptConcat/promptConcatSchema/jsonSchema';
import type { IDatabaseService } from '@services/database/interface';
import { AgentInstanceEntity, AgentInstanceMessageEntity } from '@services/database/schema/agent';
import { logger } from '@services/libs/log';
import serviceIdentifier from '@services/serviceIdentifier';
import type { AgentInstance, AgentInstanceLatestStatus, AgentInstanceMessage, IAgentInstanceService } from './interface';
import { AGENT_INSTANCE_FIELDS, createAgentInstanceData, createAgentMessage, MESSAGE_FIELDS, toDatabaseCompatibleInstance, toDatabaseCompatibleMessage } from './utilities';
@injectable()
export class AgentInstanceService implements IAgentInstanceService {
@inject(serviceIdentifier.Database)
private readonly databaseService!: IDatabaseService;
@inject(serviceIdentifier.AgentDefinition)
private readonly agentDefinitionService!: IAgentDefinitionService;
private dataSource: DataSource | null = null;
private agentInstanceRepository: Repository<AgentInstanceEntity> | null = null;
private agentMessageRepository: Repository<AgentInstanceMessageEntity> | null = null;
private agentInstanceSubjects: Map<string, BehaviorSubject<AgentInstance | undefined>> = new Map();
private statusSubjects: Map<string, BehaviorSubject<AgentInstanceLatestStatus | undefined>> = new Map();
private agentHandlers: Map<string, AgentHandler> = new Map();
private handlerSchemas: Map<string, Record<string, unknown>> = new Map();
private cancelTokenMap: Map<string, { value: boolean }> = new Map();
private debouncedUpdateFunctions: Map<string, (message: AgentInstanceLatestStatus['message'] & { id: string }, agentId?: string) => void> = new Map();
public async initialize(): Promise<void> {
try {
await this.initializeDatabase();
await this.initializeHandlers();
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to initialize agent instance service: ${errorMessage}`);
throw error;
}
}
private async initializeDatabase(): Promise<void> {
try {
// Database is already initialized in the agent definition service
this.dataSource = await this.databaseService.getDatabase('agent');
this.agentInstanceRepository = this.dataSource.getRepository(AgentInstanceEntity);
this.agentMessageRepository = this.dataSource.getRepository(AgentInstanceMessageEntity);
logger.debug('AgentInstance repositories initialized');
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to initialize agent instance database: ${errorMessage}`);
throw error;
}
}
public async initializeHandlers(): Promise<void> {
try {
// Register plugins to global registry once during initialization
await initializePluginSystem();
logger.debug('AgentInstance Plugin system initialized and plugins registered to global registry');
// Register built-in handlers
this.registerBuiltinHandlers();
logger.debug('AgentInstance handlers registered');
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to initialize agent instance handlers: ${errorMessage}`);
throw error;
}
}
public registerBuiltinHandlers(): void {
// Plugins are already registered in initialize(), so we only register handlers here
// Register basic prompt concatenation handler with its schema
this.registerHandler('basicPromptConcatHandler', basicPromptConcatHandler, getPromptConcatHandlerConfigJsonSchema());
}
/**
* Register a handler with an optional schema
* @param handlerId ID for the handler
* @param handler The handler function
* @param schema Optional JSON schema for the handler configuration
*/
private registerHandler(handlerId: string, handler: AgentHandler, schema?: Record<string, unknown>): void {
this.agentHandlers.set(handlerId, handler);
if (schema) {
this.handlerSchemas.set(handlerId, schema);
}
}
/**
* Ensure repositories are initialized
*/
private ensureRepositories(): void {
if (!this.agentInstanceRepository || !this.agentMessageRepository) {
throw new Error('Agent instance repositories not initialized');
}
}
/**
* Clean up subscriptions for specific agent
*/
private cleanupAgentSubscriptions(agentId: string): void {
if (this.agentInstanceSubjects.has(agentId)) {
this.agentInstanceSubjects.delete(agentId);
}
// Clean up all status subscriptions related to this agent
for (const [key, _] of this.statusSubjects.entries()) {
if (key.startsWith(`${agentId}:`)) {
this.statusSubjects.delete(key);
}
}
}
public async createAgent(agentDefinitionID?: string, options?: { preview?: boolean }): Promise<AgentInstance> {
this.ensureRepositories();
try {
// Get agent definition
const agentDefinition = await this.agentDefinitionService.getAgentDef(agentDefinitionID);
if (!agentDefinition) {
throw new Error(`Agent definition not found: ${agentDefinitionID}`);
}
// Create new agent instance using utility function
// Ensure required fields exist before creating instance
if (!agentDefinition.name) {
throw new Error(`Agent definition missing required field 'name': ${agentDefinitionID}`);
}
const { instanceData, instanceId, now } = createAgentInstanceData(agentDefinition as Required<Pick<typeof agentDefinition, 'name'>> & typeof agentDefinition);
// Mark as preview if specified
if (options?.preview) {
instanceData.volatile = true;
}
// Create and save entity
const instanceEntity = this.agentInstanceRepository!.create(toDatabaseCompatibleInstance(instanceData));
await this.agentInstanceRepository!.save(instanceEntity);
logger.info('Created agent instance', {
function: 'createAgent',
instanceId,
preview: !!options?.preview,
});
// Return complete instance object
return {
...instanceData,
created: now,
modified: now,
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to create agent instance: ${errorMessage}`);
throw error;
}
}
public async getAgent(agentId: string): Promise<AgentInstance | undefined> {
this.ensureRepositories();
try {
// Query agent instance with messages in chronological order (oldest first)
const instanceEntity = await this.agentInstanceRepository!.findOne({
where: { id: agentId },
relations: ['messages'],
order: {
messages: {
modified: 'ASC', // Ensure messages are sorted in ascending order by creation time, otherwise streaming will update it and cause wrong order
},
},
});
if (!instanceEntity) {
return undefined;
}
const messages = (instanceEntity.messages || []).slice().sort((a, b) => {
const aTime = a.created ? new Date(a.created).getTime() : (a.modified ? new Date(a.modified).getTime() : 0);
const bTime = b.created ? new Date(b.created).getTime() : (b.modified ? new Date(b.modified).getTime() : 0);
return aTime - bTime;
});
return {
...pick(instanceEntity, AGENT_INSTANCE_FIELDS),
messages,
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to get agent instance: ${errorMessage}`);
throw error;
}
}
public async updateAgent(agentId: string, data: Partial<AgentInstance>): Promise<AgentInstance> {
this.ensureRepositories();
try {
// Get existing instance with messages
const instanceEntity = await this.agentInstanceRepository!.findOne({
where: { id: agentId },
relations: ['messages'],
order: {
messages: {
modified: 'ASC', // Ensure messages are sorted in ascending order by creation time, otherwise streaming will update it and cause wrong order
},
},
});
if (!instanceEntity) {
throw new Error(`Agent instance not found: ${agentId}`);
}
// Update fields using pick + Object.assign for consistency with updateAgentDef
const pickedProperties = pick(data, ['name', 'status', 'avatarUrl', 'aiApiConfig', 'closed', 'handlerConfig']);
Object.assign(instanceEntity, pickedProperties);
// Save instance updates
await this.agentInstanceRepository!.save(instanceEntity);
// Handle message updates if provided
if (data.messages && data.messages.length > 0) {
// Create entities for new messages and update existing ones
for (const message of data.messages) {
// Check if message already exists
const existingMessage = instanceEntity.messages?.find(m => m.id === message.id);
if (existingMessage) {
// Update existing message
existingMessage.content = message.content;
existingMessage.modified = message.modified || new Date();
if (message.metadata) existingMessage.metadata = message.metadata;
if (message.contentType) existingMessage.contentType = message.contentType;
await this.agentMessageRepository!.save(existingMessage);
} else {
// Create new message
const messageData = pick(message, MESSAGE_FIELDS) as AgentInstanceMessage;
const messageEntity = this.agentMessageRepository!.create(toDatabaseCompatibleMessage(messageData));
await this.agentMessageRepository!.save(messageEntity);
// Add new message to the instance entity
if (!instanceEntity.messages) {
instanceEntity.messages = [];
}
instanceEntity.messages.push(messageEntity);
}
}
}
// Construct the response object directly from the entity
// This avoids an additional database query with getAgent()
const updatedAgent: AgentInstance = {
...pick(instanceEntity, AGENT_INSTANCE_FIELDS),
messages: instanceEntity.messages || [],
};
// Notify subscribers about the updates with the already available data
// This avoids another database query within notifyAgentUpdate
this.notifyAgentUpdate(agentId, updatedAgent);
return updatedAgent;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to update agent instance: ${errorMessage}`);
throw error;
}
}
public async deleteAgent(agentId: string): Promise<void> {
this.ensureRepositories();
try {
// First delete all messages for this agent
await this.agentMessageRepository!.delete({ agentId });
// Then delete the agent instance
await this.agentInstanceRepository!.delete(agentId);
// Clean up subscriptions related to this agent
this.cleanupAgentSubscriptions(agentId);
logger.info(`Deleted agent instance: ${agentId}`);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to delete agent instance: ${errorMessage}`);
throw error;
}
}
public async getAgents(
page: number,
pageSize: number,
options?: { closed?: boolean; searchName?: string },
): Promise<Omit<AgentInstance, 'messages'>[]> {
this.ensureRepositories();
try {
const skip = (page - 1) * pageSize;
const take = pageSize;
// Build query conditions
const whereCondition: Record<string, unknown> = {};
// Always exclude preview instances from normal listing
whereCondition.preview = false;
// Add closed filter if provided
if (options && options.closed !== undefined) {
whereCondition.closed = options.closed;
}
// Add name search filter if provided
if (options && options.searchName) {
whereCondition.name = { like: `%${options.searchName}%` };
}
const [instances, _] = await this.agentInstanceRepository!.findAndCount({
where: Object.keys(whereCondition).length > 0 ? whereCondition : undefined,
skip,
take,
order: {
// Sort by creation time descending
created: 'DESC',
},
});
return instances.map(entity => pick(entity, AGENT_INSTANCE_FIELDS));
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to get agent instances: ${errorMessage}`);
throw error;
}
}
public async sendMsgToAgent(agentId: string, content: { text: string; file?: File }): Promise<void> {
try {
// Get agent instance
const agentInstance = await this.getAgent(agentId);
if (!agentInstance) {
throw new Error(`Agent instance not found: ${agentId}`);
}
// Create user message
const messageId = nanoid();
const now = new Date();
// Get agent configuration
const agentDefinition = await this.agentDefinitionService.getAgentDef(agentInstance.agentDefId);
if (!agentDefinition) {
throw new Error(`Agent definition not found: ${agentInstance.agentDefId}`);
}
// Get appropriate handler
const handlerId = agentDefinition.handlerID;
if (!handlerId) {
throw new Error(`Handler ID not found in agent definition: ${agentDefinition.id}`);
}
const handler = this.agentHandlers.get(handlerId);
if (!handler) {
throw new Error(`Handler not found: ${handlerId}`);
}
// Create handler context with temporary message added for processing
const cancelToken = { value: false };
this.cancelTokenMap.set(agentId, cancelToken);
const handlerContext: AgentHandlerContext = {
agent: {
...agentInstance,
messages: [...agentInstance.messages],
status: {
state: 'working',
modified: now,
},
},
agentDef: agentDefinition,
isCancelled: () => cancelToken.value,
};
// Create fresh hooks for this handler execution and register plugins based on handlerConfig
const { hooks: handlerHooks } = await createHooksWithPlugins(agentDefinition.handlerConfig || {});
// Trigger userMessageReceived hook with the configured plugins
await handlerHooks.userMessageReceived.promise({
handlerContext,
content,
messageId,
timestamp: now,
});
// Notify agent update after user message is added
this.notifyAgentUpdate(agentId, handlerContext.agent);
try {
// Create async generator
const generator = handler(handlerContext);
// Track the last message for completion handling
let lastResult: AgentInstanceLatestStatus | undefined;
for await (const result of generator) {
// Update status subscribers for specific message
if (result.message?.content) {
// Ensure message has correct modification timestamp
if (!result.message.modified) {
result.message.modified = new Date();
}
// Update status subscribers directly
const statusKey = `${agentId}:${result.message.id}`;
if (this.statusSubjects.has(statusKey)) {
this.statusSubjects.get(statusKey)?.next(result);
}
// Notify agent update with latest messages for real-time UI updates
this.notifyAgentUpdate(agentId, handlerContext.agent);
}
// Store the last result for completion handling
lastResult = result;
}
// Handle stream completion
if (lastResult?.message) {
// Complete the message stream directly using the last message from the generator
const statusKey = `${agentId}:${lastResult.message.id}`;
if (this.statusSubjects.has(statusKey)) {
const subject = this.statusSubjects.get(statusKey);
if (subject) {
// Send final update with completed state
subject.next({
state: 'completed',
message: lastResult.message,
modified: new Date(),
});
// Complete the Observable and remove the subject
subject.complete();
this.statusSubjects.delete(statusKey);
}
}
// Trigger agentStatusChanged hook for completion
await handlerHooks.agentStatusChanged.promise({
handlerContext,
status: {
state: 'completed',
modified: new Date(),
},
});
}
// Remove cancel token after generator completes
this.cancelTokenMap.delete(agentId);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Agent handler execution failed: ${errorMessage}`);
// Trigger agentStatusChanged hook for failure
await handlerHooks.agentStatusChanged.promise({
handlerContext,
status: {
state: 'failed',
modified: new Date(),
},
}).catch(() => {
// Ignore hook errors during error handling
});
// Remove cancel token
this.cancelTokenMap.delete(agentId);
throw error;
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to send message to agent: ${errorMessage}`);
throw error;
}
}
public async cancelAgent(agentId: string): Promise<void> {
// Try to get cancel token
const cancelToken = this.cancelTokenMap.get(agentId);
if (cancelToken) {
// Set cancel flag
cancelToken.value = true;
try {
// Update agent status to canceled
logger.debug(`cancelAgent called for ${agentId} - updating agent status to canceled`);
await this.updateAgent(agentId, {
status: {
state: 'canceled',
modified: new Date(),
},
});
logger.debug(`updateAgent returned for cancelAgent ${agentId}`);
// Propagate canceled status to any message-specific subscriptions so UI can react
try {
logger.debug('propagating canceled status to message-specific subscriptions', { function: 'cancelAgent', agentId });
const agent = await this.getAgent(agentId);
if (agent && agent.messages) {
for (const key of Array.from(this.statusSubjects.keys())) {
if (key.startsWith(`${agentId}:`)) {
const parts = key.split(':');
const messageId = parts[1];
const subject = this.statusSubjects.get(key);
const message = agent.messages.find(m => m.id === messageId);
if (subject) {
try {
const message_ = message || ({} as AgentInstanceMessage);
logger.debug('propagate canceled to subscription', { function: 'cancelAgent', subscriptionKey: key });
subject.next({
state: 'canceled',
message: message_,
modified: new Date(),
});
} catch {
// ignore
}
try {
subject.complete();
} catch {
// ignore
}
this.statusSubjects.delete(key);
}
}
}
}
} catch (error) {
logger.warn('Failed to propagate cancel status to message subscriptions', { function: 'cancelAgent', error: String(error) });
}
// Remove cancel token from map
this.cancelTokenMap.delete(agentId);
logger.info('Canceled agent instance', {
function: 'cancelAgent',
agentId,
});
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error('Failed to cancel agent instance', {
function: 'cancelAgent',
error: errorMessage,
});
throw error;
}
} else {
logger.warn(`No active operation found for agent: ${agentId}`);
}
}
public async closeAgent(agentId: string): Promise<void> {
this.ensureRepositories();
try {
// Get agent instance
const instanceEntity = await this.agentInstanceRepository!.findOne({
where: { id: agentId },
});
if (!instanceEntity) {
throw new Error(`Agent instance not found: ${agentId}`);
}
// Mark as closed
instanceEntity.closed = true;
await this.agentInstanceRepository!.save(instanceEntity);
// Cancel any ongoing operations
if (this.cancelTokenMap.has(agentId)) {
const token = this.cancelTokenMap.get(agentId);
if (token) {
token.value = true;
}
this.cancelTokenMap.delete(agentId);
}
// Clean up subscriptions
this.cleanupAgentSubscriptions(agentId);
logger.info('Closed agent instance', {
function: 'closeAgent',
agentId,
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error('Failed to close agent instance', {
function: 'closeAgent',
error: errorMessage,
});
throw error;
}
}
public subscribeToAgentUpdates(agentId: string): Observable<AgentInstance | undefined>;
/**
* Subscribe to agent instance message status updates
*/
public subscribeToAgentUpdates(agentId: string, messageId: string): Observable<AgentInstanceLatestStatus | undefined>;
public subscribeToAgentUpdates(agentId: string, messageId?: string): Observable<AgentInstance | AgentInstanceLatestStatus | undefined> {
// If messageId provided, subscribe to specific message status updates
if (messageId) {
const statusKey = `${agentId}:${messageId}`;
if (!this.statusSubjects.has(statusKey)) {
this.statusSubjects.set(statusKey, new BehaviorSubject<AgentInstanceLatestStatus | undefined>(undefined));
// Try to get initial status
this.getAgent(agentId).then(agent => {
if (agent) {
const message = agent.messages.find(m => m.id === messageId);
if (message) {
// 创建状态对象,注意不再检查 isComplete
const status: AgentInstanceLatestStatus = {
state: agent.status.state,
message,
modified: message.modified,
};
this.statusSubjects.get(statusKey)?.next(status);
}
}
}).catch((error: unknown) => {
logger.error('Failed to get initial status for message', { function: 'subscribeToAgentUpdates', error: String(error) });
});
}
return this.statusSubjects.get(statusKey)!.asObservable();
}
// If no messageId provided, subscribe to entire agent instance updates
if (!this.agentInstanceSubjects.has(agentId)) {
this.agentInstanceSubjects.set(agentId, new BehaviorSubject<AgentInstance | undefined>(undefined));
// Try to get initial data
this.getAgent(agentId).then(agent => {
this.agentInstanceSubjects.get(agentId)?.next(agent);
}).catch((error: unknown) => {
logger.error('Failed to get initial agent data', { function: 'subscribeToAgentUpdates', error: String(error) });
});
}
return this.agentInstanceSubjects.get(agentId)!.asObservable();
}
/**
* Notify agent subscription of updates
* @param agentId Agent ID
* @param agentData Agent data to use for notification
*/
private notifyAgentUpdate(agentId: string, agentData: AgentInstance): void {
try {
// Only notify if there are active subscriptions
if (this.agentInstanceSubjects.has(agentId)) {
// Use the provided data for notification (no database query)
this.agentInstanceSubjects.get(agentId)?.next(agentData);
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to notify agent update: ${errorMessage}`);
}
}
public async saveUserMessage(userMessage: AgentInstanceMessage): Promise<void> {
this.ensureRepositories();
try {
const now = new Date();
const summary = {
id: userMessage.id,
role: userMessage.role,
agentId: userMessage.agentId,
isToolResult: !!userMessage.metadata?.isToolResult,
isPersisted: !!userMessage.metadata?.isPersisted,
};
logger.debug('Saving user message to DB (start)', {
when: now.toISOString(),
...summary,
source: 'saveUserMessage',
stack: new Error().stack?.split('\n').slice(0, 4).join('\n'),
});
await this.agentMessageRepository!.save(this.agentMessageRepository!.create(toDatabaseCompatibleMessage(userMessage)));
logger.debug('User message saved to database', {
when: new Date().toISOString(),
...summary,
source: 'saveUserMessage',
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to save user message: ${errorMessage}`, {
messageId: userMessage.id,
agentId: userMessage.agentId,
});
throw error;
}
}
public debounceUpdateMessage(
message: AgentInstanceMessage,
agentId?: string,
debounceMs = 300,
): void {
const messageId = message.id;
// Update status subscribers for specific message if available
if (agentId) {
const statusKey = `${agentId}:${messageId}`;
if (this.statusSubjects.has(statusKey)) {
this.statusSubjects.get(statusKey)?.next({
state: 'working',
message,
modified: message.modified ?? new Date(),
});
}
}
// Lazy load or get existing debounced function
if (!this.debouncedUpdateFunctions.has(messageId)) {
// Create debounced function for each message ID
const debouncedUpdate = debounce(
async (messageData_: AgentInstanceMessage, aid?: string) => {
try {
this.ensureRepositories();
// ensureRepositories guarantees dataSource is available
await this.dataSource!.transaction(async transaction => {
const messageRepo = transaction.getRepository(AgentInstanceMessageEntity);
const messageEntity = await messageRepo.findOne({
where: { id: messageId },
});
if (messageEntity) {
// Update message content
messageEntity.content = messageData_.content;
if (messageData_.contentType) messageEntity.contentType = messageData_.contentType;
if (messageData_.metadata) messageEntity.metadata = messageData_.metadata;
if (messageData_.duration !== undefined) messageEntity.duration = messageData_.duration ?? undefined; // Fix: Update duration field
// Preserve provided modified; if not provided, keep existing DB value to avoid late overwrites
// Only adjust modified if the incoming timestamp is earlier; otherwise leave DB value unchanged
if (messageData_.modified instanceof Date) {
if (!messageEntity.modified || messageData_.modified.getTime() < new Date(messageEntity.modified).getTime()) {
messageEntity.modified = messageData_.modified;
}
}
const startSave = new Date();
logger.debug('Updating existing message (start save)', {
when: startSave.toISOString(),
messageId,
agentId: aid,
source: 'debounceUpdateMessage:update',
stack: new Error().stack?.split('\n').slice(0, 4).join('\n'),
});
await messageRepo.save(messageEntity);
logger.debug('Updating existing message (saved)', {
when: new Date().toISOString(),
messageId,
agentId: aid,
source: 'debounceUpdateMessage:update',
});
} else if (aid) {
// Create new message if it doesn't exist and agentId provided
// Create message using utility function
const messageData = createAgentMessage(messageId, aid, {
role: messageData_.role,
content: messageData_.content,
contentType: messageData_.contentType,
metadata: messageData_.metadata,
duration: messageData_.duration, // Include duration for new messages
});
const newMessage = messageRepo.create(toDatabaseCompatibleMessage(messageData));
const startSaveNew = new Date();
logger.debug('Creating new message (start save)', {
when: startSaveNew.toISOString(),
messageId,
agentId: aid,
source: 'debounceUpdateMessage:create',
stack: new Error().stack?.split('\n').slice(0, 4).join('\n'),
});
await messageRepo.save(newMessage);
logger.debug('Creating new message (saved)', {
when: new Date().toISOString(),
messageId,
agentId: aid,
source: 'debounceUpdateMessage:create',
});
// Get agent instance repository for transaction
const agentRepo = transaction.getRepository(AgentInstanceEntity);
// Get agent instance within the current transaction
const agentEntity = await agentRepo.findOne({
where: { id: aid },
relations: ['messages'],
});
if (agentEntity) {
// Add the new message to the agent entity
if (!agentEntity.messages) {
agentEntity.messages = [];
}
agentEntity.messages.push(newMessage);
// Save the updated agent entity
await agentRepo.save(agentEntity);
// Construct agent data from entity directly without additional query
const updatedAgent: AgentInstance = {
...pick(agentEntity, AGENT_INSTANCE_FIELDS),
messages: agentEntity.messages,
};
// Notify subscribers directly without additional queries
if (this.agentInstanceSubjects.has(aid)) {
this.agentInstanceSubjects.get(aid)?.next(updatedAgent);
logger.debug(`Notified agent subscribers of new message: ${messageId}`, {
method: 'debounceUpdateMessage',
agentId: aid,
});
}
} else {
logger.warn(`Agent instance not found for message: ${messageId}`);
}
} else {
logger.warn(`Cannot create message: missing agent ID for message ID: ${messageId}`);
}
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`Failed to update/create message content: ${errorMessage}`);
}
},
debounceMs,
);
this.debouncedUpdateFunctions.set(messageId, debouncedUpdate);
}
// Call debounced function
const debouncedFunction = this.debouncedUpdateFunctions.get(messageId);
if (debouncedFunction) {
debouncedFunction(message, agentId);
}
}
public concatPrompt(promptDescription: Pick<AgentPromptDescription, 'handlerConfig'>, messages: AgentInstanceMessage[]): Observable<PromptConcatStreamState> {
logger.debug('AgentInstanceService.concatPrompt called', {
hasPromptConfig: !!promptDescription.handlerConfig,
promptConfigKeys: Object.keys(promptDescription.handlerConfig),
messagesCount: messages.length,
});
return new Observable<PromptConcatStreamState>((observer) => {
const processStream = async () => {
try {
// Create a minimal handler context for prompt concatenation
const handlerContext = {
agent: {
id: 'temp',
messages,
agentDefId: 'temp',
status: { state: 'working' as const, modified: new Date() },
created: new Date(),
handlerConfig: {},
},
agentDef: { id: 'temp', name: 'temp', handlerConfig: promptDescription.handlerConfig },
isCancelled: () => false,
};
const streamGenerator = promptConcatStream(promptDescription as AgentPromptDescription, messages, handlerContext);
for await (const state of streamGenerator) {
observer.next(state);
if (state.isComplete) {
observer.complete();
break;
}
}
} catch (error) {
logger.error('Error in AgentInstanceService.concatPrompt', {
error: error instanceof Error ? error.message : String(error),
promptDescriptionId: (promptDescription as AgentPromptDescription).id,
messagesCount: messages.length,
});
observer.error(error);
}
};
void processStream();
});
}
public getHandlerConfigSchema(handlerId: string): Record<string, unknown> {
try {
logger.debug('AgentInstanceService.getHandlerConfigSchema called', { handlerId });
// Check if we have a schema for this handler
const schema = this.handlerSchemas.get(handlerId);
if (schema) {
return schema;
}
// If no schema found, return an empty schema
logger.warn(`No schema found for handler: ${handlerId}`);
return { type: 'object', properties: {} };
} catch (error) {
logger.error('Error in AgentInstanceService.getHandlerConfigSchema', {
error: error instanceof Error ? error.message : String(error),
handlerId,
});
throw error;
}
}
}