/** * session-watcher.ts — JSONL session file watcher * * Reads openclaw session JSONL, classifies entries, and streams * them to the browser via onEntry callbacks. */ import fs from 'fs'; import path from 'path'; import os from 'os'; import { filterValue, filterText } from './message-filter'; import { hud, buildToolArgs, resolveToolResult } from './hud-builder'; export type WatcherEntry = Record; export type OnEntry = (entry: WatcherEntry) => void; export type WsSend = (data: WatcherEntry) => void; export type GatewayRequestFn = (method: string, params: Record) => Promise; export interface SessionWatcher { start(): void; stop(): void; sendHistoryTo(send: WsSend): number; } /** * Check if a session exists on disk for a given session key. * Returns 'READY' if the .jsonl file exists, 'NO_SESSION' otherwise. * Same logic as createSessionWatcher.start() — single source of truth. */ export function checkSessionOnDisk(agentId: string, sessionKey: string): 'READY' | 'NO_SESSION' { try { const sessionsPath = path.join(os.homedir(), '.openclaw', 'agents', agentId, 'sessions', 'sessions.json'); const sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf-8')); const session = sessions[sessionKey]; if (session?.sessionFile && fs.existsSync(session.sessionFile)) { return 'READY'; } return 'NO_SESSION'; } catch { return 'NO_SESSION'; } } export function createSessionWatcher( sessionKey: string, agentId: string, onEntry: OnEntry, wsSend: WsSend, gatewayRequestFn: GatewayRequestFn, ): SessionWatcher { const sessionsPath = path.join(os.homedir(), '.openclaw', 'agents', agentId, 'sessions', 'sessions.json'); let fileWatcher: fs.FSWatcher | null = null; let directoryWatcher: fs.FSWatcher | null = null; let sessionsJsonWatcher: fs.FSWatcher | null = null; let sessionsChangeTimer: ReturnType | null = null; let filePosition = 0; let currentJsonlPath: string | null = null; let cumulativeTokens = { input: 0, output: 0, cacheRead: 0, total: 0 }; const suppressedIds = new Set(); let lastHistorySnapshot: { entries: WatcherEntry[]; file: string; entryCount: number } | null = null; // HUD tool-call pairing state — keyed by toolCallId (OpenClaw-assigned, unique per call) // Name-based fallback removed: unreliable when same tool is called multiple times in one turn. const hudPendingByCallId: Map = new Map(); function isSystemMessage(text: string): boolean { return ( !text || text.startsWith('Pre-compaction memory flush') || text.startsWith('HEARTBEAT') || !!text.match(/^Read HEARTBEAT\.md/) || text.startsWith("You're starting fresh. Read HANDOVER.md") || text.startsWith("Write HANDOVER.md to your workspace") || text.startsWith("/new") ); } function classifyEntry(parsed: any, isHistory = false): WatcherEntry | WatcherEntry[] | null { const msg = parsed.message; const ts = parsed.timestamp; const id = parsed.id; const parentId = parsed.parentId; if (!msg) return null; if (parentId && suppressedIds.has(parentId) && msg.role === 'assistant') { // If this assistant message contains a tool call, still emit HUD tool_start // but suppress the text content. Add its id to suppressedIds so toolResults // for non-tool-call responses (greeting text) are also suppressed. const toolCall = msg.content?.find((c: any) => c.type === 'toolCall'); if (!toolCall) { if (id) suppressedIds.add(id); return null; } // Has a tool call — fall through to emit tool_start; add to suppressedIds for // any sibling assistant-text that might follow if (id) suppressedIds.add(id); } if (msg.role === 'user') { let text: string = msg.content?.[0]?.text || ''; text = text.replace(/^Sender \(untrusted metadata\):[\s\S]*?```\s*\n\n?/, '').trim(); text = text.replace(/^\[\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2} [^\]]+\]\s*/, '').trim(); if (isSystemMessage(text)) { if (id) suppressedIds.add(id); return null; } // Detect injected session context prompt and split it out const promptMatch = text.match(/^(IMPORTANT: This is a (?:private|PUBLIC|private 1:1)[^\n]*(?:\n[^\n]*)*?(?:Address [^\n]*\.))\n\n([\s\S]*)$/); if (promptMatch) { const entries: WatcherEntry[] = [ { type: 'session_entry', entry_type: 'session_context', content: promptMatch[1], ts }, ]; if (promptMatch[2].trim()) { entries.push({ type: 'session_entry', entry_type: 'user_message', content: promptMatch[2].trim(), ts }); } return entries; } return { type: 'session_entry', entry_type: 'user_message', content: text, ts }; } if (msg.role === 'toolResult') { const tool: string = msg.toolName || 'unknown'; const raw: string = msg.content?.[0]?.text || ''; // Use toolCallId exclusively — name fallback removed (breaks on repeated same-tool calls) const toolCallId: string = msg.toolCallId || ''; const callId: string = toolCallId || crypto.randomUUID(); const pending = toolCallId ? hudPendingByCallId.get(toolCallId) : undefined; const pendingArgs = pending?.args || {}; if (toolCallId) hudPendingByCallId.delete(toolCallId); const result = resolveToolResult(tool, pendingArgs, raw); const startTs: number = pending?.ts || (ts ? new Date(ts).getTime() : Date.now()); const endTs: number = ts ? new Date(ts).getTime() : Date.now(); const durationMs: number = Math.max(0, endTs - startTs); const ev = hud.toolEnd(callId, 'history', tool, result, startTs, isHistory, toolCallId || undefined); return { ...ev, durationMs, ts } as any; } if (msg.role === 'assistant') { const toolCalls = msg.content?.filter((c: any) => c.type === 'toolCall') ?? []; if (toolCalls.length > 0) { const startTs = ts ? new Date(ts).getTime() : Date.now(); const events: WatcherEntry[] = []; for (const toolCall of toolCalls) { const tool: string = toolCall.name || 'unknown'; const toolCallId: string = toolCall.id || ''; const callId: string = toolCallId || crypto.randomUUID(); const args = buildToolArgs(tool, toolCall.arguments); // Stash for pairing with toolResult — keyed by toolCallId only if (toolCallId) hudPendingByCallId.set(toolCallId, { tool, args, ts: startTs }); events.push({ ...hud.toolStart(callId, 'history', tool, args, isHistory, toolCallId || undefined), ts, } as any); } return events; } if (msg.stopReason === 'stop' || msg.stopReason === 'length') { const truncated = msg.stopReason === 'length'; const text = filterText( (msg.content?.filter((c: any) => c.type === 'text') ?? []) .map((c: any) => c.text || '').join('').trim() ) ?? ''; const result: WatcherEntry = { type: 'session_entry', entry_type: 'assistant_text', content: text, ts, ...(truncated ? { truncated: true } : {}) }; if (truncated && !isHistory && wsSend) { wsSend({ type: 'truncated_warning' }); } if (msg.usage) { cumulativeTokens.input += msg.usage.input || 0; cumulativeTokens.output += msg.usage.output || 0; cumulativeTokens.cacheRead += msg.usage.cacheRead || 0; cumulativeTokens.total += msg.usage.totalTokens || 0; if (wsSend) { wsSend({ type: 'session_total_tokens', input_tokens: cumulativeTokens.input, cache_read_tokens: cumulativeTokens.cacheRead, output_tokens: cumulativeTokens.output, }); } return [ result, { type: 'session_entry', entry_type: 'usage', input_tokens: msg.usage.input || 0, output_tokens: msg.usage.output || 0, cache_read_tokens: msg.usage.cacheRead || 0, cache_write_tokens: msg.usage.cacheWrite || 0, total_tokens: msg.usage.totalTokens || 0, cost: msg.usage.cost?.total ?? msg.usage.cost ?? 0, ts, }, ]; } return result; } } return null; } function processLines(lines: string[]) { for (const line of lines) { const trimmed = line.trim(); if (!trimmed) continue; try { const parsed = JSON.parse(trimmed); const result = classifyEntry(parsed); if (!result) continue; const entries = Array.isArray(result) ? result : [result]; for (const entry of entries) { // Keep snapshot up to date so late-joining tabs get full history if (lastHistorySnapshot) { lastHistorySnapshot.entries.push(entry); lastHistorySnapshot.entryCount++; } onEntry(entry); } } catch (e: any) { wsSend({ type: 'diagnostic', level: 'warn', message: `Backend: Skipped malformed line: ${e.message}` }); } } } /** * Group flat history entries into turn-structured HUD events. * Each turn = one user message → N tool calls/results → one assistant text. * Injects synthetic turn_start/turn_end HUD nodes wrapping tool nodes. */ function groupIntoTurns(entries: WatcherEntry[]): WatcherEntry[] { const result: WatcherEntry[] = []; // Collect tool HUD events (tool_start/tool_end) since last user message let pendingTools: WatcherEntry[] = []; let turnStartTs: number | null = null; let turnEndTs: number | null = null; let inTurn = false; function flushTurn() { if (!inTurn || pendingTools.length === 0) { // No tools in this turn — emit nothing for the turn shell for (const t of pendingTools) result.push(t); pendingTools = []; inTurn = false; turnStartTs = null; turnEndTs = null; return; } const turnId = crypto.randomUUID(); const startTs = turnStartTs ?? Date.now(); const endTs = turnEndTs ?? Date.now(); const durationMs = Math.max(0, endTs - startTs); // Rewrite parentId on all pending tools to this turn's id const rewritten = pendingTools.map(t => ({ ...t, parentId: turnId })); result.push({ type: 'hud', event: 'turn_start', correlationId: turnId, ts: startTs, replay: true } as WatcherEntry); for (const t of rewritten) result.push(t); result.push({ type: 'hud', event: 'turn_end', correlationId: turnId, ts: endTs, durationMs, replay: true } as WatcherEntry); pendingTools = []; inTurn = false; turnStartTs = null; turnEndTs = null; } for (const entry of entries) { const e = entry as any; // User message → turn boundary: flush previous turn, start new if (e.type === 'session_entry' && e.entry_type === 'user_message') { flushTurn(); result.push(entry); inTurn = true; turnStartTs = e.ts ? new Date(e.ts).getTime() : Date.now(); continue; } // HUD tool events — collect instead of emitting directly if (e.type === 'hud' && (e.event === 'tool_start' || e.event === 'tool_end')) { if (inTurn) { if (e.event === 'tool_end') { // Update turn end ts to track last tool completion const ets = e.ts ? new Date(e.ts).getTime() : Date.now(); turnEndTs = Math.max(turnEndTs ?? 0, ets); } pendingTools.push(entry); } else { // Tool with no preceding user message (shouldn't happen, emit at root) result.push(entry); } continue; } // Assistant text → turn end marker if (e.type === 'session_entry' && e.entry_type === 'assistant_text') { if (inTurn && e.ts) { const ets = new Date(e.ts).getTime(); turnEndTs = Math.max(turnEndTs ?? 0, ets); } flushTurn(); result.push(entry); continue; } // Everything else (usage, diagnostics, etc.) — emit as-is result.push(entry); } // Flush any open turn at end of file flushTurn(); return result; } function startFileWatcher(filePath: string) { currentJsonlPath = filePath; suppressedIds.clear(); const content = fs.readFileSync(filePath, 'utf8'); const lines = content.split('\n'); const rawHistoryEntries: WatcherEntry[] = []; for (const line of lines) { const trimmed = line.trim(); if (!trimmed) continue; try { const parsed = JSON.parse(trimmed); if (parsed.message?.role === 'assistant' && parsed.message.usage) { cumulativeTokens.input += parsed.message.usage.input || 0; cumulativeTokens.output += parsed.message.usage.output || 0; cumulativeTokens.cacheRead += parsed.message.usage.cacheRead || 0; cumulativeTokens.total += parsed.message.usage.totalTokens || 0; } const result = classifyEntry(parsed, true); if (!result) continue; const entries = Array.isArray(result) ? result : [result]; for (const entry of entries) rawHistoryEntries.push(entry); } catch (e: any) { wsSend({ type: 'diagnostic', level: 'warn', message: `Backend: Skipped malformed line during history parse: ${e.message}` }); } } const historyEntries = groupIntoTurns(rawHistoryEntries); filePosition = Buffer.byteLength(content, 'utf8'); lastHistorySnapshot = { entries: historyEntries, file: filePath, entryCount: lines.filter(l => l.trim()).length }; onEntry({ type: 'session_history', entries: historyEntries }); onEntry({ type: 'session_status', status: 'watching', file: filePath, entries: lines.filter(l => l.trim()).length }); if (wsSend && (cumulativeTokens.input > 0 || cumulativeTokens.output > 0 || cumulativeTokens.cacheRead > 0)) { wsSend({ type: 'session_total_tokens', input_tokens: cumulativeTokens.input, cache_read_tokens: cumulativeTokens.cacheRead, output_tokens: cumulativeTokens.output, }); } fileWatcher = fs.watch(filePath, () => { try { const stat = fs.statSync(filePath); if (stat.size <= filePosition) return; const fd = fs.openSync(filePath, 'r'); const buf = Buffer.alloc(stat.size - filePosition); fs.readSync(fd, buf, 0, buf.length, filePosition); fs.closeSync(fd); filePosition = stat.size; processLines(buf.toString('utf8').split('\n')); } catch (e: any) { if ((e as NodeJS.ErrnoException).code === 'ENOENT') { // Session file was rotated — stop watching old file, check for new one wsSend({ type: 'diagnostic', level: 'warn', message: `Backend: Session file rotated, re-attaching...` }); if (fileWatcher) { fileWatcher.close(); fileWatcher = null; } // Re-read sessions.json to find new file try { const sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf8')); const session = sessions[sessionKey]; if (session?.sessionFile && fs.existsSync(session.sessionFile)) { filePosition = 0; cumulativeTokens = { input: 0, output: 0, cacheRead: 0, total: 0 }; suppressedIds.clear(); lastHistorySnapshot = null; startFileWatcher(session.sessionFile); } } catch (_) { wsSend({ type: 'diagnostic', level: 'error', message: `Backend: Failed to re-attach after rotation` }); } } else { wsSend({ type: 'diagnostic', level: 'error', message: `Backend: Error reading session file: ${e.message}` }); } } }); } function startSessionsWatcher() { if (sessionsJsonWatcher) return; // already watching try { sessionsJsonWatcher = fs.watch(sessionsPath, () => { // Debounce: sessions.json may fire multiple events per write if (sessionsChangeTimer) clearTimeout(sessionsChangeTimer); sessionsChangeTimer = setTimeout(() => { sessionsChangeTimer = null; try { const sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf8')); const session = sessions[sessionKey]; const newPath: string | undefined = session?.sessionFile; if (newPath && newPath !== currentJsonlPath && fs.existsSync(newPath)) { wsSend({ type: 'diagnostic', level: 'info', message: `Backend: Session rotated for ${sessionKey}, re-attaching to ${newPath}` }); if (fileWatcher) { fileWatcher.close(); fileWatcher = null; } if (directoryWatcher) { directoryWatcher.close(); directoryWatcher = null; } filePosition = 0; cumulativeTokens = { input: 0, output: 0, cacheRead: 0, total: 0 }; suppressedIds.clear(); lastHistorySnapshot = null; startFileWatcher(newPath); } } catch (_) { // sessions.json temporarily unreadable during write — ignore } }, 200); }); } catch (_) { // sessions.json doesn't exist — nothing to watch } } function start() { startSessionsWatcher(); let sessions: any; try { sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf8')); } catch (_) { onEntry({ type: 'session_status', status: 'no_session', reason: 'sessions.json unreadable' }); return; } const session = sessions[sessionKey]; if (!session?.sessionFile) { onEntry({ type: 'session_status', status: 'no_session', reason: `no entry for ${sessionKey}` }); return; } const rawJsonlPath: string = session.sessionFile; const jsonlDir = path.dirname(rawJsonlPath); const jsonlFilename = path.basename(rawJsonlPath); if (fs.existsSync(rawJsonlPath)) { startFileWatcher(rawJsonlPath); } else { onEntry({ type: 'session_status', status: 'no_session', reason: 'jsonl file not found, watching directory', directory: jsonlDir, expectedFile: jsonlFilename }); directoryWatcher = fs.watch(jsonlDir, (eventType, filename) => { if (eventType === 'rename' && filename === jsonlFilename) { const fullPath = path.join(jsonlDir, filename as string); setTimeout(() => { if (fs.existsSync(fullPath)) { if (directoryWatcher) { directoryWatcher.close(); directoryWatcher = null; } startFileWatcher(fullPath); } }, 100); } }); } } function stop() { if (sessionsChangeTimer) { clearTimeout(sessionsChangeTimer); sessionsChangeTimer = null; } if (fileWatcher) { fileWatcher.close(); fileWatcher = null; } if (directoryWatcher) { directoryWatcher.close(); directoryWatcher = null; } if (sessionsJsonWatcher) { sessionsJsonWatcher.close(); sessionsJsonWatcher = null; } } function sendHistoryTo(send: WsSend): number { if (!lastHistorySnapshot) return 0; send({ type: 'session_history', entries: lastHistorySnapshot.entries }); send({ type: 'session_status', status: 'watching', file: lastHistorySnapshot.file, entries: lastHistorySnapshot.entryCount }); if (cumulativeTokens.input > 0 || cumulativeTokens.output > 0 || cumulativeTokens.cacheRead > 0) { send({ type: 'session_total_tokens', input_tokens: cumulativeTokens.input, cache_read_tokens: cumulativeTokens.cacheRead, output_tokens: cumulativeTokens.output }); } return lastHistorySnapshot.entryCount; } return { start, stop, sendHistoryTo }; } // ── Previous session helpers ────────────────────────────────── export interface PreviousSessionInfo { path: string; resetTimestamp: string; } /** * Find previous session .jsonl.reset.* files for an agent. * Returns files sorted newest-first. Use `skip` to paginate. */ export function findPreviousSessionFiles(agentId: string, count = 1, skip = 0): PreviousSessionInfo[] { try { const sessionsDir = path.join(os.homedir(), '.openclaw', 'agents', agentId, 'sessions'); const files = fs.readdirSync(sessionsDir) .filter(f => f.includes('.jsonl.reset.')) .sort((a, b) => { const tsA = a.split('.jsonl.reset.')[1] || ''; const tsB = b.split('.jsonl.reset.')[1] || ''; return tsB.localeCompare(tsA); // newest first }); return files.slice(skip, skip + count).map(f => { const tsRaw = f.split('.jsonl.reset.')[1] || ''; const resetTimestamp = tsRaw.replace(/(\d{2})-(\d{2})-(\d{2})\./, '$1:$2:$3.'); return { path: path.join(sessionsDir, f), resetTimestamp }; }); } catch { return []; } } /** Convenience: find just the most recent one */ export function findPreviousSessionFile(agentId: string): PreviousSessionInfo | null { const files = findPreviousSessionFiles(agentId, 1, 0); return files.length > 0 ? files[0] : null; } export interface PreviousMessage { role: 'user' | 'assistant'; content: string; timestamp?: string; } /** * Parse a session JSONL file and extract user + assistant messages. * Skips tools, usage, system messages, session headers. * Size guard: skips files > 5MB. */ export function parseSessionFile(filePath: string): PreviousMessage[] { try { const stat = fs.statSync(filePath); if (stat.size > 5 * 1024 * 1024) return []; // too large const lines = fs.readFileSync(filePath, 'utf-8').split('\n').filter(l => l.trim()); const messages: PreviousMessage[] = []; const suppressedIds = new Set(); for (const line of lines) { let parsed: any; try { parsed = JSON.parse(line); } catch { continue; } const msg = parsed.message; const ts = parsed.timestamp; const id = parsed.id; const parentId = parsed.parentId; if (!msg) continue; // Suppress children of system messages if (parentId && suppressedIds.has(parentId)) { if (id) suppressedIds.add(id); continue; } if (msg.role === 'user') { let text: string = msg.content?.[0]?.text || ''; // Strip sender metadata (may be entire message or prefix) text = text.replace(/^Sender \(untrusted metadata\):[\s\S]*?```\s*\n?\n?/, '').trim(); text = text.replace(/^\[\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2} [^\]]+\]\s*/, '').trim(); // Skip system messages if (!text || text.startsWith('Pre-compaction memory flush') || text.startsWith('HEARTBEAT') || text.startsWith("/new") || text.startsWith("You're starting fresh") || text.startsWith("Write HANDOVER.md")) { if (id) suppressedIds.add(id); continue; } // Strip injected session prompt // Skip or strip session context prompt if (/^(?:IMPORTANT: )?This is a (?:private|PUBLIC|private 1:1)/.test(text)) { // Try to extract user content after the prompt const afterPrompt = text.match(/\n\n([\s\S]+)$/); if (afterPrompt) { text = afterPrompt[1].trim(); } else { continue; } // entire message is just the prompt } if (!text) continue; messages.push({ role: 'user', content: text, timestamp: ts }); } else if (msg.role === 'assistant' && (msg.stopReason === 'stop' || msg.stopReason === 'length')) { const text = (msg.content?.filter((c: any) => c.type === 'text') ?? []) .map((c: any) => c.text || '').join('').trim(); if (text) messages.push({ role: 'assistant', content: text, timestamp: ts }); } } return messages; } catch { return []; } }