"""Expert Base Node: domain-specific stateless executor. An expert receives a self-contained job from the PA, plans its own tool sequence, executes tools, and returns a ThoughtResult. No history, no memory — pure function. Subclasses override DOMAIN_SYSTEM, SCHEMA, and default_database. """ import asyncio import json import logging from .base import Node from ..llm import llm_call from ..db import run_db_query from ..types import ThoughtResult log = logging.getLogger("runtime") class ExpertNode(Node): """Base class for domain experts. Subclass and set DOMAIN_SYSTEM, SCHEMA, default_database.""" model = "google/gemini-2.0-flash-001" max_context_tokens = 4000 # Override in subclasses DOMAIN_SYSTEM = "You are a domain expert." SCHEMA = "" default_database = "eras2_production" PLAN_SYSTEM = """You are a domain expert's planning module. Given a job description, produce a JSON tool sequence to accomplish it. {domain} {schema} Available tools: - query_db(query, database) — SQL SELECT/DESCRIBE/SHOW only - emit_actions(actions) — show buttons [{{label, action, payload?}}] - set_state(key, value) — persistent key-value - emit_display(items) — formatted data [{{type, label, value?, style?}}] - create_machine(id, initial, states) — interactive UI with navigation states: {{"state_name": {{"actions": [...], "display": [...]}}}} - add_state(id, state, buttons, content) — add state to machine - reset_machine(id) — reset to initial - destroy_machine(id) — remove machine Output ONLY valid JSON: {{ "tool_sequence": [ {{"tool": "query_db", "args": {{"query": "SELECT ...", "database": "{database}"}}}}, {{"tool": "emit_actions", "args": {{"actions": [{{"label": "...", "action": "..."}}]}}}} ], "response_hint": "How to phrase the result for the user" }} Rules: - NEVER guess column names. If unsure, DESCRIBE first. - Max 5 tools. Keep it focused. - The job is self-contained — all context you need is in the job description.""" RESPONSE_SYSTEM = """You are a domain expert summarizing results for the user. {domain} Job: {job} {results} Write a concise, natural response. 1-3 sentences. - Reference specific data from the results. - Don't repeat raw output — summarize. - Match the language: {language}.""" def __init__(self, send_hud, process_manager=None): super().__init__(send_hud) MAX_RETRIES = 3 async def execute(self, job: str, language: str = "de") -> ThoughtResult: """Execute a self-contained job with retry on SQL errors. Expert knows the schema — plan, execute, retry if needed, respond.""" await self.hud("thinking", detail=f"planning: {job[:80]}") errors_so_far = [] tool_sequence = [] response_hint = "" for attempt in range(1, self.MAX_RETRIES + 1): # Plan (or re-plan with error context) plan_prompt = f"Job: {job}" if errors_so_far: plan_prompt += "\n\nPREVIOUS ATTEMPTS FAILED:\n" for err in errors_so_far: plan_prompt += f"- Query: {err['query']}\n Error: {err['error']}\n" plan_prompt += "\nFix the query using ONLY columns from the schema. Do NOT repeat the same mistake." plan_messages = [ {"role": "system", "content": self.PLAN_SYSTEM.format( domain=self.DOMAIN_SYSTEM, schema=self.SCHEMA, database=self.default_database)}, {"role": "user", "content": plan_prompt}, ] plan_raw = await llm_call(self.model, plan_messages) tool_sequence, response_hint = self._parse_plan(plan_raw) await self.hud("planned", tools=len(tool_sequence), hint=response_hint[:80], attempt=attempt) # Execute tools actions = [] state_updates = {} display_items = [] machine_ops = [] tool_used = "" tool_output = "" had_error = False for step in tool_sequence: tool = step.get("tool", "") args = step.get("args", {}) await self.hud("tool_call", tool=tool, args=args) if tool == "emit_actions": actions.extend(args.get("actions", [])) elif tool == "set_state": key = args.get("key", "") if key: state_updates[key] = args.get("value") elif tool == "emit_display": display_items.extend(args.get("items", [])) elif tool == "create_machine": machine_ops.append({"op": "create", **args}) elif tool == "add_state": machine_ops.append({"op": "add_state", **args}) elif tool == "reset_machine": machine_ops.append({"op": "reset", **args}) elif tool == "destroy_machine": machine_ops.append({"op": "destroy", **args}) elif tool == "query_db": query = args.get("query", "") database = args.get("database", self.default_database) try: result = await asyncio.to_thread(run_db_query, query, database) if result.startswith("Error:"): errors_so_far.append({"query": query, "error": result}) had_error = True await self.hud("tool_result", tool="query_db", output=f"ERROR (attempt {attempt}): {result[:150]}") break # stop executing, retry with new plan tool_used = "query_db" tool_output = result await self.hud("tool_result", tool="query_db", output=result[:200]) except Exception as e: errors_so_far.append({"query": query, "error": str(e)}) had_error = True await self.hud("tool_result", tool="query_db", output=f"ERROR (attempt {attempt}): {e}") break if not had_error: break # success — stop retrying log.info(f"[expert] attempt {attempt} failed, {len(errors_so_far)} errors") # Generate response (with whatever we have — success or final error) results_text = "" if tool_output: results_text = f"Tool result:\n{tool_output[:500]}" elif errors_so_far: results_text = f"All {len(errors_so_far)} query attempts failed:\n" for err in errors_so_far[-2:]: results_text += f" {err['error'][:100]}\n" resp_messages = [ {"role": "system", "content": self.RESPONSE_SYSTEM.format( domain=self.DOMAIN_SYSTEM, job=job, results=results_text, language=language)}, {"role": "user", "content": job}, ] response = await llm_call(self.model, resp_messages) if not response: response = "[no response]" await self.hud("done", response=response[:100]) return ThoughtResult( response=response, tool_used=tool_used, tool_output=tool_output, actions=actions, state_updates=state_updates, display_items=display_items, machine_ops=machine_ops, ) def _parse_plan(self, raw: str) -> tuple[list, str]: """Parse tool sequence JSON from planning LLM call.""" text = raw.strip() if text.startswith("```"): text = text.split("\n", 1)[1] if "\n" in text else text[3:] if text.endswith("```"): text = text[:-3] text = text.strip() try: data = json.loads(text) return data.get("tool_sequence", []), data.get("response_hint", "") except (json.JSONDecodeError, Exception) as e: log.error(f"[expert] plan parse failed: {e}, raw: {text[:200]}") return [], ""