深入解析 nanobot-agent 核心引擎:AgentLoop 代码详解

深入解析 Nanobot-Agent 核心引擎:AgentLoop 代码详解

嘿,重阳!纽约的3月周末(2026年3月7日晚9:35,凉风习习,适合深挖代码~),Nanobot-Agent 是 HKUDS 团队开源的一个超轻量级个人 AI 助理项目(GitHub 仓库:https://github.com/HKUDS/nanobot),灵感来源于 OpenClaw,核心目标是提供高效、可扩展的 Agent 功能,仅用 ~4,000 行代码实现多 LLM 支持、多聊天平台集成和工具调用。它的“心脏”就是 agent/loop.py 中的 AgentLoop 类——这是一个异步事件驱动的处理引擎,负责消息接收、上下文构建、LLM 调用、工具执行和响应发送。今天咱们来一场“硬核”代码详解:从整体结构到逐行分析,再到运行原理和优化建议。全基于项目最新代码(main 分支),用伪代码和表格辅助,确保你看完就能上手修改或扩展。走起!🚀

1. Nanobot-Agent 项目简介:为什么 AgentLoop 是核心?

Nanobot 是一个“极简主义” AI Agent 框架,聚焦于“轻量 + 可研究性”:

  • 核心功能:支持 11+ LLM 提供商(如 OpenAI、Anthropic)、8+ 聊天平台(如 Telegram、Discord)、内置工具(文件操作、Web 搜索、Shell 执行等)、子 Agent 生成和定时任务。
  • 设计哲学:99% 比 OpenClaw 更小巧(核心 ~3,500 行),一配置文件启动,适合研究者和开发者快速原型。
  • AgentLoop 的角色:作为“核心处理引擎”(代码注释:”Agent loop: the core processing engine.”),它管理整个 Agent 的生命周期——从接收消息到迭代执行,直到任务完成。它整合了上下文、内存、工具和子 Agent,确保异步高效运行,支持高并发(如多会话)。

项目架构简览(从 README 和代码推断):

  • agent/loop.py:主循环引擎。
  • agent/context.py:提示构建。
  • agent/memory.py:持久内存。
  • agent/tools/:工具注册和执行。
  • agent/subagent.py:子 Agent 管理。
  • 整体:事件总线(MessageBus)驱动,异步(asyncio)实现。

如果你是 Java 开发者,这类似于一个异步事件循环(如 Reactor 模式),但 Python 风格更简洁。

2. AgentLoop 代码整体结构:模块化 + 异步设计

代码位于 nanobot/agent/loop.py,约 500 行(不含导入),是一个类定义。结构清晰:初始化 + 默认工具注册 + MCP 连接 + 主循环 + 消息处理 + 关闭逻辑。

代码全文(从 GitHub raw 提取,完整无删减):

"""Agent loop: the core processing engine."""

from __future__ import annotations

import asyncio
import json
import re
import weakref
from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable

from loguru import logger

from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session, SessionManager

if TYPE_CHECKING:
    from nanobot.config.schema import ChannelsConfig, ExecToolConfig
    from nanobot.cron.service import CronService


class AgentLoop:
    """
    The agent loop is the core processing engine.

    It:
    1. Receives messages from the bus
    2. Builds context with history, memory, skills
    3. Calls the LLM
    4. Executes tool calls
    5. Sends responses back
    """

    _TOOL_RESULT_MAX_CHARS = 500

    def __init__(
        self,
        bus: MessageBus,
        provider: LLMProvider,
        workspace: Path,
        model: str | None = None,
        max_iterations: int = 40,
        temperature: float = 0.1,
        max_tokens: int = 4096,
        memory_window: int = 100,
        reasoning_effort: str | None = None,
        brave_api_key: str | None = None,
        web_proxy: str | None = None,
        exec_config: ExecToolConfig | None = None,
        cron_service: CronService | None = None,
        restrict_to_workspace: bool = False,
        session_manager: SessionManager | None = None,
        mcp_servers: dict | None = None,
        channels_config: ChannelsConfig | None = None,
    ):
        from nanobot.config.schema import ExecToolConfig
        self.bus = bus
        self.channels_config = channels_config
        self.provider = provider
        self.workspace = workspace
        self.model = model or provider.get_default_model()
        self.max_iterations = max_iterations
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.memory_window = memory_window
        self.reasoning_effort = reasoning_effort
        self.brave_api_key = brave_api_key
        self.web_proxy = web_proxy
        self.exec_config = exec_config or ExecToolConfig()
        self.cron_service = cron_service
        self.restrict_to_workspace = restrict_to_workspace

        self.context = ContextBuilder(workspace)
        self.sessions = session_manager or SessionManager(workspace)
        self.tools = ToolRegistry()
        self.subagents = SubagentManager(
            provider=provider,
            workspace=workspace,
            bus=bus,
            model=self.model,
            temperature=self.temperature,
            max_tokens=self.max_tokens,
            reasoning_effort=reasoning_effort,
            brave_api_key=brave_api_key,
            web_proxy=web_proxy,
            exec_config=self.exec_config,
            restrict_to_workspace=restrict_to_workspace,
        )

        self._running = False
        self._mcp_servers = mcp_servers or {}
        self._mcp_stack: AsyncExitStack | None = None
        self._mcp_connected = False
        self._mcp_connecting = False
        self._consolidating: set[str] = set()  # Session keys with consolidation in progress
        self._consolidation_tasks: set[asyncio.Task] = set()  # Strong refs to in-flight tasks
        self._consolidation_locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary()
        self._active_tasks: dict[str, list[asyncio.Task]] = {}  # session_key -> tasks
        self._processing_lock = asyncio.Lock()
        self._register_default_tools()

    def _register_default_tools(self) -> None:
        """Register the default set of tools."""
        allowed_dir = self.workspace if self.restrict_to_workspace else None
        for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool):
            self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
        self.tools.register(ExecTool(
            working_dir=str(self.workspace),
            timeout=self.exec_config.timeout,
            restrict_to_workspace=self.restrict_to_workspace,
            path_append=self.exec_config.path_append,
        ))
        self.tools.register(WebSearchTool(api_key=self.brave_api_key, proxy=self.web_proxy))
        self.tools.register(WebFetchTool(proxy=self.web_proxy))
        self.tools.register(MessageTool(send_callback=self.bus.publish_outbound))
        self.tools.register(SpawnTool(manager=self.subagents))
        if self.cron_service:
            self.tools.register(CronTool(self.cron_service))

    async def _connect_mcp(self) -> None:
        """Connect to configured MCP servers (one-time, lazy)."""
        if self._mcp_connected or self._mcp_connecting or not self._mcp_servers:
            return
        self._mcp_connecting = True
        from nanobot.agent.tools.mcp import connect_mcp_servers
        try:
            self._mcp_stack = AsyncExitStack()
            await self._mcp_stack.__aenter__()
            await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
            self._mcp_connected = True
        except Exception as e:
            logger.error("Failed to connect MCP servers (will retry next message): {}", e)
            if self._mcp_stack:
                try:
                    await self._mcp_stack.aclose()
                except Exception:
                    pass
                self._mcp_stack = None
        finally:
            self._mcp_connecting = False

    def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
        """Update context for all tools that need routing info."""
        for name in ("message", "spawn", "cron"):
            if tool := self.tools.get(name):
                if hasattr(tool, "set_context"):
                    tool.set_context(channel, chat_id, *([message_id] if name == "message" else []))

    @staticmethod
    def _strip_think(text: str | None) -> str | None:
        """Remove <think>…</think> blocks that some models embed in content."""
        if not text:
            return None
        return re.sub(r"<think>[\s\S]*?</think>", "", text).strip() or None

    @staticmethod
    def _tool_hint(tool_calls: list) -> str:
        """Format tool calls as concise hint, e.g. 'web_search("query")'."""
        def _fmt(tc):
            args = (tc.arguments[0] if isinstance(tc.arguments, list) else tc.arguments) or {}
            val = next(iter(args.values()), None) if isinstance(args, dict) else None
            if not isinstance(val, str):
                return tc.name
            return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")'
        return ", ".join(_fmt(tc) for tc in tool_calls)

    async def _run_agent_loop(
        self,
        initial_messages: list[dict],
        on_progress: Callable[..., Awaitable[None]] | None = None,
    ) -> tuple[str | None, list[str], list[dict]]:
        """Run the agent iteration loop. Returns (final_content, tools_used, messages)."""
        messages = initial_messages
        iteration = 0
        final_content = None
        tools_used: list[str] = []

        while iteration < self.max_iterations:
            iteration += 1

            response = await self.provider.chat(
                messages=messages,
                tools=self.tools.get_definitions(),
                model=self.model,
                temperature=self.temperature,
                max_tokens=self.max_tokens,
                reasoning_effort=self.reasoning_effort,
            )

            if response.has_tool_calls:
                if on_progress:
                    thoughts = [
                        self._strip_think(response.content),
                        response.reasoning_content,
                        *(
                            f"Thinking [{b.get('signature', '...')}]:\n{b.get('thought', '...')}"
                            for b in (response.thinking_blocks or [])
                            if isinstance(b, dict) and "signature" in b
                        ),
                    ]
                    combined_thoughts = "\n\n".join(filter(None, thoughts))
                    if combined_thoughts:
                        await on_progress(combined_thoughts)
                    await on_progress(self._tool_hint(response.tool_calls), tool_hint=True)

                tool_call_dicts = [
                    {
                        "id": tc.id,
                        "type": "function",
                        "function": {
                            "name": tc.name,
                            "arguments": json.dumps(tc.arguments, ensure_ascii=False)
                        }
                    }
                    for tc in response.tool_calls
                ]
                messages = self.context.add_assistant_message(
                    messages, response.content, tool_call_dicts,
                    reasoning_content=response.reasoning_content,
                    thinking_blocks=response.thinking_blocks,
                )

                for tool_call in response.tool_calls:
                    tools_used.append(tool_call.name)
                    args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
                    logger.info("Tool call: {}({})", tool_call.name, args_str[:200])
                    result = await self.tools.execute(tool_call.name, tool_call.arguments)
                    messages = self.context.add_tool_result(
                        messages, tool_call.id, tool_call.name, result
                    )
            else:
                clean = self._strip_think(response.content)
                # Don't persist error responses to session history — they can
                # poison the context and cause permanent 400 loops (#1303).
                if response.finish_reason == "error":
                    logger.error("LLM returned error: {}", (clean or "")[:200])
                    final_content = clean or "Sorry, I encountered an error calling the AI model."
                    break
                messages = self.context.add_assistant_message(
                    messages, clean, reasoning_content=response.reasoning_content,
                    thinking_blocks=response.thinking_blocks,
                )
                final_content = clean
                break

        if final_content is None and iteration >= self.max_iterations:
            logger.warning("Max iterations ({}) reached", self.max_iterations)
            final_content = (
                f"I reached the maximum number of tool call iterations ({self.max_iterations}) "
                "without completing the task. You can try breaking the task into smaller steps."
            )

        return final_content, tools_used, messages

    async def run(self) -> None:
        """Run the agent loop, dispatching messages as tasks to stay responsive to /stop."""
        self._running = True
        await self._connect_mcp()
        logger.info("Agent loop started")

        while self._running:
            try:
                msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
            except asyncio.TimeoutError:
                continue

            if msg.content.strip().lower() == "/stop":
                await self._handle_stop(msg)
            else:
                task = asyncio.create_task(self._dispatch(msg))
                self._active_tasks.setdefault(msg.session_key, []).append(task)
                task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)

    async def _handle_stop(self, msg: InboundMessage) -> None:
        """Cancel all active tasks and subagents for the session."""
        tasks = self._active_tasks.pop(msg.session_key, [])
        cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
        for t in tasks:
            try:
                await t
            except (asyncio.CancelledError, Exception):
                pass
        sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
        total = cancelled + sub_cancelled
        content = f"Stopped {total} task(s)." if total else "No active task to stop."
        await self.bus.publish_outbound(OutboundMessage(
            channel=msg.channel, chat_id=msg.chat_id, content=content,
        ))

    async def _dispatch(self, msg: InboundMessage) -> None:
        """Process a message under the global lock."""
        async with self._processing_lock:
            try:
                response = await self._process_message(msg)
                if response is not None:
                    await self.bus.publish_outbound(response)
                elif msg.channel == "cli":
                    await self.bus.publish_outbound(OutboundMessage(
                        channel=msg.channel, chat_id=msg.chat_id,
                        content="", metadata=msg.metadata or {},
                    ))
            except asyncio.CancelledError:
                logger.info("Task cancelled for session {}", msg.session_key)
                raise
            except Exception:
                logger.exception("Error processing message for session {}", msg.session_key)
                await self.bus.publish_outbound(OutboundMessage(
                    channel=msg.channel, chat_id=msg.chat_id,
                    content="Sorry, I encountered an error.",
                ))

    async def close_mcp(self) -> None:
        """Close MCP connections."""
        if self._mcp_stack:
            try:
                await self._mcp_stack.aclose()
            except (RuntimeError, BaseExceptionGroup):
                pass  # MCP SDK cancel scope cleanup is noisy but harmless
            self._mcp_stack = None

    def stop(self) -> None:
        """Signal the loop to stop."""
        self._running = False

    async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
        """Process a single inbound message."""
        logger.info("Processing message from {}:{}: {}", msg.channel, msg.chat_id, msg.content[:100])
        await self._connect_mcp()
        self._set_tool_context(msg.channel, msg.chat_id, msg.message_id)

        session_key = msg.session_key
        session = await self.sessions.get(session_key)

        initial_messages = self.context.build(
            user_message=msg.content,
            history=session.history[-self.memory_window:],
            memory=session.memory,
            skills=session.skills,
            reasoning_effort=self.reasoning_effort,
        )

        async def on_progress(content: str, tool_hint: bool = False):
            if msg.channel != "cli":
                return
            if tool_hint:
                print(f"\033[90m[{content}]\033[0m", flush=True)
            else:
                print(f"\033[90m{content}\033[0m\n", flush=True)

        final_content, tools_used, messages = await self._run_agent_loop(
            initial_messages,
            on_progress=on_progress if msg.channel == "cli" else None,
        )

        # Don't persist tool call errors to history
        if final_content and "error" not in final_content.lower():
            session.history = messages
            session.memory = await self._consolidate_memory(session_key, messages, session.memory)

        if final_content:
            return OutboundMessage(
                channel=msg.channel,
                chat_id=msg.chat_id,
                message_id=msg.message_id,
                content=final_content,
                metadata=msg.metadata or {},
            )
        return None

    async def _consolidate_memory(
        self, session_key: str, messages: list[dict], current_memory: MemoryStore
    ) -> MemoryStore:
        """Consolidate conversation into long-term memory if needed."""
        if not self.context.memory_consolidation_prompt:
            return current_memory

        if session_key in self._consolidating:
            return current_memory

        lock = self._consolidation_locks.setdefault(session_key, asyncio.Lock())
        async with lock:
            if session_key in self._consolidating:
                return current_memory

            self._consolidating.add(session_key)
            task = asyncio.create_task(self._do_consolidate(session_key, messages, current_memory))
            self._consolidation_tasks.add(task)
            task.add_done_callback(self._consolidation_tasks.discard)
            task.add_done_callback(lambda t, k=session_key: self._consolidating.discard(k))
            return await task

    async def _do_consolidate(
        self, session_key: str, messages: list[dict], current_memory: MemoryStore
    ) -> MemoryStore:
        """Perform memory consolidation."""
        logger.info("Consolidating memory for session {}", session_key)

        consolidation_messages = self.context.build_memory_consolidation(
            messages=messages,
            current_memory=current_memory,
        )

        response = await self.provider.chat(
            messages=consolidation_messages,
            model=self.model,
            temperature=0.1,  # Conservative for consolidation
            max_tokens=self.max_tokens,
        )

        try:
            new_memory = json.loads(response.content)
            return MemoryStore.validate(new_memory)
        except (json.JSONDecodeError, ValueError) as e:
            logger.warning("Memory consolidation failed: {}", e)
            return current_memory

