agent-runtime/agent/nodes/expert_base.py
Nico 2d649fa448 v0.15.3: Domain context, iterative plan-execute, FK mappings, ES6 node inspector
Eras Expert domain context:
- Full Heizkostenabrechnung business model (Kunde>Objekte>Nutzeinheiten>Geraete)
- Known PK/FK mappings: kunden.Kundennummer, objekte.KundenID, etc.
- Correct JOIN example in SCHEMA prompt
- PA knows domain hierarchy for better job formulation

Iterative plan-execute in ExpertNode:
- DESCRIBE queries execute first, results injected into re-plan
- Re-plan uses actual column names from DESCRIBE
- Eliminates "Unknown column" errors on first query

Frontend:
- Node inspector: per-node cards with model, tokens, progress, last event
- Graph switcher buttons in top bar
- Clear button in top bar
- Nodes panel 300px wide
- WS reconnect on 1006 (deploy) without showing login
- Model info emitted on context HUD events

Domain context test: 21/21 (hierarchy, JOINs, FK, PA job quality)
Default graph: v4-eras

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 18:34:42 +02:00

222 lines
8.6 KiB
Python

"""Expert Base Node: domain-specific stateless executor.
An expert receives a self-contained job from the PA, plans its own tool sequence,
executes tools, and returns a ThoughtResult. No history, no memory — pure function.
Subclasses override DOMAIN_SYSTEM, SCHEMA, and default_database.
"""
import asyncio
import json
import logging
from .base import Node
from ..llm import llm_call
from ..db import run_db_query
from ..types import ThoughtResult
log = logging.getLogger("runtime")
class ExpertNode(Node):
"""Base class for domain experts. Subclass and set DOMAIN_SYSTEM, SCHEMA, default_database."""
model = "google/gemini-2.0-flash-001"
max_context_tokens = 4000
# Override in subclasses
DOMAIN_SYSTEM = "You are a domain expert."
SCHEMA = ""
default_database = "eras2_production"
PLAN_SYSTEM = """You are a domain expert's planning module.
Given a job description, produce a JSON tool sequence to accomplish it.
{domain}
{schema}
Available tools:
- query_db(query, database) — SQL SELECT/DESCRIBE/SHOW only
- emit_actions(actions) — show buttons [{{label, action, payload?}}]
- set_state(key, value) — persistent key-value
- emit_display(items) — formatted data [{{type, label, value?, style?}}]
- create_machine(id, initial, states) — interactive UI with navigation
states: {{"state_name": {{"actions": [...], "display": [...]}}}}
- add_state(id, state, buttons, content) — add state to machine
- reset_machine(id) — reset to initial
- destroy_machine(id) — remove machine
Output ONLY valid JSON:
{{
"tool_sequence": [
{{"tool": "query_db", "args": {{"query": "SELECT ...", "database": "{database}"}}}},
{{"tool": "emit_actions", "args": {{"actions": [{{"label": "...", "action": "..."}}]}}}}
],
"response_hint": "How to phrase the result for the user"
}}
Rules:
- NEVER guess column names. If unsure, DESCRIBE first.
- Max 5 tools. Keep it focused.
- The job is self-contained — all context you need is in the job description."""
RESPONSE_SYSTEM = """You are a domain expert summarizing results for the user.
{domain}
Job: {job}
{results}
Write a concise, natural response. 1-3 sentences.
- Reference specific data from the results.
- Don't repeat raw output — summarize.
- Match the language: {language}."""
def __init__(self, send_hud, process_manager=None):
super().__init__(send_hud)
async def execute(self, job: str, language: str = "de") -> ThoughtResult:
"""Execute a self-contained job. Returns ThoughtResult.
Uses iterative plan-execute: if DESCRIBE queries are in the plan,
execute them first, inject results into a re-plan, then execute the rest."""
await self.hud("thinking", detail=f"planning: {job[:80]}")
# Step 1: Plan tool sequence
schema_context = self.SCHEMA
plan_messages = [
{"role": "system", "content": self.PLAN_SYSTEM.format(
domain=self.DOMAIN_SYSTEM, schema=schema_context,
database=self.default_database)},
{"role": "user", "content": f"Job: {job}"},
]
plan_raw = await llm_call(self.model, plan_messages)
tool_sequence, response_hint = self._parse_plan(plan_raw)
# Step 1b: Execute DESCRIBE queries first, then re-plan with actual schema
describe_results = {}
remaining_tools = []
for step in tool_sequence:
if step.get("tool") == "query_db":
query = step.get("args", {}).get("query", "").strip().upper()
if query.startswith("DESCRIBE") or query.startswith("SHOW"):
await self.hud("tool_call", tool="query_db", args=step.get("args", {}))
try:
result = await asyncio.to_thread(
run_db_query, step["args"]["query"],
step["args"].get("database", self.default_database))
describe_results[step["args"]["query"]] = result
await self.hud("tool_result", tool="query_db", output=result[:200])
except Exception as e:
await self.hud("tool_result", tool="query_db", output=str(e)[:200])
else:
remaining_tools.append(step)
else:
remaining_tools.append(step)
# Re-plan if we got DESCRIBE results (now we know actual column names)
if describe_results:
schema_update = "Actual column names from DESCRIBE:\n"
for q, result in describe_results.items():
schema_update += f"\n{q}:\n{result[:500]}\n"
replan_messages = [
{"role": "system", "content": self.PLAN_SYSTEM.format(
domain=self.DOMAIN_SYSTEM,
schema=schema_context + "\n\n" + schema_update,
database=self.default_database)},
{"role": "user", "content": f"Job: {job}\n\nUse ONLY the actual column names from DESCRIBE above. Do NOT include DESCRIBE steps — they are already done."},
]
replan_raw = await llm_call(self.model, replan_messages)
new_tools, new_hint = self._parse_plan(replan_raw)
if new_tools:
remaining_tools = new_tools
if new_hint:
response_hint = new_hint
tool_sequence = remaining_tools
await self.hud("planned", tools=len(tool_sequence), hint=response_hint[:80])
# Step 2: Execute remaining tools
actions = []
state_updates = {}
display_items = []
machine_ops = []
tool_used = ""
tool_output = ""
for step in tool_sequence:
tool = step.get("tool", "")
args = step.get("args", {})
await self.hud("tool_call", tool=tool, args=args)
if tool == "emit_actions":
actions.extend(args.get("actions", []))
elif tool == "set_state":
key = args.get("key", "")
if key:
state_updates[key] = args.get("value")
elif tool == "emit_display":
display_items.extend(args.get("items", []))
elif tool == "create_machine":
machine_ops.append({"op": "create", **args})
elif tool == "add_state":
machine_ops.append({"op": "add_state", **args})
elif tool == "reset_machine":
machine_ops.append({"op": "reset", **args})
elif tool == "destroy_machine":
machine_ops.append({"op": "destroy", **args})
elif tool == "query_db":
query = args.get("query", "")
database = args.get("database", self.default_database)
try:
result = await asyncio.to_thread(run_db_query, query, database)
tool_used = "query_db"
tool_output = result
await self.hud("tool_result", tool="query_db", output=result[:200])
except Exception as e:
tool_used = "query_db"
tool_output = f"Error: {e}"
await self.hud("tool_result", tool="query_db", output=str(e)[:200])
# Step 3: Generate response
results_text = ""
if tool_output:
results_text = f"Tool result:\n{tool_output[:500]}"
resp_messages = [
{"role": "system", "content": self.RESPONSE_SYSTEM.format(
domain=self.DOMAIN_SYSTEM, job=job, results=results_text, language=language)},
{"role": "user", "content": job},
]
response = await llm_call(self.model, resp_messages)
if not response:
response = "[no response]"
await self.hud("done", response=response[:100])
return ThoughtResult(
response=response,
tool_used=tool_used,
tool_output=tool_output,
actions=actions,
state_updates=state_updates,
display_items=display_items,
machine_ops=machine_ops,
)
def _parse_plan(self, raw: str) -> tuple[list, str]:
"""Parse tool sequence JSON from planning LLM call."""
text = raw.strip()
if text.startswith("```"):
text = text.split("\n", 1)[1] if "\n" in text else text[3:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
try:
data = json.loads(text)
return data.get("tool_sequence", []), data.get("response_hint", "")
except (json.JSONDecodeError, Exception) as e:
log.error(f"[expert] plan parse failed: {e}, raw: {text[:200]}")
return [], ""