- Rename files: cog_cli.py, test_cog.py, k8s/cog-*.yaml - Update all Python tool names: cog_* -> assay_* - Update FastAPI titles, MCP server names, URLs - Update K8s manifests: deployments, services, secrets, ingress - Update Docker env vars: COG_API -> ASSAY_API - Domain: cog.loop42.de -> assay.loop42.de - SQLite path: /tmp/cog_db.sqlite -> /tmp/assay_db.sqlite Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
642 lines
30 KiB
Python
642 lines
30 KiB
Python
"""Thinker Node: S3 — control, reasoning, tool use."""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
|
|
from .base import Node
|
|
from ..llm import llm_call
|
|
from ..process import ProcessManager
|
|
from ..types import Command, ThoughtResult
|
|
|
|
log = logging.getLogger("runtime")
|
|
|
|
# OpenAI-compatible tool definitions for Thinker
|
|
|
|
EMIT_ACTIONS_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "emit_actions",
|
|
"description": "Show buttons in the user's dashboard. Call this to create, update, or replace UI controls. For stateful buttons (counters, toggles), include var/op in payload.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"actions": {
|
|
"type": "array",
|
|
"description": "List of buttons to show.",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"label": {"type": "string", "description": "Short button text (2-4 words)"},
|
|
"action": {"type": "string", "description": "snake_case action identifier"},
|
|
"payload": {
|
|
"type": "object",
|
|
"description": "Optional. For stateful buttons: {var, op, initial}. Ops: inc, dec, set, toggle.",
|
|
},
|
|
},
|
|
"required": ["label", "action"],
|
|
},
|
|
},
|
|
},
|
|
"required": ["actions"],
|
|
},
|
|
},
|
|
}
|
|
|
|
SET_STATE_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "set_state",
|
|
"description": "Set a persistent key-value pair in the dashboard state store. Values survive across turns. The dashboard shows all state as live labels. Sensor picks up changes and pushes deltas. Use for counters, flags, status, progress tracking.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"key": {"type": "string", "description": "State key (snake_case, e.g. 'session_mode', 'progress')"},
|
|
"value": {"description": "Any JSON value (string, number, boolean, object, array)"},
|
|
},
|
|
"required": ["key", "value"],
|
|
},
|
|
},
|
|
}
|
|
|
|
EMIT_DISPLAY_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "emit_display",
|
|
"description": "Show rich formatted data in the dashboard display area. Use for status reports, progress bars, structured info. Rendered per-response (not persistent like set_state).",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"items": {
|
|
"type": "array",
|
|
"description": "Display items to render.",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"type": {"type": "string", "enum": ["kv", "progress", "status", "text"],
|
|
"description": "kv=key-value pair, progress=bar with %, status=icon+text, text=plain text"},
|
|
"label": {"type": "string", "description": "Label or key"},
|
|
"value": {"description": "Value (string/number). For progress: 0-100."},
|
|
"style": {"type": "string", "description": "Optional: 'success', 'warning', 'error', 'info'"},
|
|
},
|
|
"required": ["type", "label"],
|
|
},
|
|
},
|
|
},
|
|
"required": ["items"],
|
|
},
|
|
},
|
|
}
|
|
|
|
CREATE_MACHINE_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "create_machine",
|
|
"description": "Create a state machine with states on the dashboard. Each state has a name, buttons, and content. Buttons with 'go' field transition locally without LLM. Machines persist across turns.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "string", "description": "Unique machine ID (snake_case, e.g. 'nav', 'todo')"},
|
|
"initial": {"type": "string", "description": "Name of the initial state"},
|
|
"states": {
|
|
"type": "array",
|
|
"description": "List of states. Each state has name, buttons, and content.",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string", "description": "State name"},
|
|
"buttons": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"label": {"type": "string"},
|
|
"action": {"type": "string"},
|
|
"go": {"type": "string", "description": "Target state name for local transition"},
|
|
},
|
|
"required": ["label", "action"],
|
|
},
|
|
},
|
|
"content": {"type": "array", "items": {"type": "string"}},
|
|
},
|
|
"required": ["name"],
|
|
},
|
|
},
|
|
},
|
|
"required": ["id", "initial", "states"],
|
|
},
|
|
},
|
|
}
|
|
|
|
ADD_STATE_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "add_state",
|
|
"description": "Add or replace a state in an existing machine. Use to extend machines at runtime.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "string", "description": "Machine ID"},
|
|
"state": {"type": "string", "description": "State name to add/replace"},
|
|
"buttons": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"label": {"type": "string"},
|
|
"action": {"type": "string"},
|
|
"go": {"type": "string"},
|
|
},
|
|
"required": ["label", "action"],
|
|
},
|
|
},
|
|
"content": {"type": "array", "items": {"type": "string"}},
|
|
},
|
|
"required": ["id", "state"],
|
|
},
|
|
},
|
|
}
|
|
|
|
RESET_MACHINE_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "reset_machine",
|
|
"description": "Reset a machine to its initial state.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "string", "description": "Machine ID to reset"},
|
|
},
|
|
"required": ["id"],
|
|
},
|
|
},
|
|
}
|
|
|
|
DESTROY_MACHINE_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "destroy_machine",
|
|
"description": "Remove a machine from the dashboard entirely.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "string", "description": "Machine ID to destroy"},
|
|
},
|
|
"required": ["id"],
|
|
},
|
|
},
|
|
}
|
|
|
|
QUERY_DB_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "query_db",
|
|
"description": """Execute a SQL query against a MariaDB database.
|
|
Returns tab-separated text. SELECT/DESCRIBE/SHOW only. Use LIMIT for large tables.
|
|
If a query errors, fix the SQL and retry. Use SHOW TABLES and DESCRIBE to explore.
|
|
|
|
Available databases:
|
|
- eras2_production: heating energy settlement (693 customers, 56K devices, German)
|
|
- plankiste_test: Kita pedagogical planning (10 activities, methods, age groups, German)
|
|
|
|
Use SHOW TABLES to discover tables. Use DESCRIBE tablename to explore columns.""",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {"type": "string", "description": "SQL SELECT/DESCRIBE/SHOW query"},
|
|
"database": {"type": "string", "description": "Database name: eras2_production or plankiste_test",
|
|
"enum": ["eras2_production", "plankiste_test"]},
|
|
},
|
|
"required": ["query", "database"],
|
|
},
|
|
},
|
|
}
|
|
|
|
THINKER_TOOLS = [EMIT_ACTIONS_TOOL, SET_STATE_TOOL, EMIT_DISPLAY_TOOL,
|
|
CREATE_MACHINE_TOOL, ADD_STATE_TOOL, RESET_MACHINE_TOOL, DESTROY_MACHINE_TOOL,
|
|
QUERY_DB_TOOL]
|
|
|
|
|
|
class ThinkerNode(Node):
|
|
name = "thinker"
|
|
model = "openai/gpt-4o-mini"
|
|
max_context_tokens = 4000
|
|
|
|
SYSTEM = """You are the Thinker node — the brain of this cognitive runtime.
|
|
You receive a perception of what the user said. Decide: answer directly or use a tool.
|
|
|
|
CODE EXECUTION — write a ```python code block and it WILL be executed. Use print() for output.
|
|
- For math, databases, file ops, any computation: write python. NEVER describe code — write it.
|
|
- For simple conversation: respond directly as text.
|
|
|
|
YOUR ENVIRONMENT:
|
|
You are one node in a pipeline: Input (perceives) -> You (reason) -> Output (speaks) + Dashboard (renders).
|
|
- Your text response goes to Output, which speaks it to the user.
|
|
- You have 3 function tools for the dashboard:
|
|
|
|
1. emit_actions() — show buttons. Button clicks come back as "ACTION: action_name".
|
|
Stateful buttons: include var/op in payload (inc/dec/set/toggle). UI handles locally.
|
|
Example: label:"+1", action:"increment", payload:{"var":"count","op":"inc","initial":0}
|
|
|
|
2. set_state(key, value) — persistent key-value store shown as live labels.
|
|
Survives across turns. Use for tracking mode, progress, flags.
|
|
Example: set_state("session_mode", "building")
|
|
|
|
3. emit_display(items) — rich per-response display (status, progress, key-value).
|
|
Not persistent. Use for status reports, structured info.
|
|
Types: kv (key-value), progress (0-100 bar), status (icon+text), text (plain).
|
|
|
|
4. STATE MACHINES — persistent interactive components on the dashboard:
|
|
create_machine(id, initial) — create empty machine, then add_state for each state.
|
|
add_state(id, state, buttons, content) — add a state. Buttons with "go" transition locally.
|
|
reset_machine(id) — return machine to initial state.
|
|
destroy_machine(id) — remove machine from dashboard.
|
|
Example — navigation menu:
|
|
create_machine(id="nav", initial="main", states=[
|
|
{"name":"main","buttons":[{"label":"Menu 1","action":"menu_1","go":"sub1"},{"label":"Menu 2","action":"menu_2","go":"sub2"}],"content":["Welcome"]},
|
|
{"name":"sub1","buttons":[{"label":"Back","action":"back","go":"main"}],"content":["Sub 1 details"]},
|
|
{"name":"sub2","buttons":[{"label":"Back","action":"back","go":"main"}],"content":["Sub 2 details"]}
|
|
])
|
|
PREFER machines over emit_actions for anything with navigation or multiple views.
|
|
ALWAYS include states when creating a machine. Never write code — use the tool.
|
|
|
|
DASHBOARD FEEDBACK:
|
|
Your context includes what the user's dashboard currently shows.
|
|
- If you see a WARNING about missing or mismatched controls, call emit_actions to fix it.
|
|
- Trust the dashboard feedback over your memory.
|
|
- NEVER apologize for technical issues. Just fix them and move on naturally.
|
|
|
|
CRITICAL RULES:
|
|
- NEVER apologize. Don't say "sorry", "my apologies", "I apologize". Just fix things and move on.
|
|
- NEVER write code blocks alongside tool calls. If you call create_machine or emit_actions, your text response should describe what you did in plain language, NOT show code.
|
|
- NEVER output code (Python, JavaScript, TypeScript, or any language) for state machines, counters, or UI components. You are NOT a code assistant. Use the function tools instead.
|
|
- Keep button labels short (2-4 words), action is snake_case.
|
|
- Use set_state for anything that should persist across turns.
|
|
- Use emit_display for one-time status/info that doesn't need to persist.
|
|
|
|
{memory_context}"""
|
|
|
|
DB_HOST = "mariadb-eras" # K3s service name
|
|
DB_USER = "root"
|
|
DB_PASS = "root"
|
|
DB_NAME = "eras2_production"
|
|
|
|
def __init__(self, send_hud, process_manager: ProcessManager = None):
|
|
super().__init__(send_hud)
|
|
self.pm = process_manager
|
|
|
|
def _run_db_query(self, query: str, database: str = None) -> str:
|
|
"""Execute SQL query against MariaDB (runs in thread pool)."""
|
|
import pymysql
|
|
# Safety: only SELECT/DESCRIBE/SHOW
|
|
trimmed = query.strip().upper()
|
|
if not (trimmed.startswith("SELECT") or trimmed.startswith("DESCRIBE") or trimmed.startswith("SHOW")):
|
|
return "Error: Only SELECT/DESCRIBE/SHOW queries allowed"
|
|
db = database or self.DB_NAME
|
|
if db not in ("eras2_production", "plankiste_test"):
|
|
return f"Error: Unknown database '{db}'. Use eras2_production or plankiste_test."
|
|
conn = pymysql.connect(host=self.DB_HOST, user=self.DB_USER,
|
|
password=self.DB_PASS, database=db,
|
|
connect_timeout=5, read_timeout=15)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(query)
|
|
rows = cur.fetchall()
|
|
if not rows:
|
|
return "(no results)"
|
|
cols = [d[0] for d in cur.description]
|
|
lines = ["\t".join(cols)]
|
|
for row in rows:
|
|
lines.append("\t".join(str(v) if v is not None else "" for v in row))
|
|
return "\n".join(lines)
|
|
finally:
|
|
conn.close()
|
|
|
|
def _parse_tool_call(self, response: str) -> tuple[str, str] | None:
|
|
"""Parse python/sql code blocks from response text for execution."""
|
|
text = response.strip()
|
|
|
|
if text.startswith("TOOL:"):
|
|
lines = text.split("\n")
|
|
tool_name = lines[0].replace("TOOL:", "").strip()
|
|
code_lines = []
|
|
in_code = False
|
|
for line in lines[1:]:
|
|
if line.strip().startswith("```") and not in_code:
|
|
in_code = True
|
|
continue
|
|
elif line.strip().startswith("```") and in_code:
|
|
break
|
|
elif in_code:
|
|
code_lines.append(line)
|
|
elif line.strip().startswith("CODE:"):
|
|
continue
|
|
return (tool_name, "\n".join(code_lines)) if code_lines else None
|
|
|
|
block_match = re.search(r'```(python|py|sql|sqlite|sh|bash|tool_code)?\s*\n(.*?)```', text, re.DOTALL)
|
|
if block_match:
|
|
lang = (block_match.group(1) or "").lower()
|
|
code = block_match.group(2).strip()
|
|
if code and len(code.split("\n")) > 0:
|
|
if lang in ("sql", "sqlite"):
|
|
wrapped = f'''import sqlite3
|
|
conn = sqlite3.connect("/tmp/assay_db.sqlite")
|
|
cursor = conn.cursor()
|
|
for stmt in """{code}""".split(";"):
|
|
stmt = stmt.strip()
|
|
if stmt:
|
|
cursor.execute(stmt)
|
|
conn.commit()
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = cursor.fetchall()
|
|
for t in tables:
|
|
cursor.execute(f"SELECT * FROM {t[0]}")
|
|
rows = cursor.fetchall()
|
|
cols = [d[0] for d in cursor.description]
|
|
print(f"Table: {t[0]}")
|
|
print(" | ".join(cols))
|
|
for row in rows:
|
|
print(" | ".join(str(c) for c in row))
|
|
conn.close()'''
|
|
return ("python", wrapped)
|
|
return ("python", code)
|
|
|
|
return None
|
|
|
|
def _strip_code_blocks(self, response: str) -> str:
|
|
"""Remove ALL code blocks from response, return plain text."""
|
|
text = re.sub(r'```[\s\S]*?```', '', response)
|
|
return text.strip()
|
|
|
|
async def _extract_from_tool_calls(self, tool_calls: list) -> tuple[list[dict], dict, list[dict], list[dict]]:
|
|
"""Extract actions, state updates, display items, and machine ops from tool_calls."""
|
|
actions = []
|
|
state_updates = {}
|
|
display_items = []
|
|
machine_ops = []
|
|
for tc in tool_calls:
|
|
fn = tc.get("function", {})
|
|
name = fn.get("name", "")
|
|
try:
|
|
args = json.loads(fn.get("arguments", "{}"))
|
|
except (json.JSONDecodeError, Exception) as e:
|
|
log.error(f"[thinker] {name} parse error: {e}")
|
|
continue
|
|
if name == "emit_actions":
|
|
actions.extend(args.get("actions", []))
|
|
labels = [a.get("label", "?") for a in args.get("actions", [])]
|
|
await self.hud("tool_call", tool=name, input=f"buttons: {labels}")
|
|
elif name == "set_state":
|
|
key = args.get("key", "")
|
|
if key:
|
|
state_updates[key] = args.get("value")
|
|
await self.hud("tool_call", tool=name, input=f"{key} = {args.get('value')}")
|
|
elif name == "emit_display":
|
|
display_items.extend(args.get("items", []))
|
|
await self.hud("tool_call", tool=name, input=f"{len(args.get('items', []))} items")
|
|
elif name == "create_machine":
|
|
machine_ops.append({"op": "create", **args})
|
|
states = [s.get("name", "?") for s in args.get("states", [])]
|
|
await self.hud("tool_call", tool=name, input=f"id={args.get('id')} states={states}")
|
|
elif name == "add_state":
|
|
machine_ops.append({"op": "add_state", **args})
|
|
await self.hud("tool_call", tool=name, input=f"{args.get('id')}.{args.get('state')}")
|
|
elif name == "reset_machine":
|
|
machine_ops.append({"op": "reset", **args})
|
|
await self.hud("tool_call", tool=name, input=f"id={args.get('id')}")
|
|
elif name == "destroy_machine":
|
|
machine_ops.append({"op": "destroy", **args})
|
|
await self.hud("tool_call", tool=name, input=f"id={args.get('id')}")
|
|
elif name == "query_db":
|
|
query = args.get("query", "")
|
|
await self.hud("tool_call", tool=name, input=query[:120])
|
|
try:
|
|
import asyncio
|
|
db = args.get("database", "eras2_production")
|
|
output = await asyncio.to_thread(self._run_db_query, query, db)
|
|
lines = output.split("\n")
|
|
if len(lines) > 102:
|
|
output = "\n".join(lines[:102]) + f"\n... ({len(lines) - 102} more rows)"
|
|
self._db_result = output
|
|
await self.hud("tool_result", tool=name, output=output[:200], rows=max(0, len(lines) - 1))
|
|
except Exception as e:
|
|
self._db_result = f"Error: {e}"
|
|
await self.hud("tool_result", tool=name, output=str(e)[:200], rows=0)
|
|
return actions, state_updates, display_items, machine_ops
|
|
|
|
def _parse_actions_fallback(self, response: str) -> tuple[str, list[dict]]:
|
|
"""Fallback: extract ACTIONS: JSON line from response text (legacy format)."""
|
|
actions = []
|
|
lines = response.split("\n")
|
|
clean_lines = []
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if stripped.startswith("ACTIONS:"):
|
|
try:
|
|
actions = json.loads(stripped[8:].strip())
|
|
if not isinstance(actions, list):
|
|
actions = []
|
|
except (json.JSONDecodeError, Exception):
|
|
pass
|
|
else:
|
|
clean_lines.append(line)
|
|
return "\n".join(clean_lines).strip(), actions
|
|
|
|
async def process(self, command: Command, history: list[dict], memory_context: str = "") -> ThoughtResult:
|
|
await self.hud("thinking", detail="reasoning about response")
|
|
|
|
messages = [
|
|
{"role": "system", "content": self.SYSTEM.replace("{memory_context}", memory_context)},
|
|
]
|
|
for msg in history[-12:]:
|
|
messages.append(msg)
|
|
a = command.analysis
|
|
input_ctx = (
|
|
f"Input analysis:\n"
|
|
f"- Who: {a.who} | Intent: {a.intent} | Complexity: {a.complexity}\n"
|
|
f"- Topic: {a.topic} | Tone: {a.tone} | Language: {a.language}\n"
|
|
f"- Context: {a.context}\n"
|
|
f"- Original message: {command.source_text}"
|
|
)
|
|
messages.append({"role": "system", "content": input_ctx})
|
|
messages = self.trim_context(messages)
|
|
|
|
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
|
|
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
|
|
|
|
# Call with all thinker tools available
|
|
response, tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
|
|
if not response and not tool_calls:
|
|
response = "[no response from LLM]"
|
|
log.info(f"[thinker] response: {(response or '')[:200]}")
|
|
if tool_calls:
|
|
log.info(f"[thinker] tool_calls: {len(tool_calls)}")
|
|
|
|
# Extract from function calls
|
|
actions, state_updates, display_items, machine_ops = await self._extract_from_tool_calls(tool_calls)
|
|
|
|
# S3* audit: detect code-without-tools mismatch
|
|
has_code = response and "```" in response
|
|
has_any_tool = bool(actions or state_updates or display_items or machine_ops or tool_calls)
|
|
if has_code and not has_any_tool:
|
|
await self.hud("s3_audit", check="code_without_tools",
|
|
detail="Thinker wrote code but made no tool calls. Retrying.")
|
|
log.info("[thinker] S3* audit: code without tools — retrying")
|
|
messages.append({"role": "assistant", "content": response})
|
|
messages.append({"role": "system", "content": (
|
|
"S3* AUDIT CORRECTION: You wrote code instead of calling function tools. "
|
|
"This is wrong. You MUST use emit_actions, create_machine, set_state, query_db etc. "
|
|
"Convert what you intended into actual tool calls. Do NOT write code."
|
|
)})
|
|
messages = self.trim_context(messages)
|
|
response, tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
|
|
if not response and not tool_calls:
|
|
response = "[no response from LLM]"
|
|
retry_a, retry_s, retry_d, retry_m = await self._extract_from_tool_calls(tool_calls)
|
|
if retry_a:
|
|
actions = retry_a
|
|
state_updates.update(retry_s)
|
|
display_items.extend(retry_d)
|
|
machine_ops.extend(retry_m)
|
|
has_any_tool = bool(actions or state_updates or display_items or machine_ops or tool_calls)
|
|
if has_any_tool:
|
|
await self.hud("s3_audit", check="code_without_tools", detail="Retry succeeded — tools called.")
|
|
else:
|
|
await self.hud("s3_audit", check="code_without_tools", detail="Retry failed — still no tools.")
|
|
|
|
# S3* audit: intent-vs-action — did Thinker DO what was requested?
|
|
has_any_tool = bool(actions or state_updates or display_items or machine_ops
|
|
or getattr(self, '_db_result', None))
|
|
if command.analysis.intent in ("request", "action") and not has_any_tool:
|
|
await self.hud("s3_audit", check="intent_without_action",
|
|
detail=f"Intent={command.analysis.intent} topic={command.analysis.topic} but no tools called. Retrying.")
|
|
log.info(f"[thinker] S3* audit: intent without action — retrying")
|
|
messages.append({"role": "assistant", "content": response or ""})
|
|
messages.append({"role": "system", "content": (
|
|
f"S3* AUDIT CORRECTION: The user's intent was '{command.analysis.intent}' "
|
|
f"about '{command.analysis.topic}', but you only produced text without calling any tools. "
|
|
"You MUST take action — call query_db, emit_actions, create_machine, set_state, etc. "
|
|
"DO something, don't just describe what could be done."
|
|
)})
|
|
messages = self.trim_context(messages)
|
|
response, tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
|
|
if not response and not tool_calls:
|
|
response = "[no response from LLM]"
|
|
retry_a, retry_s, retry_d, retry_m = await self._extract_from_tool_calls(tool_calls)
|
|
if retry_a:
|
|
actions = retry_a
|
|
state_updates.update(retry_s)
|
|
display_items.extend(retry_d)
|
|
machine_ops.extend(retry_m)
|
|
has_any_tool = bool(actions or state_updates or display_items or machine_ops
|
|
or tool_calls or getattr(self, '_db_result', None))
|
|
if has_any_tool:
|
|
await self.hud("s3_audit", check="intent_without_action", detail="Retry succeeded — action taken.")
|
|
else:
|
|
await self.hud("s3_audit", check="intent_without_action", detail="Retry failed — still no action.")
|
|
|
|
# DB query result → second LLM call to interpret (with retry on error)
|
|
db_result = getattr(self, '_db_result', None)
|
|
if db_result is not None:
|
|
self._db_result = None
|
|
log.info(f"[thinker] db result: {db_result[:200]}")
|
|
is_error = db_result.startswith("Error:")
|
|
|
|
messages.append({"role": "assistant", "content": response or "Querying database..."})
|
|
if is_error:
|
|
messages.append({"role": "system", "content": f"Query FAILED: {db_result}\nFix the SQL and call query_db again. Table names are lowercase plural (kunden, objekte, geraete, nutzeinheit, geraeteverbraeuche)."})
|
|
else:
|
|
messages.append({"role": "system", "content": f"Database query result:\n{db_result}"})
|
|
messages.append({"role": "user", "content": "Respond based on the query result. Be concise. Present tabular data clearly."})
|
|
messages = self.trim_context(messages)
|
|
final, final_tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
|
|
if not final:
|
|
final = "[no response from LLM]"
|
|
final_actions, final_state, final_display, final_machine_ops = await self._extract_from_tool_calls(final_tool_calls)
|
|
if final_actions:
|
|
actions = final_actions
|
|
state_updates.update(final_state)
|
|
display_items.extend(final_display)
|
|
machine_ops.extend(final_machine_ops)
|
|
|
|
# If retry produced a new DB result, do one more interpret call
|
|
retry_result = getattr(self, '_db_result', None)
|
|
if retry_result is not None:
|
|
self._db_result = None
|
|
log.info(f"[thinker] db retry result: {retry_result[:200]}")
|
|
messages.append({"role": "assistant", "content": final or "Retrying..."})
|
|
messages.append({"role": "system", "content": f"Database query result:\n{retry_result}"})
|
|
messages.append({"role": "user", "content": "Respond based on the query result. Be concise. Present tabular data clearly."})
|
|
messages = self.trim_context(messages)
|
|
final, final_tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
|
|
if not final:
|
|
final = "[no response from LLM]"
|
|
r_actions, r_state, r_display, r_machine = await self._extract_from_tool_calls(final_tool_calls)
|
|
if r_actions:
|
|
actions = r_actions
|
|
state_updates.update(r_state)
|
|
display_items.extend(r_display)
|
|
machine_ops.extend(r_machine)
|
|
db_result = retry_result
|
|
|
|
clean_text = self._strip_code_blocks(final)
|
|
await self.hud("decided", instruction=clean_text[:200])
|
|
return ThoughtResult(response=clean_text, tool_used="query_db",
|
|
tool_output=db_result, actions=actions,
|
|
state_updates=state_updates, display_items=display_items,
|
|
machine_ops=machine_ops)
|
|
|
|
# Fallback: check for legacy ACTIONS: line in text
|
|
if not actions and response:
|
|
response, actions = self._parse_actions_fallback(response)
|
|
|
|
# Check for python/sql code execution in text
|
|
code_call = self._parse_tool_call(response) if response else None
|
|
if code_call:
|
|
tool_name, code = code_call
|
|
|
|
if self.pm and tool_name == "python":
|
|
proc = await self.pm.execute(tool_name, code)
|
|
tool_output = "\n".join(proc.output_lines)
|
|
else:
|
|
tool_output = f"[unknown tool: {tool_name}]"
|
|
|
|
log.info(f"[thinker] tool output: {tool_output[:200]}")
|
|
|
|
# Second call: interpret tool output
|
|
messages.append({"role": "assistant", "content": response})
|
|
messages.append({"role": "system", "content": f"Tool output:\n{tool_output}"})
|
|
messages.append({"role": "user", "content": "Respond to the user based on the tool output. Be natural and concise. Use tools if needed."})
|
|
messages = self.trim_context(messages)
|
|
final, final_tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
|
|
if not final:
|
|
final = "[no response from LLM]"
|
|
|
|
# Merge from second call
|
|
final_actions, final_state, final_display, final_machine_ops = await self._extract_from_tool_calls(final_tool_calls)
|
|
if final_actions:
|
|
actions = final_actions
|
|
state_updates.update(final_state)
|
|
display_items.extend(final_display)
|
|
machine_ops.extend(final_machine_ops)
|
|
|
|
clean_text = self._strip_code_blocks(final)
|
|
if not actions:
|
|
clean_text, actions = self._parse_actions_fallback(clean_text)
|
|
|
|
if actions:
|
|
log.info(f"[thinker] actions: {actions}")
|
|
await self.hud("decided", instruction=clean_text[:200])
|
|
return ThoughtResult(response=clean_text, tool_used=tool_name,
|
|
tool_output=tool_output, actions=actions,
|
|
state_updates=state_updates, display_items=display_items,
|
|
machine_ops=machine_ops)
|
|
|
|
clean_text = (self._strip_code_blocks(response) or response) if response else ""
|
|
if actions:
|
|
log.info(f"[thinker] actions: {actions}")
|
|
await self.hud("decided", instruction="direct response (no tools)")
|
|
return ThoughtResult(response=clean_text, actions=actions,
|
|
state_updates=state_updates, display_items=display_items,
|
|
machine_ops=machine_ops)
|