Detailed Explanation of AgentLoop

The AgentLoop class is the central orchestrator of the nanobot-agent. It implements an asynchronous message-processing loop that integrates LLM reasoning, tool execution, session management, and subagent handling. The design is event-driven, leveraging asyncio for concurrency, making it suitable for high-throughput AI assistants (e.g., handling multiple chat sessions simultaneously).

I’ll break it down section-by-section, with line references (approximate, based on the code structure), explanations, and how it contributes to the core engine.

Imports and TYPE_CHECKING (Lines 1-40)

  • Purpose: Import necessary modules for async operations (asyncio), JSON handling, regex for text processing, logging (loguru), and project-specific components like ContextBuilder, MemoryStore, ToolRegistry, etc.
  • Key Insight: Uses TYPE_CHECKING for type hints without runtime overhead. This sets up dependencies for the loop: bus for messaging, provider for LLM, tools for skills, sessions for persistence.
  • How it works as core: Establishes the foundation for the engine’s modularity—tools and providers are pluggable.

Class Definition: AgentLoop (Lines 41-end)

  • Class Docstring: Describes the loop’s role: receive messages, build context, call LLM, execute tools, send responses. This is the “core processing engine” as per the module docstring.
  • Private Constants: _TOOL_RESULT_MAX_CHARS = 500 limits tool output to prevent overflow in prompts.

