diff --git a/agent/nodes/eras_expert.py b/agent/nodes/eras_expert.py index 280146f..04514c3 100644 --- a/agent/nodes/eras_expert.py +++ b/agent/nodes/eras_expert.py @@ -1,7 +1,7 @@ -"""Eras Expert: heating cost billing domain specialist. +"""Eras Expert: Heizkostenabrechnung domain specialist. -Eras is a German software company for Heizkostenabrechnung (heating cost billing). -Users are Hausverwaltungen and Messdienste who manage properties, meters, and billings. +The expert knows the full database schema. No DESCRIBE at runtime. +All queries use verified column names and JOIN patterns. """ import asyncio @@ -17,117 +17,109 @@ class ErasExpertNode(ExpertNode): name = "eras_expert" default_database = "eras2_production" - DOMAIN_SYSTEM = """You are the Eras domain expert — specialist for heating cost billing (Heizkostenabrechnung). + DOMAIN_SYSTEM = """You are the Eras domain expert for Heizkostenabrechnung (German heating cost billing). BUSINESS CONTEXT: -Eras is a German software company. The software manages Heizkostenabrechnung according to German law (HeizKV). -The USER of this software is a Hausverwaltung (property management) or Messdienst (metering service). -They use Eras to manage their customers' properties, meters, consumption readings, and billings. +Eras is software for Hausverwaltungen and Messdienste who manage properties, meters, and billings. +The USER of this agent is an Eras customer exploring their data. They think in domain terms +(Kunden, Objekte, Wohnungen, Zaehler) — NOT in SQL. Never expose SQL or table names to the user. -DOMAIN MODEL (how the data relates): -- Kunden (customers) = the Hausverwaltungen or property managers that the Eras user serves - Each Kunde has a Kundennummer and contact data (Name, Adresse, etc.) +DOMAIN MODEL: +- Kunden = property managers (Hausverwaltungen). 693 in the system. +- Objekte = buildings/Liegenschaften managed by Kunden. 780 total. Linked via objektkunde (m:n). +- Nutzeinheiten = apartments/units inside Objekte. 4578 total. +- Nutzer = tenants/occupants of Nutzeinheiten. 8206 total. +- Geraete = measurement devices (Heizkostenverteiler, Zaehler). 56726 total. +- Verbraeuche = consumption readings from Geraete. 1.3M readings. +- Adressen = postal addresses, linked via objektadressen/kundenadressen. -- Objekte (properties/buildings/Liegenschaften) = physical buildings managed by a Kunde - A Kunde can have many Objekte. Each Objekt has an address and is linked to a Kunde. +RESPOND IN DOMAIN LANGUAGE: +- Say "Kunde Jaeger hat 3 Objekte" not "SELECT COUNT..." +- Say "12 Wohnungen mit 45 Geraeten" not "nutzeinheit rows" +- Present data as summaries, not raw tables""" -- Nutzeinheiten (usage units/apartments) = individual units within an Objekt - An Objekt contains multiple Nutzeinheiten (e.g., Wohnung 1, Wohnung 2). - Each Nutzeinheit has Nutzer (tenants/occupants). + SCHEMA = """COMPLETE DATABASE SCHEMA (eras2_production) — use these exact column names: -- Geraete (devices/meters) = measurement devices installed in Nutzeinheiten - Heizkostenverteiler, Waermezaehler, Wasserzaehler, etc. - Each Geraet is linked to a Nutzeinheit and has a Geraetetyp. +=== kunden (693 rows) === +PK: ID (int) +Name1, Name2, Name3 (longtext) — customer name parts +Kundennummer (longtext) — customer number +AnredeID (FK), BriefanredeID (FK), ZugeordneterKomplettdruckID (FK) +Anmerkung, Fremdnummer, Ansprechpartner (longtext) +Steuernummer, UmsatzsteuerID (longtext) +HatHistorie, IstWebkunde, IstNettoKunde, BrennstoffkostenNachFIFO, BelegePerEmail (bool) +MietpreisAnpassungProzent (decimal) -- Geraeteverbraeuche (consumption readings) = measured values from Geraete - Ablesewerte collected by Monteure or remote reading systems. +=== objektkunde (911 rows) — JUNCTION: kunden ↔ objekte (many-to-many) === +PK: ID (int) +KundeID (FK → kunden.ID) +ObjektID (FK → objekte.ID) +ZeitraumVon, ZeitraumBis (datetime) +IstKunde, IstEigentuemer, IstRechnungsempfaenger, IstAbrechnungsempfaenger (bool) -- Abrechnungen (billings) = Heizkostenabrechnungen generated per Objekt/period - The core output: distributes heating costs to Nutzeinheiten based on consumption. +=== objekte (780 rows) === +PK: ID (int) +Objektnummer (longtext) — building reference number +AbleserID, MonteurID, UVIRefObjektID, ZugeordneterKomplettdruckID (FK) +Anmerkung, AnmerkungIntern (longtext) +HatHistorie, VorauszahlungGetrennt, Selbstablesung, IstObjektFreigegeben (bool) -- Auftraege (work orders) = tasks for Monteure (technicians) - Device installation, reading collection, maintenance. +=== objektadressen — JUNCTION: objekte ↔ adressen === +PK: ID, ObjektID (FK → objekte.ID), AdresseID (FK → adressen.ID), IstPrimaer (bool) -HIERARCHY (via JOINs): - Kunde ←→ objektkunde ←→ Objekt (many-to-many via junction table!) - Objekt → Nutzeinheiten → Geraete → Verbraeuche - Nutzeinheit → Nutzer - Kunde → Abrechnungen - Kunde → Auftraege +=== kundenadressen — JUNCTION: kunden ↔ adressen === +PK: ID, KundeID (FK → kunden.ID), AdresseID (FK → adressen.ID), TypDerAdresseID (FK) -CRITICAL: kunden and objekte are linked through the objektkunde junction table, NOT directly. +=== adressen (1762 rows) === +PK: ID (int) +Strasse, Hausnummer, Postleitzahl, Ort, Adresszusatz, Postfach (longtext) +LandID (FK), Laengengrad, Breitengrad (double) -IMPORTANT NOTES: -- All table/column names are German, lowercase -- Foreign keys often use patterns like KundenID, ObjektID, NutzeinheitID -- The database is eras2_production -- Always DESCRIBE tables before writing JOINs to verify actual column names -- Common user questions: customer overview, device counts, billing status, Objekt details""" +=== nutzeinheit (4578 rows) === +PK: ID (int) +ObjektID (FK → objekte.ID) +NeNummerInt (longtext) — unit number +Lage, Stockwerk, Flaeche, Nutzflaeche (various) +AdresseID (FK), CustomStatusKeyID (FK) - SCHEMA = """Known tables (eras2_production): -- kunden — customers (Hausverwaltungen) -- objekte — properties/buildings (Liegenschaften) -- nutzeinheit — apartments/units within Objekte -- nutzer — tenants/occupants of Nutzeinheiten -- geraete — measurement devices (Heizkostenverteiler, etc.) -- geraeteverbraeuche — consumption readings -- abrechnungen — heating cost billings -- auftraege — work orders for Monteure -- auftragspositionen — line items within Auftraege -- geraetetypen — device type catalog -- geraetekatalog — device model catalog -- heizbetriebskosten — heating operation costs -- nebenkosten — additional costs (Nebenkosten) +=== kundenutzeinheit — JUNCTION: kunden ↔ nutzeinheit === +PK: ID, KundeID (FK → kunden.ID), NutzeinheitID (FK → nutzeinheit.ID), Von, Bis (datetime) -KNOWN SCHEMA (verified — ONLY use these column names without DESCRIBE): -All tables use ID (int, auto_increment) as primary key. +=== nutzer (8206 rows) — tenants/occupants === +PK: ID (int) +NutzeinheitID (FK → nutzeinheit.ID) +Name1, Name2, Name3, Name4 (longtext) — tenant name +NutzungVon, NutzungBis (datetime) +ArtDerNutzung (int), AnredeID (FK), BriefanredeID (FK) +IstGesperrt, Selbstableser (bool) -- kunden: PK=ID. Known columns: Name1, Name2, Name3, Kundennummer -- objekte: PK=ID. Known columns: Objektnummer -- objektkunde: JUNCTION TABLE for kunden↔objekte (many-to-many!) - PK=ID, FK: KundeID→kunden.ID, ObjektID→objekte.ID -- nutzeinheit: PK=ID, FK: ObjektID→objekte.ID -- geraete: PK=ID, FK: NutzeinheitID→nutzeinheit.ID -- geraeteverbraeuche: linked to geraete -- nutzer: linked to nutzeinheit (DESCRIBE to find FK column name) +=== geraete (56726 rows) — meters/devices === +PK: ID (int) +NutzeinheitID (FK → nutzeinheit.ID) +ArtikelID (FK → geraetekatalog), GeraeteTypID (FK) +Fabriknummer, Funkkennung (longtext) — serial numbers +Einbaudatum, Ausbaudatum, GeeichtBis (datetime) +AnsprechpartnerID, ZugeordneterRaumID, CustomStatusKeyID (FK) -For ANY column not listed above, you MUST DESCRIBE the table first. +=== geraeteverbraeuche (1.3M rows) — consumption readings === +PK: ID (int) +GeraetID (FK → geraete.ID) +Ablesedatum (datetime), Ablesung, Verbrauch, Faktor (double) +AbleseartID (FK), Schaetzung (int), Status (int) +IstRekonstruiert (bool), Herkunft (int) -JOIN PATTERNS (use these exactly): -- Kunde → Objekte: JOIN objektkunde ok ON ok.KundeID = k.ID JOIN objekte o ON o.ID = ok.ObjektID -- Objekt → Nutzeinheiten: JOIN nutzeinheit n ON n.ObjektID = o.ID -- Nutzeinheit → Geraete: JOIN geraete g ON g.NutzeinheitID = n.ID +JOIN PATTERNS (use exactly): +Kunde → Objekte: JOIN objektkunde ok ON ok.KundeID = k.ID JOIN objekte o ON o.ID = ok.ObjektID +Objekt → Adresse: JOIN objektadressen oa ON oa.ObjektID = o.ID JOIN adressen a ON a.ID = oa.AdresseID +Kunde → Adresse: JOIN kundenadressen ka ON ka.KundeID = k.ID JOIN adressen a ON a.ID = ka.AdresseID +Objekt → NE: JOIN nutzeinheit ne ON ne.ObjektID = o.ID +NE → Nutzer: JOIN nutzer nu ON nu.NutzeinheitID = ne.ID +NE → Geraete: JOIN geraete g ON g.NutzeinheitID = ne.ID +Geraet → Verbrauch: JOIN geraeteverbraeuche gv ON gv.GeraetID = g.ID -IMPORTANT: For tables not listed above, always DESCRIBE first. -The junction table objektkunde is REQUIRED to link kunden and objekte. - -Example for "how many Objekte per Kunde": -[ - {{"tool": "query_db", "args": {{"query": "SELECT k.ID, k.Name1, COUNT(DISTINCT o.ID) as AnzahlObjekte FROM kunden k JOIN objektkunde ok ON ok.KundeID = k.ID JOIN objekte o ON o.ID = ok.ObjektID GROUP BY k.ID, k.Name1 ORDER BY AnzahlObjekte DESC LIMIT 20", "database": "eras2_production"}}}} -]""" - - def __init__(self, send_hud, process_manager=None): - super().__init__(send_hud, process_manager) - self._schema_cache: dict[str, str] = {} - - async def execute(self, job: str, language: str = "de"): - """Execute with schema auto-discovery. Caches DESCRIBE results.""" - if self._schema_cache: - schema_ctx = "Known column names from previous DESCRIBE:\n" - for table, desc in self._schema_cache.items(): - lines = desc.strip().split("\n")[:8] - schema_ctx += f"\n{table}:\n" + "\n".join(lines) + "\n" - job = job + "\n\n" + schema_ctx - - result = await super().execute(job, language) - - # Cache DESCRIBE results - if result.tool_output and "Field\t" in result.tool_output: - for table in ["kunden", "objekte", "nutzeinheit", "nutzer", "geraete", - "geraeteverbraeuche", "abrechnungen", "auftraege"]: - if table in job.lower() or table in result.tool_output.lower(): - self._schema_cache[table] = result.tool_output - log.info(f"[eras] cached schema for {table}") - break - - return result +RULES: +- NEVER use DESCRIBE at runtime. You know the schema. +- NEVER guess column names. Use ONLY columns listed above. +- For unknown tables: return an error, do not explore. +- Always LIMIT large queries (max 50 rows). +- Use LEFT JOIN when results might be empty.""" diff --git a/agent/nodes/expert_base.py b/agent/nodes/expert_base.py index 1a43d2f..8fa6b5f 100644 --- a/agent/nodes/expert_base.py +++ b/agent/nodes/expert_base.py @@ -78,63 +78,19 @@ Write a concise, natural response. 1-3 sentences. async def execute(self, job: str, language: str = "de") -> ThoughtResult: """Execute a self-contained job. Returns ThoughtResult. - Uses iterative plan-execute: if DESCRIBE queries are in the plan, - execute them first, inject results into a re-plan, then execute the rest.""" + Expert knows the schema — plan once, execute, respond.""" await self.hud("thinking", detail=f"planning: {job[:80]}") - # Step 1: Plan tool sequence - schema_context = self.SCHEMA + # Step 1: Plan tool sequence (expert knows schema, no DESCRIBE needed) plan_messages = [ {"role": "system", "content": self.PLAN_SYSTEM.format( - domain=self.DOMAIN_SYSTEM, schema=schema_context, + domain=self.DOMAIN_SYSTEM, schema=self.SCHEMA, database=self.default_database)}, {"role": "user", "content": f"Job: {job}"}, ] plan_raw = await llm_call(self.model, plan_messages) tool_sequence, response_hint = self._parse_plan(plan_raw) - # Step 1b: Execute DESCRIBE queries first, then re-plan with actual schema - describe_results = {} - remaining_tools = [] - for step in tool_sequence: - if step.get("tool") == "query_db": - query = step.get("args", {}).get("query", "").strip().upper() - if query.startswith("DESCRIBE") or query.startswith("SHOW"): - await self.hud("tool_call", tool="query_db", args=step.get("args", {})) - try: - result = await asyncio.to_thread( - run_db_query, step["args"]["query"], - step["args"].get("database", self.default_database)) - describe_results[step["args"]["query"]] = result - await self.hud("tool_result", tool="query_db", output=result[:200]) - except Exception as e: - await self.hud("tool_result", tool="query_db", output=str(e)[:200]) - else: - remaining_tools.append(step) - else: - remaining_tools.append(step) - - # Re-plan if we got DESCRIBE results (now we know actual column names) - if describe_results: - schema_update = "Actual column names from DESCRIBE:\n" - for q, result in describe_results.items(): - schema_update += f"\n{q}:\n{result[:500]}\n" - - replan_messages = [ - {"role": "system", "content": self.PLAN_SYSTEM.format( - domain=self.DOMAIN_SYSTEM, - schema=schema_context + "\n\n" + schema_update, - database=self.default_database)}, - {"role": "user", "content": f"Job: {job}\n\nUse ONLY the actual column names from DESCRIBE above. Do NOT include DESCRIBE steps — they are already done."}, - ] - replan_raw = await llm_call(self.model, replan_messages) - new_tools, new_hint = self._parse_plan(replan_raw) - if new_tools: - remaining_tools = new_tools - if new_hint: - response_hint = new_hint - - tool_sequence = remaining_tools await self.hud("planned", tools=len(tool_sequence), hint=response_hint[:80]) # Step 2: Execute remaining tools diff --git a/testcases/eras_domain.md b/testcases/eras_domain.md new file mode 100644 index 0000000..6b32c71 --- /dev/null +++ b/testcases/eras_domain.md @@ -0,0 +1,64 @@ +# Eras Domain Mastery + +Tests that the expert knows the schema cold — no DESCRIBE at runtime, no SQL errors, +domain-correct responses. The expert is a Heizkostenabrechnung specialist, not a SQL explorer. + +## Setup +- clear history + +## Steps + +### 1. Customer overview +- send: zeig mir die ersten 5 Kunden +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: length > 20 + +### 2. Objekte per Kunde (junction table) +- send: welcher Kunde hat die meisten Objekte? +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: length > 20 + +### 3. Nutzeinheiten in an Objekt +- send: wie viele Nutzeinheiten hat Objekt 4? +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: length > 5 + +### 4. Geraete count per Objekt +- send: welches Objekt hat die meisten Geraete? +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: length > 20 + +### 5. Full hierarchy traversal (4 tables) +- send: zeig mir alle Nutzer von Kunde 2 +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: length > 10 + +### 6. Address lookup via junction +- send: was ist die Adresse von Objekt 4? +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: length > 10 + +### 7. Verbrauchsdaten query +- send: zeig mir die letzten 5 Verbrauchswerte von Geraet 100 +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: length > 10 + +### 8. Domain language response (not SQL dump) +- send: gib mir eine Zusammenfassung von Kunde 103 +- expect_trace: has tool_call +- expect_response: not contains "SELECT" or "JOIN" or "FROM" +- expect_response: length > 30 + +### 9. Expert does NOT describe at runtime +- send: wie viele Geraete hat Kunde 63? +- expect_trace: has tool_call +- expect_response: not contains "Unknown column" or "1054" or "error" or "Error" +- expect_response: not contains "DESCRIBE" or "describe" +- expect_response: length > 5