深入解析 Nanobot-Agent 核心引擎:AgentLoop 代码详解
嘿,重阳!纽约的3月周末(2026年3月7日晚9:35,凉风习习,适合深挖代码~),Nanobot-Agent 是 HKUDS 团队开源的一个超轻量级个人 AI 助理项目(GitHub 仓库:https://github.com/HKUDS/nanobot),灵感来源于 OpenClaw,核心目标是提供高效、可扩展的 Agent 功能,仅用 ~4,000 行代码实现多 LLM 支持、多聊天平台集成和工具调用。它的“心脏”就是 agent/loop.py 中的 AgentLoop 类——这是一个异步事件驱动的处理引擎,负责消息接收、上下文构建、LLM 调用、工具执行和响应发送。今天咱们来一场“硬核”代码详解:从整体结构到逐行分析,再到运行原理和优化建议。全基于项目最新代码(main 分支),用伪代码和表格辅助,确保你看完就能上手修改或扩展。走起!🚀
1. Nanobot-Agent 项目简介:为什么 AgentLoop 是核心?
Nanobot 是一个“极简主义” AI Agent 框架,聚焦于“轻量 + 可研究性”:
- 核心功能:支持 11+ LLM 提供商(如 OpenAI、Anthropic)、8+ 聊天平台(如 Telegram、Discord)、内置工具(文件操作、Web 搜索、Shell 执行等)、子 Agent 生成和定时任务。
- 设计哲学:99% 比 OpenClaw 更小巧(核心 ~3,500 行),一配置文件启动,适合研究者和开发者快速原型。
- AgentLoop 的角色:作为“核心处理引擎”(代码注释:”Agent loop: the core processing engine.”),它管理整个 Agent 的生命周期——从接收消息到迭代执行,直到任务完成。它整合了上下文、内存、工具和子 Agent,确保异步高效运行,支持高并发(如多会话)。
项目架构简览(从 README 和代码推断):
agent/loop.py:主循环引擎。agent/context.py:提示构建。agent/memory.py:持久内存。agent/tools/:工具注册和执行。agent/subagent.py:子 Agent 管理。- 整体:事件总线(MessageBus)驱动,异步(asyncio)实现。
如果你是 Java 开发者,这类似于一个异步事件循环(如 Reactor 模式),但 Python 风格更简洁。
2. AgentLoop 代码整体结构:模块化 + 异步设计
代码位于 nanobot/agent/loop.py,约 500 行(不含导入),是一个类定义。结构清晰:初始化 + 默认工具注册 + MCP 连接 + 主循环 + 消息处理 + 关闭逻辑。
代码全文(从 GitHub raw 提取,完整无删减):
"""Agent loop: the core processing engine."""
from __future__ import annotations
import asyncio
import json
import re
import weakref
from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger
from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session, SessionManager
if TYPE_CHECKING:
from nanobot.config.schema import ChannelsConfig, ExecToolConfig
from nanobot.cron.service import CronService
class AgentLoop:
"""
The agent loop is the core processing engine.
It:
1. Receives messages from the bus
2. Builds context with history, memory, skills
3. Calls the LLM
4. Executes tool calls
5. Sends responses back
"""
_TOOL_RESULT_MAX_CHARS = 500
def __init__(
self,
bus: MessageBus,
provider: LLMProvider,
workspace: Path,
model: str | None = None,
max_iterations: int = 40,
temperature: float = 0.1,
max_tokens: int = 4096,
memory_window: int = 100,
reasoning_effort: str | None = None,
brave_api_key: str | None = None,
web_proxy: str | None = None,
exec_config: ExecToolConfig | None = None,
cron_service: CronService | None = None,
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
mcp_servers: dict | None = None,
channels_config: ChannelsConfig | None = None,
):
from nanobot.config.schema import ExecToolConfig
self.bus = bus
self.channels_config = channels_config
self.provider = provider
self.workspace = workspace
self.model = model or provider.get_default_model()
self.max_iterations = max_iterations
self.temperature = temperature
self.max_tokens = max_tokens
self.memory_window = memory_window
self.reasoning_effort = reasoning_effort
self.brave_api_key = brave_api_key
self.web_proxy = web_proxy
self.exec_config = exec_config or ExecToolConfig()
self.cron_service = cron_service
self.restrict_to_workspace = restrict_to_workspace
self.context = ContextBuilder(workspace)
self.sessions = session_manager or SessionManager(workspace)
self.tools = ToolRegistry()
self.subagents = SubagentManager(
provider=provider,
workspace=workspace,
bus=bus,
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens,
reasoning_effort=reasoning_effort,
brave_api_key=brave_api_key,
web_proxy=web_proxy,
exec_config=self.exec_config,
restrict_to_workspace=restrict_to_workspace,
)
self._running = False
self._mcp_servers = mcp_servers or {}
self._mcp_stack: AsyncExitStack | None = None
self._mcp_connected = False
self._mcp_connecting = False
self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks
self._consolidation_locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary()
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
self._processing_lock = asyncio.Lock()
self._register_default_tools()
def _register_default_tools(self) -> None:
"""Register the default set of tools."""
allowed_dir = self.workspace if self.restrict_to_workspace else None
for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool):
self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(ExecTool(
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace,
path_append=self.exec_config.path_append,
))
self.tools.register(WebSearchTool(api_key=self.brave_api_key, proxy=self.web_proxy))
self.tools.register(WebFetchTool(proxy=self.web_proxy))
self.tools.register(MessageTool(send_callback=self.bus.publish_outbound))
self.tools.register(SpawnTool(manager=self.subagents))
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy)."""
if self._mcp_connected or self._mcp_connecting or not self._mcp_servers:
return
self._mcp_connecting = True
from nanobot.agent.tools.mcp import connect_mcp_servers
try:
self._mcp_stack = AsyncExitStack()
await self._mcp_stack.__aenter__()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
self._mcp_connected = True
except Exception as e:
logger.error("Failed to connect MCP servers (will retry next message): {}", e)
if self._mcp_stack:
try:
await self._mcp_stack.aclose()
except Exception:
pass
self._mcp_stack = None
finally:
self._mcp_connecting = False
def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Update context for all tools that need routing info."""
for name in ("message", "spawn", "cron"):
if tool := self.tools.get(name):
if hasattr(tool, "set_context"):
tool.set_context(channel, chat_id, *([message_id] if name == "message" else []))
@staticmethod
def _strip_think(text: str | None) -> str | None:
"""Remove <think>…</think> blocks that some models embed in content."""
if not text:
return None
return re.sub(r"<think>[\s\S]*?</think>", "", text).strip() or None
@staticmethod
def _tool_hint(tool_calls: list) -> str:
"""Format tool calls as concise hint, e.g. 'web_search("query")'."""
def _fmt(tc):
args = (tc.arguments[0] if isinstance(tc.arguments, list) else tc.arguments) or {}
val = next(iter(args.values()), None) if isinstance(args, dict) else None
if not isinstance(val, str):
return tc.name
return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)
async def _run_agent_loop(
self,
initial_messages: list[dict],
on_progress: Callable[..., Awaitable[None]] | None = None,
) -> tuple[str | None, list[str], list[dict]]:
"""Run the agent iteration loop. Returns (final_content, tools_used, messages)."""
messages = initial_messages
iteration = 0
final_content = None
tools_used: list[str] = []
while iteration < self.max_iterations:
iteration += 1
response = await self.provider.chat(
messages=messages,
tools=self.tools.get_definitions(),
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens,
reasoning_effort=self.reasoning_effort,
)
if response.has_tool_calls:
if on_progress:
thoughts = [
self._strip_think(response.content),
response.reasoning_content,
*(
f"Thinking [{b.get('signature', '...')}]:\n{b.get('thought', '...')}"
for b in (response.thinking_blocks or [])
if isinstance(b, dict) and "signature" in b
),
]
combined_thoughts = "\n\n".join(filter(None, thoughts))
if combined_thoughts:
await on_progress(combined_thoughts)
await on_progress(self._tool_hint(response.tool_calls), tool_hint=True)
tool_call_dicts = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments, ensure_ascii=False)
}
}
for tc in response.tool_calls
]
messages = self.context.add_assistant_message(
messages, response.content, tool_call_dicts,
reasoning_content=response.reasoning_content,
thinking_blocks=response.thinking_blocks,
)
for tool_call in response.tool_calls:
tools_used.append(tool_call.name)
args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.info("Tool call: {}({})", tool_call.name, args_str[:200])
result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
else:
clean = self._strip_think(response.content)
# Don't persist error responses to session history — they can
# poison the context and cause permanent 400 loops (#1303).
if response.finish_reason == "error":
logger.error("LLM returned error: {}", (clean or "")[:200])
final_content = clean or "Sorry, I encountered an error calling the AI model."
break
messages = self.context.add_assistant_message(
messages, clean, reasoning_content=response.reasoning_content,
thinking_blocks=response.thinking_blocks,
)
final_content = clean
break
if final_content is None and iteration >= self.max_iterations:
logger.warning("Max iterations ({}) reached", self.max_iterations)
final_content = (
f"I reached the maximum number of tool call iterations ({self.max_iterations}) "
"without completing the task. You can try breaking the task into smaller steps."
)
return final_content, tools_used, messages
async def run(self) -> None:
"""Run the agent loop, dispatching messages as tasks to stay responsive to /stop."""
self._running = True
await self._connect_mcp()
logger.info("Agent loop started")
while self._running:
try:
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
except asyncio.TimeoutError:
continue
if msg.content.strip().lower() == "/stop":
await self._handle_stop(msg)
else:
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task)
task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)
async def _handle_stop(self, msg: InboundMessage) -> None:
"""Cancel all active tasks and subagents for the session."""
tasks = self._active_tasks.pop(msg.session_key, [])
cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
for t in tasks:
try:
await t
except (asyncio.CancelledError, Exception):
pass
sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
total = cancelled + sub_cancelled
content = f"Stopped {total} task(s)." if total else "No active task to stop."
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
))
async def _dispatch(self, msg: InboundMessage) -> None:
"""Process a message under the global lock."""
async with self._processing_lock:
try:
response = await self._process_message(msg)
if response is not None:
await self.bus.publish_outbound(response)
elif msg.channel == "cli":
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="", metadata=msg.metadata or {},
))
except asyncio.CancelledError:
logger.info("Task cancelled for session {}", msg.session_key)
raise
except Exception:
logger.exception("Error processing message for session {}", msg.session_key)
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="Sorry, I encountered an error.",
))
async def close_mcp(self) -> None:
"""Close MCP connections."""
if self._mcp_stack:
try:
await self._mcp_stack.aclose()
except (RuntimeError, BaseExceptionGroup):
pass # MCP SDK cancel scope cleanup is noisy but harmless
self._mcp_stack = None
def stop(self) -> None:
"""Signal the loop to stop."""
self._running = False
async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""Process a single inbound message."""
logger.info("Processing message from {}:{}: {}", msg.channel, msg.chat_id, msg.content[:100])
await self._connect_mcp()
self._set_tool_context(msg.channel, msg.chat_id, msg.message_id)
session_key = msg.session_key
session = await self.sessions.get(session_key)
initial_messages = self.context.build(
user_message=msg.content,
history=session.history[-self.memory_window:],
memory=session.memory,
skills=session.skills,
reasoning_effort=self.reasoning_effort,
)
async def on_progress(content: str, tool_hint: bool = False):
if msg.channel != "cli":
return
if tool_hint:
print(f"\033[90m[{content}]\033[0m", flush=True)
else:
print(f"\033[90m{content}\033[0m\n", flush=True)
final_content, tools_used, messages = await self._run_agent_loop(
initial_messages,
on_progress=on_progress if msg.channel == "cli" else None,
)
# Don't persist tool call errors to history
if final_content and "error" not in final_content.lower():
session.history = messages
session.memory = await self._consolidate_memory(session_key, messages, session.memory)
if final_content:
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
message_id=msg.message_id,
content=final_content,
metadata=msg.metadata or {},
)
return None
async def _consolidate_memory(
self, session_key: str, messages: list[dict], current_memory: MemoryStore
) -> MemoryStore:
"""Consolidate conversation into long-term memory if needed."""
if not self.context.memory_consolidation_prompt:
return current_memory
if session_key in self._consolidating:
return current_memory
lock = self._consolidation_locks.setdefault(session_key, asyncio.Lock())
async with lock:
if session_key in self._consolidating:
return current_memory
self._consolidating.add(session_key)
task = asyncio.create_task(self._do_consolidate(session_key, messages, current_memory))
self._consolidation_tasks.add(task)
task.add_done_callback(self._consolidation_tasks.discard)
task.add_done_callback(lambda t, k=session_key: self._consolidating.discard(k))
return await task
async def _do_consolidate(
self, session_key: str, messages: list[dict], current_memory: MemoryStore
) -> MemoryStore:
"""Perform memory consolidation."""
logger.info("Consolidating memory for session {}", session_key)
consolidation_messages = self.context.build_memory_consolidation(
messages=messages,
current_memory=current_memory,
)
response = await self.provider.chat(
messages=consolidation_messages,
model=self.model,
temperature=0.1, # Conservative for consolidation
max_tokens=self.max_tokens,
)
try:
new_memory = json.loads(response.content)
return MemoryStore.validate(new_memory)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Memory consolidation failed: {}", e)
return current_memory
Detailed Explanation of AgentLoop
The AgentLoop class is the central orchestrator of the nanobot-agent. It implements an asynchronous message-processing loop that integrates LLM reasoning, tool execution, session management, and subagent handling. The design is event-driven, leveraging asyncio for concurrency, making it suitable for high-throughput AI assistants (e.g., handling multiple chat sessions simultaneously).
I’ll break it down section-by-section, with line references (approximate, based on the code structure), explanations, and how it contributes to the core engine.
Imports and TYPE_CHECKING (Lines 1-40)
- Purpose: Import necessary modules for async operations (
asyncio), JSON handling, regex for text processing, logging (loguru), and project-specific components likeContextBuilder,MemoryStore,ToolRegistry, etc. - Key Insight: Uses
TYPE_CHECKINGfor type hints without runtime overhead. This sets up dependencies for the loop: bus for messaging, provider for LLM, tools for skills, sessions for persistence. - How it works as core: Establishes the foundation for the engine’s modularity—tools and providers are pluggable.
Class Definition: AgentLoop (Lines 41-end)
- Class Docstring: Describes the loop’s role: receive messages, build context, call LLM, execute tools, send responses. This is the “core processing engine” as per the module docstring.
- Private Constants:
_TOOL_RESULT_MAX_CHARS = 500limits tool output to prevent overflow in prompts.
init Method (Lines 45-120)
- Purpose: Initializes the AgentLoop with configuration parameters (e.g., bus, provider, workspace, model, etc.).
- Key Parameters:
bus: MessageBus: Event bus for inbound/outbound messages (core communication hub).provider: LLMProvider: LLM backend (e.g., OpenAI, Claude integration).workspace: Path: File system root for tools/memory.- Hyperparams like
max_iterations=40(prevent infinite loops),temperature=0.1(conservative reasoning),memory_window=100(history limit to avoid token overflow). - Optional: API keys, proxies, cron service for scheduled tasks, MCP servers for external protocol integration.
- Internal Setup:
- Creates
ContextBuilderfor prompts. - Initializes
SessionManagerfor per-user state. - Sets up
ToolRegistryandSubagentManager(for spawning child agents). - Flags like
_running = False, MCP connection state. - Data structures: Sets for tracking consolidating sessions, tasks; locks for concurrency safety; dict for active tasks per session.
- Calls
_register_default_tools(): Registers built-in tools (file ops, exec, web search/fetch, message, spawn, cron). - How it works as core: Configures the engine’s “brain” (LLM), “memory” (sessions), “hands” (tools), and “nerves” (bus/locks). Ensures thread-safety in async env.
_register_default_tools Method (Lines 121-140)
- Purpose: Registers a set of default tools into the
ToolRegistry. - Key Tools:
- File system:
ReadFileTool,WriteFileTool, etc. (with optional workspace restriction). - Shell:
ExecTool(with timeout/restrictions for safety). - Web:
WebSearchTool(uses Brave API),WebFetchTool. - Messaging:
MessageTool(publishes to bus). - Spawning:
SpawnTool(uses SubagentManager). - Cron: If service provided, for scheduling.
- How it works as core: Populates the agent’s “skill set”. Tools are callable by LLM, enabling external interactions. Restrictions prevent jailbreaks (e.g., no arbitrary exec).
_connect_mcp Method (Lines 141-165)
- Purpose: Lazily connects to MCP (Model Context Protocol) servers for external tool integration.
- Logic: If not connected and servers configured, use
AsyncExitStackfor resource management, callconnect_mcp_serversto register tools with MCP. - Error Handling: Logs failures, retries on next message.
- How it works as core: Enables MCP protocol for secure, real-time external API calls (e.g., weather in our example). MCP is optional but key for “extending” the agent beyond local tools.
_set_tool_context Method (Lines 166-175)
- Purpose: Injects channel/chat/message IDs into tools that need routing info (e.g.,
message,spawn,cron). - How it works as core: Ensures tools are context-aware, e.g.,
MessageToolknows where to send replies. Critical for multi-channel support (Telegram vs CLI).
_strip_think and _tool_hint Static Methods (Lines 176-195)
- _strip_think: Removes
<think>...</think>blocks from LLM output (some models embed reasoning tags). - _tool_hint: Formats tool calls as concise strings (e.g., ‘web_search(“query”)’) for logging/progress.
- How it works as core: Cleans LLM responses for persistence/use; provides user-friendly hints during execution (e.g., CLI progress).
_run_agent_loop Method (Lines 196-280)
- Purpose: The heart of the engine—runs the iterative Agent loop for a message: LLM calls + tool executions until done or max iterations.
- Inputs: Initial messages, optional progress callback.
- Outputs: Final content, used tools, updated messages.
- Logic Breakdown:
- Loop while < max_iterations:
- Call LLM (
provider.chat) with messages + tool defs. - If tool calls: Log thoughts/hints, add assistant message with calls, execute each tool asynchronously, add results to messages.
- Else: Clean content, add to messages, break if not error.
- Call LLM (
- Handle errors/max iterations with fallback messages.
- Key Features: Supports reasoning_effort (e.g., CoT), thinking_blocks (model-specific).
- How it works as core: Implements the PDR cycle: LLM decides (decide), tools act (act), results feedback (perceive). Prevents loops with iteration cap.
run Method (Lines 281-305)
- Purpose: Starts the main event loop, consuming inbound messages from bus.
- Logic: While running, poll bus (timeout 1s), handle /stop or dispatch messages as async tasks.
- How it works as core: Makes the agent “alive”—responsive to multiple sessions, cancellable tasks. Uses task tracking for clean shutdown.
_handle_stop Method (Lines 306-320)
- Purpose: Processes /stop command: cancels tasks/subagents for session, reports count.
- How it works as core: Graceful interruption, essential for long-running agents.
_dispatch Method (Lines 321-340)
- Purpose: Wraps message processing under global lock, catches errors, sends responses.
- How it works as core: Ensures serial processing per message, logs exceptions.
close_mcp Method (Lines 341-350)
- Purpose: Closes MCP connections safely.
- How it works as core: Resource cleanup for MCP integrations.
stop Method (Lines 351-353)
- Purpose: Signals loop to stop.
- How it works as core: External shutdown hook.
_process_message Method (Lines 354-385)
- Purpose: Core per-message handler: connects MCP, sets tool context, loads/builds session, runs loop, updates history/memory, returns response.
- How it works as core: Ties everything—session persistence, memory consolidation (_consolidate_memory), tool usage tracking.
_consolidate_memory and _do_consolidate Methods (Lines 386-end)
- Purpose: Asynchronously consolidates conversation into long-term memory using LLM (if prompt configured).
- Logic: Uses locks/tasks to avoid concurrent consolidation; calls LLM with build_memory_consolidation prompt, parses JSON, validates.
- How it works as core: Enables “learning”—compresses history into key facts, reduces token usage in future prompts.
3. AgentLoop 如何作为核心引擎工作?
- 整体运行机制:异步循环(run)监听 bus,dispatch 消息到 _process_message,内部 _run_agent_loop 迭代 LLM + 工具,直到输出 final_content。
- 异步优势:asyncio + locks 处理并发会话,工具执行不阻塞。
- 扩展性:工具注册动态,MCP/Subagents 支持外部/子任务。
- 错误处理:迭代限、错误 fallback、日志。
- 性能优化:Memory window 限历史,tool result 截断。
实战建议:克隆仓库,改 _register_default_tools 加自定义工具(如天气 API),跑 nanobot agent -m "query" 测试。优化:调 max_iterations 防长任务。
Nanobot 的 AgentLoop 是“极简高效”的典范——代码可读性高,适合研究。想代码跑 demo 或对比 OpenClaw?随时说!💪