582 lines
26 KiB
TypeScript
582 lines
26 KiB
TypeScript
/**
|
|
* gateway.ts — OpenClaw gateway WebSocket connection (Bun)
|
|
*
|
|
* Manages the single upstream connection to openclaw gateway,
|
|
* request/response tracking, and event forwarding to browser sessions.
|
|
*/
|
|
|
|
import * as fs from 'fs';
|
|
import * as path from 'path';
|
|
import { sign, createPrivateKey } from 'crypto';
|
|
import { filterText, filterValue } from './message-filter';
|
|
import { hud, buildToolArgs, resolveToolResult, buildExecResult, buildGenericResult } from './hud-builder';
|
|
|
|
const GATEWAY_HOST = process.env.GATEWAY_HOST || '10.0.0.10';
|
|
const GATEWAY_PORT = process.env.GATEWAY_PORT || '18789';
|
|
const GATEWAY_TOKEN = process.env.GATEWAY_TOKEN || 'bd162cc05dac9371b92d613307350ae4bae3858f79240654';
|
|
|
|
// Device identity for gateway auth (required since openclaw 2026.3.13 for operator.write scope)
|
|
interface DeviceIdentity { deviceId: string; publicKeyPem: string; privateKeyPem: string; }
|
|
const DEVICE_IDENTITY_PATH = path.join(
|
|
process.env.HOME || '/home/openclaw', '.openclaw/identity/device.json'
|
|
);
|
|
const deviceIdentity: DeviceIdentity | null = (() => {
|
|
try { return JSON.parse(fs.readFileSync(DEVICE_IDENTITY_PATH, 'utf8')); }
|
|
catch { console.warn('[auth] device.json not found — connecting without device auth'); return null; }
|
|
})();
|
|
|
|
// Mirrors openclaw's buildDeviceAuthPayloadV3 — pipe-delimited string for Ed25519 signing
|
|
function buildV3Payload(p: {
|
|
deviceId: string; clientId: string; clientMode: string; role: string;
|
|
scopes: string[]; signedAtMs: number; token: string; nonce: string;
|
|
}): string {
|
|
return ['v3', p.deviceId, p.clientId, p.clientMode, p.role,
|
|
p.scopes.join(','), String(p.signedAtMs), p.token, p.nonce,
|
|
'linux', 'linux', // platform + deviceFamily (normalised to lowercase)
|
|
].join('|');
|
|
}
|
|
|
|
// Load gateway self-signed CA cert for TLS verification
|
|
const GATEWAY_CA_CERT = (() => {
|
|
try {
|
|
const p = path.join(import.meta.dir, 'gateway-ca.crt');
|
|
return fs.readFileSync(p, 'utf8');
|
|
} catch (_) { return undefined; }
|
|
})();
|
|
|
|
let gatewayWS: WebSocket | null = null;
|
|
let gatewayConnected = false;
|
|
const pendingRequests = new Map<string, {
|
|
resolve: (v: unknown) => void;
|
|
reject: (e: Error) => void;
|
|
timer: ReturnType<typeof setTimeout>;
|
|
}>();
|
|
|
|
// browserSessions ref — set by server.ts after init
|
|
let browserSessions: Map<any, any> | null = null;
|
|
export function setBrowserSessions(map: Map<any, any>) { browserSessions = map; }
|
|
export function isConnected() { return gatewayConnected; }
|
|
|
|
// onTurnDone callback — set by server.ts to clear activeTurnId when a turn completes
|
|
let onTurnDoneCallback: ((sessionKey: string) => void) | null = null;
|
|
export function setOnTurnDone(cb: (sessionKey: string) => void) { onTurnDoneCallback = cb; }
|
|
|
|
// onGatewayDisconnect callback — set by server.ts to abort all in-flight turns on gateway drop
|
|
let onGatewayDisconnectCallback: (() => void) | null = null;
|
|
export function setOnGatewayDisconnect(cb: () => void) { onGatewayDisconnectCallback = cb; }
|
|
|
|
function uuid(): string { return crypto.randomUUID(); }
|
|
|
|
const CONNECT_SCOPES = ['operator.read', 'operator.write', 'operator.admin'];
|
|
|
|
// Sends the connect request after receiving the challenge nonce (or null for no-device-auth fallback)
|
|
function sendConnectRequest(ws: WebSocket, challengeNonce: string | null,
|
|
outerResolve: () => void, outerReject: (e: Error) => void) {
|
|
const signedAtMs = Date.now();
|
|
let device: Record<string, unknown> | undefined;
|
|
|
|
if (deviceIdentity && challengeNonce) {
|
|
const payload = buildV3Payload({
|
|
deviceId: deviceIdentity.deviceId,
|
|
clientId: 'webchat', clientMode: 'webchat', role: 'operator',
|
|
scopes: CONNECT_SCOPES, signedAtMs,
|
|
token: GATEWAY_TOKEN, nonce: challengeNonce,
|
|
});
|
|
const privKey = createPrivateKey({ key: deviceIdentity.privateKeyPem, format: 'pem' });
|
|
const sigBytes = sign(null, Buffer.from(payload, 'utf8'), privKey);
|
|
device = {
|
|
id: deviceIdentity.deviceId,
|
|
publicKey: deviceIdentity.publicKeyPem,
|
|
signature: sigBytes.toString('base64url'),
|
|
signedAt: signedAtMs,
|
|
nonce: challengeNonce,
|
|
};
|
|
}
|
|
|
|
const connectId = uuid();
|
|
pendingRequests.set(connectId, {
|
|
resolve: (payload: unknown) => {
|
|
const granted = (payload as any)?.scopes ?? [];
|
|
console.log('[auth] Connect OK — granted scopes:', granted.join(', ') || '(none listed)');
|
|
gatewayConnected = true;
|
|
outerResolve();
|
|
},
|
|
reject: (err: Error) => {
|
|
console.error('[auth] Connect rejected:', err.message);
|
|
outerReject(err);
|
|
},
|
|
timer: setTimeout(() => {
|
|
pendingRequests.delete(connectId);
|
|
outerReject(new Error('Connect handshake timeout'));
|
|
}, 15000),
|
|
});
|
|
|
|
ws.send(JSON.stringify({
|
|
type: 'req', id: connectId, method: 'connect',
|
|
params: {
|
|
minProtocol: 3, maxProtocol: 3,
|
|
client: { id: 'webchat', version: '1.0.0', platform: 'linux', mode: 'webchat', deviceFamily: 'Linux' },
|
|
role: 'operator', scopes: CONNECT_SCOPES,
|
|
caps: ['tool-events'], commands: [], permissions: {},
|
|
auth: { token: GATEWAY_TOKEN },
|
|
userAgent: 'openclaw-webgateway/1.0',
|
|
...(device ? { device } : {}),
|
|
},
|
|
}));
|
|
const authMode = device ? `yes, nonce=${challengeNonce!.slice(0, 8)}…` : 'no (fallback)';
|
|
console.log(`[auth] Connect sent (device auth: ${authMode})`);
|
|
}
|
|
|
|
export function connectToGateway(): Promise<void> {
|
|
return new Promise((resolve, reject) => {
|
|
const wsUrl = `wss://${GATEWAY_HOST}:${GATEWAY_PORT}`;
|
|
console.log(`🔌 Connecting to gateway: ${wsUrl}`);
|
|
|
|
const ws = new WebSocket(wsUrl, {
|
|
headers: { 'Origin': 'http://localhost', 'User-Agent': 'openclaw-webgateway/1.0' },
|
|
...(GATEWAY_CA_CERT ? { tls: { ca: GATEWAY_CA_CERT } } : {}),
|
|
} as any);
|
|
gatewayWS = ws;
|
|
|
|
ws.addEventListener('open', () => {
|
|
console.log('✅ Gateway WS open — waiting for connect.challenge…');
|
|
// Fallback: if no challenge arrives within 5 s, connect without device auth
|
|
const fallbackTimer = setTimeout(() => {
|
|
console.warn('[auth] No connect.challenge in 5 s — sending without device auth');
|
|
sendConnectRequest(ws, null, resolve, reject);
|
|
}, 5000);
|
|
(ws as any)._challengeFallbackTimer = fallbackTimer;
|
|
});
|
|
|
|
ws.addEventListener('message', (ev: MessageEvent) => {
|
|
try {
|
|
const msg = JSON.parse(typeof ev.data === 'string' ? ev.data : ev.data.toString());
|
|
handleGatewayMessage(msg, resolve, reject);
|
|
} catch (err: any) {
|
|
console.error('Gateway parse error:', err.message);
|
|
}
|
|
});
|
|
|
|
ws.addEventListener('close', () => {
|
|
console.log('❌ Gateway disconnected');
|
|
gatewayConnected = false;
|
|
gatewayWS = null;
|
|
if (onGatewayDisconnectCallback) onGatewayDisconnectCallback();
|
|
setTimeout(() => connectToGateway().catch(() => {}), 3000);
|
|
});
|
|
|
|
ws.addEventListener('error', (ev: Event) => {
|
|
const msg = (ev as any).message || 'unknown error';
|
|
console.error('Gateway error:', msg);
|
|
reject(new Error(msg));
|
|
});
|
|
|
|
setTimeout(() => {
|
|
if (!gatewayConnected) reject(new Error('Gateway connection timeout'));
|
|
}, 20000);
|
|
});
|
|
}
|
|
|
|
export function gatewayRequest(method: string, params: Record<string, unknown>): Promise<unknown> {
|
|
return new Promise((resolve, reject) => {
|
|
if (!gatewayWS || gatewayWS.readyState !== WebSocket.OPEN) {
|
|
reject(new Error('Gateway not connected')); return;
|
|
}
|
|
const id = uuid();
|
|
const timer = setTimeout(() => {
|
|
pendingRequests.delete(id);
|
|
reject(new Error('Gateway request timeout'));
|
|
}, 60000);
|
|
pendingRequests.set(id, { resolve, reject, timer });
|
|
gatewayWS.send(JSON.stringify({ type: 'req', id, method, params }));
|
|
});
|
|
}
|
|
|
|
function handleGatewayMessage(msg: any, connectResolve?: () => void, connectReject?: (e: Error) => void) {
|
|
// Handle connect.challenge — clear fallback timer and send signed connect request
|
|
if (msg.type === 'event' && msg.event === 'connect.challenge') {
|
|
const nonce: string = msg.payload?.nonce ?? '';
|
|
clearTimeout((gatewayWS as any)?._challengeFallbackTimer);
|
|
if (gatewayWS && connectResolve && connectReject) {
|
|
sendConnectRequest(gatewayWS, nonce, connectResolve, connectReject);
|
|
}
|
|
return;
|
|
}
|
|
if (msg.type === 'event') {
|
|
const p = msg.payload || {};
|
|
console.log(`[EV] ev=${msg.event} stream=${p.stream} state=${p.state} phase=${p.data?.phase} tool=${p.tool||p.data?.name} id=${p.id||p.data?.toolCallId} keys=${Object.keys(p).join(',')}`);
|
|
if (p.stream === 'tool' && p.data?.phase !== 'update') console.log(`[EV:tool] phase=${p.data?.phase} tool=${p.data?.name} id=${p.data?.toolCallId}`);
|
|
}
|
|
if (msg.type === 'res' && msg.id) {
|
|
const pending = pendingRequests.get(msg.id);
|
|
if (pending) {
|
|
clearTimeout(pending.timer);
|
|
pendingRequests.delete(msg.id);
|
|
if (msg.ok) pending.resolve(msg.payload);
|
|
else pending.reject(new Error(msg.error?.message || 'Gateway error'));
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (msg.type === 'event') {
|
|
const { event: eventName, payload } = msg;
|
|
const psk = payload?.sessionKey;
|
|
if (psk && browserSessions) {
|
|
// Tool/lifecycle events use the agent's main sessionKey (e.g. agent:titan:main)
|
|
// Browser sessions use web sessionKey (e.g. agent:titan:web:nico)
|
|
// Match by agent ID for tool + lifecycle stream events from agent runs
|
|
const pAgentId = extractAgentId(psk);
|
|
for (const [ws, session] of browserSessions) {
|
|
const sAgentId = extractAgentId(session.sessionKey);
|
|
const exactMatch = session.sessionKey === psk;
|
|
const agentIdMatch = pAgentId && sAgentId === pAgentId;
|
|
const agentStreamMatch = eventName === 'agent'
|
|
&& (payload?.stream === 'tool' || payload?.stream === 'lifecycle')
|
|
&& agentIdMatch;
|
|
if (exactMatch || agentStreamMatch) {
|
|
if (exactMatch && session._handoverHandler) session._handoverHandler(eventName, payload);
|
|
forwardToBrowser(ws, session, eventName, payload);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
function stripNoReply(text: string): string {
|
|
return text.replace(/\s*NO_REPLY\s*$/g, '').trim();
|
|
}
|
|
|
|
function extractAgentId(sessionKey: string | null): string | null {
|
|
if (!sessionKey) return null;
|
|
const parts = sessionKey.split(':');
|
|
return parts.length >= 2 ? parts[1] : null;
|
|
}
|
|
|
|
/**
|
|
* Retroactively emit HUD tool_start/tool_end events from chat.history.
|
|
* Called after chat.done — the gateway does not forward tool stream events
|
|
* to operator WS connections, so we reconstruct them from the stored transcript.
|
|
*/
|
|
async function emitToolHudFromHistory(
|
|
session: any,
|
|
turnId: string,
|
|
hadTurnStarted: boolean,
|
|
turnTs: number,
|
|
send: (data: unknown) => void,
|
|
): Promise<void> {
|
|
const agentId = extractAgentId(session.sessionKey);
|
|
if (!agentId) { if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs)); return; }
|
|
const agentMainKey = `agent:${agentId}:main`;
|
|
|
|
let messages: any[] = [];
|
|
try {
|
|
const res: any = await gatewayRequest('chat.history', { sessionKey: agentMainKey, limit: 40 });
|
|
messages = res?.messages || [];
|
|
} catch {
|
|
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
|
|
return;
|
|
}
|
|
|
|
// Find the last assistant message with tool_use blocks
|
|
let lastAssistantIdx = -1;
|
|
for (let i = messages.length - 1; i >= 0; i--) {
|
|
const m = messages[i];
|
|
if (m.role === 'assistant') {
|
|
const content = Array.isArray(m.content) ? m.content : [];
|
|
if (content.some((c: any) => c.type === 'tool_use')) {
|
|
lastAssistantIdx = i;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (lastAssistantIdx === -1) {
|
|
// No tool calls — just close the turn
|
|
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
|
|
return;
|
|
}
|
|
|
|
// Build a map of tool_result blocks from subsequent user messages
|
|
const resultMap = new Map<string, any>(); // toolCallId → tool_result block
|
|
for (let i = lastAssistantIdx + 1; i < messages.length; i++) {
|
|
const m = messages[i];
|
|
if (m.role === 'user') {
|
|
const content = Array.isArray(m.content) ? m.content : [];
|
|
for (const c of content) {
|
|
if (c.type === 'tool_result' && c.tool_use_id) {
|
|
resultMap.set(c.tool_use_id, c);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Extract tool_use blocks from the assistant message
|
|
const assistantContent = Array.isArray(messages[lastAssistantIdx].content)
|
|
? messages[lastAssistantIdx].content : [];
|
|
const toolUses = assistantContent.filter((c: any) => c.type === 'tool_use');
|
|
|
|
if (toolUses.length === 0) {
|
|
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
|
|
return;
|
|
}
|
|
|
|
// Ensure turn is open
|
|
if (!hadTurnStarted) {
|
|
send(hud.turnStart(turnId));
|
|
}
|
|
|
|
// Emit tool_start + tool_end for each tool call (replay=false so they're live-looking)
|
|
const baseTs = turnTs;
|
|
for (let i = 0; i < toolUses.length; i++) {
|
|
const tu = toolUses[i];
|
|
const callId: string = tu.id || crypto.randomUUID();
|
|
const tool: string = tu.name || 'unknown';
|
|
const args = buildToolArgs(tool, tu.input || {});
|
|
const callTs = baseTs + i * 10; // slight offset to preserve order
|
|
|
|
send(hud.toolStart(callId, turnId, tool, args));
|
|
|
|
const resultBlock = resultMap.get(callId);
|
|
let result: Record<string, any>;
|
|
if (resultBlock) {
|
|
// Unwrap content array if needed
|
|
let raw: string;
|
|
if (Array.isArray(resultBlock.content)) {
|
|
raw = resultBlock.content.filter((c: any) => c.type === 'text').map((c: any) => c.text || '').join('');
|
|
} else {
|
|
raw = typeof resultBlock.content === 'string' ? resultBlock.content : JSON.stringify(resultBlock.content ?? '');
|
|
}
|
|
result = resolveToolResult(tool, tu.input || {}, raw);
|
|
if (resultBlock.is_error === true) (result as any).ok = false;
|
|
} else {
|
|
result = { ok: true };
|
|
}
|
|
send(hud.toolEnd(callId, turnId, tool, result, callTs));
|
|
}
|
|
|
|
// Close turn
|
|
send(hud.turnEnd(turnId, turnTs));
|
|
}
|
|
|
|
function forwardToBrowser(ws: any, session: any, eventName: string, payload: any) {
|
|
if (ws.readyState !== 1 /* OPEN */) return;
|
|
const agentId = extractAgentId(session.sessionKey);
|
|
const send = (data: unknown) => ws.send(JSON.stringify(data));
|
|
|
|
const turnId: string = session.turnId || 'unknown';
|
|
|
|
// Ensure turn_start is emitted before any child HUD events
|
|
function ensureTurnStart() {
|
|
if (!session._hudTurnStarted) {
|
|
session._hudTurnStarted = true;
|
|
session._hudTurnTs = Date.now();
|
|
send(hud.turnStart(turnId));
|
|
}
|
|
}
|
|
|
|
// Close any open thinking span — repeated across multiple event handlers
|
|
function closeThinkIfOpen() {
|
|
if (session._hudThinkId) {
|
|
send(hud.thinkEnd(session._hudThinkId, turnId, session._hudThinkTs || Date.now()));
|
|
session._hudThinkId = null;
|
|
session._hudThinkTs = null;
|
|
}
|
|
}
|
|
|
|
// Close any open turn span
|
|
function closeTurnIfOpen() {
|
|
if (session._hudTurnStarted) {
|
|
send(hud.turnEnd(turnId, session._hudTurnTs || Date.now()));
|
|
session._hudTurnStarted = false;
|
|
session._hudTurnTs = null;
|
|
}
|
|
}
|
|
|
|
switch (eventName) {
|
|
case 'chat.thinking': {
|
|
send({ type: 'thinking', content: payload.reasoning || '' });
|
|
// HUD: turn_start + think_start on first chunk
|
|
ensureTurnStart();
|
|
if (!session._hudThinkId) {
|
|
session._hudThinkId = crypto.randomUUID();
|
|
session._hudThinkTs = Date.now();
|
|
send(hud.thinkStart(session._hudThinkId, turnId));
|
|
}
|
|
break;
|
|
}
|
|
case 'agent': {
|
|
// Live tool events — stream:"tool" phase:"start"|"result"
|
|
// stream:"lifecycle" phase:"end"|"error" — turn done via embedded run
|
|
// stream:"compaction" phase:"start"|"end" — handled by ARCH-6 (TODO)
|
|
if (payload?.stream === 'tool') {
|
|
const phase: string = payload.data?.phase || '';
|
|
const tool: string = payload.data?.name || 'unknown';
|
|
const toolCallId: string = payload.data?.toolCallId || payload.data?.id || '';
|
|
const corrId: string = toolCallId || crypto.randomUUID();
|
|
|
|
if (phase === 'start') {
|
|
const args = buildToolArgs(tool, payload.data?.args || {});
|
|
ensureTurnStart();
|
|
if (!session._hudPendingTools) session._hudPendingTools = new Map();
|
|
session._hudPendingTools.set(corrId, { tool, args, ts: Date.now(), toolCallId });
|
|
session._hudLiveToolSeen = true;
|
|
console.log(`[HUD] tool_start corrId=${corrId} tool=${tool} turnId=${turnId}`);
|
|
send(hud.toolStart(corrId, turnId, tool, args, false, toolCallId));
|
|
} else if (phase === 'result') {
|
|
const raw = (() => {
|
|
const d = payload.data;
|
|
// result.content[{type:"text",text:"..."}] is the primary shape
|
|
if (d?.result?.content) {
|
|
const c = d.result.content;
|
|
return Array.isArray(c) ? c.filter((x: any) => x.type === 'text').map((x: any) => x.text || '').join('') : String(c);
|
|
}
|
|
if (typeof d?.output === 'string') return d.output;
|
|
if (d?.content) return typeof d.content === 'string' ? d.content
|
|
: Array.isArray(d.content) ? d.content.filter((c: any) => c.type === 'text').map((c: any) => c.text || '').join('') : '';
|
|
return '';
|
|
})();
|
|
const pending = session._hudPendingTools?.get(corrId);
|
|
const startTs: number = pending?.ts || Date.now();
|
|
const pendingArgs = pending?.args || {};
|
|
const result = resolveToolResult(tool, pendingArgs, raw);
|
|
console.log(`[HUD] tool_end corrId=${corrId} tool=${tool} deferred=${!!(session._hudPendingTools?.size === 0 && session._hudDeferredTurnEnd)}`);
|
|
send(hud.toolEnd(corrId, turnId, tool, result, startTs, false, toolCallId));
|
|
session._hudPendingTools?.delete(corrId);
|
|
// Drain deferred turn_end if all tools are now done
|
|
if ((session._hudPendingTools?.size ?? 0) === 0 && session._hudDeferredTurnEnd) {
|
|
const { turnId: dTurnId, turnTs: dTurnTs, hadTurnStarted: dHts } = session._hudDeferredTurnEnd;
|
|
session._hudDeferredTurnEnd = null;
|
|
if (dHts) send(hud.turnEnd(dTurnId, dTurnTs));
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
// chat.tool_call / chat.tool_result — dead code (OpenClaw never emits these event names)
|
|
// Kept as no-ops for reference; remove in future cleanup.
|
|
case 'chat.delta': {
|
|
const delta = filterText(payload.delta || '') ?? '';
|
|
session._lastDeltaText = (session._lastDeltaText || '') + delta;
|
|
ensureTurnStart();
|
|
closeThinkIfOpen();
|
|
send({ type: 'delta', content: delta, agentId, turnId: session.turnId || null });
|
|
break;
|
|
}
|
|
case 'chat.done': {
|
|
const accText = session._lastDeltaText || null;
|
|
session._lastDeltaText = '';
|
|
closeThinkIfOpen();
|
|
// Capture turn state before clearing, for async history path
|
|
const hadTurnStarted = session._hudTurnStarted;
|
|
const turnTs = session._hudTurnTs || Date.now();
|
|
session._hudTurnStarted = false;
|
|
session._hudTurnTs = null;
|
|
|
|
// Scenario H: detect NO_REPLY turns — partial deltas may have leaked to browser.
|
|
// Signal suppress=true so frontend can drop the bubble retroactively.
|
|
const isNoReply = /^\s*NO_REPLY\s*$/.test(accText || '');
|
|
|
|
// HUD: close the turn.
|
|
// If live stream:"tool" events were received this turn (_hudPendingTools had entries),
|
|
// skip emitToolHudFromHistory — tools already emitted live, no need to replay from history.
|
|
const hadLiveTools = session._hudLiveToolSeen === true;
|
|
session._hudLiveToolSeen = false;
|
|
if (hadLiveTools) {
|
|
// Live path: defer turn_end until all pending tool_ends have landed.
|
|
// chat.done may race ahead of the final phase:'result' event.
|
|
const pendingCount = session._hudPendingTools?.size ?? 0;
|
|
if (pendingCount > 0) {
|
|
// Stash close on session — tool_end handler will drain and emit
|
|
session._hudDeferredTurnEnd = { turnId, turnTs, hadTurnStarted };
|
|
} else {
|
|
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
|
|
}
|
|
} else {
|
|
// Fallback path: no live tool events — reconstruct from chat.history (reconnect mid-turn, etc.)
|
|
emitToolHudFromHistory(session, turnId, hadTurnStarted, turnTs, send).catch(() => {
|
|
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
|
|
});
|
|
}
|
|
|
|
send({ type: 'done', content: isNoReply ? '' : accText, suppress: isNoReply, usage: payload.usage, agentId, turnId: session.turnId || null });
|
|
if (session.sessionKey && onTurnDoneCallback) onTurnDoneCallback(session.sessionKey);
|
|
break;
|
|
}
|
|
case 'chat.aborted':
|
|
case 'chat.error': {
|
|
closeThinkIfOpen();
|
|
closeTurnIfOpen();
|
|
if (session.sessionKey && onTurnDoneCallback) onTurnDoneCallback(session.sessionKey);
|
|
break;
|
|
}
|
|
case 'chat': {
|
|
const contentArr = payload.message?.content || [];
|
|
const textBlock = contentArr.find((c: any) => c.type === 'text');
|
|
const thinkBlock = contentArr.find((c: any) => c.type === 'thinking');
|
|
if (payload.state === 'delta') {
|
|
// HUD: ensure turn started
|
|
ensureTurnStart();
|
|
// Forward thinking delta + HUD think_start
|
|
if (thinkBlock?.thinking) {
|
|
const lastThink: number = session._lastThinkOffset || 0;
|
|
const thinkChunk = thinkBlock.thinking.slice(lastThink);
|
|
session._lastThinkOffset = thinkBlock.thinking.length;
|
|
if (thinkChunk) {
|
|
if (!session._hudThinkId) {
|
|
session._hudThinkId = crypto.randomUUID();
|
|
session._hudThinkTs = Date.now();
|
|
send(hud.thinkStart(session._hudThinkId, turnId));
|
|
}
|
|
send({ type: 'thinking', content: thinkChunk, agentId });
|
|
}
|
|
}
|
|
// Forward text delta + HUD think_end when text starts
|
|
const fullText = textBlock?.text || '';
|
|
const lastOffset: number = session._lastDeltaOffset || 0;
|
|
const rawChunk = fullText.slice(lastOffset);
|
|
session._lastDeltaOffset = fullText.length;
|
|
const newChars = filterText(rawChunk) ?? '';
|
|
if (newChars) {
|
|
closeThinkIfOpen();
|
|
send({ type: 'delta', content: newChars, agentId });
|
|
}
|
|
} else if (payload.state === 'final') {
|
|
session._lastDeltaOffset = 0;
|
|
session._lastThinkOffset = 0;
|
|
ensureTurnStart();
|
|
closeThinkIfOpen();
|
|
closeTurnIfOpen();
|
|
const finalText = filterText(stripNoReply(textBlock?.text || ''));
|
|
send({ type: 'done', content: finalText, usage: payload.usage, agentId });
|
|
if (session.sessionKey && onTurnDoneCallback) onTurnDoneCallback(session.sessionKey);
|
|
} else if (payload.state === 'aborted' || payload.state === 'error') {
|
|
session._lastDeltaOffset = 0;
|
|
session._lastThinkOffset = 0;
|
|
closeThinkIfOpen();
|
|
closeTurnIfOpen();
|
|
if (session.sessionKey && onTurnDoneCallback) onTurnDoneCallback(session.sessionKey);
|
|
} else {
|
|
send({ type: 'message', content: stripNoReply(textBlock?.text || payload.content || ''), agentId });
|
|
}
|
|
break;
|
|
}
|
|
case 'chat.message':
|
|
send({ type: 'message', content: stripNoReply(payload.content), agentId });
|
|
break;
|
|
// Note: 'agent' case handled above (line ~398) — duplicate removed in refactor
|
|
default:
|
|
send({ type: 'event', event: eventName, payload });
|
|
}
|
|
}
|
|
|
|
export function disconnectGateway() {
|
|
if (gatewayWS) {
|
|
const ws = gatewayWS;
|
|
gatewayConnected = false;
|
|
gatewayWS = null;
|
|
ws.removeEventListener('close', () => {});
|
|
try { ws.close(); } catch (_) {}
|
|
setTimeout(() => connectToGateway().catch(err => console.error('[disco] reconnect failed:', err.message)), 500);
|
|
}
|
|
}
|