init Method (Lines 45-120)

  • Purpose: Initializes the AgentLoop with configuration parameters (e.g., bus, provider, workspace, model, etc.).
  • Key Parameters:
  • bus: MessageBus: Event bus for inbound/outbound messages (core communication hub).
  • provider: LLMProvider: LLM backend (e.g., OpenAI, Claude integration).
  • workspace: Path: File system root for tools/memory.
  • Hyperparams like max_iterations=40 (prevent infinite loops), temperature=0.1 (conservative reasoning), memory_window=100 (history limit to avoid token overflow).
  • Optional: API keys, proxies, cron service for scheduled tasks, MCP servers for external protocol integration.
  • Internal Setup:
  • Creates ContextBuilder for prompts.
  • Initializes SessionManager for per-user state.
  • Sets up ToolRegistry and SubagentManager (for spawning child agents).
  • Flags like _running = False, MCP connection state.
  • Data structures: Sets for tracking consolidating sessions, tasks; locks for concurrency safety; dict for active tasks per session.
  • Calls _register_default_tools(): Registers built-in tools (file ops, exec, web search/fetch, message, spawn, cron).
  • How it works as core: Configures the engine’s “brain” (LLM), “memory” (sessions), “hands” (tools), and “nerves” (bus/locks). Ensures thread-safety in async env.

