Frame Engine (v3-framed): - Tick-based deterministic pipeline: frames advance on completion, not timers - FrameRecord/FrameTrace dataclasses for structured per-message tracing - /api/frames endpoint: queryable frame trace history (last 20 messages) - frame_trace HUD event with full pipeline visibility - Reflex=2F, Director=4F, Director+Interpreter=5F deterministic frame counts Expert Architecture (v4-eras): - PA node (pa_v1): routes to domain experts, holds user context - ExpertNode base: stateless executor with plan+execute two-LLM-call pattern - ErasExpertNode: eras2_production DB specialist with DESCRIBE-first discipline - Schema caching: DESCRIBE results reused across queries within session - Progress streaming: PA streams thinking message, expert streams per-tool progress - PARouting type for structured routing decisions UI Controls Split: - Separate thinker_controls from machine controls (current_controls is now a property) - Machine buttons persist across Thinker responses - Machine state parser handles both dict and list formats from Director - Normalized button format with go/payload field mapping WebSocket Architecture: - /ws/test: dedicated debug socket for test runner progress - /ws/trace: dedicated debug socket for HUD/frame trace events - /ws (chat): cleaned up, only deltas/controls/done/cleared - WS survives graph switch (re-attaches to new runtime) - Pipeline result reset on clear Test Infrastructure: - Live test streaming: on_result callback fires per check during execution - Frontend polling fallback (500ms) for proxy-buffered WS - frame_trace-first trace assertion (fixes stale perceived event bug) - action_match supports "or" patterns and multi-pattern matching - Trace window increased to 40 events - Graph-agnostic assertions (has X or Y) Test Suites: - smoketest.md: 12 steps covering all categories (~2min) - fast.md: 10 quick checks (~1min) - fast_v4.md: 10 v4-eras specific checks - expert_eras.md: eras domain tests (routing, DB, schema, errors) - expert_progress.md: progress streaming tests Other: - Shared db.py extracted from thinker_v2 (reused by experts) - InputNode prompt: few-shot examples, history as context summary - Director prompt: full tool signatures for add_state/reset_machine/destroy_machine - nginx no-cache headers for static files during development - Cache-busted static file references Scores: v3 smoketest 39/40, v4-eras fast 28/28, expert_eras 23/23 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
633 lines
28 KiB
Python
633 lines
28 KiB
Python
"""Frame Engine: tick-based deterministic pipeline execution.
|
|
|
|
Replaces the imperative handle_message() with a frame-stepping model:
|
|
- Each frame dispatches all nodes that have pending input
|
|
- Frames advance on completion (not on a timer)
|
|
- 0ms when idle, engine awaits external input
|
|
- Deterministic ordering: reflex=2 frames, thinker=3-4, interpreter=5
|
|
|
|
Works with any graph definition (v1, v2, v3). Node implementations unchanged.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field, asdict
|
|
|
|
from .types import Envelope, Command, InputAnalysis, ThoughtResult, DirectorPlan, PARouting
|
|
|
|
log = logging.getLogger("runtime")
|
|
|
|
|
|
# --- Frame Trace ---
|
|
|
|
@dataclass
|
|
class FrameRecord:
|
|
"""One frame's execution record."""
|
|
frame: int
|
|
node: str # which node ran ("input", "director", ...)
|
|
started: float = 0.0 # time.monotonic()
|
|
ended: float = 0.0
|
|
duration_ms: float = 0.0
|
|
input_summary: str = "" # what the node received
|
|
output_summary: str = "" # what the node produced
|
|
route: str = "" # where output goes next ("director", "output+ui", ...)
|
|
condition: str = "" # if a condition was evaluated ("reflex=True", ...)
|
|
error: str = "" # if the node failed
|
|
|
|
|
|
@dataclass
|
|
class FrameTrace:
|
|
"""Complete trace of one message through the pipeline."""
|
|
message: str = ""
|
|
graph: str = ""
|
|
total_frames: int = 0
|
|
total_ms: float = 0.0
|
|
started: float = 0.0
|
|
path: str = "" # "reflex", "director", "director+interpreter"
|
|
frames: list = field(default_factory=list) # list of FrameRecord
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"message": self.message[:100],
|
|
"graph": self.graph,
|
|
"total_frames": self.total_frames,
|
|
"total_ms": round(self.total_ms, 1),
|
|
"path": self.path,
|
|
"frames": [
|
|
{
|
|
"frame": f.frame,
|
|
"node": f.node,
|
|
"duration_ms": round(f.duration_ms, 1),
|
|
"input": f.input_summary[:200],
|
|
"output": f.output_summary[:200],
|
|
"route": f.route,
|
|
"condition": f.condition,
|
|
"error": f.error,
|
|
}
|
|
for f in self.frames
|
|
],
|
|
}
|
|
|
|
|
|
class FrameEngine:
|
|
"""Tick-based engine that steps through graph nodes frame by frame."""
|
|
|
|
def __init__(self, graph: dict, nodes: dict, sink, history: list,
|
|
send_hud, sensor, memorizer, ui_node, identity: str = "unknown",
|
|
channel: str = "unknown", broadcast=None):
|
|
self.graph = graph
|
|
self.nodes = nodes
|
|
self.sink = sink
|
|
self.history = history
|
|
self._send_hud = send_hud
|
|
self.sensor = sensor
|
|
self.memorizer = memorizer
|
|
self.ui_node = ui_node
|
|
self.identity = identity
|
|
self.channel = channel
|
|
self._broadcast = broadcast or (lambda e: None)
|
|
|
|
self.frame = 0
|
|
self.bus = {}
|
|
self.conditions = graph.get("conditions", {})
|
|
self.edges = [e for e in graph.get("edges", []) if e.get("type") == "data"]
|
|
|
|
self.has_director = "director" in nodes and hasattr(nodes.get("director"), "decide")
|
|
self.has_interpreter = "interpreter" in nodes
|
|
self.has_pa = "pa" in nodes and hasattr(nodes.get("pa"), "route")
|
|
|
|
# Discover available experts in this graph
|
|
self._experts = {}
|
|
for role, node in nodes.items():
|
|
if role.startswith("expert_"):
|
|
expert_name = role[7:] # "expert_eras" → "eras"
|
|
self._experts[expert_name] = node
|
|
if self.has_pa and self._experts:
|
|
nodes["pa"].set_available_experts(list(self._experts.keys()))
|
|
log.info(f"[frame] PA with experts: {list(self._experts.keys())}")
|
|
|
|
# Frame trace — last message's complete trace, queryable via API
|
|
self.last_trace: FrameTrace = FrameTrace()
|
|
# History of recent traces (last 20 messages)
|
|
self.trace_history: list[dict] = []
|
|
self.MAX_TRACE_HISTORY = 20
|
|
|
|
# --- Frame lifecycle helpers ---
|
|
|
|
def _begin_frame(self, frame_num: int, node: str, input_summary: str = "") -> FrameRecord:
|
|
"""Start a new frame. Returns the record to fill in."""
|
|
self.frame = frame_num
|
|
rec = FrameRecord(
|
|
frame=frame_num,
|
|
node=node,
|
|
started=time.monotonic(),
|
|
input_summary=input_summary,
|
|
)
|
|
self.last_trace.frames.append(rec)
|
|
return rec
|
|
|
|
def _end_frame(self, rec: FrameRecord, output_summary: str = "",
|
|
route: str = "", condition: str = ""):
|
|
"""Complete a frame record with output and timing."""
|
|
rec.ended = time.monotonic()
|
|
rec.duration_ms = (rec.ended - rec.started) * 1000
|
|
rec.output_summary = output_summary
|
|
rec.route = route
|
|
rec.condition = condition
|
|
log.info(f"[frame] F{rec.frame} {rec.node} "
|
|
f"{rec.duration_ms:.0f}ms -> {route or 'done'}")
|
|
|
|
def _begin_trace(self, text: str) -> FrameTrace:
|
|
"""Start a new message trace."""
|
|
trace = FrameTrace(
|
|
message=text,
|
|
graph=self.graph.get("name", "unknown"),
|
|
started=time.monotonic(),
|
|
)
|
|
self.last_trace = trace
|
|
self.frame = 0
|
|
return trace
|
|
|
|
def _end_trace(self, path: str):
|
|
"""Finalize the trace and emit as HUD event."""
|
|
t = self.last_trace
|
|
t.total_frames = self.frame
|
|
t.total_ms = (time.monotonic() - t.started) * 1000
|
|
t.path = path
|
|
# Store in history
|
|
self.trace_history.append(t.to_dict())
|
|
if len(self.trace_history) > self.MAX_TRACE_HISTORY:
|
|
self.trace_history = self.trace_history[-self.MAX_TRACE_HISTORY:]
|
|
log.info(f"[frame] trace: {path} {t.total_frames}F {t.total_ms:.0f}ms")
|
|
|
|
async def _emit_trace_hud(self):
|
|
"""Emit the completed frame trace as a single HUD event."""
|
|
t = self.last_trace
|
|
await self._send_hud({
|
|
"node": "frame_engine",
|
|
"event": "frame_trace",
|
|
"trace": t.to_dict(),
|
|
})
|
|
|
|
# --- Main entry point ---
|
|
|
|
async def process_message(self, text: str, dashboard: list = None) -> dict:
|
|
"""Process a message through the frame pipeline.
|
|
Returns {response, controls, memorizer, frames, trace}."""
|
|
|
|
self._begin_trace(text)
|
|
|
|
# Handle ACTION: prefix
|
|
if text.startswith("ACTION:"):
|
|
return await self._handle_action(text, dashboard)
|
|
|
|
# Setup
|
|
envelope = Envelope(
|
|
text=text, user_id=self.identity,
|
|
session_id="test", timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
)
|
|
self.sensor.note_user_activity()
|
|
if dashboard is not None:
|
|
self.sensor.update_browser_dashboard(dashboard)
|
|
self.history.append({"role": "user", "content": text})
|
|
|
|
# --- Frame 1: Input ---
|
|
mem_ctx = self._build_context(dashboard)
|
|
rec = self._begin_frame(1, "input", input_summary=text[:100])
|
|
|
|
command = await self.nodes["input"].process(
|
|
envelope, self.history, memory_context=mem_ctx,
|
|
identity=self.identity, channel=self.channel)
|
|
|
|
a = command.analysis
|
|
cmd_summary = f"intent={a.intent} language={a.language} tone={a.tone} complexity={a.complexity}"
|
|
|
|
# Check reflex condition
|
|
is_reflex = self._check_condition("reflex", command=command)
|
|
if is_reflex:
|
|
self._end_frame(rec, output_summary=cmd_summary,
|
|
route="output (reflex)", condition="reflex=True")
|
|
await self._send_hud({"node": "runtime", "event": "reflex_path",
|
|
"detail": f"{a.intent}/{a.complexity}"})
|
|
return await self._run_reflex(command, mem_ctx)
|
|
else:
|
|
next_node = "pa" if self.has_pa else ("director" if self.has_director else "thinker")
|
|
self._end_frame(rec, output_summary=cmd_summary,
|
|
route=next_node, condition=f"reflex=False")
|
|
|
|
# --- Frame 2+: Pipeline ---
|
|
if self.has_pa:
|
|
return await self._run_expert_pipeline(command, mem_ctx, dashboard)
|
|
elif self.has_director:
|
|
return await self._run_director_pipeline(command, mem_ctx, dashboard)
|
|
else:
|
|
return await self._run_thinker_pipeline(command, mem_ctx, dashboard)
|
|
|
|
# --- Pipeline variants ---
|
|
|
|
async def _run_reflex(self, command: Command, mem_ctx: str) -> dict:
|
|
"""Reflex: Input(F1) → Output(F2)."""
|
|
rec = self._begin_frame(2, "output", input_summary="reflex passthrough")
|
|
|
|
thought = ThoughtResult(response=command.source_text, actions=[])
|
|
response = await self._run_output_and_ui(thought, mem_ctx)
|
|
|
|
self.history.append({"role": "assistant", "content": response})
|
|
await self.memorizer.update(self.history)
|
|
self._trim_history()
|
|
|
|
self._end_frame(rec, output_summary=f"response[{len(response)}]")
|
|
self._end_trace("reflex")
|
|
await self._emit_trace_hud()
|
|
return self._make_result(response)
|
|
|
|
async def _run_expert_pipeline(self, command: Command, mem_ctx: str,
|
|
dashboard: list = None) -> dict:
|
|
"""Expert pipeline: Input(F1) → PA(F2) → Expert(F3) → [Interpreter(F4)] → Output."""
|
|
a = command.analysis
|
|
|
|
# Frame 2: PA routes
|
|
rec = self._begin_frame(2, "pa",
|
|
input_summary=f"intent={a.intent} topic={a.topic}")
|
|
routing = await self.nodes["pa"].route(
|
|
command, self.history, memory_context=mem_ctx,
|
|
identity=self.identity, channel=self.channel)
|
|
route_summary = f"expert={routing.expert} job={routing.job[:60]}"
|
|
self._end_frame(rec, output_summary=route_summary,
|
|
route=f"expert_{routing.expert}" if routing.expert != "none" else "output")
|
|
|
|
# Stream thinking message to user
|
|
if routing.thinking_message:
|
|
await self.sink.send_delta(routing.thinking_message + "\n\n")
|
|
|
|
# Direct PA response (no expert needed)
|
|
if routing.expert == "none":
|
|
rec = self._begin_frame(3, "output+ui",
|
|
input_summary=f"pa_direct: {routing.response_hint[:80]}")
|
|
thought = ThoughtResult(response=routing.response_hint, actions=[])
|
|
response = await self._run_output_and_ui(thought, mem_ctx)
|
|
self.history.append({"role": "assistant", "content": response})
|
|
await self.memorizer.update(self.history)
|
|
self._trim_history()
|
|
self._end_frame(rec, output_summary=f"response[{len(response)}]")
|
|
self._end_trace("pa_direct")
|
|
await self._emit_trace_hud()
|
|
return self._make_result(response)
|
|
|
|
# Frame 3: Expert executes
|
|
expert = self._experts.get(routing.expert)
|
|
if not expert:
|
|
log.error(f"[frame] expert '{routing.expert}' not found")
|
|
thought = ThoughtResult(response=f"Expert '{routing.expert}' not available.")
|
|
rec = self._begin_frame(3, "output+ui", input_summary="expert_not_found")
|
|
response = await self._run_output_and_ui(thought, mem_ctx)
|
|
self.history.append({"role": "assistant", "content": response})
|
|
self._end_frame(rec, output_summary="error", error=f"expert '{routing.expert}' not found")
|
|
self._end_trace("expert_error")
|
|
await self._emit_trace_hud()
|
|
return self._make_result(response)
|
|
|
|
rec = self._begin_frame(3, f"expert_{routing.expert}",
|
|
input_summary=f"job: {routing.job[:80]}")
|
|
|
|
# Wrap expert's HUD to stream progress to user
|
|
original_hud = expert.send_hud
|
|
expert.send_hud = self._make_progress_wrapper(original_hud, routing.language)
|
|
|
|
try:
|
|
thought = await expert.execute(routing.job, routing.language)
|
|
finally:
|
|
expert.send_hud = original_hud
|
|
|
|
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
|
|
f"actions={len(thought.actions)}")
|
|
has_tool = bool(thought.tool_used and thought.tool_output)
|
|
|
|
# Interpreter (conditional)
|
|
if self.has_interpreter and has_tool:
|
|
self._end_frame(rec, output_summary=thought_summary,
|
|
route="interpreter", condition="has_tool_output=True")
|
|
rec = self._begin_frame(4, "interpreter",
|
|
input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]")
|
|
interpreted = await self.nodes["interpreter"].interpret(
|
|
thought.tool_used, thought.tool_output, routing.job)
|
|
thought.response = interpreted.summary
|
|
self._end_frame(rec, output_summary=f"summary[{len(interpreted.summary)}]", route="output+ui")
|
|
|
|
rec = self._begin_frame(5, "output+ui",
|
|
input_summary=f"interpreted: {interpreted.summary[:80]}")
|
|
path = "expert+interpreter"
|
|
else:
|
|
self._end_frame(rec, output_summary=thought_summary,
|
|
route="output+ui",
|
|
condition="has_tool_output=False" if not has_tool else "")
|
|
rec = self._begin_frame(4, "output+ui",
|
|
input_summary=f"response: {thought.response[:80]}")
|
|
path = "expert"
|
|
|
|
# Clear progress text, render final response
|
|
self.sink.reset()
|
|
response = await self._run_output_and_ui(thought, mem_ctx)
|
|
self.history.append({"role": "assistant", "content": response})
|
|
await self.memorizer.update(self.history)
|
|
self._trim_history()
|
|
|
|
controls_count = len(self.ui_node.current_controls)
|
|
self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}")
|
|
self._end_trace(path)
|
|
await self._emit_trace_hud()
|
|
return self._make_result(response)
|
|
|
|
def _make_progress_wrapper(self, original_hud, language: str):
|
|
"""Wrap an expert's send_hud to also stream progress deltas to the user."""
|
|
sink = self.sink
|
|
progress_map = {
|
|
"tool_call": {"query_db": "Daten werden abgerufen..." if language == "de" else "Fetching data...",
|
|
"emit_actions": "UI wird erstellt..." if language == "de" else "Building UI...",
|
|
"create_machine": "Maschine wird erstellt..." if language == "de" else "Creating machine...",
|
|
"_default": "Verarbeite..." if language == "de" else "Processing..."},
|
|
"tool_result": {"_default": ""}, # silent on result
|
|
"planned": {"_default": "Plan erstellt..." if language == "de" else "Plan ready..."},
|
|
}
|
|
|
|
async def wrapper(data: dict):
|
|
await original_hud(data)
|
|
event = data.get("event", "")
|
|
if event in progress_map:
|
|
tool = data.get("tool", "_default")
|
|
msg = progress_map[event].get(tool, progress_map[event].get("_default", ""))
|
|
if msg:
|
|
await sink.send_delta(msg + "\n")
|
|
|
|
return wrapper
|
|
|
|
async def _run_director_pipeline(self, command: Command, mem_ctx: str,
|
|
dashboard: list = None) -> dict:
|
|
"""Director: Input(F1) → Director(F2) → Thinker(F3) → [Interpreter(F4)] → Output."""
|
|
a = command.analysis
|
|
|
|
# Frame 2: Director
|
|
rec = self._begin_frame(2, "director",
|
|
input_summary=f"intent={a.intent} topic={a.topic}")
|
|
plan = await self.nodes["director"].decide(command, self.history, memory_context=mem_ctx)
|
|
plan_summary = f"goal={plan.goal} tools={len(plan.tool_sequence)} hint={plan.response_hint[:50]}"
|
|
self._end_frame(rec, output_summary=plan_summary, route="thinker")
|
|
|
|
# Frame 3: Thinker
|
|
rec = self._begin_frame(3, "thinker",
|
|
input_summary=plan_summary[:100])
|
|
thought = await self.nodes["thinker"].process(
|
|
command, plan, self.history, memory_context=mem_ctx)
|
|
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
|
|
f"actions={len(thought.actions)} machines={len(thought.machine_ops)}")
|
|
has_tool = bool(thought.tool_used and thought.tool_output)
|
|
|
|
# Check interpreter condition
|
|
if self.has_interpreter and has_tool:
|
|
self._end_frame(rec, output_summary=thought_summary,
|
|
route="interpreter", condition="has_tool_output=True")
|
|
|
|
# Frame 4: Interpreter
|
|
rec = self._begin_frame(4, "interpreter",
|
|
input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]")
|
|
interpreted = await self.nodes["interpreter"].interpret(
|
|
thought.tool_used, thought.tool_output, command.source_text)
|
|
thought.response = interpreted.summary
|
|
interp_summary = f"summary[{len(interpreted.summary)}] facts={interpreted.key_facts}"
|
|
self._end_frame(rec, output_summary=interp_summary, route="output+ui")
|
|
|
|
# Frame 5: Output
|
|
rec = self._begin_frame(5, "output+ui",
|
|
input_summary=f"interpreted: {interpreted.summary[:80]}")
|
|
path = "director+interpreter"
|
|
else:
|
|
self._end_frame(rec, output_summary=thought_summary,
|
|
route="output+ui",
|
|
condition="has_tool_output=False" if not has_tool else "")
|
|
|
|
# Frame 4: Output
|
|
rec = self._begin_frame(4, "output+ui",
|
|
input_summary=f"response: {thought.response[:80]}")
|
|
path = "director"
|
|
|
|
response = await self._run_output_and_ui(thought, mem_ctx)
|
|
self.history.append({"role": "assistant", "content": response})
|
|
await self.memorizer.update(self.history)
|
|
self._trim_history()
|
|
|
|
controls_count = len(self.ui_node.current_controls)
|
|
self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}")
|
|
self._end_trace(path)
|
|
await self._emit_trace_hud()
|
|
return self._make_result(response)
|
|
|
|
async def _run_thinker_pipeline(self, command: Command, mem_ctx: str,
|
|
dashboard: list = None) -> dict:
|
|
"""v1: Input(F1) → Thinker(F2) → Output(F3)."""
|
|
a = command.analysis
|
|
|
|
# Frame 2: Thinker
|
|
rec = self._begin_frame(2, "thinker",
|
|
input_summary=f"intent={a.intent} topic={a.topic}")
|
|
|
|
director = self.nodes.get("director")
|
|
if director and hasattr(director, "plan"):
|
|
is_complex = command.analysis.complexity == "complex"
|
|
text = command.source_text
|
|
is_data_request = (command.analysis.intent in ("request", "action")
|
|
and any(k in text.lower()
|
|
for k in ["daten", "data", "database", "db", "tabelle", "table",
|
|
"query", "abfrage", "untersuche", "investigate",
|
|
"analyse", "analyze", "customer", "kunde"]))
|
|
if is_complex or (is_data_request and len(text.split()) > 8):
|
|
await director.plan(self.history, self.memorizer.state, text)
|
|
mem_ctx = self._build_context(dashboard)
|
|
|
|
thought = await self.nodes["thinker"].process(command, self.history, memory_context=mem_ctx)
|
|
if director and hasattr(director, "current_plan"):
|
|
director.current_plan = ""
|
|
|
|
thought_summary = f"response[{len(thought.response)}] tool={thought.tool_used or 'none'}"
|
|
self._end_frame(rec, output_summary=thought_summary, route="output+ui")
|
|
|
|
# Frame 3: Output
|
|
rec = self._begin_frame(3, "output+ui",
|
|
input_summary=f"response: {thought.response[:80]}")
|
|
response = await self._run_output_and_ui(thought, mem_ctx)
|
|
self.history.append({"role": "assistant", "content": response})
|
|
await self.memorizer.update(self.history)
|
|
if director and hasattr(director, "update"):
|
|
await director.update(self.history, self.memorizer.state)
|
|
self._trim_history()
|
|
|
|
self._end_frame(rec, output_summary=f"response[{len(response)}]")
|
|
self._end_trace("thinker")
|
|
await self._emit_trace_hud()
|
|
return self._make_result(response)
|
|
|
|
async def _handle_action(self, text: str, dashboard: list = None) -> dict:
|
|
"""Handle ACTION: messages (button clicks)."""
|
|
parts = text.split("|", 1)
|
|
action = parts[0].replace("ACTION:", "").strip()
|
|
data = None
|
|
if len(parts) > 1:
|
|
try:
|
|
data = json.loads(parts[1].replace("data:", "").strip())
|
|
except (json.JSONDecodeError, Exception):
|
|
pass
|
|
|
|
self.sensor.note_user_activity()
|
|
|
|
# Frame 1: Try machine transition (no LLM)
|
|
rec = self._begin_frame(1, "ui", input_summary=f"action={action}")
|
|
handled, transition_result = self.ui_node.try_machine_transition(action)
|
|
if handled:
|
|
await self._send_hud({"node": "ui", "event": "machine_transition",
|
|
"action": action, "detail": transition_result})
|
|
controls = self.ui_node.get_machine_controls()
|
|
for ctrl in self.ui_node.current_controls:
|
|
if not ctrl.get("machine_id"):
|
|
controls.append(ctrl)
|
|
self.ui_node.current_controls = controls
|
|
await self.sink.send_controls(controls)
|
|
await self._send_hud({"node": "ui", "event": "controls", "controls": controls})
|
|
self.sink.reset()
|
|
for i in range(0, len(transition_result), 12):
|
|
await self.sink.send_delta(transition_result[i:i + 12])
|
|
await self.sink.send_done()
|
|
self.history.append({"role": "user", "content": f"[clicked {action}]"})
|
|
self.history.append({"role": "assistant", "content": transition_result})
|
|
|
|
self._end_frame(rec, output_summary=f"machine_transition: {transition_result[:80]}")
|
|
self._end_trace("action_machine")
|
|
await self._emit_trace_hud()
|
|
return self._make_result(transition_result)
|
|
|
|
# Try local UI action
|
|
result, controls = await self.ui_node.process_local_action(action, data)
|
|
if result is not None:
|
|
if controls:
|
|
await self.sink.send_controls(controls)
|
|
self.sink.reset()
|
|
for i in range(0, len(result), 12):
|
|
await self.sink.send_delta(result[i:i + 12])
|
|
await self.sink.send_done()
|
|
self.history.append({"role": "user", "content": f"[clicked {action}]"})
|
|
self.history.append({"role": "assistant", "content": result})
|
|
|
|
self._end_frame(rec, output_summary=f"local_action: {result[:80]}")
|
|
self._end_trace("action_local")
|
|
await self._emit_trace_hud()
|
|
return self._make_result(result)
|
|
|
|
# Complex action — needs full pipeline
|
|
self._end_frame(rec, output_summary="no local handler", route="director/thinker")
|
|
|
|
action_desc = f"ACTION: {action}"
|
|
if data:
|
|
action_desc += f" | data: {json.dumps(data)}"
|
|
self.history.append({"role": "user", "content": action_desc})
|
|
|
|
mem_ctx = self._build_context(dashboard)
|
|
command = Command(
|
|
analysis=InputAnalysis(intent="action", topic=action, complexity="simple"),
|
|
source_text=action_desc)
|
|
|
|
if self.has_director:
|
|
return await self._run_director_pipeline(command, mem_ctx, dashboard)
|
|
else:
|
|
return await self._run_thinker_pipeline(command, mem_ctx, dashboard)
|
|
|
|
# --- Helpers ---
|
|
|
|
def _build_context(self, dashboard: list = None) -> str:
|
|
"""Build the full context string for nodes."""
|
|
sensor_lines = self.sensor.get_context_lines()
|
|
director = self.nodes.get("director")
|
|
director_line = director.get_context_line() if director else ""
|
|
mem_ctx = self.memorizer.get_context_block(
|
|
sensor_lines=sensor_lines, ui_state=self.ui_node.state)
|
|
if director_line:
|
|
mem_ctx += f"\n\n{director_line}"
|
|
machine_summary = self.ui_node.get_machine_summary()
|
|
if machine_summary:
|
|
mem_ctx += f"\n\n{machine_summary}"
|
|
if dashboard is not None:
|
|
mem_ctx += f"\n\n{self._format_dashboard(dashboard)}"
|
|
sensor_flags = self.sensor.consume_flags()
|
|
if sensor_flags:
|
|
flag_lines = ["Sensor flags:"]
|
|
for f in sensor_flags:
|
|
if f["type"] == "idle_return":
|
|
flag_lines.append(f" - User returned after {f['away_duration']} away.")
|
|
elif f["type"] == "workspace_mismatch":
|
|
flag_lines.append(f" - Workspace mismatch: {f['detail']}")
|
|
mem_ctx += "\n\n" + "\n".join(flag_lines)
|
|
return mem_ctx
|
|
|
|
def _format_dashboard(self, dashboard: list) -> str:
|
|
"""Format dashboard controls into context string."""
|
|
server_controls = self.ui_node.current_controls
|
|
server_buttons = [str(c.get("label", "")) for c in server_controls if isinstance(c, dict) and c.get("type") == "button"]
|
|
browser_buttons = [str(c.get("label", "")) for c in dashboard if isinstance(c, dict) and c.get("type") == "button"] if dashboard else []
|
|
lines = []
|
|
if server_buttons and not browser_buttons:
|
|
lines.append(f"WARNING: Server sent {len(server_buttons)} controls but dashboard shows NONE.")
|
|
lines.append(f" Expected: {', '.join(server_buttons)}")
|
|
lines.append(" Controls failed to render. You MUST re-emit them in ACTIONS.")
|
|
elif server_buttons and sorted(server_buttons) != sorted(browser_buttons):
|
|
lines.append("WARNING: Dashboard mismatch.")
|
|
lines.append(f" Server: {', '.join(server_buttons)}")
|
|
lines.append(f" Browser: {', '.join(browser_buttons) or 'nothing'}")
|
|
if not dashboard:
|
|
lines.append("Dashboard: empty")
|
|
else:
|
|
lines.append("Dashboard (user sees):")
|
|
for ctrl in dashboard:
|
|
ctype = ctrl.get("type", "unknown")
|
|
if ctype == "button":
|
|
lines.append(f" - Button: {ctrl.get('label', '?')}")
|
|
elif ctype == "table":
|
|
lines.append(f" - Table: {len(ctrl.get('data', []))} rows")
|
|
else:
|
|
lines.append(f" - {ctype}: {ctrl.get('label', ctrl.get('text', '?'))}")
|
|
return "\n".join(lines)
|
|
|
|
async def _run_output_and_ui(self, thought: ThoughtResult, mem_ctx: str) -> str:
|
|
"""Run Output and UI nodes in parallel. Returns response text."""
|
|
self.sink.reset()
|
|
output_task = asyncio.create_task(
|
|
self.nodes["output"].process(thought, self.history, self.sink, memory_context=mem_ctx))
|
|
ui_task = asyncio.create_task(
|
|
self.ui_node.process(thought, self.history, memory_context=mem_ctx))
|
|
response, controls = await asyncio.gather(output_task, ui_task)
|
|
if controls:
|
|
await self.sink.send_controls(controls)
|
|
return response
|
|
|
|
def _check_condition(self, name: str, command: Command = None,
|
|
thought: ThoughtResult = None) -> bool:
|
|
"""Evaluate a named condition."""
|
|
if name == "reflex" and command:
|
|
return (command.analysis.intent == "social"
|
|
and command.analysis.complexity == "trivial")
|
|
if name == "has_tool_output" and thought:
|
|
return bool(thought.tool_used and thought.tool_output)
|
|
return False
|
|
|
|
def _make_result(self, response: str) -> dict:
|
|
"""Build the result dict returned to callers."""
|
|
return {
|
|
"response": response,
|
|
"controls": self.ui_node.current_controls,
|
|
"memorizer": self.memorizer.state,
|
|
"frames": self.frame,
|
|
"trace": self.last_trace.to_dict(),
|
|
}
|
|
|
|
def _trim_history(self):
|
|
if len(self.history) > 40:
|
|
self.history[:] = self.history[-40:]
|