From 067cbccea6f37734aac0848cff4d802363725a74 Mon Sep 17 00:00:00 2001 From: Nico Date: Sun, 29 Mar 2026 19:34:01 +0200 Subject: [PATCH] v0.15.8: Expert retry loop, fixed geraete schema, action routing, stable nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expert retry loop (max 3 attempts): - On SQL error, re-plans with error context injected - "PREVIOUS ATTEMPTS FAILED" section tells LLM what went wrong - Breaks out of tool sequence on error, retries full plan - Only reports failure after exhausting retries - Recovery test: 13/13 Schema fixes: - geraete: Geraetenummer, Bezeichnung, Beschreibung (were Fabriknummer, Funkkennung) - geraeteverbraeuche: all columns verified - nutzer: all columns verified Action routing: - Button clicks route through PA→Expert in v4 (was missing has_pa check) - WS handler catches exceptions, sends error HUD instead of crashing Nodes panel: - Fixed pipeline order, no re-sorting - Normalized names (pa_v1→pa, expert_eras→eras) Co-Authored-By: Claude Opus 4.6 (1M context) --- agent/nodes/eras_expert.py | 18 +++-- agent/nodes/expert_base.py | 135 +++++++++++++++++++++-------------- testcases/expert_recovery.md | 27 +++++++ 3 files changed, 124 insertions(+), 56 deletions(-) create mode 100644 testcases/expert_recovery.md diff --git a/agent/nodes/eras_expert.py b/agent/nodes/eras_expert.py index 04514c3..c9d376e 100644 --- a/agent/nodes/eras_expert.py +++ b/agent/nodes/eras_expert.py @@ -96,17 +96,27 @@ IstGesperrt, Selbstableser (bool) === geraete (56726 rows) — meters/devices === PK: ID (int) NutzeinheitID (FK → nutzeinheit.ID) -ArtikelID (FK → geraetekatalog), GeraeteTypID (FK) -Fabriknummer, Funkkennung (longtext) — serial numbers -Einbaudatum, Ausbaudatum, GeeichtBis (datetime) +Geraetenummer (longtext) — device number/serial +Bezeichnung (longtext) — device name/label +Beschreibung (longtext) — description +ArtikelID (FK), NutzergruppenID (FK), Einheit (int) +Einbaudatum, Ausbaudatum, GeeichtBis, GeeichtAm, ErstInbetriebnahme, DefektAb (datetime) +FirmwareVersion, LaufendeNummer, GruppenKennung, Memo, AllgemeinesMemo (longtext) AnsprechpartnerID, ZugeordneterRaumID, CustomStatusKeyID (FK) +Gemietet, Gewartet, KeinAndruck, IstAbzuziehendesGeraet, HatHistorie (bool) === geraeteverbraeuche (1.3M rows) — consumption readings === PK: ID (int) GeraetID (FK → geraete.ID) -Ablesedatum (datetime), Ablesung, Verbrauch, Faktor (double) +Ablesedatum (datetime) — reading date +Ablesung (double) — meter reading value +Verbrauch (double) — consumption value +Faktor (double) — factor +Aenderungsdatum (datetime) AbleseartID (FK), Schaetzung (int), Status (int) IstRekonstruiert (bool), Herkunft (int) +ManuellerWert (double), Rohablesung (double) +Anmerkung, Fehler, Ampullenfarbe (longtext) JOIN PATTERNS (use exactly): Kunde → Objekte: JOIN objektkunde ok ON ok.KundeID = k.ID JOIN objekte o ON o.ID = ok.ObjektID diff --git a/agent/nodes/expert_base.py b/agent/nodes/expert_base.py index 8fa6b5f..eb79283 100644 --- a/agent/nodes/expert_base.py +++ b/agent/nodes/expert_base.py @@ -76,69 +76,100 @@ Write a concise, natural response. 1-3 sentences. def __init__(self, send_hud, process_manager=None): super().__init__(send_hud) + MAX_RETRIES = 3 + async def execute(self, job: str, language: str = "de") -> ThoughtResult: - """Execute a self-contained job. Returns ThoughtResult. - Expert knows the schema — plan once, execute, respond.""" + """Execute a self-contained job with retry on SQL errors. + Expert knows the schema — plan, execute, retry if needed, respond.""" await self.hud("thinking", detail=f"planning: {job[:80]}") - # Step 1: Plan tool sequence (expert knows schema, no DESCRIBE needed) - plan_messages = [ - {"role": "system", "content": self.PLAN_SYSTEM.format( - domain=self.DOMAIN_SYSTEM, schema=self.SCHEMA, - database=self.default_database)}, - {"role": "user", "content": f"Job: {job}"}, - ] - plan_raw = await llm_call(self.model, plan_messages) - tool_sequence, response_hint = self._parse_plan(plan_raw) + errors_so_far = [] + tool_sequence = [] + response_hint = "" - await self.hud("planned", tools=len(tool_sequence), hint=response_hint[:80]) + for attempt in range(1, self.MAX_RETRIES + 1): + # Plan (or re-plan with error context) + plan_prompt = f"Job: {job}" + if errors_so_far: + plan_prompt += "\n\nPREVIOUS ATTEMPTS FAILED:\n" + for err in errors_so_far: + plan_prompt += f"- Query: {err['query']}\n Error: {err['error']}\n" + plan_prompt += "\nFix the query using ONLY columns from the schema. Do NOT repeat the same mistake." - # Step 2: Execute remaining tools - actions = [] - state_updates = {} - display_items = [] - machine_ops = [] - tool_used = "" - tool_output = "" + plan_messages = [ + {"role": "system", "content": self.PLAN_SYSTEM.format( + domain=self.DOMAIN_SYSTEM, schema=self.SCHEMA, + database=self.default_database)}, + {"role": "user", "content": plan_prompt}, + ] + plan_raw = await llm_call(self.model, plan_messages) + tool_sequence, response_hint = self._parse_plan(plan_raw) + await self.hud("planned", tools=len(tool_sequence), + hint=response_hint[:80], attempt=attempt) - for step in tool_sequence: - tool = step.get("tool", "") - args = step.get("args", {}) - await self.hud("tool_call", tool=tool, args=args) + # Execute tools + actions = [] + state_updates = {} + display_items = [] + machine_ops = [] + tool_used = "" + tool_output = "" + had_error = False - if tool == "emit_actions": - actions.extend(args.get("actions", [])) - elif tool == "set_state": - key = args.get("key", "") - if key: - state_updates[key] = args.get("value") - elif tool == "emit_display": - display_items.extend(args.get("items", [])) - elif tool == "create_machine": - machine_ops.append({"op": "create", **args}) - elif tool == "add_state": - machine_ops.append({"op": "add_state", **args}) - elif tool == "reset_machine": - machine_ops.append({"op": "reset", **args}) - elif tool == "destroy_machine": - machine_ops.append({"op": "destroy", **args}) - elif tool == "query_db": - query = args.get("query", "") - database = args.get("database", self.default_database) - try: - result = await asyncio.to_thread(run_db_query, query, database) - tool_used = "query_db" - tool_output = result - await self.hud("tool_result", tool="query_db", output=result[:200]) - except Exception as e: - tool_used = "query_db" - tool_output = f"Error: {e}" - await self.hud("tool_result", tool="query_db", output=str(e)[:200]) + for step in tool_sequence: + tool = step.get("tool", "") + args = step.get("args", {}) + await self.hud("tool_call", tool=tool, args=args) - # Step 3: Generate response + if tool == "emit_actions": + actions.extend(args.get("actions", [])) + elif tool == "set_state": + key = args.get("key", "") + if key: + state_updates[key] = args.get("value") + elif tool == "emit_display": + display_items.extend(args.get("items", [])) + elif tool == "create_machine": + machine_ops.append({"op": "create", **args}) + elif tool == "add_state": + machine_ops.append({"op": "add_state", **args}) + elif tool == "reset_machine": + machine_ops.append({"op": "reset", **args}) + elif tool == "destroy_machine": + machine_ops.append({"op": "destroy", **args}) + elif tool == "query_db": + query = args.get("query", "") + database = args.get("database", self.default_database) + try: + result = await asyncio.to_thread(run_db_query, query, database) + if result.startswith("Error:"): + errors_so_far.append({"query": query, "error": result}) + had_error = True + await self.hud("tool_result", tool="query_db", + output=f"ERROR (attempt {attempt}): {result[:150]}") + break # stop executing, retry with new plan + tool_used = "query_db" + tool_output = result + await self.hud("tool_result", tool="query_db", output=result[:200]) + except Exception as e: + errors_so_far.append({"query": query, "error": str(e)}) + had_error = True + await self.hud("tool_result", tool="query_db", + output=f"ERROR (attempt {attempt}): {e}") + break + + if not had_error: + break # success — stop retrying + log.info(f"[expert] attempt {attempt} failed, {len(errors_so_far)} errors") + + # Generate response (with whatever we have — success or final error) results_text = "" if tool_output: results_text = f"Tool result:\n{tool_output[:500]}" + elif errors_so_far: + results_text = f"All {len(errors_so_far)} query attempts failed:\n" + for err in errors_so_far[-2:]: + results_text += f" {err['error'][:100]}\n" resp_messages = [ {"role": "system", "content": self.RESPONSE_SYSTEM.format( diff --git a/testcases/expert_recovery.md b/testcases/expert_recovery.md new file mode 100644 index 0000000..65f1ed3 --- /dev/null +++ b/testcases/expert_recovery.md @@ -0,0 +1,27 @@ +# Expert Recovery + +Tests that the expert recovers from SQL errors by retrying with corrected queries, +not by reporting the error and stopping. + +## Setup +- clear history + +## Steps + +### 1. Expert recovers from column error silently +- send: zeig mir alle Geraete von Objekt 4 mit Bezeichnung und Einbaudatum +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: length > 20 + +### 2. Multi-table query with potential errors +- send: zeig mir alle Nutzer und ihre Geraete fuer Kunde 2 +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: length > 20 + +### 3. Expert does not give up on first failure +- send: zeig mir Verbrauchswerte fuer Geraet 50 im letzten Monat +- expect_trace: has tool_call +- expect_response: not contains "I need assistance" or "developer" or "schema issue" +- expect_response: length > 10