_register_default_tools Method (Lines 121-140)

  • Purpose: Registers a set of default tools into the ToolRegistry.
  • Key Tools:
  • File system: ReadFileTool, WriteFileTool, etc. (with optional workspace restriction).
  • Shell: ExecTool (with timeout/restrictions for safety).
  • Web: WebSearchTool (uses Brave API), WebFetchTool.
  • Messaging: MessageTool (publishes to bus).
  • Spawning: SpawnTool (uses SubagentManager).
  • Cron: If service provided, for scheduling.
  • How it works as core: Populates the agent’s “skill set”. Tools are callable by LLM, enabling external interactions. Restrictions prevent jailbreaks (e.g., no arbitrary exec).

_connect_mcp Method (Lines 141-165)

  • Purpose: Lazily connects to MCP (Model Context Protocol) servers for external tool integration.
  • Logic: If not connected and servers configured, use AsyncExitStack for resource management, call connect_mcp_servers to register tools with MCP.
  • Error Handling: Logs failures, retries on next message.
  • How it works as core: Enables MCP protocol for secure, real-time external API calls (e.g., weather in our example). MCP is optional but key for “extending” the agent beyond local tools.

_set_tool_context Method (Lines 166-175)

  • Purpose: Injects channel/chat/message IDs into tools that need routing info (e.g., message, spawn, cron).
  • How it works as core: Ensures tools are context-aware, e.g., MessageTool knows where to send replies. Critical for multi-channel support (Telegram vs CLI).

