agent-runtime/agent/engine.py
Nico 1000411eb2 v0.15.0: Frame engine (v3), PA + Expert architecture (v4-eras), live test streaming
Frame Engine (v3-framed):
- Tick-based deterministic pipeline: frames advance on completion, not timers
- FrameRecord/FrameTrace dataclasses for structured per-message tracing
- /api/frames endpoint: queryable frame trace history (last 20 messages)
- frame_trace HUD event with full pipeline visibility
- Reflex=2F, Director=4F, Director+Interpreter=5F deterministic frame counts

Expert Architecture (v4-eras):
- PA node (pa_v1): routes to domain experts, holds user context
- ExpertNode base: stateless executor with plan+execute two-LLM-call pattern
- ErasExpertNode: eras2_production DB specialist with DESCRIBE-first discipline
- Schema caching: DESCRIBE results reused across queries within session
- Progress streaming: PA streams thinking message, expert streams per-tool progress
- PARouting type for structured routing decisions

UI Controls Split:
- Separate thinker_controls from machine controls (current_controls is now a property)
- Machine buttons persist across Thinker responses
- Machine state parser handles both dict and list formats from Director
- Normalized button format with go/payload field mapping

WebSocket Architecture:
- /ws/test: dedicated debug socket for test runner progress
- /ws/trace: dedicated debug socket for HUD/frame trace events
- /ws (chat): cleaned up, only deltas/controls/done/cleared
- WS survives graph switch (re-attaches to new runtime)
- Pipeline result reset on clear

Test Infrastructure:
- Live test streaming: on_result callback fires per check during execution
- Frontend polling fallback (500ms) for proxy-buffered WS
- frame_trace-first trace assertion (fixes stale perceived event bug)
- action_match supports "or" patterns and multi-pattern matching
- Trace window increased to 40 events
- Graph-agnostic assertions (has X or Y)

Test Suites:
- smoketest.md: 12 steps covering all categories (~2min)
- fast.md: 10 quick checks (~1min)
- fast_v4.md: 10 v4-eras specific checks
- expert_eras.md: eras domain tests (routing, DB, schema, errors)
- expert_progress.md: progress streaming tests

Other:
- Shared db.py extracted from thinker_v2 (reused by experts)
- InputNode prompt: few-shot examples, history as context summary
- Director prompt: full tool signatures for add_state/reset_machine/destroy_machine
- nginx no-cache headers for static files during development
- Cache-busted static file references

Scores: v3 smoketest 39/40, v4-eras fast 28/28, expert_eras 23/23

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 17:10:31 +02:00

108 lines
3.7 KiB
Python

"""Graph Engine: loads graph definitions, instantiates nodes, executes pipelines."""
import importlib
import logging
from pathlib import Path
from .nodes import NODE_REGISTRY
from .process import ProcessManager
log = logging.getLogger("runtime")
GRAPHS_DIR = Path(__file__).parent / "graphs"
def list_graphs() -> list[dict]:
"""List all available graph definitions."""
graphs = []
for f in sorted(GRAPHS_DIR.glob("*.py")):
if f.name.startswith("_"):
continue
mod = _load_graph_module(f.stem)
if mod:
graphs.append({
"name": getattr(mod, "NAME", f.stem),
"description": getattr(mod, "DESCRIPTION", ""),
"file": f.name,
})
return graphs
def load_graph(name: str) -> dict:
"""Load a graph definition by name. Returns the module's attributes as a dict."""
# Try matching by NAME attribute first, then by filename
for f in GRAPHS_DIR.glob("*.py"):
if f.name.startswith("_"):
continue
mod = _load_graph_module(f.stem)
if mod and getattr(mod, "NAME", "") == name:
return _graph_from_module(mod)
# Fallback: match by filename stem
mod = _load_graph_module(name)
if mod:
return _graph_from_module(mod)
raise ValueError(f"Graph '{name}' not found")
def _load_graph_module(stem: str):
"""Import a graph module by stem name."""
try:
return importlib.import_module(f".graphs.{stem}", package="agent")
except (ImportError, ModuleNotFoundError) as e:
log.error(f"[engine] failed to load graph '{stem}': {e}")
return None
def _graph_from_module(mod) -> dict:
"""Extract graph definition from a module."""
return {
"name": getattr(mod, "NAME", "unknown"),
"description": getattr(mod, "DESCRIPTION", ""),
"nodes": getattr(mod, "NODES", {}),
"edges": getattr(mod, "EDGES", []),
"conditions": getattr(mod, "CONDITIONS", {}),
"audit": getattr(mod, "AUDIT", {}),
"engine": getattr(mod, "ENGINE", "imperative"),
}
def instantiate_nodes(graph: dict, send_hud, process_manager: ProcessManager = None) -> dict:
"""Create node instances from a graph definition. Returns {role: node_instance}."""
nodes = {}
for role, impl_name in graph["nodes"].items():
cls = NODE_REGISTRY.get(impl_name)
if not cls:
log.error(f"[engine] node class not found: {impl_name}")
continue
# Thinker and Expert nodes accept process_manager
if impl_name.startswith("thinker") or impl_name.endswith("_expert"):
nodes[role] = cls(send_hud=send_hud, process_manager=process_manager)
else:
nodes[role] = cls(send_hud=send_hud)
log.info(f"[engine] {role} = {impl_name} ({cls.__name__})")
return nodes
def get_graph_for_cytoscape(graph: dict) -> dict:
"""Convert graph definition to Cytoscape-compatible elements for frontend."""
elements = {"nodes": [], "edges": []}
for role in graph["nodes"]:
elements["nodes"].append({"data": {"id": role, "label": role}})
for edge in graph["edges"]:
src = edge["from"]
targets = edge["to"] if isinstance(edge["to"], list) else [edge["to"]]
edge_type = edge.get("type", "data")
for tgt in targets:
elements["edges"].append({
"data": {
"id": f"e-{src}-{tgt}",
"source": src,
"target": tgt,
"edge_type": edge_type,
"condition": edge.get("condition", ""),
"carries": edge.get("carries", ""),
"method": edge.get("method", ""),
},
})
return elements