mirror of
https://github.com/tiddly-gittly/TidGi-Desktop.git
synced 2026-01-13 12:50:55 -08:00
915 lines
34 KiB
TypeScript
915 lines
34 KiB
TypeScript
import { inject, injectable } from 'inversify';
|
|
import { debounce, pick } from 'lodash';
|
|
import { nanoid } from 'nanoid';
|
|
import { BehaviorSubject, Observable } from 'rxjs';
|
|
import { DataSource, Repository } from 'typeorm';
|
|
|
|
import type { IAgentDefinitionService } from '@services/agentDefinition/interface';
|
|
import { basicPromptConcatHandler } from '@services/agentInstance/buildInAgentHandlers/basicPromptConcatHandler';
|
|
import type { AgentHandler, AgentHandlerContext } from '@services/agentInstance/buildInAgentHandlers/type';
|
|
import { createHooksWithPlugins, initializePluginSystem } from '@services/agentInstance/plugins';
|
|
import { promptConcatStream, PromptConcatStreamState } from '@services/agentInstance/promptConcat/promptConcat';
|
|
import type { AgentPromptDescription } from '@services/agentInstance/promptConcat/promptConcatSchema';
|
|
import { getPromptConcatHandlerConfigJsonSchema } from '@services/agentInstance/promptConcat/promptConcatSchema/jsonSchema';
|
|
import type { IDatabaseService } from '@services/database/interface';
|
|
import { AgentInstanceEntity, AgentInstanceMessageEntity } from '@services/database/schema/agent';
|
|
import { logger } from '@services/libs/log';
|
|
import serviceIdentifier from '@services/serviceIdentifier';
|
|
|
|
import type { AgentInstance, AgentInstanceLatestStatus, AgentInstanceMessage, IAgentInstanceService } from './interface';
|
|
import { AGENT_INSTANCE_FIELDS, createAgentInstanceData, createAgentMessage, MESSAGE_FIELDS, toDatabaseCompatibleInstance, toDatabaseCompatibleMessage } from './utilities';
|
|
|
|
@injectable()
|
|
export class AgentInstanceService implements IAgentInstanceService {
|
|
@inject(serviceIdentifier.Database)
|
|
private readonly databaseService!: IDatabaseService;
|
|
|
|
@inject(serviceIdentifier.AgentDefinition)
|
|
private readonly agentDefinitionService!: IAgentDefinitionService;
|
|
|
|
private dataSource: DataSource | null = null;
|
|
private agentInstanceRepository: Repository<AgentInstanceEntity> | null = null;
|
|
private agentMessageRepository: Repository<AgentInstanceMessageEntity> | null = null;
|
|
|
|
private agentInstanceSubjects: Map<string, BehaviorSubject<AgentInstance | undefined>> = new Map();
|
|
private statusSubjects: Map<string, BehaviorSubject<AgentInstanceLatestStatus | undefined>> = new Map();
|
|
|
|
private agentHandlers: Map<string, AgentHandler> = new Map();
|
|
private handlerSchemas: Map<string, Record<string, unknown>> = new Map();
|
|
private cancelTokenMap: Map<string, { value: boolean }> = new Map();
|
|
private debouncedUpdateFunctions: Map<string, (message: AgentInstanceLatestStatus['message'] & { id: string }, agentId?: string) => void> = new Map();
|
|
|
|
public async initialize(): Promise<void> {
|
|
try {
|
|
await this.initializeDatabase();
|
|
await this.initializeHandlers();
|
|
} catch (error) {
|
|
logger.error('Failed to initialize agent instance service', { error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
private async initializeDatabase(): Promise<void> {
|
|
try {
|
|
// Database is already initialized in the agent definition service
|
|
this.dataSource = await this.databaseService.getDatabase('agent');
|
|
this.agentInstanceRepository = this.dataSource.getRepository(AgentInstanceEntity);
|
|
this.agentMessageRepository = this.dataSource.getRepository(AgentInstanceMessageEntity);
|
|
logger.debug('AgentInstance repositories initialized');
|
|
} catch (error) {
|
|
logger.error('Failed to initialize agent instance database', { error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async initializeHandlers(): Promise<void> {
|
|
try {
|
|
// Register plugins to global registry once during initialization
|
|
await initializePluginSystem();
|
|
logger.debug('AgentInstance Plugin system initialized and plugins registered to global registry');
|
|
|
|
// Register built-in handlers
|
|
this.registerBuiltinHandlers();
|
|
logger.debug('AgentInstance handlers registered');
|
|
} catch (error) {
|
|
logger.error('Failed to initialize agent instance handlers', { error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public registerBuiltinHandlers(): void {
|
|
// Plugins are already registered in initialize(), so we only register handlers here
|
|
// Register basic prompt concatenation handler with its schema
|
|
this.registerHandler('basicPromptConcatHandler', basicPromptConcatHandler, getPromptConcatHandlerConfigJsonSchema());
|
|
}
|
|
|
|
/**
|
|
* Register a handler with an optional schema
|
|
* @param handlerId ID for the handler
|
|
* @param handler The handler function
|
|
* @param schema Optional JSON schema for the handler configuration
|
|
*/
|
|
private registerHandler(handlerId: string, handler: AgentHandler, schema?: Record<string, unknown>): void {
|
|
this.agentHandlers.set(handlerId, handler);
|
|
if (schema) {
|
|
this.handlerSchemas.set(handlerId, schema);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Ensure repositories are initialized
|
|
*/
|
|
private ensureRepositories(): void {
|
|
if (!this.agentInstanceRepository || !this.agentMessageRepository) {
|
|
throw new Error('Agent instance repositories not initialized');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Clean up subscriptions for specific agent
|
|
*/
|
|
private cleanupAgentSubscriptions(agentId: string): void {
|
|
if (this.agentInstanceSubjects.has(agentId)) {
|
|
this.agentInstanceSubjects.delete(agentId);
|
|
}
|
|
|
|
// Clean up all status subscriptions related to this agent
|
|
for (const [key, _] of this.statusSubjects.entries()) {
|
|
if (key.startsWith(`${agentId}:`)) {
|
|
this.statusSubjects.delete(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
public async createAgent(agentDefinitionID?: string, options?: { preview?: boolean }): Promise<AgentInstance> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
// Get agent definition
|
|
const agentDefinition = await this.agentDefinitionService.getAgentDef(agentDefinitionID);
|
|
if (!agentDefinition) {
|
|
throw new Error(`Agent definition not found: ${agentDefinitionID}`);
|
|
}
|
|
|
|
// Create new agent instance using utility function
|
|
// Ensure required fields exist before creating instance
|
|
if (!agentDefinition.name) {
|
|
throw new Error(`Agent definition missing required field 'name': ${agentDefinitionID}`);
|
|
}
|
|
|
|
const { instanceData, instanceId, now } = createAgentInstanceData(agentDefinition as Required<Pick<typeof agentDefinition, 'name'>> & typeof agentDefinition);
|
|
|
|
// Mark as preview if specified
|
|
if (options?.preview) {
|
|
instanceData.volatile = true;
|
|
}
|
|
|
|
// Create and save entity
|
|
const instanceEntity = this.agentInstanceRepository!.create(toDatabaseCompatibleInstance(instanceData));
|
|
await this.agentInstanceRepository!.save(instanceEntity);
|
|
logger.info('Created agent instance', {
|
|
function: 'createAgent',
|
|
instanceId,
|
|
preview: !!options?.preview,
|
|
});
|
|
|
|
// Return complete instance object
|
|
return {
|
|
...instanceData,
|
|
created: now,
|
|
modified: now,
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to create agent instance', { error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async getAgent(agentId: string): Promise<AgentInstance | undefined> {
|
|
this.ensureRepositories();
|
|
try {
|
|
// Query agent instance with messages in chronological order (oldest first)
|
|
const instanceEntity = await this.agentInstanceRepository!.findOne({
|
|
where: { id: agentId },
|
|
relations: ['messages'],
|
|
order: {
|
|
messages: {
|
|
modified: 'ASC', // Ensure messages are sorted in ascending order by creation time, otherwise streaming will update it and cause wrong order
|
|
},
|
|
},
|
|
});
|
|
if (!instanceEntity) {
|
|
return undefined;
|
|
}
|
|
const messages = (instanceEntity.messages || []).slice().sort((a, b) => {
|
|
const aTime = a.created ? new Date(a.created).getTime() : (a.modified ? new Date(a.modified).getTime() : 0);
|
|
const bTime = b.created ? new Date(b.created).getTime() : (b.modified ? new Date(b.modified).getTime() : 0);
|
|
return aTime - bTime;
|
|
});
|
|
return {
|
|
...pick(instanceEntity, AGENT_INSTANCE_FIELDS),
|
|
messages,
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to get agent instance', { error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async updateAgent(agentId: string, data: Partial<AgentInstance>): Promise<AgentInstance> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
// Get existing instance with messages
|
|
const instanceEntity = await this.agentInstanceRepository!.findOne({
|
|
where: { id: agentId },
|
|
relations: ['messages'],
|
|
order: {
|
|
messages: {
|
|
modified: 'ASC', // Ensure messages are sorted in ascending order by creation time, otherwise streaming will update it and cause wrong order
|
|
},
|
|
},
|
|
});
|
|
|
|
if (!instanceEntity) {
|
|
throw new Error(`Agent instance not found: ${agentId}`);
|
|
}
|
|
|
|
// Update fields using pick + Object.assign for consistency with updateAgentDef
|
|
const pickedProperties = pick(data, ['name', 'status', 'avatarUrl', 'aiApiConfig', 'closed', 'handlerConfig']);
|
|
Object.assign(instanceEntity, pickedProperties);
|
|
|
|
// Save instance updates
|
|
await this.agentInstanceRepository!.save(instanceEntity);
|
|
|
|
// Handle message updates if provided
|
|
if (data.messages && data.messages.length > 0) {
|
|
// Create entities for new messages and update existing ones
|
|
for (const message of data.messages) {
|
|
// Check if message already exists
|
|
const existingMessage = instanceEntity.messages?.find(m => m.id === message.id);
|
|
|
|
if (existingMessage) {
|
|
// Update existing message
|
|
existingMessage.content = message.content;
|
|
existingMessage.modified = message.modified || new Date();
|
|
if (message.metadata) existingMessage.metadata = message.metadata;
|
|
if (message.contentType) existingMessage.contentType = message.contentType;
|
|
|
|
await this.agentMessageRepository!.save(existingMessage);
|
|
} else {
|
|
// Create new message
|
|
const messageData = pick(message, MESSAGE_FIELDS) as AgentInstanceMessage;
|
|
const messageEntity = this.agentMessageRepository!.create(toDatabaseCompatibleMessage(messageData));
|
|
|
|
await this.agentMessageRepository!.save(messageEntity);
|
|
|
|
// Add new message to the instance entity
|
|
if (!instanceEntity.messages) {
|
|
instanceEntity.messages = [];
|
|
}
|
|
instanceEntity.messages.push(messageEntity);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Construct the response object directly from the entity
|
|
// This avoids an additional database query with getAgent()
|
|
const updatedAgent: AgentInstance = {
|
|
...pick(instanceEntity, AGENT_INSTANCE_FIELDS),
|
|
messages: instanceEntity.messages || [],
|
|
};
|
|
|
|
// Notify subscribers about the updates with the already available data
|
|
// This avoids another database query within notifyAgentUpdate
|
|
this.notifyAgentUpdate(agentId, updatedAgent);
|
|
|
|
return updatedAgent;
|
|
} catch (error) {
|
|
logger.error('Failed to update agent instance', { error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async deleteAgent(agentId: string): Promise<void> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
// First delete all messages for this agent
|
|
await this.agentMessageRepository!.delete({ agentId });
|
|
|
|
// Then delete the agent instance
|
|
await this.agentInstanceRepository!.delete(agentId);
|
|
|
|
// Clean up subscriptions related to this agent
|
|
this.cleanupAgentSubscriptions(agentId);
|
|
|
|
logger.info(`Deleted agent instance: ${agentId}`);
|
|
} catch (error) {
|
|
logger.error('Failed to delete agent instance', { error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async getAgents(
|
|
page: number,
|
|
pageSize: number,
|
|
options?: { closed?: boolean; searchName?: string },
|
|
): Promise<Omit<AgentInstance, 'messages'>[]> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
const skip = (page - 1) * pageSize;
|
|
const take = pageSize;
|
|
|
|
// Build query conditions
|
|
const whereCondition: Record<string, unknown> = {};
|
|
|
|
// Always exclude preview instances from normal listing
|
|
whereCondition.preview = false;
|
|
|
|
// Add closed filter if provided
|
|
if (options && options.closed !== undefined) {
|
|
whereCondition.closed = options.closed;
|
|
}
|
|
|
|
// Add name search filter if provided
|
|
if (options && options.searchName) {
|
|
whereCondition.name = { like: `%${options.searchName}%` };
|
|
}
|
|
|
|
const [instances, _] = await this.agentInstanceRepository!.findAndCount({
|
|
where: Object.keys(whereCondition).length > 0 ? whereCondition : undefined,
|
|
skip,
|
|
take,
|
|
order: {
|
|
// Sort by creation time descending
|
|
created: 'DESC',
|
|
},
|
|
});
|
|
|
|
return instances.map(entity => pick(entity, AGENT_INSTANCE_FIELDS));
|
|
} catch (error) {
|
|
logger.error('Failed to get agent instances', { error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async sendMsgToAgent(agentId: string, content: { text: string; file?: File }): Promise<void> {
|
|
try {
|
|
// Get agent instance
|
|
const agentInstance = await this.getAgent(agentId);
|
|
if (!agentInstance) {
|
|
throw new Error(`Agent instance not found: ${agentId}`);
|
|
}
|
|
|
|
// Create user message
|
|
const messageId = nanoid();
|
|
const now = new Date();
|
|
|
|
// Get agent configuration
|
|
const agentDefinition = await this.agentDefinitionService.getAgentDef(agentInstance.agentDefId);
|
|
if (!agentDefinition) {
|
|
throw new Error(`Agent definition not found: ${agentInstance.agentDefId}`);
|
|
}
|
|
|
|
// Get appropriate handler
|
|
const handlerId = agentDefinition.handlerID;
|
|
if (!handlerId) {
|
|
throw new Error(`Handler ID not found in agent definition: ${agentDefinition.id}`);
|
|
}
|
|
const handler = this.agentHandlers.get(handlerId);
|
|
if (!handler) {
|
|
throw new Error(`Handler not found: ${handlerId}`);
|
|
}
|
|
|
|
// Create handler context with temporary message added for processing
|
|
const cancelToken = { value: false };
|
|
this.cancelTokenMap.set(agentId, cancelToken);
|
|
const handlerContext: AgentHandlerContext = {
|
|
agent: {
|
|
...agentInstance,
|
|
messages: [...agentInstance.messages],
|
|
status: {
|
|
state: 'working',
|
|
modified: now,
|
|
},
|
|
},
|
|
agentDef: agentDefinition,
|
|
isCancelled: () => cancelToken.value,
|
|
};
|
|
|
|
// Create fresh hooks for this handler execution and register plugins based on handlerConfig
|
|
const { hooks: handlerHooks } = await createHooksWithPlugins(agentDefinition.handlerConfig || {});
|
|
|
|
// Trigger userMessageReceived hook with the configured plugins
|
|
await handlerHooks.userMessageReceived.promise({
|
|
handlerContext,
|
|
content,
|
|
messageId,
|
|
timestamp: now,
|
|
});
|
|
|
|
// Notify agent update after user message is added
|
|
this.notifyAgentUpdate(agentId, handlerContext.agent);
|
|
|
|
try {
|
|
// Create async generator
|
|
const generator = handler(handlerContext);
|
|
|
|
// Track the last message for completion handling
|
|
let lastResult: AgentInstanceLatestStatus | undefined;
|
|
|
|
for await (const result of generator) {
|
|
// Update status subscribers for specific message
|
|
if (result.message?.content) {
|
|
// Ensure message has correct modification timestamp
|
|
if (!result.message.modified) {
|
|
result.message.modified = new Date();
|
|
}
|
|
|
|
// Update status subscribers directly
|
|
const statusKey = `${agentId}:${result.message.id}`;
|
|
if (this.statusSubjects.has(statusKey)) {
|
|
this.statusSubjects.get(statusKey)?.next(result);
|
|
}
|
|
|
|
// Notify agent update with latest messages for real-time UI updates
|
|
this.notifyAgentUpdate(agentId, handlerContext.agent);
|
|
}
|
|
|
|
// Store the last result for completion handling
|
|
lastResult = result;
|
|
}
|
|
|
|
// Handle stream completion
|
|
if (lastResult?.message) {
|
|
// Complete the message stream directly using the last message from the generator
|
|
const statusKey = `${agentId}:${lastResult.message.id}`;
|
|
if (this.statusSubjects.has(statusKey)) {
|
|
const subject = this.statusSubjects.get(statusKey);
|
|
if (subject) {
|
|
// Send final update with completed state
|
|
subject.next({
|
|
state: 'completed',
|
|
message: lastResult.message,
|
|
modified: new Date(),
|
|
});
|
|
// Complete the Observable and remove the subject
|
|
subject.complete();
|
|
this.statusSubjects.delete(statusKey);
|
|
}
|
|
}
|
|
|
|
// Trigger agentStatusChanged hook for completion
|
|
await handlerHooks.agentStatusChanged.promise({
|
|
handlerContext,
|
|
status: {
|
|
state: 'completed',
|
|
modified: new Date(),
|
|
},
|
|
});
|
|
}
|
|
|
|
// Remove cancel token after generator completes
|
|
this.cancelTokenMap.delete(agentId);
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Agent handler execution failed: ${errorMessage}`);
|
|
|
|
// Trigger agentStatusChanged hook for failure
|
|
await handlerHooks.agentStatusChanged.promise({
|
|
handlerContext,
|
|
status: {
|
|
state: 'failed',
|
|
modified: new Date(),
|
|
},
|
|
}).catch(() => {
|
|
// Ignore hook errors during error handling
|
|
});
|
|
|
|
// Remove cancel token
|
|
this.cancelTokenMap.delete(agentId);
|
|
throw error;
|
|
}
|
|
} catch (error: unknown) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to send message to agent: ${errorMessage}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async cancelAgent(agentId: string): Promise<void> {
|
|
// Try to get cancel token
|
|
const cancelToken = this.cancelTokenMap.get(agentId);
|
|
|
|
if (cancelToken) {
|
|
// Set cancel flag
|
|
cancelToken.value = true;
|
|
|
|
try {
|
|
// Update agent status to canceled
|
|
logger.debug(`cancelAgent called for ${agentId} - updating agent status to canceled`);
|
|
await this.updateAgent(agentId, {
|
|
status: {
|
|
state: 'canceled',
|
|
modified: new Date(),
|
|
},
|
|
});
|
|
logger.debug(`updateAgent returned for cancelAgent ${agentId}`);
|
|
|
|
// Propagate canceled status to any message-specific subscriptions so UI can react
|
|
try {
|
|
logger.debug('propagating canceled status to message-specific subscriptions', { function: 'cancelAgent', agentId });
|
|
const agent = await this.getAgent(agentId);
|
|
if (agent && agent.messages) {
|
|
for (const key of Array.from(this.statusSubjects.keys())) {
|
|
if (key.startsWith(`${agentId}:`)) {
|
|
const parts = key.split(':');
|
|
const messageId = parts[1];
|
|
const subject = this.statusSubjects.get(key);
|
|
const message = agent.messages.find(m => m.id === messageId);
|
|
if (subject) {
|
|
try {
|
|
const message_ = message || ({} as AgentInstanceMessage);
|
|
logger.debug('propagate canceled to subscription', { function: 'cancelAgent', subscriptionKey: key });
|
|
subject.next({
|
|
state: 'canceled',
|
|
message: message_,
|
|
modified: new Date(),
|
|
});
|
|
} catch {
|
|
// ignore
|
|
}
|
|
try {
|
|
subject.complete();
|
|
} catch {
|
|
// ignore
|
|
}
|
|
this.statusSubjects.delete(key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.warn('Failed to propagate cancel status to message subscriptions', { function: 'cancelAgent', error });
|
|
}
|
|
|
|
// Remove cancel token from map
|
|
this.cancelTokenMap.delete(agentId);
|
|
|
|
logger.info('Canceled agent instance', {
|
|
function: 'cancelAgent',
|
|
agentId,
|
|
});
|
|
} catch (error: unknown) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error('Failed to cancel agent instance', {
|
|
function: 'cancelAgent',
|
|
error: errorMessage,
|
|
});
|
|
throw error;
|
|
}
|
|
} else {
|
|
logger.warn(`No active operation found for agent: ${agentId}`);
|
|
}
|
|
}
|
|
|
|
public async closeAgent(agentId: string): Promise<void> {
|
|
this.ensureRepositories();
|
|
|
|
try {
|
|
// Get agent instance
|
|
const instanceEntity = await this.agentInstanceRepository!.findOne({
|
|
where: { id: agentId },
|
|
});
|
|
|
|
if (!instanceEntity) {
|
|
throw new Error(`Agent instance not found: ${agentId}`);
|
|
}
|
|
|
|
// Mark as closed
|
|
instanceEntity.closed = true;
|
|
await this.agentInstanceRepository!.save(instanceEntity);
|
|
|
|
// Cancel any ongoing operations
|
|
if (this.cancelTokenMap.has(agentId)) {
|
|
const token = this.cancelTokenMap.get(agentId);
|
|
if (token) {
|
|
token.value = true;
|
|
}
|
|
this.cancelTokenMap.delete(agentId);
|
|
}
|
|
|
|
// Clean up subscriptions
|
|
this.cleanupAgentSubscriptions(agentId);
|
|
|
|
logger.info('Closed agent instance', {
|
|
function: 'closeAgent',
|
|
agentId,
|
|
});
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error('Failed to close agent instance', {
|
|
function: 'closeAgent',
|
|
error: errorMessage,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public subscribeToAgentUpdates(agentId: string): Observable<AgentInstance | undefined>;
|
|
/**
|
|
* Subscribe to agent instance message status updates
|
|
*/
|
|
public subscribeToAgentUpdates(agentId: string, messageId: string): Observable<AgentInstanceLatestStatus | undefined>;
|
|
public subscribeToAgentUpdates(agentId: string, messageId?: string): Observable<AgentInstance | AgentInstanceLatestStatus | undefined> {
|
|
// If messageId provided, subscribe to specific message status updates
|
|
if (messageId) {
|
|
const statusKey = `${agentId}:${messageId}`;
|
|
if (!this.statusSubjects.has(statusKey)) {
|
|
this.statusSubjects.set(statusKey, new BehaviorSubject<AgentInstanceLatestStatus | undefined>(undefined));
|
|
|
|
// Try to get initial status
|
|
this.getAgent(agentId).then(agent => {
|
|
if (agent) {
|
|
const message = agent.messages.find(m => m.id === messageId);
|
|
if (message) {
|
|
// 创建状态对象,注意不再检查 isComplete
|
|
const status: AgentInstanceLatestStatus = {
|
|
state: agent.status.state,
|
|
message,
|
|
modified: message.modified,
|
|
};
|
|
|
|
this.statusSubjects.get(statusKey)?.next(status);
|
|
}
|
|
}
|
|
}).catch((error: unknown) => {
|
|
logger.error('Failed to get initial status for message', { function: 'subscribeToAgentUpdates', error });
|
|
});
|
|
}
|
|
|
|
return this.statusSubjects.get(statusKey)!.asObservable();
|
|
}
|
|
|
|
// If no messageId provided, subscribe to entire agent instance updates
|
|
if (!this.agentInstanceSubjects.has(agentId)) {
|
|
this.agentInstanceSubjects.set(agentId, new BehaviorSubject<AgentInstance | undefined>(undefined));
|
|
|
|
// Try to get initial data
|
|
this.getAgent(agentId).then(agent => {
|
|
this.agentInstanceSubjects.get(agentId)?.next(agent);
|
|
}).catch((error: unknown) => {
|
|
logger.error('Failed to get initial agent data', { function: 'subscribeToAgentUpdates', error });
|
|
});
|
|
}
|
|
|
|
return this.agentInstanceSubjects.get(agentId)!.asObservable();
|
|
}
|
|
|
|
/**
|
|
* Notify agent subscription of updates
|
|
* @param agentId Agent ID
|
|
* @param agentData Agent data to use for notification
|
|
*/
|
|
private notifyAgentUpdate(agentId: string, agentData: AgentInstance): void {
|
|
try {
|
|
// Only notify if there are active subscriptions
|
|
if (this.agentInstanceSubjects.has(agentId)) {
|
|
// Use the provided data for notification (no database query)
|
|
this.agentInstanceSubjects.get(agentId)?.next(agentData);
|
|
}
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Failed to notify agent update: ${errorMessage}`);
|
|
}
|
|
}
|
|
|
|
public async saveUserMessage(userMessage: AgentInstanceMessage): Promise<void> {
|
|
this.ensureRepositories();
|
|
try {
|
|
const now = new Date();
|
|
const summary = {
|
|
id: userMessage.id,
|
|
role: userMessage.role,
|
|
agentId: userMessage.agentId,
|
|
isToolResult: !!userMessage.metadata?.isToolResult,
|
|
isPersisted: !!userMessage.metadata?.isPersisted,
|
|
};
|
|
logger.debug('Saving user message to DB (start)', {
|
|
when: now.toISOString(),
|
|
...summary,
|
|
source: 'saveUserMessage',
|
|
stack: new Error().stack?.split('\n').slice(0, 4).join('\n'),
|
|
});
|
|
|
|
await this.agentMessageRepository!.save(this.agentMessageRepository!.create(toDatabaseCompatibleMessage(userMessage)));
|
|
|
|
logger.debug('User message saved to database', {
|
|
when: new Date().toISOString(),
|
|
...summary,
|
|
source: 'saveUserMessage',
|
|
});
|
|
} catch (error) {
|
|
logger.error('Failed to save user message', {
|
|
error,
|
|
messageId: userMessage.id,
|
|
agentId: userMessage.agentId,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public debounceUpdateMessage(
|
|
message: AgentInstanceMessage,
|
|
agentId?: string,
|
|
debounceMs = 300,
|
|
): void {
|
|
const messageId = message.id;
|
|
|
|
// Update status subscribers for specific message if available
|
|
if (agentId) {
|
|
const statusKey = `${agentId}:${messageId}`;
|
|
if (this.statusSubjects.has(statusKey)) {
|
|
this.statusSubjects.get(statusKey)?.next({
|
|
state: 'working',
|
|
message,
|
|
modified: message.modified ?? new Date(),
|
|
});
|
|
}
|
|
}
|
|
|
|
// Lazy load or get existing debounced function
|
|
if (!this.debouncedUpdateFunctions.has(messageId)) {
|
|
// Create debounced function for each message ID
|
|
const debouncedUpdate = debounce(
|
|
async (messageData_: AgentInstanceMessage, aid?: string) => {
|
|
try {
|
|
this.ensureRepositories();
|
|
// ensureRepositories guarantees dataSource is available
|
|
await this.dataSource!.transaction(async transaction => {
|
|
const messageRepo = transaction.getRepository(AgentInstanceMessageEntity);
|
|
const messageEntity = await messageRepo.findOne({
|
|
where: { id: messageId },
|
|
});
|
|
|
|
if (messageEntity) {
|
|
// Update message content
|
|
messageEntity.content = messageData_.content;
|
|
if (messageData_.contentType) messageEntity.contentType = messageData_.contentType;
|
|
if (messageData_.metadata) messageEntity.metadata = messageData_.metadata;
|
|
if (messageData_.duration !== undefined) messageEntity.duration = messageData_.duration ?? undefined; // Fix: Update duration field
|
|
// Preserve provided modified; if not provided, keep existing DB value to avoid late overwrites
|
|
// Only adjust modified if the incoming timestamp is earlier; otherwise leave DB value unchanged
|
|
if (messageData_.modified instanceof Date) {
|
|
if (!messageEntity.modified || messageData_.modified.getTime() < new Date(messageEntity.modified).getTime()) {
|
|
messageEntity.modified = messageData_.modified;
|
|
}
|
|
}
|
|
|
|
const startSave = new Date();
|
|
logger.debug('Updating existing message (start save)', {
|
|
when: startSave.toISOString(),
|
|
messageId,
|
|
agentId: aid,
|
|
source: 'debounceUpdateMessage:update',
|
|
stack: new Error().stack?.split('\n').slice(0, 4).join('\n'),
|
|
});
|
|
await messageRepo.save(messageEntity);
|
|
logger.debug('Updating existing message (saved)', {
|
|
when: new Date().toISOString(),
|
|
messageId,
|
|
agentId: aid,
|
|
source: 'debounceUpdateMessage:update',
|
|
});
|
|
} else if (aid) {
|
|
// Create new message if it doesn't exist and agentId provided
|
|
// Create message using utility function
|
|
const messageData = createAgentMessage(messageId, aid, {
|
|
role: messageData_.role,
|
|
content: messageData_.content,
|
|
contentType: messageData_.contentType,
|
|
metadata: messageData_.metadata,
|
|
duration: messageData_.duration, // Include duration for new messages
|
|
});
|
|
const newMessage = messageRepo.create(toDatabaseCompatibleMessage(messageData));
|
|
|
|
const startSaveNew = new Date();
|
|
logger.debug('Creating new message (start save)', {
|
|
when: startSaveNew.toISOString(),
|
|
messageId,
|
|
agentId: aid,
|
|
source: 'debounceUpdateMessage:create',
|
|
stack: new Error().stack?.split('\n').slice(0, 4).join('\n'),
|
|
});
|
|
await messageRepo.save(newMessage);
|
|
logger.debug('Creating new message (saved)', {
|
|
when: new Date().toISOString(),
|
|
messageId,
|
|
agentId: aid,
|
|
source: 'debounceUpdateMessage:create',
|
|
});
|
|
|
|
// Get agent instance repository for transaction
|
|
const agentRepo = transaction.getRepository(AgentInstanceEntity);
|
|
|
|
// Get agent instance within the current transaction
|
|
const agentEntity = await agentRepo.findOne({
|
|
where: { id: aid },
|
|
relations: ['messages'],
|
|
});
|
|
|
|
if (agentEntity) {
|
|
// Add the new message to the agent entity
|
|
if (!agentEntity.messages) {
|
|
agentEntity.messages = [];
|
|
}
|
|
agentEntity.messages.push(newMessage);
|
|
|
|
// Save the updated agent entity
|
|
await agentRepo.save(agentEntity);
|
|
|
|
// Construct agent data from entity directly without additional query
|
|
const updatedAgent: AgentInstance = {
|
|
...pick(agentEntity, AGENT_INSTANCE_FIELDS),
|
|
messages: agentEntity.messages,
|
|
};
|
|
|
|
// Notify subscribers directly without additional queries
|
|
if (this.agentInstanceSubjects.has(aid)) {
|
|
this.agentInstanceSubjects.get(aid)?.next(updatedAgent);
|
|
logger.debug(`Notified agent subscribers of new message: ${messageId}`, {
|
|
method: 'debounceUpdateMessage',
|
|
agentId: aid,
|
|
});
|
|
}
|
|
} else {
|
|
logger.warn(`Agent instance not found for message: ${messageId}`);
|
|
}
|
|
} else {
|
|
logger.warn(`Cannot create message: missing agent ID for message ID: ${messageId}`);
|
|
}
|
|
});
|
|
} catch (error) {
|
|
logger.error('Failed to update/create message content', { error });
|
|
}
|
|
},
|
|
debounceMs,
|
|
);
|
|
|
|
this.debouncedUpdateFunctions.set(messageId, debouncedUpdate);
|
|
}
|
|
|
|
// Call debounced function
|
|
const debouncedFunction = this.debouncedUpdateFunctions.get(messageId);
|
|
if (debouncedFunction) {
|
|
debouncedFunction(message, agentId);
|
|
}
|
|
}
|
|
|
|
public concatPrompt(promptDescription: Pick<AgentPromptDescription, 'handlerConfig'>, messages: AgentInstanceMessage[]): Observable<PromptConcatStreamState> {
|
|
logger.debug('AgentInstanceService.concatPrompt called', {
|
|
hasPromptConfig: !!promptDescription.handlerConfig,
|
|
promptConfigKeys: Object.keys(promptDescription.handlerConfig),
|
|
messagesCount: messages.length,
|
|
});
|
|
|
|
return new Observable<PromptConcatStreamState>((observer) => {
|
|
const processStream = async () => {
|
|
try {
|
|
// Create a minimal handler context for prompt concatenation
|
|
const handlerContext = {
|
|
agent: {
|
|
id: 'temp',
|
|
messages,
|
|
agentDefId: 'temp',
|
|
status: { state: 'working' as const, modified: new Date() },
|
|
created: new Date(),
|
|
handlerConfig: {},
|
|
},
|
|
agentDef: { id: 'temp', name: 'temp', handlerConfig: promptDescription.handlerConfig },
|
|
isCancelled: () => false,
|
|
};
|
|
|
|
const streamGenerator = promptConcatStream(promptDescription as AgentPromptDescription, messages, handlerContext);
|
|
for await (const state of streamGenerator) {
|
|
observer.next(state);
|
|
if (state.isComplete) {
|
|
observer.complete();
|
|
break;
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.error('Error in AgentInstanceService.concatPrompt', {
|
|
error,
|
|
promptDescriptionId: (promptDescription as AgentPromptDescription).id,
|
|
messagesCount: messages.length,
|
|
});
|
|
observer.error(error);
|
|
}
|
|
};
|
|
void processStream();
|
|
});
|
|
}
|
|
|
|
public getHandlerConfigSchema(handlerId: string): Record<string, unknown> {
|
|
try {
|
|
logger.debug('AgentInstanceService.getHandlerConfigSchema called', { handlerId });
|
|
// Check if we have a schema for this handler
|
|
const schema = this.handlerSchemas.get(handlerId);
|
|
if (schema) {
|
|
return schema;
|
|
}
|
|
// If no schema found, return an empty schema
|
|
logger.warn(`No schema found for handler: ${handlerId}`);
|
|
return { type: 'object', properties: {} };
|
|
} catch (error) {
|
|
logger.error('Error in AgentInstanceService.getHandlerConfigSchema', {
|
|
error,
|
|
handlerId,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
}
|