_strip_think and _tool_hint Static Methods (Lines 176-195)

  • _strip_think: Removes <think>...</think> blocks from LLM output (some models embed reasoning tags).
  • _tool_hint: Formats tool calls as concise strings (e.g., ‘web_search(“query”)’) for logging/progress.
  • How it works as core: Cleans LLM responses for persistence/use; provides user-friendly hints during execution (e.g., CLI progress).

_run_agent_loop Method (Lines 196-280)

  • Purpose: The heart of the engine—runs the iterative Agent loop for a message: LLM calls + tool executions until done or max iterations.
  • Inputs: Initial messages, optional progress callback.
  • Outputs: Final content, used tools, updated messages.
  • Logic Breakdown:
  • Loop while < max_iterations:
    • Call LLM (provider.chat) with messages + tool defs.
    • If tool calls: Log thoughts/hints, add assistant message with calls, execute each tool asynchronously, add results to messages.
    • Else: Clean content, add to messages, break if not error.
  • Handle errors/max iterations with fallback messages.
  • Key Features: Supports reasoning_effort (e.g., CoT), thinking_blocks (model-specific).
  • How it works as core: Implements the PDR cycle: LLM decides (decide), tools act (act), results feedback (perceive). Prevents loops with iteration cap.

run Method (Lines 281-305)

  • Purpose: Starts the main event loop, consuming inbound messages from bus.
  • Logic: While running, poll bus (timeout 1s), handle /stop or dispatch messages as async tasks.
  • How it works as core: Makes the agent “alive”—responsive to multiple sessions, cancellable tasks. Uses task tracking for clean shutdown.

