- Memorizer tracks user_expectation (conversational/delegated/waiting_input/observing) - Output node adjusts phrasing per expectation - PA retry loop: reformulates job on expert failure (all retries exhausted or tool skip) - Machine state in PA context: get_machine_summary includes current state, buttons, stored data - Expert writes to machine state via update_machine + transition_machine - Expanded baked schema coverage - Awareness panel shows color-coded expectation state - Dashboard and workspace component updates Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
413 lines
19 KiB
Python
413 lines
19 KiB
Python
"""Runtime: wires all nodes together into a processing pipeline."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
from .types import Envelope, Command, InputAnalysis, ThoughtResult, DirectorPlan
|
|
from .process import ProcessManager
|
|
from .engine import load_graph, instantiate_nodes, list_graphs, get_graph_for_cytoscape
|
|
from .frame_engine import FrameEngine
|
|
|
|
log = logging.getLogger("runtime")
|
|
|
|
TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl"
|
|
|
|
# Default graph — can be switched at runtime
|
|
_active_graph_name = "v4-eras"
|
|
|
|
|
|
class OutputSink:
|
|
"""Collects output. Optionally streams to attached WebSocket."""
|
|
|
|
def __init__(self):
|
|
self.ws = None
|
|
self.response: str = ""
|
|
self.controls: list = []
|
|
self.done: bool = False
|
|
|
|
def attach(self, ws):
|
|
self.ws = ws
|
|
|
|
def detach(self):
|
|
self.ws = None
|
|
|
|
def reset(self):
|
|
self.response = ""
|
|
self.controls = []
|
|
self.done = False
|
|
|
|
async def send_delta(self, text: str):
|
|
self.response += text
|
|
if self.ws:
|
|
try:
|
|
await self.ws.send_text(json.dumps({"type": "delta", "content": text}))
|
|
except Exception:
|
|
pass
|
|
|
|
async def send_controls(self, controls: list):
|
|
self.controls = controls
|
|
if self.ws:
|
|
try:
|
|
await self.ws.send_text(json.dumps({"type": "controls", "controls": controls}))
|
|
except Exception:
|
|
pass
|
|
|
|
async def send_artifacts(self, artifacts: list):
|
|
if self.ws:
|
|
try:
|
|
await self.ws.send_text(json.dumps({"type": "artifacts", "artifacts": artifacts}))
|
|
except Exception:
|
|
pass
|
|
|
|
async def send_hud(self, data: dict):
|
|
if self.ws:
|
|
try:
|
|
await self.ws.send_text(json.dumps({"type": "hud", **data}))
|
|
except Exception:
|
|
pass
|
|
|
|
async def send_done(self):
|
|
self.done = True
|
|
if self.ws:
|
|
try:
|
|
await self.ws.send_text(json.dumps({"type": "done"}))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class Runtime:
|
|
def __init__(self, user_claims: dict = None, origin: str = "",
|
|
broadcast: Callable = None, graph_name: str = None):
|
|
self.sink = OutputSink()
|
|
self.history: list[dict] = []
|
|
self.MAX_HISTORY = 40
|
|
self._broadcast = broadcast or (lambda e: None)
|
|
|
|
# Load graph and instantiate nodes
|
|
gname = graph_name or _active_graph_name
|
|
self.graph = load_graph(gname)
|
|
self.process_manager = ProcessManager(send_hud=self._send_hud)
|
|
nodes = instantiate_nodes(self.graph, send_hud=self._send_hud,
|
|
process_manager=self.process_manager)
|
|
|
|
# Bind nodes by role (pipeline code references these)
|
|
self.input_node = nodes["input"]
|
|
self.thinker = nodes.get("thinker") # v1/v2/v3
|
|
self.output_node = nodes["output"]
|
|
self.ui_node = nodes["ui"]
|
|
self.memorizer = nodes["memorizer"]
|
|
self.director = nodes.get("director") # v1/v2/v3, None in v4
|
|
self.sensor = nodes["sensor"]
|
|
self.interpreter = nodes.get("interpreter") # v2+ only
|
|
|
|
# Detect graph type
|
|
self.is_v2 = self.director is not None and hasattr(self.director, "decide")
|
|
self.use_frames = self.graph.get("engine") == "frames"
|
|
self.sensor.start(
|
|
get_memo_state=lambda: self.memorizer.state,
|
|
get_server_controls=lambda: self.ui_node.current_controls,
|
|
)
|
|
|
|
claims = user_claims or {}
|
|
log.info(f"[runtime] user_claims: {claims}")
|
|
self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown"
|
|
log.info(f"[runtime] resolved identity: {self.identity}")
|
|
self.channel = origin or "unknown"
|
|
|
|
self.memorizer.state["user_name"] = self.identity
|
|
self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session"
|
|
|
|
# Frame engine (for v3+ graphs)
|
|
if self.use_frames:
|
|
self.frame_engine = FrameEngine(
|
|
graph=self.graph, nodes=nodes, sink=self.sink,
|
|
history=self.history, send_hud=self._send_hud,
|
|
sensor=self.sensor, memorizer=self.memorizer,
|
|
ui_node=self.ui_node, identity=self.identity,
|
|
channel=self.channel, broadcast=self._broadcast)
|
|
log.info(f"[runtime] using FrameEngine for graph '{gname}'")
|
|
|
|
def attach_ws(self, ws):
|
|
"""Attach a WebSocket for real-time streaming."""
|
|
self.sink.attach(ws)
|
|
log.info("[runtime] WS attached")
|
|
|
|
def detach_ws(self):
|
|
"""Detach WebSocket. Runtime keeps running."""
|
|
self.sink.detach()
|
|
log.info("[runtime] WS detached")
|
|
|
|
def update_identity(self, user_claims: dict, origin: str = ""):
|
|
"""Update identity from WS auth claims."""
|
|
claims = user_claims or {}
|
|
self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or self.identity
|
|
if origin:
|
|
self.channel = origin
|
|
self.memorizer.state["user_name"] = self.identity
|
|
self.memorizer.state["situation"] = f"authenticated on {self.channel}"
|
|
log.info(f"[runtime] identity updated: {self.identity}")
|
|
|
|
async def _send_hud(self, data: dict):
|
|
await self.sink.send_hud(data)
|
|
trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data}
|
|
try:
|
|
with open(TRACE_FILE, "a", encoding="utf-8") as f:
|
|
f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n")
|
|
if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000:
|
|
lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n")
|
|
TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8")
|
|
except Exception as e:
|
|
log.error(f"trace write error: {e}")
|
|
self._broadcast(trace_entry)
|
|
|
|
async def _stream_text(self, text: str):
|
|
"""Stream pre-formed text to the client as deltas."""
|
|
self.sink.reset()
|
|
chunk_size = 12
|
|
for i in range(0, len(text), chunk_size):
|
|
await self.sink.send_delta(text[i:i + chunk_size])
|
|
await self.sink.send_done()
|
|
|
|
async def _run_output_and_ui(self, thought, mem_ctx):
|
|
"""Run Output and UI nodes in parallel. Returns (response_text, controls)."""
|
|
self.sink.reset()
|
|
output_task = asyncio.create_task(
|
|
self.output_node.process(thought, self.history, self.sink, memory_context=mem_ctx))
|
|
ui_task = asyncio.create_task(
|
|
self.ui_node.process(thought, self.history, memory_context=mem_ctx))
|
|
|
|
response, controls = await asyncio.gather(output_task, ui_task)
|
|
|
|
if controls:
|
|
await self.sink.send_controls(controls)
|
|
|
|
return response
|
|
|
|
async def handle_action(self, action: str, data: dict = None):
|
|
"""Handle a structured UI action (button click etc.)."""
|
|
self.sensor.note_user_activity()
|
|
|
|
# Try machine transition first (go: target — no LLM needed)
|
|
handled, transition_result = self.ui_node.try_machine_transition(action)
|
|
if handled:
|
|
await self._send_hud({"node": "ui", "event": "machine_transition",
|
|
"action": action, "detail": transition_result})
|
|
# Re-render all controls (machines + state + buttons)
|
|
controls = self.ui_node.get_machine_controls()
|
|
# Include non-machine buttons and labels
|
|
for ctrl in self.ui_node.current_controls:
|
|
if not ctrl.get("machine_id"):
|
|
controls.append(ctrl)
|
|
self.ui_node.current_controls = controls
|
|
await self.sink.send_controls(controls)
|
|
await self._send_hud({"node": "ui", "event": "controls", "controls": controls})
|
|
await self._stream_text(transition_result)
|
|
self.history.append({"role": "user", "content": f"[clicked {action}]"})
|
|
self.history.append({"role": "assistant", "content": transition_result})
|
|
return
|
|
|
|
# Try local UI action next (inc, dec, toggle — no LLM needed)
|
|
result, controls = await self.ui_node.process_local_action(action, data)
|
|
if result is not None:
|
|
# Local action handled — send controls update + short response
|
|
if controls:
|
|
await self.sink.send_controls(controls)
|
|
await self._stream_text(result)
|
|
self.history.append({"role": "user", "content": f"[clicked {action}]"})
|
|
self.history.append({"role": "assistant", "content": result})
|
|
return
|
|
|
|
# Complex action — needs Thinker reasoning
|
|
action_desc = f"ACTION: {action}"
|
|
if data:
|
|
action_desc += f" | data: {json.dumps(data)}"
|
|
self.history.append({"role": "user", "content": action_desc})
|
|
|
|
sensor_lines = self.sensor.get_context_lines()
|
|
director_line = self.director.get_context_line() if self.director else ""
|
|
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines, ui_state=self.ui_node.state)
|
|
if director_line:
|
|
mem_ctx += f"\n\n{director_line}"
|
|
|
|
command = Command(
|
|
analysis=InputAnalysis(intent="action", topic=action, complexity="simple"),
|
|
source_text=action_desc)
|
|
if self.is_v2:
|
|
plan = await self.director.decide(command, self.history, memory_context=mem_ctx)
|
|
thought = await self.thinker.process(command, plan, self.history, memory_context=mem_ctx)
|
|
if self.interpreter and thought.tool_used and thought.tool_output:
|
|
interpreted = await self.interpreter.interpret(
|
|
thought.tool_used, thought.tool_output, action_desc)
|
|
thought.response = interpreted.summary
|
|
else:
|
|
thought = await self.thinker.process(command, self.history, memory_context=mem_ctx)
|
|
|
|
response = await self._run_output_and_ui(thought, mem_ctx)
|
|
self.history.append({"role": "assistant", "content": response})
|
|
|
|
await self.memorizer.update(self.history)
|
|
if not self.is_v2 and self.director:
|
|
await self.director.update(self.history, self.memorizer.state)
|
|
|
|
if len(self.history) > self.MAX_HISTORY:
|
|
self.history = self.history[-self.MAX_HISTORY:]
|
|
|
|
def _format_dashboard(self, dashboard: list) -> str:
|
|
"""Format dashboard controls into a context string for Thinker.
|
|
Compares browser-reported state against server-side controls to detect mismatches."""
|
|
server_controls = self.ui_node.current_controls
|
|
server_buttons = [str(c.get("label", "")) for c in server_controls if isinstance(c, dict) and c.get("type") == "button"]
|
|
browser_buttons = [str(c.get("label", "")) for c in dashboard if isinstance(c, dict) and c.get("type") == "button"] if dashboard else []
|
|
|
|
lines = []
|
|
|
|
# Mismatch detection (S3* audit)
|
|
if server_buttons and not browser_buttons:
|
|
lines.append(f"WARNING: Server sent {len(server_buttons)} controls but dashboard shows NONE.")
|
|
lines.append(f" Expected buttons: {', '.join(server_buttons)}")
|
|
lines.append(" Controls failed to render or were lost. You MUST re-emit them in ACTIONS.")
|
|
elif server_buttons and sorted(server_buttons) != sorted(browser_buttons):
|
|
lines.append(f"WARNING: Dashboard mismatch.")
|
|
lines.append(f" Server sent: {', '.join(server_buttons)}")
|
|
lines.append(f" Browser shows: {', '.join(browser_buttons) or 'nothing'}")
|
|
lines.append(" Re-emit correct controls in ACTIONS if needed.")
|
|
|
|
if not dashboard:
|
|
lines.append("Dashboard: empty (user sees nothing)")
|
|
else:
|
|
lines.append("Dashboard (what user currently sees):")
|
|
for ctrl in dashboard:
|
|
ctype = ctrl.get("type", "unknown")
|
|
if ctype == "button":
|
|
lines.append(f" - Button: {ctrl.get('label', '?')}")
|
|
elif ctype == "label":
|
|
lines.append(f" - Label: {ctrl.get('text', '?')} = {ctrl.get('value', '?')}")
|
|
elif ctype == "table":
|
|
cols = ctrl.get("columns", [])
|
|
rows = len(ctrl.get("data", []))
|
|
lines.append(f" - Table: {', '.join(cols)} ({rows} rows)")
|
|
else:
|
|
lines.append(f" - {ctype}: {ctrl.get('label', ctrl.get('text', '?'))}")
|
|
return "\n".join(lines)
|
|
|
|
async def handle_message(self, text: str, dashboard: list = None):
|
|
# Frame engine: delegate entirely
|
|
if self.use_frames:
|
|
result = await self.frame_engine.process_message(text, dashboard)
|
|
return result
|
|
|
|
# Detect ACTION: prefix from API/test runner
|
|
if text.startswith("ACTION:"):
|
|
parts = text.split("|", 1)
|
|
action = parts[0].replace("ACTION:", "").strip()
|
|
data = None
|
|
if len(parts) > 1:
|
|
try:
|
|
data = json.loads(parts[1].replace("data:", "").strip())
|
|
except (json.JSONDecodeError, Exception):
|
|
pass
|
|
return await self.handle_action(action, data)
|
|
|
|
envelope = Envelope(
|
|
text=text,
|
|
user_id="bob",
|
|
session_id="test",
|
|
timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
)
|
|
|
|
self.sensor.note_user_activity()
|
|
if dashboard is not None:
|
|
self.sensor.update_browser_dashboard(dashboard)
|
|
self.history.append({"role": "user", "content": text})
|
|
|
|
# Check Sensor flags (idle return, workspace mismatch)
|
|
sensor_flags = self.sensor.consume_flags()
|
|
sensor_lines = self.sensor.get_context_lines()
|
|
director_line = self.director.get_context_line() if self.director else ""
|
|
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines, ui_state=self.ui_node.state)
|
|
if director_line:
|
|
mem_ctx += f"\n\n{director_line}"
|
|
machine_summary = self.ui_node.get_machine_summary()
|
|
if machine_summary:
|
|
mem_ctx += f"\n\n{machine_summary}"
|
|
if dashboard is not None:
|
|
mem_ctx += f"\n\n{self._format_dashboard(dashboard)}"
|
|
# Inject sensor flags into context
|
|
if sensor_flags:
|
|
flag_lines = ["Sensor flags:"]
|
|
for f in sensor_flags:
|
|
if f["type"] == "idle_return":
|
|
flag_lines.append(f" - User returned after {f['away_duration']} away. Welcome them back briefly, mention what's on their dashboard.")
|
|
elif f["type"] == "workspace_mismatch":
|
|
flag_lines.append(f" - Workspace mismatch detected: {f['detail']}. Check if controls need re-emitting.")
|
|
mem_ctx += "\n\n" + "\n".join(flag_lines)
|
|
|
|
command = await self.input_node.process(
|
|
envelope, self.history, memory_context=mem_ctx,
|
|
identity=self.identity, channel=self.channel)
|
|
|
|
# Reflex path: trivial social messages skip Thinker entirely
|
|
if command.analysis.intent == "social" and command.analysis.complexity == "trivial":
|
|
await self._send_hud({"node": "runtime", "event": "reflex_path",
|
|
"detail": f"{command.analysis.intent}/{command.analysis.complexity}"})
|
|
thought = ThoughtResult(response=command.source_text, actions=[])
|
|
response = await self._run_output_and_ui(thought, mem_ctx)
|
|
self.history.append({"role": "assistant", "content": response})
|
|
await self.memorizer.update(self.history)
|
|
if not self.is_v2:
|
|
await self.director.update(self.history, self.memorizer.state)
|
|
if len(self.history) > self.MAX_HISTORY:
|
|
self.history = self.history[-self.MAX_HISTORY:]
|
|
return
|
|
|
|
if self.is_v2:
|
|
# v2 flow: Director decides, Thinker executes, Interpreter reads results
|
|
plan = await self.director.decide(command, self.history, memory_context=mem_ctx)
|
|
thought = await self.thinker.process(command, plan, self.history, memory_context=mem_ctx)
|
|
# Interpreter: factual summary of tool results (no hallucination)
|
|
if self.interpreter and thought.tool_used and thought.tool_output:
|
|
interpreted = await self.interpreter.interpret(
|
|
thought.tool_used, thought.tool_output, text)
|
|
# Replace thinker's response with interpreter's factual summary
|
|
thought.response = interpreted.summary
|
|
else:
|
|
# v1 flow: optional Director pre-planning for complex requests
|
|
is_complex = command.analysis.complexity == "complex"
|
|
is_data_request = (command.analysis.intent in ("request", "action")
|
|
and any(k in text.lower()
|
|
for k in ["daten", "data", "database", "db", "tabelle", "table",
|
|
"query", "abfrage", "untersuche", "investigate", "explore",
|
|
"analyse", "analyze", "umsatz", "revenue", "billing",
|
|
"abrechnung", "customer", "kunde", "geraete", "device",
|
|
"objekt", "object", "how many", "wieviele", "welche"]))
|
|
needs_planning = is_complex or (is_data_request and len(text.split()) > 8)
|
|
if needs_planning:
|
|
plan = await self.director.plan(self.history, self.memorizer.state, text)
|
|
if plan:
|
|
director_line = self.director.get_context_line()
|
|
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines, ui_state=self.ui_node.state)
|
|
mem_ctx += f"\n\n{director_line}"
|
|
if machine_summary:
|
|
mem_ctx += f"\n\n{machine_summary}"
|
|
if dashboard is not None:
|
|
mem_ctx += f"\n\n{self._format_dashboard(dashboard)}"
|
|
|
|
thought = await self.thinker.process(command, self.history, memory_context=mem_ctx)
|
|
self.director.current_plan = ""
|
|
|
|
# Output (voice) and UI (screen) run in parallel
|
|
response = await self._run_output_and_ui(thought, mem_ctx)
|
|
|
|
self.history.append({"role": "assistant", "content": response})
|
|
|
|
await self.memorizer.update(self.history)
|
|
if not self.is_v2:
|
|
await self.director.update(self.history, self.memorizer.state)
|
|
|
|
if len(self.history) > self.MAX_HISTORY:
|
|
self.history = self.history[-self.MAX_HISTORY:]
|