From 20363a1f2fd8ac778c8af74f1137470954d41535 Mon Sep 17 00:00:00 2001 From: Nico Date: Sat, 28 Mar 2026 01:16:26 +0100 Subject: [PATCH] v0.7.2: UI controls + ProcessManager + Thinker upgrade (WIP) - ProcessManager: observable tool execution with start/stop/status - UI controls protocol: buttons, tables, process cards - Frontend renders controls in chat, clicks route back as actions - Thinker upgraded to gemini-2.5-flash-preview - Auto-detect SQL/python/tool_code blocks for execution - SQL blocks auto-wrapped in Python sqlite3 script - WIP: tool execution path needs tuning, controls not yet triggered Co-Authored-By: Claude Opus 4.6 (1M context) --- agent.py | 272 ++++++++++++++++++++++++++++++++++++----------- static/app.js | 75 +++++++++++++ static/style.css | 17 +++ test_cog.py | 2 +- 4 files changed, 305 insertions(+), 61 deletions(-) diff --git a/agent.py b/agent.py index 943d4aa..f415bc6 100644 --- a/agent.py +++ b/agent.py @@ -159,6 +159,108 @@ class ThoughtResult: response: str # what to tell the user (direct or post-tool) tool_used: str = "" # which tool was called (empty if none) tool_output: str = "" # raw tool output + controls: list = field(default_factory=list) # UI controls to render + + +# --- Process Manager (observable tool execution) --- + +class Process: + """A single observable tool execution.""" + _next_id = 0 + + def __init__(self, tool: str, code: str, send_hud): + Process._next_id += 1 + self.pid = Process._next_id + self.tool = tool + self.code = code + self.send_hud = send_hud + self.status = "pending" # pending, running, done, failed, cancelled + self.output_lines: list[str] = [] + self.exit_code: int | None = None + self.started_at: float = 0 + self.ended_at: float = 0 + self._subprocess: subprocess.Popen | None = None + + async def hud(self, event: str, **data): + await self.send_hud({"node": "process", "event": event, "pid": self.pid, + "tool": self.tool, "status": self.status, **data}) + + def run_sync(self) -> str: + """Execute the tool synchronously. Returns output.""" + self.status = "running" + self.started_at = time.time() + try: + with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as f: + f.write(self.code) + f.flush() + self._subprocess = subprocess.Popen( + ['python3', f.name], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + text=True, cwd=tempfile.gettempdir() + ) + stdout, stderr = self._subprocess.communicate(timeout=10) + self.exit_code = self._subprocess.returncode + if stdout: + self.output_lines.extend(stdout.strip().split("\n")) + if self.exit_code != 0 and stderr: + self.output_lines.append(f"[stderr: {stderr.strip()}]") + self.status = "done" if self.exit_code == 0 else "failed" + except subprocess.TimeoutExpired: + if self._subprocess: + self._subprocess.kill() + self.output_lines.append("[error: timed out after 10s]") + self.status = "failed" + self.exit_code = -1 + except Exception as e: + self.output_lines.append(f"[error: {e}]") + self.status = "failed" + self.exit_code = -1 + finally: + self.ended_at = time.time() + return "\n".join(self.output_lines) or "[no output]" + + def cancel(self): + if self._subprocess and self.status == "running": + self._subprocess.kill() + self.status = "cancelled" + self.ended_at = time.time() + self.output_lines.append("[cancelled by user]") + + +class ProcessManager: + """Manages all tool executions as observable processes.""" + + def __init__(self, send_hud): + self.send_hud = send_hud + self.processes: dict[int, Process] = {} + + async def execute(self, tool: str, code: str) -> Process: + """Create and run a process. Returns the completed Process.""" + proc = Process(tool, code, self.send_hud) + self.processes[proc.pid] = proc + + await proc.hud("process_start", code=code[:200]) + + # Run in executor to avoid blocking the event loop + loop = asyncio.get_event_loop() + output = await loop.run_in_executor(None, proc.run_sync) + + elapsed = round(proc.ended_at - proc.started_at, 2) + await proc.hud("process_done", exit_code=proc.exit_code, + output=output[:500], elapsed=elapsed) + return proc + + def cancel(self, pid: int) -> bool: + proc = self.processes.get(pid) + if proc: + proc.cancel() + return True + return False + + def get_status(self) -> list[dict]: + return [{"pid": p.pid, "tool": p.tool, "status": p.status, + "elapsed": round((p.ended_at or time.time()) - p.started_at, 2) if p.started_at else 0} + for p in self.processes.values()] # --- Base Node --- @@ -479,24 +581,27 @@ import tempfile class ThinkerNode(Node): name = "thinker" - model = "google/gemini-2.0-flash-001" + model = "google/gemini-2.5-flash-preview" max_context_tokens = 4000 SYSTEM = """You are the Thinker node — the brain of this cognitive runtime. -You receive a perception of what the user said. Decide: answer directly or use a tool. +You receive a perception of what the user said. Decide: answer directly, use a tool, or show UI controls. -TOOL FORMAT — when you need to compute, query, or create something, respond with ONLY: -TOOL: python -CODE: -``` -print("result here") -``` +TOOLS — write a ```python code block and it WILL be executed. Use print() for output. +- For math, databases, file ops, any computation: write python. NEVER describe code — write it. +- For simple conversation: respond directly as text. -RULES: -- For math, databases, file ops, any computation: write a ```python code block. It WILL be executed. -- For simple conversation (greetings, opinions, knowledge): respond directly as text. -- Your python code runs in a real environment. Use print() for output. -- NEVER describe code — write it. It will run automatically. +UI CONTROLS — to show interactive elements, include a JSON block: +```controls +[ + {{"type": "table", "data": [...], "columns": ["id", "name", "email"]}}, + {{"type": "button", "label": "Add Customer", "action": "add_customer"}}, + {{"type": "button", "label": "Refresh", "action": "refresh_customers"}} +] +``` +Controls render in the chat. User clicks flow back as actions you can handle. + +You can combine text + code + controls in one response. {memory_context}""" @@ -523,23 +628,64 @@ RULES: continue return (tool_name, "\n".join(code_lines)) if code_lines else None - # Auto-detect: if response is mostly a python code block, execute it - if "```python" in text or "```py" in text: - code_lines = [] - in_code = False - for line in text.split("\n"): - if ("```python" in line or "```py" in line) and not in_code: - in_code = True - continue - elif line.strip() == "```" and in_code: - break - elif in_code: - code_lines.append(line) - if code_lines and len(code_lines) > 1: - return ("python", "\n".join(code_lines)) + # Auto-detect: code blocks get executed as python + # Catches ```python, ```py, ```sql, ```sqlite, or bare ``` with code-like content + import re + block_match = re.search(r'```(?:python|py|sql|sqlite|sh|bash|tool_code)?\s*\n(.*?)```', text, re.DOTALL) + if block_match: + code = block_match.group(1).strip() + if code and len(code.split("\n")) > 0: + # If it's SQL, wrap it in a python sqlite3 script + if "```sql" in text or "```sqlite" in text or ("SELECT" in code.upper() and "CREATE" in code.upper()): + wrapped = f'''import sqlite3 +conn = sqlite3.connect("/tmp/cog_db.sqlite") +cursor = conn.cursor() +for stmt in """{code}""".split(";"): + stmt = stmt.strip() + if stmt: + cursor.execute(stmt) +conn.commit() +cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") +tables = cursor.fetchall() +for t in tables: + cursor.execute(f"SELECT * FROM {{t[0]}}") + rows = cursor.fetchall() + cols = [d[0] for d in cursor.description] + print(f"Table: {{t[0]}}") + print(" | ".join(cols)) + for row in rows: + print(" | ".join(str(c) for c in row)) +conn.close()''' + return ("python", wrapped) + return ("python", code) return None + def __init__(self, send_hud, process_manager: ProcessManager = None): + super().__init__(send_hud) + self.pm = process_manager + + def _parse_controls(self, response: str) -> list[dict]: + """Extract ```controls JSON blocks from response.""" + controls = [] + if "```controls" not in response: + return controls + parts = response.split("```controls") + for part in parts[1:]: + end = part.find("```") + if end != -1: + try: + controls.extend(json.loads(part[:end].strip())) + except json.JSONDecodeError: + pass + return controls + + def _strip_blocks(self, response: str) -> str: + """Remove code and control blocks, return plain text.""" + import re + text = re.sub(r'```(?:python|py|controls).*?```', '', response, flags=re.DOTALL) + return text.strip() + async def process(self, command: Command, history: list[dict], memory_context: str = "") -> ThoughtResult: await self.hud("thinking", detail="reasoning about response") @@ -557,54 +703,46 @@ RULES: response = await llm_call(self.model, messages) log.info(f"[thinker] response: {response[:200]}") + # Parse UI controls + controls = self._parse_controls(response) + if controls: + await self.hud("controls", controls=controls) + # Check if Thinker wants to use a tool tool_call = self._parse_tool_call(response) if tool_call: tool_name, code = tool_call - await self.hud("tool_call", tool=tool_name, code=code[:200]) - log.info(f"[thinker] calling tool: {tool_name}") - if tool_name == "python": - loop = asyncio.get_event_loop() - tool_output = await loop.run_in_executor(None, self._run_python_sync, code) + if self.pm and tool_name == "python": + proc = await self.pm.execute(tool_name, code) + tool_output = "\n".join(proc.output_lines) else: tool_output = f"[unknown tool: {tool_name}]" - await self.hud("tool_result", tool=tool_name, output=tool_output[:500]) log.info(f"[thinker] tool output: {tool_output[:200]}") - # Second LLM call: interpret tool output + # Second LLM call: interpret tool output + optionally add controls messages.append({"role": "assistant", "content": response}) messages.append({"role": "system", "content": f"Tool output:\n{tool_output}"}) - messages.append({"role": "user", "content": "Now respond to the user based on the tool output. Be natural and concise."}) + messages.append({"role": "user", "content": "Respond to the user based on the tool output. If showing data, include a ```controls block with a table. Be natural and concise."}) messages = self.trim_context(messages) final = await llm_call(self.model, messages) - await self.hud("decided", instruction=final[:200]) - return ThoughtResult(response=final, tool_used=tool_name, tool_output=tool_output) + + # Parse controls from the follow-up too + more_controls = self._parse_controls(final) + if more_controls: + controls.extend(more_controls) + await self.hud("controls", controls=more_controls) + + clean_text = self._strip_blocks(final) + await self.hud("decided", instruction=clean_text[:200]) + return ThoughtResult(response=clean_text, tool_used=tool_name, + tool_output=tool_output, controls=controls) # No tool needed — pass through + clean_text = self._strip_blocks(response) or response await self.hud("decided", instruction="direct response (no tools)") - return ThoughtResult(response=response) - - def _run_python_sync(self, code: str) -> str: - """Sync wrapper for subprocess execution.""" - try: - with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as f: - f.write(code) - f.flush() - result = subprocess.run( - ['python3', f.name], - capture_output=True, text=True, timeout=10, - cwd=tempfile.gettempdir() - ) - output = result.stdout - if result.returncode != 0: - output += f"\n[stderr: {result.stderr.strip()}]" - return output.strip() or "[no output]" - except subprocess.TimeoutExpired: - return "[error: execution timed out after 10s]" - except Exception as e: - return f"[error: {e}]" + return ThoughtResult(response=clean_text, controls=controls) # --- Memorizer Node (S2 — shared state / coordination) --- @@ -716,7 +854,8 @@ class Runtime: self.history: list[dict] = [] self.MAX_HISTORY = 40 # sliding window — oldest messages drop off self.input_node = InputNode(send_hud=self._send_hud) - self.thinker = ThinkerNode(send_hud=self._send_hud) + self.process_manager = ProcessManager(send_hud=self._send_hud) + self.thinker = ThinkerNode(send_hud=self._send_hud, process_manager=self.process_manager) self.output_node = OutputNode(send_hud=self._send_hud) self.memorizer = MemorizerNode(send_hud=self._send_hud) self.sensor = SensorNode(send_hud=self._send_hud) @@ -792,6 +931,10 @@ class Runtime: response = await self.output_node.process(command, self.history, self.ws, memory_context=mem_ctx) self.history.append({"role": "assistant", "content": response}) + # Send UI controls if Thinker produced any + if thought.controls: + await self.ws.send_text(json.dumps({"type": "controls", "controls": thought.controls})) + # Memorizer updates shared state after each exchange await self.memorizer.update(self.history) @@ -856,7 +999,16 @@ async def ws_endpoint(ws: WebSocket, token: str | None = Query(None), access_tok while True: data = await ws.receive_text() msg = json.loads(data) - await runtime.handle_message(msg["text"]) + if msg.get("type") == "action": + # User clicked a UI control + action_text = f"[user clicked: {msg.get('action', 'unknown')}]" + if msg.get("data"): + action_text += f" data: {json.dumps(msg['data'])}" + await runtime.handle_message(action_text) + elif msg.get("type") == "cancel_process": + runtime.process_manager.cancel(msg.get("pid", 0)) + else: + await runtime.handle_message(msg.get("text", "")) except WebSocketDisconnect: runtime.sensor.stop() if _active_runtime is runtime: diff --git a/static/app.js b/static/app.js index 2e6125d..781d2e2 100644 --- a/static/app.js +++ b/static/app.js @@ -129,6 +129,9 @@ function connect() { } else if (data.type === 'done') { if (currentEl) currentEl.classList.remove('streaming'); currentEl = null; + + } else if (data.type === 'controls') { + renderControls(data.controls); } }; } @@ -235,6 +238,78 @@ function addTrace(node, event, text, cls, detail) { scroll(traceEl); } +function renderControls(controls) { + const container = document.createElement('div'); + container.className = 'controls-container'; + + for (const ctrl of controls) { + if (ctrl.type === 'button') { + const btn = document.createElement('button'); + btn.className = 'control-btn'; + btn.textContent = ctrl.label; + btn.onclick = () => { + if (ws && ws.readyState === 1) { + ws.send(JSON.stringify({ type: 'action', action: ctrl.action, data: ctrl.data || {} })); + addTrace('runtime', 'action', ctrl.action); + } + }; + container.appendChild(btn); + + } else if (ctrl.type === 'table') { + const table = document.createElement('table'); + table.className = 'control-table'; + // Header + if (ctrl.columns) { + const thead = document.createElement('tr'); + for (const col of ctrl.columns) { + const th = document.createElement('th'); + th.textContent = col; + thead.appendChild(th); + } + table.appendChild(thead); + } + // Rows + for (const row of (ctrl.data || [])) { + const tr = document.createElement('tr'); + if (Array.isArray(row)) { + for (const cell of row) { + const td = document.createElement('td'); + td.textContent = cell; + tr.appendChild(td); + } + } else if (typeof row === 'object') { + for (const col of (ctrl.columns || Object.keys(row))) { + const td = document.createElement('td'); + td.textContent = row[col] ?? ''; + tr.appendChild(td); + } + } + table.appendChild(tr); + } + container.appendChild(table); + + } else if (ctrl.type === 'process') { + const card = document.createElement('div'); + card.className = 'process-card ' + (ctrl.status || 'running'); + card.innerHTML = + '' + esc(ctrl.tool || 'python') + '' + + '' + esc(ctrl.status || 'running') + '' + + (ctrl.status === 'running' ? '' : '') + + '
' + esc(ctrl.output || '') + '
'; + container.appendChild(card); + } + } + + msgs.appendChild(container); + scroll(msgs); +} + +function cancelProcess(pid) { + if (ws && ws.readyState === 1) { + ws.send(JSON.stringify({ type: 'cancel_process', pid })); + } +} + function updateMeter(node, tokens, maxTokens, fillPct) { const meter = document.getElementById('meter-' + node); if (!meter) return; diff --git a/static/style.css b/static/style.css index 28c15c3..be07364 100644 --- a/static/style.css +++ b/static/style.css @@ -64,6 +64,23 @@ button:hover { background: #1d4ed8; } .trace-data.state { color: #c084fc; } .trace-data.context { color: #666; } +/* UI Controls */ +.controls-container { padding: 0.4rem 0; display: flex; flex-wrap: wrap; gap: 0.4rem; align-items: flex-start; } +.control-btn { padding: 0.35rem 0.75rem; background: #1e3a5f; color: #60a5fa; border: 1px solid #2563eb; border-radius: 0.3rem; cursor: pointer; font-size: 0.8rem; } +.control-btn:hover { background: #2563eb; color: white; } +.control-table { width: 100%; border-collapse: collapse; font-size: 0.8rem; background: #111; border-radius: 0.3rem; overflow: hidden; } +.control-table th { background: #1a1a2e; color: #a78bfa; padding: 0.3rem 0.5rem; text-align: left; font-weight: 600; border-bottom: 1px solid #333; } +.control-table td { padding: 0.25rem 0.5rem; border-bottom: 1px solid #1a1a1a; color: #ccc; } +.control-table tr:hover td { background: #1a1a2e; } +.process-card { background: #111; border: 1px solid #333; border-radius: 0.3rem; padding: 0.4rem 0.6rem; font-size: 0.75rem; width: 100%; } +.process-card.running { border-color: #f59e0b; } +.process-card.done { border-color: #22c55e; } +.process-card.failed { border-color: #ef4444; } +.pc-tool { font-weight: 700; color: #fb923c; margin-right: 0.5rem; } +.pc-status { color: #888; margin-right: 0.5rem; } +.pc-stop { padding: 0.15rem 0.4rem; background: #ef4444; color: white; border: none; border-radius: 0.2rem; cursor: pointer; font-size: 0.7rem; } +.pc-output { margin-top: 0.3rem; color: #888; white-space: pre-wrap; max-height: 8rem; overflow-y: auto; } + /* Expandable trace detail */ .trace-line.expandable { cursor: pointer; } .trace-detail { display: none; padding: 0.3rem 0.4rem 0.3rem 12rem; font-size: 0.65rem; color: #777; white-space: pre-wrap; word-break: break-all; max-height: 10rem; overflow-y: auto; background: #0d0d14; border-bottom: 1px solid #1a1a2e; } diff --git a/test_cog.py b/test_cog.py index 2627653..2374cc8 100644 --- a/test_cog.py +++ b/test_cog.py @@ -16,7 +16,7 @@ def clear(): tests = [ ("hello!", None), ("what is 42 * 137?", None), - ("create a sqlite db with 5 customers and show them", None), + ("create a sqlite db with 5 customers and show them in a table", None), ("wie spaet ist es?", None), ]