agent-runtime/agent/nodes/thinker_v1.py
Nico bf11312b4b Rename cog -> assay across codebase
- Rename files: cog_cli.py, test_cog.py, k8s/cog-*.yaml
- Update all Python tool names: cog_* -> assay_*
- Update FastAPI titles, MCP server names, URLs
- Update K8s manifests: deployments, services, secrets, ingress
- Update Docker env vars: COG_API -> ASSAY_API
- Domain: cog.loop42.de -> assay.loop42.de
- SQLite path: /tmp/cog_db.sqlite -> /tmp/assay_db.sqlite

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 01:39:05 +02:00

642 lines
30 KiB
Python

"""Thinker Node: S3 — control, reasoning, tool use."""
import json
import logging
import re
from .base import Node
from ..llm import llm_call
from ..process import ProcessManager
from ..types import Command, ThoughtResult
log = logging.getLogger("runtime")
# OpenAI-compatible tool definitions for Thinker
EMIT_ACTIONS_TOOL = {
"type": "function",
"function": {
"name": "emit_actions",
"description": "Show buttons in the user's dashboard. Call this to create, update, or replace UI controls. For stateful buttons (counters, toggles), include var/op in payload.",
"parameters": {
"type": "object",
"properties": {
"actions": {
"type": "array",
"description": "List of buttons to show.",
"items": {
"type": "object",
"properties": {
"label": {"type": "string", "description": "Short button text (2-4 words)"},
"action": {"type": "string", "description": "snake_case action identifier"},
"payload": {
"type": "object",
"description": "Optional. For stateful buttons: {var, op, initial}. Ops: inc, dec, set, toggle.",
},
},
"required": ["label", "action"],
},
},
},
"required": ["actions"],
},
},
}
SET_STATE_TOOL = {
"type": "function",
"function": {
"name": "set_state",
"description": "Set a persistent key-value pair in the dashboard state store. Values survive across turns. The dashboard shows all state as live labels. Sensor picks up changes and pushes deltas. Use for counters, flags, status, progress tracking.",
"parameters": {
"type": "object",
"properties": {
"key": {"type": "string", "description": "State key (snake_case, e.g. 'session_mode', 'progress')"},
"value": {"description": "Any JSON value (string, number, boolean, object, array)"},
},
"required": ["key", "value"],
},
},
}
EMIT_DISPLAY_TOOL = {
"type": "function",
"function": {
"name": "emit_display",
"description": "Show rich formatted data in the dashboard display area. Use for status reports, progress bars, structured info. Rendered per-response (not persistent like set_state).",
"parameters": {
"type": "object",
"properties": {
"items": {
"type": "array",
"description": "Display items to render.",
"items": {
"type": "object",
"properties": {
"type": {"type": "string", "enum": ["kv", "progress", "status", "text"],
"description": "kv=key-value pair, progress=bar with %, status=icon+text, text=plain text"},
"label": {"type": "string", "description": "Label or key"},
"value": {"description": "Value (string/number). For progress: 0-100."},
"style": {"type": "string", "description": "Optional: 'success', 'warning', 'error', 'info'"},
},
"required": ["type", "label"],
},
},
},
"required": ["items"],
},
},
}
CREATE_MACHINE_TOOL = {
"type": "function",
"function": {
"name": "create_machine",
"description": "Create a state machine with states on the dashboard. Each state has a name, buttons, and content. Buttons with 'go' field transition locally without LLM. Machines persist across turns.",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Unique machine ID (snake_case, e.g. 'nav', 'todo')"},
"initial": {"type": "string", "description": "Name of the initial state"},
"states": {
"type": "array",
"description": "List of states. Each state has name, buttons, and content.",
"items": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "State name"},
"buttons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"label": {"type": "string"},
"action": {"type": "string"},
"go": {"type": "string", "description": "Target state name for local transition"},
},
"required": ["label", "action"],
},
},
"content": {"type": "array", "items": {"type": "string"}},
},
"required": ["name"],
},
},
},
"required": ["id", "initial", "states"],
},
},
}
ADD_STATE_TOOL = {
"type": "function",
"function": {
"name": "add_state",
"description": "Add or replace a state in an existing machine. Use to extend machines at runtime.",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Machine ID"},
"state": {"type": "string", "description": "State name to add/replace"},
"buttons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"label": {"type": "string"},
"action": {"type": "string"},
"go": {"type": "string"},
},
"required": ["label", "action"],
},
},
"content": {"type": "array", "items": {"type": "string"}},
},
"required": ["id", "state"],
},
},
}
RESET_MACHINE_TOOL = {
"type": "function",
"function": {
"name": "reset_machine",
"description": "Reset a machine to its initial state.",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Machine ID to reset"},
},
"required": ["id"],
},
},
}
DESTROY_MACHINE_TOOL = {
"type": "function",
"function": {
"name": "destroy_machine",
"description": "Remove a machine from the dashboard entirely.",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Machine ID to destroy"},
},
"required": ["id"],
},
},
}
QUERY_DB_TOOL = {
"type": "function",
"function": {
"name": "query_db",
"description": """Execute a SQL query against a MariaDB database.
Returns tab-separated text. SELECT/DESCRIBE/SHOW only. Use LIMIT for large tables.
If a query errors, fix the SQL and retry. Use SHOW TABLES and DESCRIBE to explore.
Available databases:
- eras2_production: heating energy settlement (693 customers, 56K devices, German)
- plankiste_test: Kita pedagogical planning (10 activities, methods, age groups, German)
Use SHOW TABLES to discover tables. Use DESCRIBE tablename to explore columns.""",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL SELECT/DESCRIBE/SHOW query"},
"database": {"type": "string", "description": "Database name: eras2_production or plankiste_test",
"enum": ["eras2_production", "plankiste_test"]},
},
"required": ["query", "database"],
},
},
}
THINKER_TOOLS = [EMIT_ACTIONS_TOOL, SET_STATE_TOOL, EMIT_DISPLAY_TOOL,
CREATE_MACHINE_TOOL, ADD_STATE_TOOL, RESET_MACHINE_TOOL, DESTROY_MACHINE_TOOL,
QUERY_DB_TOOL]
class ThinkerNode(Node):
name = "thinker"
model = "openai/gpt-4o-mini"
max_context_tokens = 4000
SYSTEM = """You are the Thinker node — the brain of this cognitive runtime.
You receive a perception of what the user said. Decide: answer directly or use a tool.
CODE EXECUTION — write a ```python code block and it WILL be executed. Use print() for output.
- For math, databases, file ops, any computation: write python. NEVER describe code — write it.
- For simple conversation: respond directly as text.
YOUR ENVIRONMENT:
You are one node in a pipeline: Input (perceives) -> You (reason) -> Output (speaks) + Dashboard (renders).
- Your text response goes to Output, which speaks it to the user.
- You have 3 function tools for the dashboard:
1. emit_actions() — show buttons. Button clicks come back as "ACTION: action_name".
Stateful buttons: include var/op in payload (inc/dec/set/toggle). UI handles locally.
Example: label:"+1", action:"increment", payload:{"var":"count","op":"inc","initial":0}
2. set_state(key, value) — persistent key-value store shown as live labels.
Survives across turns. Use for tracking mode, progress, flags.
Example: set_state("session_mode", "building")
3. emit_display(items) — rich per-response display (status, progress, key-value).
Not persistent. Use for status reports, structured info.
Types: kv (key-value), progress (0-100 bar), status (icon+text), text (plain).
4. STATE MACHINES — persistent interactive components on the dashboard:
create_machine(id, initial) — create empty machine, then add_state for each state.
add_state(id, state, buttons, content) — add a state. Buttons with "go" transition locally.
reset_machine(id) — return machine to initial state.
destroy_machine(id) — remove machine from dashboard.
Example — navigation menu:
create_machine(id="nav", initial="main", states=[
{"name":"main","buttons":[{"label":"Menu 1","action":"menu_1","go":"sub1"},{"label":"Menu 2","action":"menu_2","go":"sub2"}],"content":["Welcome"]},
{"name":"sub1","buttons":[{"label":"Back","action":"back","go":"main"}],"content":["Sub 1 details"]},
{"name":"sub2","buttons":[{"label":"Back","action":"back","go":"main"}],"content":["Sub 2 details"]}
])
PREFER machines over emit_actions for anything with navigation or multiple views.
ALWAYS include states when creating a machine. Never write code — use the tool.
DASHBOARD FEEDBACK:
Your context includes what the user's dashboard currently shows.
- If you see a WARNING about missing or mismatched controls, call emit_actions to fix it.
- Trust the dashboard feedback over your memory.
- NEVER apologize for technical issues. Just fix them and move on naturally.
CRITICAL RULES:
- NEVER apologize. Don't say "sorry", "my apologies", "I apologize". Just fix things and move on.
- NEVER write code blocks alongside tool calls. If you call create_machine or emit_actions, your text response should describe what you did in plain language, NOT show code.
- NEVER output code (Python, JavaScript, TypeScript, or any language) for state machines, counters, or UI components. You are NOT a code assistant. Use the function tools instead.
- Keep button labels short (2-4 words), action is snake_case.
- Use set_state for anything that should persist across turns.
- Use emit_display for one-time status/info that doesn't need to persist.
{memory_context}"""
DB_HOST = "mariadb-eras" # K3s service name
DB_USER = "root"
DB_PASS = "root"
DB_NAME = "eras2_production"
def __init__(self, send_hud, process_manager: ProcessManager = None):
super().__init__(send_hud)
self.pm = process_manager
def _run_db_query(self, query: str, database: str = None) -> str:
"""Execute SQL query against MariaDB (runs in thread pool)."""
import pymysql
# Safety: only SELECT/DESCRIBE/SHOW
trimmed = query.strip().upper()
if not (trimmed.startswith("SELECT") or trimmed.startswith("DESCRIBE") or trimmed.startswith("SHOW")):
return "Error: Only SELECT/DESCRIBE/SHOW queries allowed"
db = database or self.DB_NAME
if db not in ("eras2_production", "plankiste_test"):
return f"Error: Unknown database '{db}'. Use eras2_production or plankiste_test."
conn = pymysql.connect(host=self.DB_HOST, user=self.DB_USER,
password=self.DB_PASS, database=db,
connect_timeout=5, read_timeout=15)
try:
with conn.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
if not rows:
return "(no results)"
cols = [d[0] for d in cur.description]
lines = ["\t".join(cols)]
for row in rows:
lines.append("\t".join(str(v) if v is not None else "" for v in row))
return "\n".join(lines)
finally:
conn.close()
def _parse_tool_call(self, response: str) -> tuple[str, str] | None:
"""Parse python/sql code blocks from response text for execution."""
text = response.strip()
if text.startswith("TOOL:"):
lines = text.split("\n")
tool_name = lines[0].replace("TOOL:", "").strip()
code_lines = []
in_code = False
for line in lines[1:]:
if line.strip().startswith("```") and not in_code:
in_code = True
continue
elif line.strip().startswith("```") and in_code:
break
elif in_code:
code_lines.append(line)
elif line.strip().startswith("CODE:"):
continue
return (tool_name, "\n".join(code_lines)) if code_lines else None
block_match = re.search(r'```(python|py|sql|sqlite|sh|bash|tool_code)?\s*\n(.*?)```', text, re.DOTALL)
if block_match:
lang = (block_match.group(1) or "").lower()
code = block_match.group(2).strip()
if code and len(code.split("\n")) > 0:
if lang in ("sql", "sqlite"):
wrapped = f'''import sqlite3
conn = sqlite3.connect("/tmp/assay_db.sqlite")
cursor = conn.cursor()
for stmt in """{code}""".split(";"):
stmt = stmt.strip()
if stmt:
cursor.execute(stmt)
conn.commit()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
for t in tables:
cursor.execute(f"SELECT * FROM {t[0]}")
rows = cursor.fetchall()
cols = [d[0] for d in cursor.description]
print(f"Table: {t[0]}")
print(" | ".join(cols))
for row in rows:
print(" | ".join(str(c) for c in row))
conn.close()'''
return ("python", wrapped)
return ("python", code)
return None
def _strip_code_blocks(self, response: str) -> str:
"""Remove ALL code blocks from response, return plain text."""
text = re.sub(r'```[\s\S]*?```', '', response)
return text.strip()
async def _extract_from_tool_calls(self, tool_calls: list) -> tuple[list[dict], dict, list[dict], list[dict]]:
"""Extract actions, state updates, display items, and machine ops from tool_calls."""
actions = []
state_updates = {}
display_items = []
machine_ops = []
for tc in tool_calls:
fn = tc.get("function", {})
name = fn.get("name", "")
try:
args = json.loads(fn.get("arguments", "{}"))
except (json.JSONDecodeError, Exception) as e:
log.error(f"[thinker] {name} parse error: {e}")
continue
if name == "emit_actions":
actions.extend(args.get("actions", []))
labels = [a.get("label", "?") for a in args.get("actions", [])]
await self.hud("tool_call", tool=name, input=f"buttons: {labels}")
elif name == "set_state":
key = args.get("key", "")
if key:
state_updates[key] = args.get("value")
await self.hud("tool_call", tool=name, input=f"{key} = {args.get('value')}")
elif name == "emit_display":
display_items.extend(args.get("items", []))
await self.hud("tool_call", tool=name, input=f"{len(args.get('items', []))} items")
elif name == "create_machine":
machine_ops.append({"op": "create", **args})
states = [s.get("name", "?") for s in args.get("states", [])]
await self.hud("tool_call", tool=name, input=f"id={args.get('id')} states={states}")
elif name == "add_state":
machine_ops.append({"op": "add_state", **args})
await self.hud("tool_call", tool=name, input=f"{args.get('id')}.{args.get('state')}")
elif name == "reset_machine":
machine_ops.append({"op": "reset", **args})
await self.hud("tool_call", tool=name, input=f"id={args.get('id')}")
elif name == "destroy_machine":
machine_ops.append({"op": "destroy", **args})
await self.hud("tool_call", tool=name, input=f"id={args.get('id')}")
elif name == "query_db":
query = args.get("query", "")
await self.hud("tool_call", tool=name, input=query[:120])
try:
import asyncio
db = args.get("database", "eras2_production")
output = await asyncio.to_thread(self._run_db_query, query, db)
lines = output.split("\n")
if len(lines) > 102:
output = "\n".join(lines[:102]) + f"\n... ({len(lines) - 102} more rows)"
self._db_result = output
await self.hud("tool_result", tool=name, output=output[:200], rows=max(0, len(lines) - 1))
except Exception as e:
self._db_result = f"Error: {e}"
await self.hud("tool_result", tool=name, output=str(e)[:200], rows=0)
return actions, state_updates, display_items, machine_ops
def _parse_actions_fallback(self, response: str) -> tuple[str, list[dict]]:
"""Fallback: extract ACTIONS: JSON line from response text (legacy format)."""
actions = []
lines = response.split("\n")
clean_lines = []
for line in lines:
stripped = line.strip()
if stripped.startswith("ACTIONS:"):
try:
actions = json.loads(stripped[8:].strip())
if not isinstance(actions, list):
actions = []
except (json.JSONDecodeError, Exception):
pass
else:
clean_lines.append(line)
return "\n".join(clean_lines).strip(), actions
async def process(self, command: Command, history: list[dict], memory_context: str = "") -> ThoughtResult:
await self.hud("thinking", detail="reasoning about response")
messages = [
{"role": "system", "content": self.SYSTEM.replace("{memory_context}", memory_context)},
]
for msg in history[-12:]:
messages.append(msg)
a = command.analysis
input_ctx = (
f"Input analysis:\n"
f"- Who: {a.who} | Intent: {a.intent} | Complexity: {a.complexity}\n"
f"- Topic: {a.topic} | Tone: {a.tone} | Language: {a.language}\n"
f"- Context: {a.context}\n"
f"- Original message: {command.source_text}"
)
messages.append({"role": "system", "content": input_ctx})
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
# Call with all thinker tools available
response, tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
if not response and not tool_calls:
response = "[no response from LLM]"
log.info(f"[thinker] response: {(response or '')[:200]}")
if tool_calls:
log.info(f"[thinker] tool_calls: {len(tool_calls)}")
# Extract from function calls
actions, state_updates, display_items, machine_ops = await self._extract_from_tool_calls(tool_calls)
# S3* audit: detect code-without-tools mismatch
has_code = response and "```" in response
has_any_tool = bool(actions or state_updates or display_items or machine_ops or tool_calls)
if has_code and not has_any_tool:
await self.hud("s3_audit", check="code_without_tools",
detail="Thinker wrote code but made no tool calls. Retrying.")
log.info("[thinker] S3* audit: code without tools — retrying")
messages.append({"role": "assistant", "content": response})
messages.append({"role": "system", "content": (
"S3* AUDIT CORRECTION: You wrote code instead of calling function tools. "
"This is wrong. You MUST use emit_actions, create_machine, set_state, query_db etc. "
"Convert what you intended into actual tool calls. Do NOT write code."
)})
messages = self.trim_context(messages)
response, tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
if not response and not tool_calls:
response = "[no response from LLM]"
retry_a, retry_s, retry_d, retry_m = await self._extract_from_tool_calls(tool_calls)
if retry_a:
actions = retry_a
state_updates.update(retry_s)
display_items.extend(retry_d)
machine_ops.extend(retry_m)
has_any_tool = bool(actions or state_updates or display_items or machine_ops or tool_calls)
if has_any_tool:
await self.hud("s3_audit", check="code_without_tools", detail="Retry succeeded — tools called.")
else:
await self.hud("s3_audit", check="code_without_tools", detail="Retry failed — still no tools.")
# S3* audit: intent-vs-action — did Thinker DO what was requested?
has_any_tool = bool(actions or state_updates or display_items or machine_ops
or getattr(self, '_db_result', None))
if command.analysis.intent in ("request", "action") and not has_any_tool:
await self.hud("s3_audit", check="intent_without_action",
detail=f"Intent={command.analysis.intent} topic={command.analysis.topic} but no tools called. Retrying.")
log.info(f"[thinker] S3* audit: intent without action — retrying")
messages.append({"role": "assistant", "content": response or ""})
messages.append({"role": "system", "content": (
f"S3* AUDIT CORRECTION: The user's intent was '{command.analysis.intent}' "
f"about '{command.analysis.topic}', but you only produced text without calling any tools. "
"You MUST take action — call query_db, emit_actions, create_machine, set_state, etc. "
"DO something, don't just describe what could be done."
)})
messages = self.trim_context(messages)
response, tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
if not response and not tool_calls:
response = "[no response from LLM]"
retry_a, retry_s, retry_d, retry_m = await self._extract_from_tool_calls(tool_calls)
if retry_a:
actions = retry_a
state_updates.update(retry_s)
display_items.extend(retry_d)
machine_ops.extend(retry_m)
has_any_tool = bool(actions or state_updates or display_items or machine_ops
or tool_calls or getattr(self, '_db_result', None))
if has_any_tool:
await self.hud("s3_audit", check="intent_without_action", detail="Retry succeeded — action taken.")
else:
await self.hud("s3_audit", check="intent_without_action", detail="Retry failed — still no action.")
# DB query result → second LLM call to interpret (with retry on error)
db_result = getattr(self, '_db_result', None)
if db_result is not None:
self._db_result = None
log.info(f"[thinker] db result: {db_result[:200]}")
is_error = db_result.startswith("Error:")
messages.append({"role": "assistant", "content": response or "Querying database..."})
if is_error:
messages.append({"role": "system", "content": f"Query FAILED: {db_result}\nFix the SQL and call query_db again. Table names are lowercase plural (kunden, objekte, geraete, nutzeinheit, geraeteverbraeuche)."})
else:
messages.append({"role": "system", "content": f"Database query result:\n{db_result}"})
messages.append({"role": "user", "content": "Respond based on the query result. Be concise. Present tabular data clearly."})
messages = self.trim_context(messages)
final, final_tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
if not final:
final = "[no response from LLM]"
final_actions, final_state, final_display, final_machine_ops = await self._extract_from_tool_calls(final_tool_calls)
if final_actions:
actions = final_actions
state_updates.update(final_state)
display_items.extend(final_display)
machine_ops.extend(final_machine_ops)
# If retry produced a new DB result, do one more interpret call
retry_result = getattr(self, '_db_result', None)
if retry_result is not None:
self._db_result = None
log.info(f"[thinker] db retry result: {retry_result[:200]}")
messages.append({"role": "assistant", "content": final or "Retrying..."})
messages.append({"role": "system", "content": f"Database query result:\n{retry_result}"})
messages.append({"role": "user", "content": "Respond based on the query result. Be concise. Present tabular data clearly."})
messages = self.trim_context(messages)
final, final_tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
if not final:
final = "[no response from LLM]"
r_actions, r_state, r_display, r_machine = await self._extract_from_tool_calls(final_tool_calls)
if r_actions:
actions = r_actions
state_updates.update(r_state)
display_items.extend(r_display)
machine_ops.extend(r_machine)
db_result = retry_result
clean_text = self._strip_code_blocks(final)
await self.hud("decided", instruction=clean_text[:200])
return ThoughtResult(response=clean_text, tool_used="query_db",
tool_output=db_result, actions=actions,
state_updates=state_updates, display_items=display_items,
machine_ops=machine_ops)
# Fallback: check for legacy ACTIONS: line in text
if not actions and response:
response, actions = self._parse_actions_fallback(response)
# Check for python/sql code execution in text
code_call = self._parse_tool_call(response) if response else None
if code_call:
tool_name, code = code_call
if self.pm and tool_name == "python":
proc = await self.pm.execute(tool_name, code)
tool_output = "\n".join(proc.output_lines)
else:
tool_output = f"[unknown tool: {tool_name}]"
log.info(f"[thinker] tool output: {tool_output[:200]}")
# Second call: interpret tool output
messages.append({"role": "assistant", "content": response})
messages.append({"role": "system", "content": f"Tool output:\n{tool_output}"})
messages.append({"role": "user", "content": "Respond to the user based on the tool output. Be natural and concise. Use tools if needed."})
messages = self.trim_context(messages)
final, final_tool_calls = await llm_call(self.model, messages, tools=THINKER_TOOLS)
if not final:
final = "[no response from LLM]"
# Merge from second call
final_actions, final_state, final_display, final_machine_ops = await self._extract_from_tool_calls(final_tool_calls)
if final_actions:
actions = final_actions
state_updates.update(final_state)
display_items.extend(final_display)
machine_ops.extend(final_machine_ops)
clean_text = self._strip_code_blocks(final)
if not actions:
clean_text, actions = self._parse_actions_fallback(clean_text)
if actions:
log.info(f"[thinker] actions: {actions}")
await self.hud("decided", instruction=clean_text[:200])
return ThoughtResult(response=clean_text, tool_used=tool_name,
tool_output=tool_output, actions=actions,
state_updates=state_updates, display_items=display_items,
machine_ops=machine_ops)
clean_text = (self._strip_code_blocks(response) or response) if response else ""
if actions:
log.info(f"[thinker] actions: {actions}")
await self.hud("decided", instruction="direct response (no tools)")
return ThoughtResult(response=clean_text, actions=actions,
state_updates=state_updates, display_items=display_items,
machine_ops=machine_ops)