95 lines
3.2 KiB
TypeScript
95 lines
3.2 KiB
TypeScript
/**
|
|
* mcp/events.ts — In-memory event queue for MCP long-polling
|
|
*
|
|
* Each MCP API key gets its own queue. Tools can push events,
|
|
* and dev_subscribe can wait (long-poll) for the next event.
|
|
*/
|
|
|
|
export interface McpEvent {
|
|
type: string;
|
|
timestamp: string;
|
|
data: any;
|
|
}
|
|
|
|
interface Waiter {
|
|
resolve: (event: McpEvent | null) => void;
|
|
timer: ReturnType<typeof setTimeout>;
|
|
}
|
|
|
|
const queues = new Map<string, McpEvent[]>();
|
|
const waiters = new Map<string, Waiter>();
|
|
const knownKeys = new Set<string>();
|
|
|
|
const MAX_QUEUE = 100;
|
|
|
|
/** Push an event to a specific MCP key's queue */
|
|
export function pushEvent(mcpKey: string, event: Omit<McpEvent, 'timestamp'>) {
|
|
const full: McpEvent = { ...event, timestamp: new Date().toISOString() };
|
|
const waiter = waiters.get(mcpKey);
|
|
if (waiter) {
|
|
clearTimeout(waiter.timer);
|
|
waiters.delete(mcpKey);
|
|
waiter.resolve(full);
|
|
return;
|
|
}
|
|
let q = queues.get(mcpKey);
|
|
if (!q) { q = []; queues.set(mcpKey, q); }
|
|
q.push(full);
|
|
while (q.length > MAX_QUEUE) q.shift();
|
|
}
|
|
|
|
/** Wait for the next event (long-poll). Returns event or null on timeout. */
|
|
export function waitForEvent(mcpKey: string, timeoutMs: number = 30000): Promise<McpEvent | null> {
|
|
knownKeys.add(mcpKey);
|
|
const q = queues.get(mcpKey);
|
|
if (q && q.length > 0) {
|
|
return Promise.resolve(q.shift()!);
|
|
}
|
|
return new Promise((resolve) => {
|
|
const existing = waiters.get(mcpKey);
|
|
if (existing) {
|
|
clearTimeout(existing.timer);
|
|
existing.resolve(null);
|
|
}
|
|
const timer = setTimeout(() => {
|
|
waiters.delete(mcpKey);
|
|
resolve(null);
|
|
}, Math.min(timeoutMs, 55000));
|
|
waiters.set(mcpKey, { resolve, timer });
|
|
});
|
|
}
|
|
|
|
/** Get recent events without blocking (for catch-up) */
|
|
export function getEvents(mcpKey: string, count: number = 10): McpEvent[] {
|
|
const q = queues.get(mcpKey) ?? [];
|
|
return q.splice(0, count);
|
|
}
|
|
|
|
/** Push an event to ALL known MCP key queues (broadcast) */
|
|
export function pushEventAll(event: Omit<McpEvent, 'timestamp'>) {
|
|
const full: McpEvent = { ...event, timestamp: new Date().toISOString() };
|
|
for (const key of knownKeys) {
|
|
const waiter = waiters.get(key);
|
|
if (waiter) {
|
|
clearTimeout(waiter.timer);
|
|
waiters.delete(key);
|
|
waiter.resolve(full);
|
|
} else {
|
|
let q = queues.get(key);
|
|
if (!q) { q = []; queues.set(key, q); }
|
|
q.push(full);
|
|
while (q.length > MAX_QUEUE) q.shift();
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Active MCP key for current request (thread-local pattern) ──
|
|
let _activeMcpKey: string | null = null;
|
|
export function setActiveMcpKey(key: string | null) { _activeMcpKey = key; }
|
|
export function getActiveMcpKey(): string | null { return _activeMcpKey; }
|
|
|
|
// ── Broadcast to browsers (set by server.ts to avoid circular imports) ──
|
|
let _broadcastFn: ((data: Record<string, any>) => void) | null = null;
|
|
export function setBroadcastFn(fn: (data: Record<string, any>) => void) { _broadcastFn = fn; }
|
|
export function broadcastToBrowsers(data: Record<string, any>) { _broadcastFn?.(data); }
|