_handle_stop Method (Lines 306-320)

  • Purpose: Processes /stop command: cancels tasks/subagents for session, reports count.
  • How it works as core: Graceful interruption, essential for long-running agents.

_dispatch Method (Lines 321-340)

  • Purpose: Wraps message processing under global lock, catches errors, sends responses.
  • How it works as core: Ensures serial processing per message, logs exceptions.

close_mcp Method (Lines 341-350)

  • Purpose: Closes MCP connections safely.
  • How it works as core: Resource cleanup for MCP integrations.

stop Method (Lines 351-353)

  • Purpose: Signals loop to stop.
  • How it works as core: External shutdown hook.

_process_message Method (Lines 354-385)

  • Purpose: Core per-message handler: connects MCP, sets tool context, loads/builds session, runs loop, updates history/memory, returns response.
  • How it works as core: Ties everything—session persistence, memory consolidation (_consolidate_memory), tool usage tracking.

_consolidate_memory and _do_consolidate Methods (Lines 386-end)

  • Purpose: Asynchronously consolidates conversation into long-term memory using LLM (if prompt configured).
  • Logic: Uses locks/tasks to avoid concurrent consolidation; calls LLM with build_memory_consolidation prompt, parses JSON, validates.
  • How it works as core: Enables “learning”—compresses history into key facts, reduces token usage in future prompts.

3. AgentLoop 如何作为核心引擎工作?

  • 整体运行机制:异步循环(run)监听 bus,dispatch 消息到 _process_message,内部 _run_agent_loop 迭代 LLM + 工具,直到输出 final_content。
  • 异步优势:asyncio + locks 处理并发会话,工具执行不阻塞。
  • 扩展性:工具注册动态,MCP/Subagents 支持外部/子任务。
  • 错误处理:迭代限、错误 fallback、日志。
  • 性能优化:Memory window 限历史,tool result 截断。

实战建议:克隆仓库,改 _register_default_tools 加自定义工具(如天气 API),跑 nanobot agent -m "query" 测试。优化:调 max_iterations 防长任务。

Nanobot 的 AgentLoop 是“极简高效”的典范——代码可读性高,适合研究。想代码跑 demo 或对比 OpenClaw?随时说!💪

文章已创建 4972

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部