hermes/backend/gateway.ts
Nico ccee249618 v0.6.42: Hermes chat UI — Vue3/TS/Vite, audio STT/TTS, sidebar rail, MCP event loop
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-30 19:35:10 +02:00

582 lines
26 KiB
TypeScript

/**
* gateway.ts — OpenClaw gateway WebSocket connection (Bun)
*
* Manages the single upstream connection to openclaw gateway,
* request/response tracking, and event forwarding to browser sessions.
*/
import * as fs from 'fs';
import * as path from 'path';
import { sign, createPrivateKey } from 'crypto';
import { filterText, filterValue } from './message-filter';
import { hud, buildToolArgs, resolveToolResult, buildExecResult, buildGenericResult } from './hud-builder';
const GATEWAY_HOST = process.env.GATEWAY_HOST || '10.0.0.10';
const GATEWAY_PORT = process.env.GATEWAY_PORT || '18789';
const GATEWAY_TOKEN = process.env.GATEWAY_TOKEN || 'bd162cc05dac9371b92d613307350ae4bae3858f79240654';
// Device identity for gateway auth (required since openclaw 2026.3.13 for operator.write scope)
interface DeviceIdentity { deviceId: string; publicKeyPem: string; privateKeyPem: string; }
const DEVICE_IDENTITY_PATH = path.join(
process.env.HOME || '/home/openclaw', '.openclaw/identity/device.json'
);
const deviceIdentity: DeviceIdentity | null = (() => {
try { return JSON.parse(fs.readFileSync(DEVICE_IDENTITY_PATH, 'utf8')); }
catch { console.warn('[auth] device.json not found — connecting without device auth'); return null; }
})();
// Mirrors openclaw's buildDeviceAuthPayloadV3 — pipe-delimited string for Ed25519 signing
function buildV3Payload(p: {
deviceId: string; clientId: string; clientMode: string; role: string;
scopes: string[]; signedAtMs: number; token: string; nonce: string;
}): string {
return ['v3', p.deviceId, p.clientId, p.clientMode, p.role,
p.scopes.join(','), String(p.signedAtMs), p.token, p.nonce,
'linux', 'linux', // platform + deviceFamily (normalised to lowercase)
].join('|');
}
// Load gateway self-signed CA cert for TLS verification
const GATEWAY_CA_CERT = (() => {
try {
const p = path.join(import.meta.dir, 'gateway-ca.crt');
return fs.readFileSync(p, 'utf8');
} catch (_) { return undefined; }
})();
let gatewayWS: WebSocket | null = null;
let gatewayConnected = false;
const pendingRequests = new Map<string, {
resolve: (v: unknown) => void;
reject: (e: Error) => void;
timer: ReturnType<typeof setTimeout>;
}>();
// browserSessions ref — set by server.ts after init
let browserSessions: Map<any, any> | null = null;
export function setBrowserSessions(map: Map<any, any>) { browserSessions = map; }
export function isConnected() { return gatewayConnected; }
// onTurnDone callback — set by server.ts to clear activeTurnId when a turn completes
let onTurnDoneCallback: ((sessionKey: string) => void) | null = null;
export function setOnTurnDone(cb: (sessionKey: string) => void) { onTurnDoneCallback = cb; }
// onGatewayDisconnect callback — set by server.ts to abort all in-flight turns on gateway drop
let onGatewayDisconnectCallback: (() => void) | null = null;
export function setOnGatewayDisconnect(cb: () => void) { onGatewayDisconnectCallback = cb; }
function uuid(): string { return crypto.randomUUID(); }
const CONNECT_SCOPES = ['operator.read', 'operator.write', 'operator.admin'];
// Sends the connect request after receiving the challenge nonce (or null for no-device-auth fallback)
function sendConnectRequest(ws: WebSocket, challengeNonce: string | null,
outerResolve: () => void, outerReject: (e: Error) => void) {
const signedAtMs = Date.now();
let device: Record<string, unknown> | undefined;
if (deviceIdentity && challengeNonce) {
const payload = buildV3Payload({
deviceId: deviceIdentity.deviceId,
clientId: 'webchat', clientMode: 'webchat', role: 'operator',
scopes: CONNECT_SCOPES, signedAtMs,
token: GATEWAY_TOKEN, nonce: challengeNonce,
});
const privKey = createPrivateKey({ key: deviceIdentity.privateKeyPem, format: 'pem' });
const sigBytes = sign(null, Buffer.from(payload, 'utf8'), privKey);
device = {
id: deviceIdentity.deviceId,
publicKey: deviceIdentity.publicKeyPem,
signature: sigBytes.toString('base64url'),
signedAt: signedAtMs,
nonce: challengeNonce,
};
}
const connectId = uuid();
pendingRequests.set(connectId, {
resolve: (payload: unknown) => {
const granted = (payload as any)?.scopes ?? [];
console.log('[auth] Connect OK — granted scopes:', granted.join(', ') || '(none listed)');
gatewayConnected = true;
outerResolve();
},
reject: (err: Error) => {
console.error('[auth] Connect rejected:', err.message);
outerReject(err);
},
timer: setTimeout(() => {
pendingRequests.delete(connectId);
outerReject(new Error('Connect handshake timeout'));
}, 15000),
});
ws.send(JSON.stringify({
type: 'req', id: connectId, method: 'connect',
params: {
minProtocol: 3, maxProtocol: 3,
client: { id: 'webchat', version: '1.0.0', platform: 'linux', mode: 'webchat', deviceFamily: 'Linux' },
role: 'operator', scopes: CONNECT_SCOPES,
caps: ['tool-events'], commands: [], permissions: {},
auth: { token: GATEWAY_TOKEN },
userAgent: 'openclaw-webgateway/1.0',
...(device ? { device } : {}),
},
}));
const authMode = device ? `yes, nonce=${challengeNonce!.slice(0, 8)}` : 'no (fallback)';
console.log(`[auth] Connect sent (device auth: ${authMode})`);
}
export function connectToGateway(): Promise<void> {
return new Promise((resolve, reject) => {
const wsUrl = `wss://${GATEWAY_HOST}:${GATEWAY_PORT}`;
console.log(`🔌 Connecting to gateway: ${wsUrl}`);
const ws = new WebSocket(wsUrl, {
headers: { 'Origin': 'http://localhost', 'User-Agent': 'openclaw-webgateway/1.0' },
...(GATEWAY_CA_CERT ? { tls: { ca: GATEWAY_CA_CERT } } : {}),
} as any);
gatewayWS = ws;
ws.addEventListener('open', () => {
console.log('✅ Gateway WS open — waiting for connect.challenge…');
// Fallback: if no challenge arrives within 5 s, connect without device auth
const fallbackTimer = setTimeout(() => {
console.warn('[auth] No connect.challenge in 5 s — sending without device auth');
sendConnectRequest(ws, null, resolve, reject);
}, 5000);
(ws as any)._challengeFallbackTimer = fallbackTimer;
});
ws.addEventListener('message', (ev: MessageEvent) => {
try {
const msg = JSON.parse(typeof ev.data === 'string' ? ev.data : ev.data.toString());
handleGatewayMessage(msg, resolve, reject);
} catch (err: any) {
console.error('Gateway parse error:', err.message);
}
});
ws.addEventListener('close', () => {
console.log('❌ Gateway disconnected');
gatewayConnected = false;
gatewayWS = null;
if (onGatewayDisconnectCallback) onGatewayDisconnectCallback();
setTimeout(() => connectToGateway().catch(() => {}), 3000);
});
ws.addEventListener('error', (ev: Event) => {
const msg = (ev as any).message || 'unknown error';
console.error('Gateway error:', msg);
reject(new Error(msg));
});
setTimeout(() => {
if (!gatewayConnected) reject(new Error('Gateway connection timeout'));
}, 20000);
});
}
export function gatewayRequest(method: string, params: Record<string, unknown>): Promise<unknown> {
return new Promise((resolve, reject) => {
if (!gatewayWS || gatewayWS.readyState !== WebSocket.OPEN) {
reject(new Error('Gateway not connected')); return;
}
const id = uuid();
const timer = setTimeout(() => {
pendingRequests.delete(id);
reject(new Error('Gateway request timeout'));
}, 60000);
pendingRequests.set(id, { resolve, reject, timer });
gatewayWS.send(JSON.stringify({ type: 'req', id, method, params }));
});
}
function handleGatewayMessage(msg: any, connectResolve?: () => void, connectReject?: (e: Error) => void) {
// Handle connect.challenge — clear fallback timer and send signed connect request
if (msg.type === 'event' && msg.event === 'connect.challenge') {
const nonce: string = msg.payload?.nonce ?? '';
clearTimeout((gatewayWS as any)?._challengeFallbackTimer);
if (gatewayWS && connectResolve && connectReject) {
sendConnectRequest(gatewayWS, nonce, connectResolve, connectReject);
}
return;
}
if (msg.type === 'event') {
const p = msg.payload || {};
console.log(`[EV] ev=${msg.event} stream=${p.stream} state=${p.state} phase=${p.data?.phase} tool=${p.tool||p.data?.name} id=${p.id||p.data?.toolCallId} keys=${Object.keys(p).join(',')}`);
if (p.stream === 'tool' && p.data?.phase !== 'update') console.log(`[EV:tool] phase=${p.data?.phase} tool=${p.data?.name} id=${p.data?.toolCallId}`);
}
if (msg.type === 'res' && msg.id) {
const pending = pendingRequests.get(msg.id);
if (pending) {
clearTimeout(pending.timer);
pendingRequests.delete(msg.id);
if (msg.ok) pending.resolve(msg.payload);
else pending.reject(new Error(msg.error?.message || 'Gateway error'));
}
return;
}
if (msg.type === 'event') {
const { event: eventName, payload } = msg;
const psk = payload?.sessionKey;
if (psk && browserSessions) {
// Tool/lifecycle events use the agent's main sessionKey (e.g. agent:titan:main)
// Browser sessions use web sessionKey (e.g. agent:titan:web:nico)
// Match by agent ID for tool + lifecycle stream events from agent runs
const pAgentId = extractAgentId(psk);
for (const [ws, session] of browserSessions) {
const sAgentId = extractAgentId(session.sessionKey);
const exactMatch = session.sessionKey === psk;
const agentIdMatch = pAgentId && sAgentId === pAgentId;
const agentStreamMatch = eventName === 'agent'
&& (payload?.stream === 'tool' || payload?.stream === 'lifecycle')
&& agentIdMatch;
if (exactMatch || agentStreamMatch) {
if (exactMatch && session._handoverHandler) session._handoverHandler(eventName, payload);
forwardToBrowser(ws, session, eventName, payload);
}
}
}
}
}
function stripNoReply(text: string): string {
return text.replace(/\s*NO_REPLY\s*$/g, '').trim();
}
function extractAgentId(sessionKey: string | null): string | null {
if (!sessionKey) return null;
const parts = sessionKey.split(':');
return parts.length >= 2 ? parts[1] : null;
}
/**
* Retroactively emit HUD tool_start/tool_end events from chat.history.
* Called after chat.done — the gateway does not forward tool stream events
* to operator WS connections, so we reconstruct them from the stored transcript.
*/
async function emitToolHudFromHistory(
session: any,
turnId: string,
hadTurnStarted: boolean,
turnTs: number,
send: (data: unknown) => void,
): Promise<void> {
const agentId = extractAgentId(session.sessionKey);
if (!agentId) { if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs)); return; }
const agentMainKey = `agent:${agentId}:main`;
let messages: any[] = [];
try {
const res: any = await gatewayRequest('chat.history', { sessionKey: agentMainKey, limit: 40 });
messages = res?.messages || [];
} catch {
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
return;
}
// Find the last assistant message with tool_use blocks
let lastAssistantIdx = -1;
for (let i = messages.length - 1; i >= 0; i--) {
const m = messages[i];
if (m.role === 'assistant') {
const content = Array.isArray(m.content) ? m.content : [];
if (content.some((c: any) => c.type === 'tool_use')) {
lastAssistantIdx = i;
break;
}
}
}
if (lastAssistantIdx === -1) {
// No tool calls — just close the turn
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
return;
}
// Build a map of tool_result blocks from subsequent user messages
const resultMap = new Map<string, any>(); // toolCallId → tool_result block
for (let i = lastAssistantIdx + 1; i < messages.length; i++) {
const m = messages[i];
if (m.role === 'user') {
const content = Array.isArray(m.content) ? m.content : [];
for (const c of content) {
if (c.type === 'tool_result' && c.tool_use_id) {
resultMap.set(c.tool_use_id, c);
}
}
}
}
// Extract tool_use blocks from the assistant message
const assistantContent = Array.isArray(messages[lastAssistantIdx].content)
? messages[lastAssistantIdx].content : [];
const toolUses = assistantContent.filter((c: any) => c.type === 'tool_use');
if (toolUses.length === 0) {
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
return;
}
// Ensure turn is open
if (!hadTurnStarted) {
send(hud.turnStart(turnId));
}
// Emit tool_start + tool_end for each tool call (replay=false so they're live-looking)
const baseTs = turnTs;
for (let i = 0; i < toolUses.length; i++) {
const tu = toolUses[i];
const callId: string = tu.id || crypto.randomUUID();
const tool: string = tu.name || 'unknown';
const args = buildToolArgs(tool, tu.input || {});
const callTs = baseTs + i * 10; // slight offset to preserve order
send(hud.toolStart(callId, turnId, tool, args));
const resultBlock = resultMap.get(callId);
let result: Record<string, any>;
if (resultBlock) {
// Unwrap content array if needed
let raw: string;
if (Array.isArray(resultBlock.content)) {
raw = resultBlock.content.filter((c: any) => c.type === 'text').map((c: any) => c.text || '').join('');
} else {
raw = typeof resultBlock.content === 'string' ? resultBlock.content : JSON.stringify(resultBlock.content ?? '');
}
result = resolveToolResult(tool, tu.input || {}, raw);
if (resultBlock.is_error === true) (result as any).ok = false;
} else {
result = { ok: true };
}
send(hud.toolEnd(callId, turnId, tool, result, callTs));
}
// Close turn
send(hud.turnEnd(turnId, turnTs));
}
function forwardToBrowser(ws: any, session: any, eventName: string, payload: any) {
if (ws.readyState !== 1 /* OPEN */) return;
const agentId = extractAgentId(session.sessionKey);
const send = (data: unknown) => ws.send(JSON.stringify(data));
const turnId: string = session.turnId || 'unknown';
// Ensure turn_start is emitted before any child HUD events
function ensureTurnStart() {
if (!session._hudTurnStarted) {
session._hudTurnStarted = true;
session._hudTurnTs = Date.now();
send(hud.turnStart(turnId));
}
}
// Close any open thinking span — repeated across multiple event handlers
function closeThinkIfOpen() {
if (session._hudThinkId) {
send(hud.thinkEnd(session._hudThinkId, turnId, session._hudThinkTs || Date.now()));
session._hudThinkId = null;
session._hudThinkTs = null;
}
}
// Close any open turn span
function closeTurnIfOpen() {
if (session._hudTurnStarted) {
send(hud.turnEnd(turnId, session._hudTurnTs || Date.now()));
session._hudTurnStarted = false;
session._hudTurnTs = null;
}
}
switch (eventName) {
case 'chat.thinking': {
send({ type: 'thinking', content: payload.reasoning || '' });
// HUD: turn_start + think_start on first chunk
ensureTurnStart();
if (!session._hudThinkId) {
session._hudThinkId = crypto.randomUUID();
session._hudThinkTs = Date.now();
send(hud.thinkStart(session._hudThinkId, turnId));
}
break;
}
case 'agent': {
// Live tool events — stream:"tool" phase:"start"|"result"
// stream:"lifecycle" phase:"end"|"error" — turn done via embedded run
// stream:"compaction" phase:"start"|"end" — handled by ARCH-6 (TODO)
if (payload?.stream === 'tool') {
const phase: string = payload.data?.phase || '';
const tool: string = payload.data?.name || 'unknown';
const toolCallId: string = payload.data?.toolCallId || payload.data?.id || '';
const corrId: string = toolCallId || crypto.randomUUID();
if (phase === 'start') {
const args = buildToolArgs(tool, payload.data?.args || {});
ensureTurnStart();
if (!session._hudPendingTools) session._hudPendingTools = new Map();
session._hudPendingTools.set(corrId, { tool, args, ts: Date.now(), toolCallId });
session._hudLiveToolSeen = true;
console.log(`[HUD] tool_start corrId=${corrId} tool=${tool} turnId=${turnId}`);
send(hud.toolStart(corrId, turnId, tool, args, false, toolCallId));
} else if (phase === 'result') {
const raw = (() => {
const d = payload.data;
// result.content[{type:"text",text:"..."}] is the primary shape
if (d?.result?.content) {
const c = d.result.content;
return Array.isArray(c) ? c.filter((x: any) => x.type === 'text').map((x: any) => x.text || '').join('') : String(c);
}
if (typeof d?.output === 'string') return d.output;
if (d?.content) return typeof d.content === 'string' ? d.content
: Array.isArray(d.content) ? d.content.filter((c: any) => c.type === 'text').map((c: any) => c.text || '').join('') : '';
return '';
})();
const pending = session._hudPendingTools?.get(corrId);
const startTs: number = pending?.ts || Date.now();
const pendingArgs = pending?.args || {};
const result = resolveToolResult(tool, pendingArgs, raw);
console.log(`[HUD] tool_end corrId=${corrId} tool=${tool} deferred=${!!(session._hudPendingTools?.size === 0 && session._hudDeferredTurnEnd)}`);
send(hud.toolEnd(corrId, turnId, tool, result, startTs, false, toolCallId));
session._hudPendingTools?.delete(corrId);
// Drain deferred turn_end if all tools are now done
if ((session._hudPendingTools?.size ?? 0) === 0 && session._hudDeferredTurnEnd) {
const { turnId: dTurnId, turnTs: dTurnTs, hadTurnStarted: dHts } = session._hudDeferredTurnEnd;
session._hudDeferredTurnEnd = null;
if (dHts) send(hud.turnEnd(dTurnId, dTurnTs));
}
}
}
break;
}
// chat.tool_call / chat.tool_result — dead code (OpenClaw never emits these event names)
// Kept as no-ops for reference; remove in future cleanup.
case 'chat.delta': {
const delta = filterText(payload.delta || '') ?? '';
session._lastDeltaText = (session._lastDeltaText || '') + delta;
ensureTurnStart();
closeThinkIfOpen();
send({ type: 'delta', content: delta, agentId, turnId: session.turnId || null });
break;
}
case 'chat.done': {
const accText = session._lastDeltaText || null;
session._lastDeltaText = '';
closeThinkIfOpen();
// Capture turn state before clearing, for async history path
const hadTurnStarted = session._hudTurnStarted;
const turnTs = session._hudTurnTs || Date.now();
session._hudTurnStarted = false;
session._hudTurnTs = null;
// Scenario H: detect NO_REPLY turns — partial deltas may have leaked to browser.
// Signal suppress=true so frontend can drop the bubble retroactively.
const isNoReply = /^\s*NO_REPLY\s*$/.test(accText || '');
// HUD: close the turn.
// If live stream:"tool" events were received this turn (_hudPendingTools had entries),
// skip emitToolHudFromHistory — tools already emitted live, no need to replay from history.
const hadLiveTools = session._hudLiveToolSeen === true;
session._hudLiveToolSeen = false;
if (hadLiveTools) {
// Live path: defer turn_end until all pending tool_ends have landed.
// chat.done may race ahead of the final phase:'result' event.
const pendingCount = session._hudPendingTools?.size ?? 0;
if (pendingCount > 0) {
// Stash close on session — tool_end handler will drain and emit
session._hudDeferredTurnEnd = { turnId, turnTs, hadTurnStarted };
} else {
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
}
} else {
// Fallback path: no live tool events — reconstruct from chat.history (reconnect mid-turn, etc.)
emitToolHudFromHistory(session, turnId, hadTurnStarted, turnTs, send).catch(() => {
if (hadTurnStarted) send(hud.turnEnd(turnId, turnTs));
});
}
send({ type: 'done', content: isNoReply ? '' : accText, suppress: isNoReply, usage: payload.usage, agentId, turnId: session.turnId || null });
if (session.sessionKey && onTurnDoneCallback) onTurnDoneCallback(session.sessionKey);
break;
}
case 'chat.aborted':
case 'chat.error': {
closeThinkIfOpen();
closeTurnIfOpen();
if (session.sessionKey && onTurnDoneCallback) onTurnDoneCallback(session.sessionKey);
break;
}
case 'chat': {
const contentArr = payload.message?.content || [];
const textBlock = contentArr.find((c: any) => c.type === 'text');
const thinkBlock = contentArr.find((c: any) => c.type === 'thinking');
if (payload.state === 'delta') {
// HUD: ensure turn started
ensureTurnStart();
// Forward thinking delta + HUD think_start
if (thinkBlock?.thinking) {
const lastThink: number = session._lastThinkOffset || 0;
const thinkChunk = thinkBlock.thinking.slice(lastThink);
session._lastThinkOffset = thinkBlock.thinking.length;
if (thinkChunk) {
if (!session._hudThinkId) {
session._hudThinkId = crypto.randomUUID();
session._hudThinkTs = Date.now();
send(hud.thinkStart(session._hudThinkId, turnId));
}
send({ type: 'thinking', content: thinkChunk, agentId });
}
}
// Forward text delta + HUD think_end when text starts
const fullText = textBlock?.text || '';
const lastOffset: number = session._lastDeltaOffset || 0;
const rawChunk = fullText.slice(lastOffset);
session._lastDeltaOffset = fullText.length;
const newChars = filterText(rawChunk) ?? '';
if (newChars) {
closeThinkIfOpen();
send({ type: 'delta', content: newChars, agentId });
}
} else if (payload.state === 'final') {
session._lastDeltaOffset = 0;
session._lastThinkOffset = 0;
ensureTurnStart();
closeThinkIfOpen();
closeTurnIfOpen();
const finalText = filterText(stripNoReply(textBlock?.text || ''));
send({ type: 'done', content: finalText, usage: payload.usage, agentId });
if (session.sessionKey && onTurnDoneCallback) onTurnDoneCallback(session.sessionKey);
} else if (payload.state === 'aborted' || payload.state === 'error') {
session._lastDeltaOffset = 0;
session._lastThinkOffset = 0;
closeThinkIfOpen();
closeTurnIfOpen();
if (session.sessionKey && onTurnDoneCallback) onTurnDoneCallback(session.sessionKey);
} else {
send({ type: 'message', content: stripNoReply(textBlock?.text || payload.content || ''), agentId });
}
break;
}
case 'chat.message':
send({ type: 'message', content: stripNoReply(payload.content), agentId });
break;
// Note: 'agent' case handled above (line ~398) — duplicate removed in refactor
default:
send({ type: 'event', event: eventName, payload });
}
}
export function disconnectGateway() {
if (gatewayWS) {
const ws = gatewayWS;
gatewayConnected = false;
gatewayWS = null;
ws.removeEventListener('close', () => {});
try { ws.close(); } catch (_) {}
setTimeout(() => connectToGateway().catch(err => console.error('[disco] reconnect failed:', err.message)), 500);
}
}