"""Expert Base Node: domain-specific stateless executor. An expert receives a self-contained job from the PA, plans its own tool sequence, executes tools, and returns a ThoughtResult. No history, no memory — pure function. Subclasses override DOMAIN_SYSTEM, SCHEMA, and default_database. """ import asyncio import json import logging from .base import Node from ..llm import llm_call from ..db import run_db_query from ..types import ThoughtResult log = logging.getLogger("runtime") class ExpertNode(Node): """Base class for domain experts. Subclass and set DOMAIN_SYSTEM, SCHEMA, default_database.""" model = "google/gemini-2.0-flash-001" max_context_tokens = 4000 # Override in subclasses DOMAIN_SYSTEM = "You are a domain expert." SCHEMA = "" default_database = "eras2_production" PLAN_SYSTEM = """You are a domain expert's planning module. Given a job description, produce a JSON tool sequence to accomplish it. {domain} {schema} Available tools: - query_db(query, database) — SQL SELECT/DESCRIBE/SHOW only - emit_actions(actions) — show buttons [{{label, action, payload?}}] - set_state(key, value) — persistent key-value - create_machine(id, initial, states) — interactive UI navigation - add_state / reset_machine / destroy_machine — machine lifecycle NOTE: Cards are generated automatically in the response step from query results. Do NOT plan emit_card or emit_list — just query the data and the system handles display. Output ONLY valid JSON: {{ "tool_sequence": [ {{"tool": "query_db", "args": {{"query": "SELECT ...", "database": "{database}"}}}} ], "response_hint": "How to phrase the result" }} Rules: - NEVER guess column names. Use ONLY columns from the schema. - Max 5 tools. Keep it focused. - For entity details: query all relevant fields, the response step creates the card. - For lists: query multiple rows, the table renders automatically. - The job is self-contained.""" RESPONSE_SYSTEM = """You are a domain expert summarizing results for the user. {domain} Job: {job} {results} Output a JSON object with "text" (response to user) and optionally "card" (structured display): {{ "text": "Concise natural response, 1-3 sentences. Reference data. Match language: {language}.", "card": {{ "title": "Entity Name or ID", "subtitle": "Type or category", "fields": [{{"label": "Field", "value": "actual value from results"}}], "actions": [{{"label": "Next action", "action": "action_id"}}] }} }} Rules: - "text" is REQUIRED. Keep it short. - "card" is OPTIONAL. Include it for single-entity details (Kunde, Objekt, Auftrag). - Card fields must use ACTUAL values from the query results, never templates/placeholders. - For lists of multiple entities, use multiple fields or skip the card. - If no card makes sense, just return {{"text": "..."}}. - Output ONLY valid JSON.""" def __init__(self, send_hud, process_manager=None): super().__init__(send_hud) MAX_RETRIES = 3 async def execute(self, job: str, language: str = "de") -> ThoughtResult: """Execute a self-contained job with retry on SQL errors. Expert knows the schema — plan, execute, retry if needed, respond.""" await self.hud("thinking", detail=f"planning: {job[:80]}") errors_so_far = [] tool_sequence = [] response_hint = "" for attempt in range(1, self.MAX_RETRIES + 1): # Plan (or re-plan with error context) plan_prompt = f"Job: {job}" if errors_so_far: plan_prompt += "\n\nPREVIOUS ATTEMPTS FAILED:\n" for err in errors_so_far: plan_prompt += f"- Query: {err['query']}\n Error: {err['error']}\n" if 'describe' in err: plan_prompt += f" DESCRIBE result: {err['describe'][:300]}\n" plan_prompt += "\nFix the query. If a column was unknown, use the DESCRIBE result above or try SELECT * LIMIT 3 to see actual columns." plan_messages = [ {"role": "system", "content": self.PLAN_SYSTEM.format( domain=self.DOMAIN_SYSTEM, schema=self.SCHEMA, database=self.default_database)}, {"role": "user", "content": plan_prompt}, ] plan_raw = await llm_call(self.model, plan_messages) tool_sequence, response_hint = self._parse_plan(plan_raw) await self.hud("planned", tools=len(tool_sequence), hint=response_hint[:80], attempt=attempt) # Execute tools actions = [] state_updates = {} display_items = [] machine_ops = [] tool_used = "" tool_output = "" had_error = False for step in tool_sequence: tool = step.get("tool", "") args = step.get("args", {}) await self.hud("tool_call", tool=tool, args=args) if tool == "emit_actions": actions.extend(args.get("actions", [])) elif tool == "emit_card": card = args.get("card", args) card["type"] = "card" display_items.append(card) elif tool == "emit_list": lst = args.get("list", args) lst["type"] = "list" display_items.append(lst) elif tool == "set_state": key = args.get("key", "") if key: state_updates[key] = args.get("value") elif tool == "emit_display": display_items.extend(args.get("items", [])) elif tool == "create_machine": machine_ops.append({"op": "create", **args}) elif tool == "add_state": machine_ops.append({"op": "add_state", **args}) elif tool == "reset_machine": machine_ops.append({"op": "reset", **args}) elif tool == "destroy_machine": machine_ops.append({"op": "destroy", **args}) elif tool == "query_db": query = args.get("query", "") database = args.get("database", self.default_database) try: result = await asyncio.to_thread(run_db_query, query, database) if result.startswith("Error:"): err_entry = {"query": query, "error": result} # Auto-DESCRIBE on column errors to help retry if "Unknown column" in result or "1054" in result: import re # Extract table name from query tables_in_query = re.findall(r'FROM\s+(\w+)|JOIN\s+(\w+)', query, re.IGNORECASE) for match in tables_in_query: tname = match[0] or match[1] if tname: try: desc = await asyncio.to_thread(run_db_query, f"DESCRIBE {tname}", database) err_entry["describe"] = f"{tname}: {desc[:300]}" await self.hud("tool_result", tool="describe", output=f"Auto-DESCRIBE {tname}") except Exception: pass break errors_so_far.append(err_entry) had_error = True await self.hud("tool_result", tool="query_db", output=f"ERROR (attempt {attempt}): {result[:150]}") break tool_used = "query_db" tool_output = result await self.hud("tool_result", tool="query_db", output=result[:200]) except Exception as e: errors_so_far.append({"query": query, "error": str(e)}) had_error = True await self.hud("tool_result", tool="query_db", output=f"ERROR (attempt {attempt}): {e}") break if not had_error: break # success — stop retrying log.info(f"[expert] attempt {attempt} failed, {len(errors_so_far)} errors") # Generate response (with whatever we have — success or final error) results_text = "" if tool_output: results_text = f"Tool result:\n{tool_output[:500]}" elif errors_so_far: results_text = f"All {len(errors_so_far)} query attempts failed:\n" for err in errors_so_far[-2:]: results_text += f" {err['error'][:100]}\n" resp_messages = [ {"role": "system", "content": self.RESPONSE_SYSTEM.format( domain=self.DOMAIN_SYSTEM, job=job, results=results_text, language=language)}, {"role": "user", "content": job}, ] raw_response = await llm_call(self.model, resp_messages) # Parse JSON response with optional card response = raw_response or "[no response]" try: text = raw_response.strip() if text.startswith("```"): text = text.split("\n", 1)[1] if "\n" in text else text[3:] if text.endswith("```"): text = text[:-3] text = text.strip() resp_data = json.loads(text) response = resp_data.get("text", raw_response) if resp_data.get("card"): card = resp_data["card"] card["type"] = "card" display_items.append(card) except (json.JSONDecodeError, Exception): pass # Use raw response as text await self.hud("done", response=response[:100]) return ThoughtResult( response=response, tool_used=tool_used, tool_output=tool_output, actions=actions, state_updates=state_updates, display_items=display_items, machine_ops=machine_ops, ) def _parse_plan(self, raw: str) -> tuple[list, str]: """Parse tool sequence JSON from planning LLM call.""" text = raw.strip() if text.startswith("```"): text = text.split("\n", 1)[1] if "\n" in text else text[3:] if text.endswith("```"): text = text[:-3] text = text.strip() try: data = json.loads(text) return data.get("tool_sequence", []), data.get("response_hint", "") except (json.JSONDecodeError, Exception) as e: log.error(f"[expert] plan parse failed: {e}, raw: {text[:200]}") return [], ""