Frame Engine (v3-framed): - Tick-based deterministic pipeline: frames advance on completion, not timers - FrameRecord/FrameTrace dataclasses for structured per-message tracing - /api/frames endpoint: queryable frame trace history (last 20 messages) - frame_trace HUD event with full pipeline visibility - Reflex=2F, Director=4F, Director+Interpreter=5F deterministic frame counts Expert Architecture (v4-eras): - PA node (pa_v1): routes to domain experts, holds user context - ExpertNode base: stateless executor with plan+execute two-LLM-call pattern - ErasExpertNode: eras2_production DB specialist with DESCRIBE-first discipline - Schema caching: DESCRIBE results reused across queries within session - Progress streaming: PA streams thinking message, expert streams per-tool progress - PARouting type for structured routing decisions UI Controls Split: - Separate thinker_controls from machine controls (current_controls is now a property) - Machine buttons persist across Thinker responses - Machine state parser handles both dict and list formats from Director - Normalized button format with go/payload field mapping WebSocket Architecture: - /ws/test: dedicated debug socket for test runner progress - /ws/trace: dedicated debug socket for HUD/frame trace events - /ws (chat): cleaned up, only deltas/controls/done/cleared - WS survives graph switch (re-attaches to new runtime) - Pipeline result reset on clear Test Infrastructure: - Live test streaming: on_result callback fires per check during execution - Frontend polling fallback (500ms) for proxy-buffered WS - frame_trace-first trace assertion (fixes stale perceived event bug) - action_match supports "or" patterns and multi-pattern matching - Trace window increased to 40 events - Graph-agnostic assertions (has X or Y) Test Suites: - smoketest.md: 12 steps covering all categories (~2min) - fast.md: 10 quick checks (~1min) - fast_v4.md: 10 v4-eras specific checks - expert_eras.md: eras domain tests (routing, DB, schema, errors) - expert_progress.md: progress streaming tests Other: - Shared db.py extracted from thinker_v2 (reused by experts) - InputNode prompt: few-shot examples, history as context summary - Director prompt: full tool signatures for add_state/reset_machine/destroy_machine - nginx no-cache headers for static files during development - Cache-busted static file references Scores: v3 smoketest 39/40, v4-eras fast 28/28, expert_eras 23/23 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
68 lines
2.8 KiB
Python
68 lines
2.8 KiB
Python
"""Eras Expert: heating/energy customer database specialist."""
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
from .expert_base import ExpertNode
|
|
from ..db import run_db_query
|
|
|
|
log = logging.getLogger("runtime")
|
|
|
|
|
|
class ErasExpertNode(ExpertNode):
|
|
name = "eras_expert"
|
|
default_database = "eras2_production"
|
|
|
|
DOMAIN_SYSTEM = """You are the Eras expert — specialist for heating and energy customer data.
|
|
You work with the eras2_production database containing customer, device, and billing data.
|
|
All table and column names are German (lowercase). Common queries involve customer lookups,
|
|
device counts, consumption analysis, and billing reports."""
|
|
|
|
SCHEMA = """Known tables (eras2_production):
|
|
- kunden — customers
|
|
- objekte — properties/objects linked to customers
|
|
- nutzeinheit — usage units within objects
|
|
- geraete — devices/meters
|
|
- geraeteverbraeuche — device consumption readings
|
|
- abrechnungen — billing records
|
|
|
|
CRITICAL: You do NOT know the exact column names. They are German and unpredictable.
|
|
Your FIRST tool_sequence step for ANY SELECT query MUST be DESCRIBE on the target table.
|
|
Then use the actual column names from the DESCRIBE result in your SELECT.
|
|
|
|
Example tool_sequence for "show me 5 customers":
|
|
[
|
|
{{"tool": "query_db", "args": {{"query": "DESCRIBE kunden", "database": "eras2_production"}}}},
|
|
{{"tool": "query_db", "args": {{"query": "SELECT * FROM kunden LIMIT 5", "database": "eras2_production"}}}}
|
|
]"""
|
|
|
|
def __init__(self, send_hud, process_manager=None):
|
|
super().__init__(send_hud, process_manager)
|
|
self._schema_cache: dict[str, str] = {} # table_name -> DESCRIBE result
|
|
|
|
async def execute(self, job: str, language: str = "de"):
|
|
"""Execute with schema auto-discovery. Caches DESCRIBE results."""
|
|
# Inject cached schema into the job context
|
|
if self._schema_cache:
|
|
schema_ctx = "Known column names from previous DESCRIBE:\n"
|
|
for table, desc in self._schema_cache.items():
|
|
# Just first 5 lines to keep it compact
|
|
lines = desc.strip().split("\n")[:6]
|
|
schema_ctx += f"\n{table}:\n" + "\n".join(lines) + "\n"
|
|
job = job + "\n\n" + schema_ctx
|
|
|
|
result = await super().execute(job, language)
|
|
|
|
# Cache any DESCRIBE results from this execution
|
|
# Parse from tool_output if it looks like a DESCRIBE result
|
|
if result.tool_output and "Field\t" in result.tool_output:
|
|
# Try to identify which table was described
|
|
for table in ["kunden", "objekte", "nutzeinheit", "geraete",
|
|
"geraeteverbraeuche", "abrechnungen"]:
|
|
if table in job.lower() or table in result.tool_output.lower():
|
|
self._schema_cache[table] = result.tool_output
|
|
log.info(f"[eras] cached schema for {table}")
|
|
break
|
|
|
|
return result
|