hermes/backend/session-watcher.ts
Nico ccee249618 v0.6.42: Hermes chat UI — Vue3/TS/Vite, audio STT/TTS, sidebar rail, MCP event loop
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-30 19:35:10 +02:00

574 lines
27 KiB
TypeScript

/**
* session-watcher.ts — JSONL session file watcher
*
* Reads openclaw session JSONL, classifies entries, and streams
* them to the browser via onEntry callbacks.
*/
import fs from 'fs';
import path from 'path';
import os from 'os';
import { filterValue, filterText } from './message-filter';
import { hud, buildToolArgs, resolveToolResult } from './hud-builder';
export type WatcherEntry = Record<string, unknown>;
export type OnEntry = (entry: WatcherEntry) => void;
export type WsSend = (data: WatcherEntry) => void;
export type GatewayRequestFn = (method: string, params: Record<string, unknown>) => Promise<unknown>;
export interface SessionWatcher {
start(): void;
stop(): void;
sendHistoryTo(send: WsSend): number;
}
/**
* Check if a session exists on disk for a given session key.
* Returns 'READY' if the .jsonl file exists, 'NO_SESSION' otherwise.
* Same logic as createSessionWatcher.start() — single source of truth.
*/
export function checkSessionOnDisk(agentId: string, sessionKey: string): 'READY' | 'NO_SESSION' {
try {
const sessionsPath = path.join(os.homedir(), '.openclaw', 'agents', agentId, 'sessions', 'sessions.json');
const sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf-8'));
const session = sessions[sessionKey];
if (session?.sessionFile && fs.existsSync(session.sessionFile)) {
return 'READY';
}
return 'NO_SESSION';
} catch {
return 'NO_SESSION';
}
}
export function createSessionWatcher(
sessionKey: string,
agentId: string,
onEntry: OnEntry,
wsSend: WsSend,
gatewayRequestFn: GatewayRequestFn,
): SessionWatcher {
const sessionsPath = path.join(os.homedir(), '.openclaw', 'agents', agentId, 'sessions', 'sessions.json');
let fileWatcher: fs.FSWatcher | null = null;
let directoryWatcher: fs.FSWatcher | null = null;
let sessionsJsonWatcher: fs.FSWatcher | null = null;
let sessionsChangeTimer: ReturnType<typeof setTimeout> | null = null;
let filePosition = 0;
let currentJsonlPath: string | null = null;
let cumulativeTokens = { input: 0, output: 0, cacheRead: 0, total: 0 };
const suppressedIds = new Set<string>();
let lastHistorySnapshot: { entries: WatcherEntry[]; file: string; entryCount: number } | null = null;
// HUD tool-call pairing state — keyed by toolCallId (OpenClaw-assigned, unique per call)
// Name-based fallback removed: unreliable when same tool is called multiple times in one turn.
const hudPendingByCallId: Map<string, { tool: string; args: any; ts: number }> = new Map();
function isSystemMessage(text: string): boolean {
return (
!text ||
text.startsWith('Pre-compaction memory flush') ||
text.startsWith('HEARTBEAT') ||
!!text.match(/^Read HEARTBEAT\.md/) ||
text.startsWith("You're starting fresh. Read HANDOVER.md") ||
text.startsWith("Write HANDOVER.md to your workspace") ||
text.startsWith("/new")
);
}
function classifyEntry(parsed: any, isHistory = false): WatcherEntry | WatcherEntry[] | null {
const msg = parsed.message;
const ts = parsed.timestamp;
const id = parsed.id;
const parentId = parsed.parentId;
if (!msg) return null;
if (parentId && suppressedIds.has(parentId) && msg.role === 'assistant') {
// If this assistant message contains a tool call, still emit HUD tool_start
// but suppress the text content. Add its id to suppressedIds so toolResults
// for non-tool-call responses (greeting text) are also suppressed.
const toolCall = msg.content?.find((c: any) => c.type === 'toolCall');
if (!toolCall) { if (id) suppressedIds.add(id); return null; }
// Has a tool call — fall through to emit tool_start; add to suppressedIds for
// any sibling assistant-text that might follow
if (id) suppressedIds.add(id);
}
if (msg.role === 'user') {
let text: string = msg.content?.[0]?.text || '';
text = text.replace(/^Sender \(untrusted metadata\):[\s\S]*?```\s*\n\n?/, '').trim();
text = text.replace(/^\[\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2} [^\]]+\]\s*/, '').trim();
if (isSystemMessage(text)) { if (id) suppressedIds.add(id); return null; }
// Detect injected session context prompt and split it out
const promptMatch = text.match(/^(IMPORTANT: This is a (?:private|PUBLIC|private 1:1)[^\n]*(?:\n[^\n]*)*?(?:Address [^\n]*\.))\n\n([\s\S]*)$/);
if (promptMatch) {
const entries: WatcherEntry[] = [
{ type: 'session_entry', entry_type: 'session_context', content: promptMatch[1], ts },
];
if (promptMatch[2].trim()) {
entries.push({ type: 'session_entry', entry_type: 'user_message', content: promptMatch[2].trim(), ts });
}
return entries;
}
return { type: 'session_entry', entry_type: 'user_message', content: text, ts };
}
if (msg.role === 'toolResult') {
const tool: string = msg.toolName || 'unknown';
const raw: string = msg.content?.[0]?.text || '';
// Use toolCallId exclusively — name fallback removed (breaks on repeated same-tool calls)
const toolCallId: string = msg.toolCallId || '';
const callId: string = toolCallId || crypto.randomUUID();
const pending = toolCallId ? hudPendingByCallId.get(toolCallId) : undefined;
const pendingArgs = pending?.args || {};
if (toolCallId) hudPendingByCallId.delete(toolCallId);
const result = resolveToolResult(tool, pendingArgs, raw);
const startTs: number = pending?.ts || (ts ? new Date(ts).getTime() : Date.now());
const endTs: number = ts ? new Date(ts).getTime() : Date.now();
const durationMs: number = Math.max(0, endTs - startTs);
const ev = hud.toolEnd(callId, 'history', tool, result, startTs, isHistory, toolCallId || undefined);
return { ...ev, durationMs, ts } as any;
}
if (msg.role === 'assistant') {
const toolCalls = msg.content?.filter((c: any) => c.type === 'toolCall') ?? [];
if (toolCalls.length > 0) {
const startTs = ts ? new Date(ts).getTime() : Date.now();
const events: WatcherEntry[] = [];
for (const toolCall of toolCalls) {
const tool: string = toolCall.name || 'unknown';
const toolCallId: string = toolCall.id || '';
const callId: string = toolCallId || crypto.randomUUID();
const args = buildToolArgs(tool, toolCall.arguments);
// Stash for pairing with toolResult — keyed by toolCallId only
if (toolCallId) hudPendingByCallId.set(toolCallId, { tool, args, ts: startTs });
events.push({
...hud.toolStart(callId, 'history', tool, args, isHistory, toolCallId || undefined),
ts,
} as any);
}
return events;
}
if (msg.stopReason === 'stop' || msg.stopReason === 'length') {
const truncated = msg.stopReason === 'length';
const text = filterText(
(msg.content?.filter((c: any) => c.type === 'text') ?? [])
.map((c: any) => c.text || '').join('').trim()
) ?? '';
const result: WatcherEntry = { type: 'session_entry', entry_type: 'assistant_text', content: text, ts, ...(truncated ? { truncated: true } : {}) };
if (truncated && !isHistory && wsSend) {
wsSend({ type: 'truncated_warning' });
}
if (msg.usage) {
cumulativeTokens.input += msg.usage.input || 0;
cumulativeTokens.output += msg.usage.output || 0;
cumulativeTokens.cacheRead += msg.usage.cacheRead || 0;
cumulativeTokens.total += msg.usage.totalTokens || 0;
if (wsSend) {
wsSend({
type: 'session_total_tokens',
input_tokens: cumulativeTokens.input,
cache_read_tokens: cumulativeTokens.cacheRead,
output_tokens: cumulativeTokens.output,
});
}
return [
result,
{
type: 'session_entry',
entry_type: 'usage',
input_tokens: msg.usage.input || 0,
output_tokens: msg.usage.output || 0,
cache_read_tokens: msg.usage.cacheRead || 0,
cache_write_tokens: msg.usage.cacheWrite || 0,
total_tokens: msg.usage.totalTokens || 0,
cost: msg.usage.cost?.total ?? msg.usage.cost ?? 0,
ts,
},
];
}
return result;
}
}
return null;
}
function processLines(lines: string[]) {
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const parsed = JSON.parse(trimmed);
const result = classifyEntry(parsed);
if (!result) continue;
const entries = Array.isArray(result) ? result : [result];
for (const entry of entries) {
// Keep snapshot up to date so late-joining tabs get full history
if (lastHistorySnapshot) {
lastHistorySnapshot.entries.push(entry);
lastHistorySnapshot.entryCount++;
}
onEntry(entry);
}
} catch (e: any) {
wsSend({ type: 'diagnostic', level: 'warn', message: `Backend: Skipped malformed line: ${e.message}` });
}
}
}
/**
* Group flat history entries into turn-structured HUD events.
* Each turn = one user message → N tool calls/results → one assistant text.
* Injects synthetic turn_start/turn_end HUD nodes wrapping tool nodes.
*/
function groupIntoTurns(entries: WatcherEntry[]): WatcherEntry[] {
const result: WatcherEntry[] = [];
// Collect tool HUD events (tool_start/tool_end) since last user message
let pendingTools: WatcherEntry[] = [];
let turnStartTs: number | null = null;
let turnEndTs: number | null = null;
let inTurn = false;
function flushTurn() {
if (!inTurn || pendingTools.length === 0) {
// No tools in this turn — emit nothing for the turn shell
for (const t of pendingTools) result.push(t);
pendingTools = [];
inTurn = false;
turnStartTs = null;
turnEndTs = null;
return;
}
const turnId = crypto.randomUUID();
const startTs = turnStartTs ?? Date.now();
const endTs = turnEndTs ?? Date.now();
const durationMs = Math.max(0, endTs - startTs);
// Rewrite parentId on all pending tools to this turn's id
const rewritten = pendingTools.map(t => ({ ...t, parentId: turnId }));
result.push({ type: 'hud', event: 'turn_start', correlationId: turnId, ts: startTs, replay: true } as WatcherEntry);
for (const t of rewritten) result.push(t);
result.push({ type: 'hud', event: 'turn_end', correlationId: turnId, ts: endTs, durationMs, replay: true } as WatcherEntry);
pendingTools = [];
inTurn = false;
turnStartTs = null;
turnEndTs = null;
}
for (const entry of entries) {
const e = entry as any;
// User message → turn boundary: flush previous turn, start new
if (e.type === 'session_entry' && e.entry_type === 'user_message') {
flushTurn();
result.push(entry);
inTurn = true;
turnStartTs = e.ts ? new Date(e.ts).getTime() : Date.now();
continue;
}
// HUD tool events — collect instead of emitting directly
if (e.type === 'hud' && (e.event === 'tool_start' || e.event === 'tool_end')) {
if (inTurn) {
if (e.event === 'tool_end') {
// Update turn end ts to track last tool completion
const ets = e.ts ? new Date(e.ts).getTime() : Date.now();
turnEndTs = Math.max(turnEndTs ?? 0, ets);
}
pendingTools.push(entry);
} else {
// Tool with no preceding user message (shouldn't happen, emit at root)
result.push(entry);
}
continue;
}
// Assistant text → turn end marker
if (e.type === 'session_entry' && e.entry_type === 'assistant_text') {
if (inTurn && e.ts) {
const ets = new Date(e.ts).getTime();
turnEndTs = Math.max(turnEndTs ?? 0, ets);
}
flushTurn();
result.push(entry);
continue;
}
// Everything else (usage, diagnostics, etc.) — emit as-is
result.push(entry);
}
// Flush any open turn at end of file
flushTurn();
return result;
}
function startFileWatcher(filePath: string) {
currentJsonlPath = filePath;
suppressedIds.clear();
const content = fs.readFileSync(filePath, 'utf8');
const lines = content.split('\n');
const rawHistoryEntries: WatcherEntry[] = [];
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const parsed = JSON.parse(trimmed);
if (parsed.message?.role === 'assistant' && parsed.message.usage) {
cumulativeTokens.input += parsed.message.usage.input || 0;
cumulativeTokens.output += parsed.message.usage.output || 0;
cumulativeTokens.cacheRead += parsed.message.usage.cacheRead || 0;
cumulativeTokens.total += parsed.message.usage.totalTokens || 0;
}
const result = classifyEntry(parsed, true);
if (!result) continue;
const entries = Array.isArray(result) ? result : [result];
for (const entry of entries) rawHistoryEntries.push(entry);
} catch (e: any) {
wsSend({ type: 'diagnostic', level: 'warn', message: `Backend: Skipped malformed line during history parse: ${e.message}` });
}
}
const historyEntries = groupIntoTurns(rawHistoryEntries);
filePosition = Buffer.byteLength(content, 'utf8');
lastHistorySnapshot = { entries: historyEntries, file: filePath, entryCount: lines.filter(l => l.trim()).length };
onEntry({ type: 'session_history', entries: historyEntries });
onEntry({ type: 'session_status', status: 'watching', file: filePath, entries: lines.filter(l => l.trim()).length });
if (wsSend && (cumulativeTokens.input > 0 || cumulativeTokens.output > 0 || cumulativeTokens.cacheRead > 0)) {
wsSend({
type: 'session_total_tokens',
input_tokens: cumulativeTokens.input,
cache_read_tokens: cumulativeTokens.cacheRead,
output_tokens: cumulativeTokens.output,
});
}
fileWatcher = fs.watch(filePath, () => {
try {
const stat = fs.statSync(filePath);
if (stat.size <= filePosition) return;
const fd = fs.openSync(filePath, 'r');
const buf = Buffer.alloc(stat.size - filePosition);
fs.readSync(fd, buf, 0, buf.length, filePosition);
fs.closeSync(fd);
filePosition = stat.size;
processLines(buf.toString('utf8').split('\n'));
} catch (e: any) {
if ((e as NodeJS.ErrnoException).code === 'ENOENT') {
// Session file was rotated — stop watching old file, check for new one
wsSend({ type: 'diagnostic', level: 'warn', message: `Backend: Session file rotated, re-attaching...` });
if (fileWatcher) { fileWatcher.close(); fileWatcher = null; }
// Re-read sessions.json to find new file
try {
const sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf8'));
const session = sessions[sessionKey];
if (session?.sessionFile && fs.existsSync(session.sessionFile)) {
filePosition = 0;
cumulativeTokens = { input: 0, output: 0, cacheRead: 0, total: 0 };
suppressedIds.clear();
lastHistorySnapshot = null;
startFileWatcher(session.sessionFile);
}
} catch (_) {
wsSend({ type: 'diagnostic', level: 'error', message: `Backend: Failed to re-attach after rotation` });
}
} else {
wsSend({ type: 'diagnostic', level: 'error', message: `Backend: Error reading session file: ${e.message}` });
}
}
});
}
function startSessionsWatcher() {
if (sessionsJsonWatcher) return; // already watching
try {
sessionsJsonWatcher = fs.watch(sessionsPath, () => {
// Debounce: sessions.json may fire multiple events per write
if (sessionsChangeTimer) clearTimeout(sessionsChangeTimer);
sessionsChangeTimer = setTimeout(() => {
sessionsChangeTimer = null;
try {
const sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf8'));
const session = sessions[sessionKey];
const newPath: string | undefined = session?.sessionFile;
if (newPath && newPath !== currentJsonlPath && fs.existsSync(newPath)) {
wsSend({ type: 'diagnostic', level: 'info', message: `Backend: Session rotated for ${sessionKey}, re-attaching to ${newPath}` });
if (fileWatcher) { fileWatcher.close(); fileWatcher = null; }
if (directoryWatcher) { directoryWatcher.close(); directoryWatcher = null; }
filePosition = 0;
cumulativeTokens = { input: 0, output: 0, cacheRead: 0, total: 0 };
suppressedIds.clear();
lastHistorySnapshot = null;
startFileWatcher(newPath);
}
} catch (_) {
// sessions.json temporarily unreadable during write — ignore
}
}, 200);
});
} catch (_) {
// sessions.json doesn't exist — nothing to watch
}
}
function start() {
startSessionsWatcher();
let sessions: any;
try { sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf8')); }
catch (_) { onEntry({ type: 'session_status', status: 'no_session', reason: 'sessions.json unreadable' }); return; }
const session = sessions[sessionKey];
if (!session?.sessionFile) {
onEntry({ type: 'session_status', status: 'no_session', reason: `no entry for ${sessionKey}` });
return;
}
const rawJsonlPath: string = session.sessionFile;
const jsonlDir = path.dirname(rawJsonlPath);
const jsonlFilename = path.basename(rawJsonlPath);
if (fs.existsSync(rawJsonlPath)) {
startFileWatcher(rawJsonlPath);
} else {
onEntry({ type: 'session_status', status: 'no_session', reason: 'jsonl file not found, watching directory', directory: jsonlDir, expectedFile: jsonlFilename });
directoryWatcher = fs.watch(jsonlDir, (eventType, filename) => {
if (eventType === 'rename' && filename === jsonlFilename) {
const fullPath = path.join(jsonlDir, filename as string);
setTimeout(() => {
if (fs.existsSync(fullPath)) {
if (directoryWatcher) { directoryWatcher.close(); directoryWatcher = null; }
startFileWatcher(fullPath);
}
}, 100);
}
});
}
}
function stop() {
if (sessionsChangeTimer) { clearTimeout(sessionsChangeTimer); sessionsChangeTimer = null; }
if (fileWatcher) { fileWatcher.close(); fileWatcher = null; }
if (directoryWatcher) { directoryWatcher.close(); directoryWatcher = null; }
if (sessionsJsonWatcher) { sessionsJsonWatcher.close(); sessionsJsonWatcher = null; }
}
function sendHistoryTo(send: WsSend): number {
if (!lastHistorySnapshot) return 0;
send({ type: 'session_history', entries: lastHistorySnapshot.entries });
send({ type: 'session_status', status: 'watching', file: lastHistorySnapshot.file, entries: lastHistorySnapshot.entryCount });
if (cumulativeTokens.input > 0 || cumulativeTokens.output > 0 || cumulativeTokens.cacheRead > 0) {
send({ type: 'session_total_tokens', input_tokens: cumulativeTokens.input, cache_read_tokens: cumulativeTokens.cacheRead, output_tokens: cumulativeTokens.output });
}
return lastHistorySnapshot.entryCount;
}
return { start, stop, sendHistoryTo };
}
// ── Previous session helpers ──────────────────────────────────
export interface PreviousSessionInfo {
path: string;
resetTimestamp: string;
}
/**
* Find previous session .jsonl.reset.* files for an agent.
* Returns files sorted newest-first. Use `skip` to paginate.
*/
export function findPreviousSessionFiles(agentId: string, count = 1, skip = 0): PreviousSessionInfo[] {
try {
const sessionsDir = path.join(os.homedir(), '.openclaw', 'agents', agentId, 'sessions');
const files = fs.readdirSync(sessionsDir)
.filter(f => f.includes('.jsonl.reset.'))
.sort((a, b) => {
const tsA = a.split('.jsonl.reset.')[1] || '';
const tsB = b.split('.jsonl.reset.')[1] || '';
return tsB.localeCompare(tsA); // newest first
});
return files.slice(skip, skip + count).map(f => {
const tsRaw = f.split('.jsonl.reset.')[1] || '';
const resetTimestamp = tsRaw.replace(/(\d{2})-(\d{2})-(\d{2})\./, '$1:$2:$3.');
return { path: path.join(sessionsDir, f), resetTimestamp };
});
} catch {
return [];
}
}
/** Convenience: find just the most recent one */
export function findPreviousSessionFile(agentId: string): PreviousSessionInfo | null {
const files = findPreviousSessionFiles(agentId, 1, 0);
return files.length > 0 ? files[0] : null;
}
export interface PreviousMessage {
role: 'user' | 'assistant';
content: string;
timestamp?: string;
}
/**
* Parse a session JSONL file and extract user + assistant messages.
* Skips tools, usage, system messages, session headers.
* Size guard: skips files > 5MB.
*/
export function parseSessionFile(filePath: string): PreviousMessage[] {
try {
const stat = fs.statSync(filePath);
if (stat.size > 5 * 1024 * 1024) return []; // too large
const lines = fs.readFileSync(filePath, 'utf-8').split('\n').filter(l => l.trim());
const messages: PreviousMessage[] = [];
const suppressedIds = new Set<string>();
for (const line of lines) {
let parsed: any;
try { parsed = JSON.parse(line); } catch { continue; }
const msg = parsed.message;
const ts = parsed.timestamp;
const id = parsed.id;
const parentId = parsed.parentId;
if (!msg) continue;
// Suppress children of system messages
if (parentId && suppressedIds.has(parentId)) {
if (id) suppressedIds.add(id);
continue;
}
if (msg.role === 'user') {
let text: string = msg.content?.[0]?.text || '';
// Strip sender metadata (may be entire message or prefix)
text = text.replace(/^Sender \(untrusted metadata\):[\s\S]*?```\s*\n?\n?/, '').trim();
text = text.replace(/^\[\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2} [^\]]+\]\s*/, '').trim();
// Skip system messages
if (!text || text.startsWith('Pre-compaction memory flush') || text.startsWith('HEARTBEAT') ||
text.startsWith("/new") || text.startsWith("You're starting fresh") ||
text.startsWith("Write HANDOVER.md")) {
if (id) suppressedIds.add(id);
continue;
}
// Strip injected session prompt
// Skip or strip session context prompt
if (/^(?:IMPORTANT: )?This is a (?:private|PUBLIC|private 1:1)/.test(text)) {
// Try to extract user content after the prompt
const afterPrompt = text.match(/\n\n([\s\S]+)$/);
if (afterPrompt) { text = afterPrompt[1].trim(); }
else { continue; } // entire message is just the prompt
}
if (!text) continue;
messages.push({ role: 'user', content: text, timestamp: ts });
} else if (msg.role === 'assistant' && (msg.stopReason === 'stop' || msg.stopReason === 'length')) {
const text = (msg.content?.filter((c: any) => c.type === 'text') ?? [])
.map((c: any) => c.text || '').join('').trim();
if (text) messages.push({ role: 'assistant', content: text, timestamp: ts });
}
}
return messages;
} catch {
return [];
}
}