agent-runtime/agent/mcp_server.py
Nico 5f447dfd53 v0.14.0: v2 Director-drives architecture + 3-pod K8s split
Architecture:
- director_v2: always-on brain, produces DirectorPlan with tool_sequence
- thinker_v2: pure executor, runs tools from DirectorPlan
- interpreter_v1: factual result summarizer, no hallucination
- v2_director_drives graph: Input -> Director -> Thinker -> Output

Infrastructure:
- Split into 3 pods: cog-frontend (nginx), cog-runtime (FastAPI), cog-mcp (SSE proxy)
- MCP survives runtime restarts (separate pod, proxies via HTTP)
- Async send pipeline: /api/send/check -> /api/send -> /api/result with progress
- Zero-downtime rolling updates (maxUnavailable: 0)
- Dynamic graph visualization (fetched from API, not hardcoded)

Tests: 22 new mocked unit tests (director_v2: 7, thinker_v2: 8, interpreter_v1: 7)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 04:17:44 +02:00

169 lines
7.5 KiB
Python

"""MCP server for cog — exposes runtime tools to any MCP client."""
import json
import logging
from pathlib import Path
from mcp.server import Server
from mcp.server.sse import SseServerTransport # re-exported for __init__.py
from mcp.types import TextContent, Tool
log = logging.getLogger("mcp")
TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl"
server = Server("cog")
# Reference to active runtime — set by api.py when WS connects
_get_runtime = lambda: None
def set_runtime_getter(fn):
global _get_runtime
_get_runtime = fn
@server.list_tools()
async def list_tools():
return [
Tool(name="cog_send", description="Send a message to the cognitive agent and get a response.",
inputSchema={"type": "object", "properties": {
"text": {"type": "string", "description": "Message text to send"},
"database": {"type": "string", "description": "Optional: database name for query_db context"},
}, "required": ["text"]}),
Tool(name="cog_trace", description="Get recent trace events from the pipeline (HUD events, tool calls, audit).",
inputSchema={"type": "object", "properties": {
"last": {"type": "integer", "description": "Number of recent events (default 20)", "default": 20},
"filter": {"type": "string", "description": "Comma-separated event types to filter (e.g. 'tool_call,controls')"},
}}),
Tool(name="cog_history", description="Get recent chat messages from the active session.",
inputSchema={"type": "object", "properties": {
"last": {"type": "integer", "description": "Number of recent messages (default 20)", "default": 20},
}}),
Tool(name="cog_state", description="Get the current memorizer state (mood, topic, language, facts).",
inputSchema={"type": "object", "properties": {}}),
Tool(name="cog_clear", description="Clear the active session (history, state, controls).",
inputSchema={"type": "object", "properties": {}}),
Tool(name="cog_graph", description="Get the active graph definition (nodes, edges, description).",
inputSchema={"type": "object", "properties": {}}),
Tool(name="cog_graph_list", description="List all available graph definitions.",
inputSchema={"type": "object", "properties": {}}),
Tool(name="cog_graph_switch", description="Switch the active graph for new sessions.",
inputSchema={"type": "object", "properties": {
"name": {"type": "string", "description": "Graph name to switch to"},
}, "required": ["name"]}),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
runtime = _get_runtime()
if name == "cog_send":
if not runtime:
return [TextContent(type="text", text="ERROR: No active session — someone must be connected via WebSocket first.")]
text = arguments.get("text", "").strip()
if not text:
return [TextContent(type="text", text="ERROR: Missing 'text' argument.")]
await runtime.handle_message(text)
response = runtime.history[-1]["content"] if runtime.history else "(no response)"
return [TextContent(type="text", text=response)]
elif name == "cog_trace":
last = arguments.get("last", 20)
filt = arguments.get("filter", "").split(",") if arguments.get("filter") else None
if not TRACE_FILE.exists():
return [TextContent(type="text", text="(no trace events)")]
lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n")
parsed = []
for line in lines[-last:]:
try:
parsed.append(json.loads(line))
except json.JSONDecodeError:
continue
if filt:
parsed = [t for t in parsed if t.get("event", "") in filt]
# Format for readability
out = []
for t in parsed:
event = t.get("event", "")
node = t.get("node", "")
if event == "tool_call":
out.append(f"CALL: {t.get('tool')} -> {str(t.get('input', ''))[:150]}")
elif event == "tool_result":
out.append(f"RESULT: {t.get('tool')} ({t.get('rows', '?')} rows) -> {str(t.get('output', ''))[:150]}")
elif event == "controls":
ctrls = t.get("controls", [])
types = {}
for c in ctrls:
types[c.get("type", "?")] = types.get(c.get("type", "?"), 0) + 1
out.append(f"CONTROLS: {types}")
elif event == "s3_audit":
out.append(f"S3*: {t.get('check', '')}{t.get('detail', '')}")
elif event == "director_plan":
out.append(f"PLAN: {t.get('goal', '')} [{len(t.get('steps', []))} steps]")
else:
detail = t.get("instruction", t.get("detail", t.get("id", "")))
out.append(f"{node:12} {event:20} {str(detail)[:120]}")
return [TextContent(type="text", text="\n".join(out) if out else "(no matching events)")]
elif name == "cog_history":
if not runtime:
return [TextContent(type="text", text="(no active session)")]
last = arguments.get("last", 20)
msgs = runtime.history[-last:]
out = []
for m in msgs:
out.append(f"--- {m['role']} ---")
out.append(m["content"][:400])
out.append("")
return [TextContent(type="text", text="\n".join(out) if out else "(no messages)")]
elif name == "cog_state":
if not runtime:
return [TextContent(type="text", text="(no active session)")]
return [TextContent(type="text", text=json.dumps(runtime.memorizer.state, indent=2, ensure_ascii=False))]
elif name == "cog_clear":
if not runtime:
return [TextContent(type="text", text="ERROR: No active session.")]
runtime.history.clear()
runtime.ui_node.state.clear()
runtime.ui_node.bindings.clear()
runtime.ui_node.current_controls.clear()
runtime.ui_node.machines.clear()
return [TextContent(type="text", text="Session cleared.")]
elif name == "cog_graph":
from .engine import load_graph, get_graph_for_cytoscape
from .runtime import _active_graph_name
graph = load_graph(_active_graph_name)
return [TextContent(type="text", text=json.dumps({
"name": graph["name"],
"description": graph["description"],
"nodes": graph["nodes"],
"edges": graph["edges"],
"conditions": graph.get("conditions", {}),
"audit": graph.get("audit", {}),
}, indent=2))]
elif name == "cog_graph_list":
from .engine import list_graphs
return [TextContent(type="text", text=json.dumps(list_graphs(), indent=2))]
elif name == "cog_graph_switch":
from .engine import load_graph
import agent.runtime as rt
gname = arguments.get("name", "")
if not gname:
return [TextContent(type="text", text="ERROR: Missing 'name' argument.")]
try:
graph = load_graph(gname)
except Exception as e:
return [TextContent(type="text", text=f"ERROR: {e}")]
rt._active_graph_name = gname
return [TextContent(type="text", text=f"Switched to graph '{graph['name']}'. New sessions will use this graph.")]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]