574 lines
27 KiB
TypeScript
574 lines
27 KiB
TypeScript
/**
|
|
* session-watcher.ts — JSONL session file watcher
|
|
*
|
|
* Reads openclaw session JSONL, classifies entries, and streams
|
|
* them to the browser via onEntry callbacks.
|
|
*/
|
|
|
|
import fs from 'fs';
|
|
import path from 'path';
|
|
import os from 'os';
|
|
import { filterValue, filterText } from './message-filter';
|
|
import { hud, buildToolArgs, resolveToolResult } from './hud-builder';
|
|
|
|
export type WatcherEntry = Record<string, unknown>;
|
|
export type OnEntry = (entry: WatcherEntry) => void;
|
|
export type WsSend = (data: WatcherEntry) => void;
|
|
export type GatewayRequestFn = (method: string, params: Record<string, unknown>) => Promise<unknown>;
|
|
|
|
export interface SessionWatcher {
|
|
start(): void;
|
|
stop(): void;
|
|
sendHistoryTo(send: WsSend): number;
|
|
}
|
|
|
|
/**
|
|
* Check if a session exists on disk for a given session key.
|
|
* Returns 'READY' if the .jsonl file exists, 'NO_SESSION' otherwise.
|
|
* Same logic as createSessionWatcher.start() — single source of truth.
|
|
*/
|
|
export function checkSessionOnDisk(agentId: string, sessionKey: string): 'READY' | 'NO_SESSION' {
|
|
try {
|
|
const sessionsPath = path.join(os.homedir(), '.openclaw', 'agents', agentId, 'sessions', 'sessions.json');
|
|
const sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf-8'));
|
|
const session = sessions[sessionKey];
|
|
if (session?.sessionFile && fs.existsSync(session.sessionFile)) {
|
|
return 'READY';
|
|
}
|
|
return 'NO_SESSION';
|
|
} catch {
|
|
return 'NO_SESSION';
|
|
}
|
|
}
|
|
|
|
export function createSessionWatcher(
|
|
sessionKey: string,
|
|
agentId: string,
|
|
onEntry: OnEntry,
|
|
wsSend: WsSend,
|
|
gatewayRequestFn: GatewayRequestFn,
|
|
): SessionWatcher {
|
|
const sessionsPath = path.join(os.homedir(), '.openclaw', 'agents', agentId, 'sessions', 'sessions.json');
|
|
let fileWatcher: fs.FSWatcher | null = null;
|
|
let directoryWatcher: fs.FSWatcher | null = null;
|
|
let sessionsJsonWatcher: fs.FSWatcher | null = null;
|
|
let sessionsChangeTimer: ReturnType<typeof setTimeout> | null = null;
|
|
let filePosition = 0;
|
|
let currentJsonlPath: string | null = null;
|
|
let cumulativeTokens = { input: 0, output: 0, cacheRead: 0, total: 0 };
|
|
const suppressedIds = new Set<string>();
|
|
let lastHistorySnapshot: { entries: WatcherEntry[]; file: string; entryCount: number } | null = null;
|
|
// HUD tool-call pairing state — keyed by toolCallId (OpenClaw-assigned, unique per call)
|
|
// Name-based fallback removed: unreliable when same tool is called multiple times in one turn.
|
|
const hudPendingByCallId: Map<string, { tool: string; args: any; ts: number }> = new Map();
|
|
|
|
function isSystemMessage(text: string): boolean {
|
|
return (
|
|
!text ||
|
|
text.startsWith('Pre-compaction memory flush') ||
|
|
text.startsWith('HEARTBEAT') ||
|
|
!!text.match(/^Read HEARTBEAT\.md/) ||
|
|
text.startsWith("You're starting fresh. Read HANDOVER.md") ||
|
|
text.startsWith("Write HANDOVER.md to your workspace") ||
|
|
text.startsWith("/new")
|
|
);
|
|
}
|
|
|
|
function classifyEntry(parsed: any, isHistory = false): WatcherEntry | WatcherEntry[] | null {
|
|
const msg = parsed.message;
|
|
const ts = parsed.timestamp;
|
|
const id = parsed.id;
|
|
const parentId = parsed.parentId;
|
|
if (!msg) return null;
|
|
|
|
if (parentId && suppressedIds.has(parentId) && msg.role === 'assistant') {
|
|
// If this assistant message contains a tool call, still emit HUD tool_start
|
|
// but suppress the text content. Add its id to suppressedIds so toolResults
|
|
// for non-tool-call responses (greeting text) are also suppressed.
|
|
const toolCall = msg.content?.find((c: any) => c.type === 'toolCall');
|
|
if (!toolCall) { if (id) suppressedIds.add(id); return null; }
|
|
// Has a tool call — fall through to emit tool_start; add to suppressedIds for
|
|
// any sibling assistant-text that might follow
|
|
if (id) suppressedIds.add(id);
|
|
}
|
|
|
|
if (msg.role === 'user') {
|
|
let text: string = msg.content?.[0]?.text || '';
|
|
text = text.replace(/^Sender \(untrusted metadata\):[\s\S]*?```\s*\n\n?/, '').trim();
|
|
text = text.replace(/^\[\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2} [^\]]+\]\s*/, '').trim();
|
|
if (isSystemMessage(text)) { if (id) suppressedIds.add(id); return null; }
|
|
// Detect injected session context prompt and split it out
|
|
const promptMatch = text.match(/^(IMPORTANT: This is a (?:private|PUBLIC|private 1:1)[^\n]*(?:\n[^\n]*)*?(?:Address [^\n]*\.))\n\n([\s\S]*)$/);
|
|
if (promptMatch) {
|
|
const entries: WatcherEntry[] = [
|
|
{ type: 'session_entry', entry_type: 'session_context', content: promptMatch[1], ts },
|
|
];
|
|
if (promptMatch[2].trim()) {
|
|
entries.push({ type: 'session_entry', entry_type: 'user_message', content: promptMatch[2].trim(), ts });
|
|
}
|
|
return entries;
|
|
}
|
|
return { type: 'session_entry', entry_type: 'user_message', content: text, ts };
|
|
}
|
|
|
|
if (msg.role === 'toolResult') {
|
|
const tool: string = msg.toolName || 'unknown';
|
|
const raw: string = msg.content?.[0]?.text || '';
|
|
// Use toolCallId exclusively — name fallback removed (breaks on repeated same-tool calls)
|
|
const toolCallId: string = msg.toolCallId || '';
|
|
const callId: string = toolCallId || crypto.randomUUID();
|
|
const pending = toolCallId ? hudPendingByCallId.get(toolCallId) : undefined;
|
|
const pendingArgs = pending?.args || {};
|
|
if (toolCallId) hudPendingByCallId.delete(toolCallId);
|
|
const result = resolveToolResult(tool, pendingArgs, raw);
|
|
const startTs: number = pending?.ts || (ts ? new Date(ts).getTime() : Date.now());
|
|
const endTs: number = ts ? new Date(ts).getTime() : Date.now();
|
|
const durationMs: number = Math.max(0, endTs - startTs);
|
|
const ev = hud.toolEnd(callId, 'history', tool, result, startTs, isHistory, toolCallId || undefined);
|
|
return { ...ev, durationMs, ts } as any;
|
|
}
|
|
|
|
if (msg.role === 'assistant') {
|
|
const toolCalls = msg.content?.filter((c: any) => c.type === 'toolCall') ?? [];
|
|
if (toolCalls.length > 0) {
|
|
const startTs = ts ? new Date(ts).getTime() : Date.now();
|
|
const events: WatcherEntry[] = [];
|
|
for (const toolCall of toolCalls) {
|
|
const tool: string = toolCall.name || 'unknown';
|
|
const toolCallId: string = toolCall.id || '';
|
|
const callId: string = toolCallId || crypto.randomUUID();
|
|
const args = buildToolArgs(tool, toolCall.arguments);
|
|
// Stash for pairing with toolResult — keyed by toolCallId only
|
|
if (toolCallId) hudPendingByCallId.set(toolCallId, { tool, args, ts: startTs });
|
|
events.push({
|
|
...hud.toolStart(callId, 'history', tool, args, isHistory, toolCallId || undefined),
|
|
ts,
|
|
} as any);
|
|
}
|
|
return events;
|
|
}
|
|
if (msg.stopReason === 'stop' || msg.stopReason === 'length') {
|
|
const truncated = msg.stopReason === 'length';
|
|
const text = filterText(
|
|
(msg.content?.filter((c: any) => c.type === 'text') ?? [])
|
|
.map((c: any) => c.text || '').join('').trim()
|
|
) ?? '';
|
|
const result: WatcherEntry = { type: 'session_entry', entry_type: 'assistant_text', content: text, ts, ...(truncated ? { truncated: true } : {}) };
|
|
if (truncated && !isHistory && wsSend) {
|
|
wsSend({ type: 'truncated_warning' });
|
|
}
|
|
if (msg.usage) {
|
|
cumulativeTokens.input += msg.usage.input || 0;
|
|
cumulativeTokens.output += msg.usage.output || 0;
|
|
cumulativeTokens.cacheRead += msg.usage.cacheRead || 0;
|
|
cumulativeTokens.total += msg.usage.totalTokens || 0;
|
|
if (wsSend) {
|
|
wsSend({
|
|
type: 'session_total_tokens',
|
|
input_tokens: cumulativeTokens.input,
|
|
cache_read_tokens: cumulativeTokens.cacheRead,
|
|
output_tokens: cumulativeTokens.output,
|
|
});
|
|
}
|
|
return [
|
|
result,
|
|
{
|
|
type: 'session_entry',
|
|
entry_type: 'usage',
|
|
input_tokens: msg.usage.input || 0,
|
|
output_tokens: msg.usage.output || 0,
|
|
cache_read_tokens: msg.usage.cacheRead || 0,
|
|
cache_write_tokens: msg.usage.cacheWrite || 0,
|
|
total_tokens: msg.usage.totalTokens || 0,
|
|
cost: msg.usage.cost?.total ?? msg.usage.cost ?? 0,
|
|
ts,
|
|
},
|
|
];
|
|
}
|
|
return result;
|
|
}
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
function processLines(lines: string[]) {
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed) continue;
|
|
try {
|
|
const parsed = JSON.parse(trimmed);
|
|
const result = classifyEntry(parsed);
|
|
if (!result) continue;
|
|
const entries = Array.isArray(result) ? result : [result];
|
|
for (const entry of entries) {
|
|
// Keep snapshot up to date so late-joining tabs get full history
|
|
if (lastHistorySnapshot) {
|
|
lastHistorySnapshot.entries.push(entry);
|
|
lastHistorySnapshot.entryCount++;
|
|
}
|
|
onEntry(entry);
|
|
}
|
|
} catch (e: any) {
|
|
wsSend({ type: 'diagnostic', level: 'warn', message: `Backend: Skipped malformed line: ${e.message}` });
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Group flat history entries into turn-structured HUD events.
|
|
* Each turn = one user message → N tool calls/results → one assistant text.
|
|
* Injects synthetic turn_start/turn_end HUD nodes wrapping tool nodes.
|
|
*/
|
|
function groupIntoTurns(entries: WatcherEntry[]): WatcherEntry[] {
|
|
const result: WatcherEntry[] = [];
|
|
// Collect tool HUD events (tool_start/tool_end) since last user message
|
|
let pendingTools: WatcherEntry[] = [];
|
|
let turnStartTs: number | null = null;
|
|
let turnEndTs: number | null = null;
|
|
let inTurn = false;
|
|
|
|
function flushTurn() {
|
|
if (!inTurn || pendingTools.length === 0) {
|
|
// No tools in this turn — emit nothing for the turn shell
|
|
for (const t of pendingTools) result.push(t);
|
|
pendingTools = [];
|
|
inTurn = false;
|
|
turnStartTs = null;
|
|
turnEndTs = null;
|
|
return;
|
|
}
|
|
const turnId = crypto.randomUUID();
|
|
const startTs = turnStartTs ?? Date.now();
|
|
const endTs = turnEndTs ?? Date.now();
|
|
const durationMs = Math.max(0, endTs - startTs);
|
|
// Rewrite parentId on all pending tools to this turn's id
|
|
const rewritten = pendingTools.map(t => ({ ...t, parentId: turnId }));
|
|
result.push({ type: 'hud', event: 'turn_start', correlationId: turnId, ts: startTs, replay: true } as WatcherEntry);
|
|
for (const t of rewritten) result.push(t);
|
|
result.push({ type: 'hud', event: 'turn_end', correlationId: turnId, ts: endTs, durationMs, replay: true } as WatcherEntry);
|
|
pendingTools = [];
|
|
inTurn = false;
|
|
turnStartTs = null;
|
|
turnEndTs = null;
|
|
}
|
|
|
|
for (const entry of entries) {
|
|
const e = entry as any;
|
|
|
|
// User message → turn boundary: flush previous turn, start new
|
|
if (e.type === 'session_entry' && e.entry_type === 'user_message') {
|
|
flushTurn();
|
|
result.push(entry);
|
|
inTurn = true;
|
|
turnStartTs = e.ts ? new Date(e.ts).getTime() : Date.now();
|
|
continue;
|
|
}
|
|
|
|
// HUD tool events — collect instead of emitting directly
|
|
if (e.type === 'hud' && (e.event === 'tool_start' || e.event === 'tool_end')) {
|
|
if (inTurn) {
|
|
if (e.event === 'tool_end') {
|
|
// Update turn end ts to track last tool completion
|
|
const ets = e.ts ? new Date(e.ts).getTime() : Date.now();
|
|
turnEndTs = Math.max(turnEndTs ?? 0, ets);
|
|
}
|
|
pendingTools.push(entry);
|
|
} else {
|
|
// Tool with no preceding user message (shouldn't happen, emit at root)
|
|
result.push(entry);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
// Assistant text → turn end marker
|
|
if (e.type === 'session_entry' && e.entry_type === 'assistant_text') {
|
|
if (inTurn && e.ts) {
|
|
const ets = new Date(e.ts).getTime();
|
|
turnEndTs = Math.max(turnEndTs ?? 0, ets);
|
|
}
|
|
flushTurn();
|
|
result.push(entry);
|
|
continue;
|
|
}
|
|
|
|
// Everything else (usage, diagnostics, etc.) — emit as-is
|
|
result.push(entry);
|
|
}
|
|
|
|
// Flush any open turn at end of file
|
|
flushTurn();
|
|
return result;
|
|
}
|
|
|
|
function startFileWatcher(filePath: string) {
|
|
currentJsonlPath = filePath;
|
|
suppressedIds.clear();
|
|
const content = fs.readFileSync(filePath, 'utf8');
|
|
const lines = content.split('\n');
|
|
const rawHistoryEntries: WatcherEntry[] = [];
|
|
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed) continue;
|
|
try {
|
|
const parsed = JSON.parse(trimmed);
|
|
if (parsed.message?.role === 'assistant' && parsed.message.usage) {
|
|
cumulativeTokens.input += parsed.message.usage.input || 0;
|
|
cumulativeTokens.output += parsed.message.usage.output || 0;
|
|
cumulativeTokens.cacheRead += parsed.message.usage.cacheRead || 0;
|
|
cumulativeTokens.total += parsed.message.usage.totalTokens || 0;
|
|
}
|
|
const result = classifyEntry(parsed, true);
|
|
if (!result) continue;
|
|
const entries = Array.isArray(result) ? result : [result];
|
|
for (const entry of entries) rawHistoryEntries.push(entry);
|
|
} catch (e: any) {
|
|
wsSend({ type: 'diagnostic', level: 'warn', message: `Backend: Skipped malformed line during history parse: ${e.message}` });
|
|
}
|
|
}
|
|
|
|
const historyEntries = groupIntoTurns(rawHistoryEntries);
|
|
|
|
filePosition = Buffer.byteLength(content, 'utf8');
|
|
lastHistorySnapshot = { entries: historyEntries, file: filePath, entryCount: lines.filter(l => l.trim()).length };
|
|
onEntry({ type: 'session_history', entries: historyEntries });
|
|
onEntry({ type: 'session_status', status: 'watching', file: filePath, entries: lines.filter(l => l.trim()).length });
|
|
|
|
if (wsSend && (cumulativeTokens.input > 0 || cumulativeTokens.output > 0 || cumulativeTokens.cacheRead > 0)) {
|
|
wsSend({
|
|
type: 'session_total_tokens',
|
|
input_tokens: cumulativeTokens.input,
|
|
cache_read_tokens: cumulativeTokens.cacheRead,
|
|
output_tokens: cumulativeTokens.output,
|
|
});
|
|
}
|
|
|
|
fileWatcher = fs.watch(filePath, () => {
|
|
try {
|
|
const stat = fs.statSync(filePath);
|
|
if (stat.size <= filePosition) return;
|
|
const fd = fs.openSync(filePath, 'r');
|
|
const buf = Buffer.alloc(stat.size - filePosition);
|
|
fs.readSync(fd, buf, 0, buf.length, filePosition);
|
|
fs.closeSync(fd);
|
|
filePosition = stat.size;
|
|
processLines(buf.toString('utf8').split('\n'));
|
|
} catch (e: any) {
|
|
if ((e as NodeJS.ErrnoException).code === 'ENOENT') {
|
|
// Session file was rotated — stop watching old file, check for new one
|
|
wsSend({ type: 'diagnostic', level: 'warn', message: `Backend: Session file rotated, re-attaching...` });
|
|
if (fileWatcher) { fileWatcher.close(); fileWatcher = null; }
|
|
// Re-read sessions.json to find new file
|
|
try {
|
|
const sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf8'));
|
|
const session = sessions[sessionKey];
|
|
if (session?.sessionFile && fs.existsSync(session.sessionFile)) {
|
|
filePosition = 0;
|
|
cumulativeTokens = { input: 0, output: 0, cacheRead: 0, total: 0 };
|
|
suppressedIds.clear();
|
|
lastHistorySnapshot = null;
|
|
startFileWatcher(session.sessionFile);
|
|
}
|
|
} catch (_) {
|
|
wsSend({ type: 'diagnostic', level: 'error', message: `Backend: Failed to re-attach after rotation` });
|
|
}
|
|
} else {
|
|
wsSend({ type: 'diagnostic', level: 'error', message: `Backend: Error reading session file: ${e.message}` });
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
function startSessionsWatcher() {
|
|
if (sessionsJsonWatcher) return; // already watching
|
|
try {
|
|
sessionsJsonWatcher = fs.watch(sessionsPath, () => {
|
|
// Debounce: sessions.json may fire multiple events per write
|
|
if (sessionsChangeTimer) clearTimeout(sessionsChangeTimer);
|
|
sessionsChangeTimer = setTimeout(() => {
|
|
sessionsChangeTimer = null;
|
|
try {
|
|
const sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf8'));
|
|
const session = sessions[sessionKey];
|
|
const newPath: string | undefined = session?.sessionFile;
|
|
if (newPath && newPath !== currentJsonlPath && fs.existsSync(newPath)) {
|
|
wsSend({ type: 'diagnostic', level: 'info', message: `Backend: Session rotated for ${sessionKey}, re-attaching to ${newPath}` });
|
|
if (fileWatcher) { fileWatcher.close(); fileWatcher = null; }
|
|
if (directoryWatcher) { directoryWatcher.close(); directoryWatcher = null; }
|
|
filePosition = 0;
|
|
cumulativeTokens = { input: 0, output: 0, cacheRead: 0, total: 0 };
|
|
suppressedIds.clear();
|
|
lastHistorySnapshot = null;
|
|
startFileWatcher(newPath);
|
|
}
|
|
} catch (_) {
|
|
// sessions.json temporarily unreadable during write — ignore
|
|
}
|
|
}, 200);
|
|
});
|
|
} catch (_) {
|
|
// sessions.json doesn't exist — nothing to watch
|
|
}
|
|
}
|
|
|
|
function start() {
|
|
startSessionsWatcher();
|
|
let sessions: any;
|
|
try { sessions = JSON.parse(fs.readFileSync(sessionsPath, 'utf8')); }
|
|
catch (_) { onEntry({ type: 'session_status', status: 'no_session', reason: 'sessions.json unreadable' }); return; }
|
|
|
|
const session = sessions[sessionKey];
|
|
if (!session?.sessionFile) {
|
|
onEntry({ type: 'session_status', status: 'no_session', reason: `no entry for ${sessionKey}` });
|
|
return;
|
|
}
|
|
|
|
const rawJsonlPath: string = session.sessionFile;
|
|
const jsonlDir = path.dirname(rawJsonlPath);
|
|
const jsonlFilename = path.basename(rawJsonlPath);
|
|
|
|
if (fs.existsSync(rawJsonlPath)) {
|
|
startFileWatcher(rawJsonlPath);
|
|
} else {
|
|
onEntry({ type: 'session_status', status: 'no_session', reason: 'jsonl file not found, watching directory', directory: jsonlDir, expectedFile: jsonlFilename });
|
|
directoryWatcher = fs.watch(jsonlDir, (eventType, filename) => {
|
|
if (eventType === 'rename' && filename === jsonlFilename) {
|
|
const fullPath = path.join(jsonlDir, filename as string);
|
|
setTimeout(() => {
|
|
if (fs.existsSync(fullPath)) {
|
|
if (directoryWatcher) { directoryWatcher.close(); directoryWatcher = null; }
|
|
startFileWatcher(fullPath);
|
|
}
|
|
}, 100);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
function stop() {
|
|
if (sessionsChangeTimer) { clearTimeout(sessionsChangeTimer); sessionsChangeTimer = null; }
|
|
if (fileWatcher) { fileWatcher.close(); fileWatcher = null; }
|
|
if (directoryWatcher) { directoryWatcher.close(); directoryWatcher = null; }
|
|
if (sessionsJsonWatcher) { sessionsJsonWatcher.close(); sessionsJsonWatcher = null; }
|
|
}
|
|
|
|
function sendHistoryTo(send: WsSend): number {
|
|
if (!lastHistorySnapshot) return 0;
|
|
send({ type: 'session_history', entries: lastHistorySnapshot.entries });
|
|
send({ type: 'session_status', status: 'watching', file: lastHistorySnapshot.file, entries: lastHistorySnapshot.entryCount });
|
|
if (cumulativeTokens.input > 0 || cumulativeTokens.output > 0 || cumulativeTokens.cacheRead > 0) {
|
|
send({ type: 'session_total_tokens', input_tokens: cumulativeTokens.input, cache_read_tokens: cumulativeTokens.cacheRead, output_tokens: cumulativeTokens.output });
|
|
}
|
|
return lastHistorySnapshot.entryCount;
|
|
}
|
|
|
|
return { start, stop, sendHistoryTo };
|
|
}
|
|
|
|
// ── Previous session helpers ──────────────────────────────────
|
|
|
|
export interface PreviousSessionInfo {
|
|
path: string;
|
|
resetTimestamp: string;
|
|
}
|
|
|
|
/**
|
|
* Find previous session .jsonl.reset.* files for an agent.
|
|
* Returns files sorted newest-first. Use `skip` to paginate.
|
|
*/
|
|
export function findPreviousSessionFiles(agentId: string, count = 1, skip = 0): PreviousSessionInfo[] {
|
|
try {
|
|
const sessionsDir = path.join(os.homedir(), '.openclaw', 'agents', agentId, 'sessions');
|
|
const files = fs.readdirSync(sessionsDir)
|
|
.filter(f => f.includes('.jsonl.reset.'))
|
|
.sort((a, b) => {
|
|
const tsA = a.split('.jsonl.reset.')[1] || '';
|
|
const tsB = b.split('.jsonl.reset.')[1] || '';
|
|
return tsB.localeCompare(tsA); // newest first
|
|
});
|
|
return files.slice(skip, skip + count).map(f => {
|
|
const tsRaw = f.split('.jsonl.reset.')[1] || '';
|
|
const resetTimestamp = tsRaw.replace(/(\d{2})-(\d{2})-(\d{2})\./, '$1:$2:$3.');
|
|
return { path: path.join(sessionsDir, f), resetTimestamp };
|
|
});
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/** Convenience: find just the most recent one */
|
|
export function findPreviousSessionFile(agentId: string): PreviousSessionInfo | null {
|
|
const files = findPreviousSessionFiles(agentId, 1, 0);
|
|
return files.length > 0 ? files[0] : null;
|
|
}
|
|
|
|
export interface PreviousMessage {
|
|
role: 'user' | 'assistant';
|
|
content: string;
|
|
timestamp?: string;
|
|
}
|
|
|
|
/**
|
|
* Parse a session JSONL file and extract user + assistant messages.
|
|
* Skips tools, usage, system messages, session headers.
|
|
* Size guard: skips files > 5MB.
|
|
*/
|
|
export function parseSessionFile(filePath: string): PreviousMessage[] {
|
|
try {
|
|
const stat = fs.statSync(filePath);
|
|
if (stat.size > 5 * 1024 * 1024) return []; // too large
|
|
|
|
const lines = fs.readFileSync(filePath, 'utf-8').split('\n').filter(l => l.trim());
|
|
const messages: PreviousMessage[] = [];
|
|
const suppressedIds = new Set<string>();
|
|
|
|
for (const line of lines) {
|
|
let parsed: any;
|
|
try { parsed = JSON.parse(line); } catch { continue; }
|
|
const msg = parsed.message;
|
|
const ts = parsed.timestamp;
|
|
const id = parsed.id;
|
|
const parentId = parsed.parentId;
|
|
if (!msg) continue;
|
|
|
|
// Suppress children of system messages
|
|
if (parentId && suppressedIds.has(parentId)) {
|
|
if (id) suppressedIds.add(id);
|
|
continue;
|
|
}
|
|
|
|
if (msg.role === 'user') {
|
|
let text: string = msg.content?.[0]?.text || '';
|
|
// Strip sender metadata (may be entire message or prefix)
|
|
text = text.replace(/^Sender \(untrusted metadata\):[\s\S]*?```\s*\n?\n?/, '').trim();
|
|
text = text.replace(/^\[\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2} [^\]]+\]\s*/, '').trim();
|
|
// Skip system messages
|
|
if (!text || text.startsWith('Pre-compaction memory flush') || text.startsWith('HEARTBEAT') ||
|
|
text.startsWith("/new") || text.startsWith("You're starting fresh") ||
|
|
text.startsWith("Write HANDOVER.md")) {
|
|
if (id) suppressedIds.add(id);
|
|
continue;
|
|
}
|
|
// Strip injected session prompt
|
|
// Skip or strip session context prompt
|
|
if (/^(?:IMPORTANT: )?This is a (?:private|PUBLIC|private 1:1)/.test(text)) {
|
|
// Try to extract user content after the prompt
|
|
const afterPrompt = text.match(/\n\n([\s\S]+)$/);
|
|
if (afterPrompt) { text = afterPrompt[1].trim(); }
|
|
else { continue; } // entire message is just the prompt
|
|
}
|
|
if (!text) continue;
|
|
messages.push({ role: 'user', content: text, timestamp: ts });
|
|
} else if (msg.role === 'assistant' && (msg.stopReason === 'stop' || msg.stopReason === 'length')) {
|
|
const text = (msg.content?.filter((c: any) => c.type === 'text') ?? [])
|
|
.map((c: any) => c.text || '').join('').trim();
|
|
if (text) messages.push({ role: 'assistant', content: text, timestamp: ts });
|
|
}
|
|
}
|
|
return messages;
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|