FastAPI + Agno HITL = 實時交互式AI!我們用SSE和WebSocket做出了可對話的AI管家 原創
在AI Agent的實際應用中,完全自主的決策往往存在風險。特別是在涉及敏感操作、重要決策或關鍵業務流程時,人類的監督和干預顯得尤為重要。Agno框架作為一個高性能的多智能體開發框架,在1.5.4版本中引入了強大的Human-in-Loop(HITL)功能,讓開發者能夠優雅地實現人機協作的智能體系統。
本文將深入探討Agno框架的HITL實現機制、流式輸出架構,并提供生產級的代碼示例,幫助開發者快速構建可控、高效的智能體應用。
一、Agno框架概述
1.1 核心特性
Agno是一個輕量級、高性能的智能體框架,具有以下突出特點:
- 極致性能:智能體實例化僅需~2μs,內存占用僅~3.75KiB,比LangGraph快10,000倍
- 原生多模態:支持文本、圖像、音頻、視頻的輸入和輸出
- 推理優先:內置三種推理方法(推理模型、推理工具、自定義思維鏈)
- 團隊協作:支持多智能體團隊架構,實現專業化分工
- 生產就緒:提供預構建的FastAPI路由,快速部署上線
1.2 架構設計理念
Agno采用模塊化、可組合的設計理念,每個組件都是即插即用的模塊:
# 簡潔的聲明式接口
from agno.agent import Agent
from agno.models.openai import OpenAIChat
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[...],
memory=True,
reasoning=True
)二、Human-in-Loop機制詳解
2.1 HITL的設計模式
Agno提供了四種主要的HITL控制流模式:
2.1.1 用戶確認流(User Confirmation Flow)
對于敏感操作,在執行前要求用戶確認:
from agno.tools import tool
from agno.agent import Agent
from agno.models.openai import OpenAIChat
@tool(requires_confirmation=True)
def delete_database(db_name: str) -> str:
"""刪除數據庫的敏感操作"""
# 實際的刪除邏輯
returnf"已成功刪除數據庫: {db_name}"
@tool(requires_confirmation=True)
def transfer_funds(amount: float, to_account: str) -> str:
"""轉賬操作,需要用戶確認"""
# 轉賬邏輯
returnf"已轉賬 ${amount} 到賬戶 {to_account}"
# 創建需要確認的智能體
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[delete_database, transfer_funds],
description="財務管理助手",
markdown=True
)
# 執行需要確認的操作
response = agent.run("刪除test_db數據庫")
# 處理確認流程
if agent.is_paused:
for tool in agent.run_response.tools_requiring_confirmation:
print(f"工具 {tool.tool_name} 需要確認")
print(f"參數: {tool.tool_args}")
# 獲取用戶確認
confirmed = input("確認執行?(y/n): ").lower() == "y"
tool.confirmed = confirmed
# 繼續執行
response = agent.continue_run()
print(response.content)2.1.2 用戶輸入流(User Input Flow)
當需要收集額外信息時,暫停執行并等待用戶輸入:
from typing import List
from agno.tools import tool
from agno.tools.function import UserInputField
@tool(requires_user_input=True, user_input_fields=["to_address", "cc_addresses"])
asyncdef send_email(subject: str, body: str, to_address: str, cc_addresses: str = None) -> str:
"""
發送郵件,需要用戶提供收件人地址
Args:
subject: 郵件主題
body: 郵件正文
to_address: 收件人地址(需要用戶輸入)
cc_addresses: 抄送地址(可選)
"""
cc_text = f",抄送給 {cc_addresses}"if cc_addresses else""
returnf"已發送郵件到 {to_address}{cc_text},主題:{subject}"
# 異步執行智能體
import asyncio
agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[send_email],
markdown=True,
debug_mode=True
)
asyncdef handle_user_input():
await agent.arun("發送會議通知郵件,主題是'季度總結會議'")
if agent.is_paused:
for tool in agent.run_response.tools_requiring_user_input:
input_schema: List[UserInputField] = tool.user_input_schema
for field in input_schema:
print(f"\n需要輸入: {field.name}")
print(f"類型: {field.field_type.__name__}")
print(f"描述: {field.description}")
if field.value isNone:
user_value = input(f"請輸入 {field.name}: ")
field.value = user_value
# 提供用戶輸入后繼續執行
response = await agent.acontinue_run()
print(response.content)
# 運行異步函數
asyncio.run(handle_user_input())2.1.3 外部工具執行流(External Tool Execution)
標記某些工具在智能體上下文之外執行:
@tool(external_execution=True)
def execute_shell_command(command: str) -> str:
"""
執行shell命令(外部執行)
注意:這個函數不會在智能體內部執行
"""
pass# 實際執行邏輯在外部處理
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[execute_shell_command],
description="系統管理助手"
)
response = agent.run("列出當前目錄的所有文件")
if agent.is_paused:
for tool in agent.run_response.tools_to_execute_externally:
print(f"需要外部執行: {tool.tool_name}")
print(f"參數: {tool.tool_args}")
# 在外部環境執行
import subprocess
if tool.tool_name == "execute_shell_command":
result = subprocess.run(
tool.tool_args['command'],
shell=True,
capture_output=True,
text=True
)
tool.result = result.stdout
# 將結果返回給智能體
response = agent.continue_run()2.1.4 動態用戶輸入(Dynamic User Input)
使用UserControlFlowTools讓智能體動態決定何時需要用戶輸入:
from agno.tools.user_control_flow import UserControlFlowTools
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[UserControlFlowTools()],
description="交互式助手",
markdown=True
)
response = agent.run("幫我創建一個項目計劃")
while response.is_paused:
if response.tools_requiring_user_input:
for tool in response.tools_requiring_user_input:
# 智能體決定需要什么信息
print(f"智能體請求: {tool.description}")
user_input = input("您的輸入: ")
tool.value = user_input
response = agent.continue_run()2.2 工具級別的控制
Agno允許在工具包級別精細控制哪些工具需要確認:
from agno.tools.yfinance import YFinanceTools
from agno.tools.duckduckgo import DuckDuckGoTools
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[
# 只有股票價格查詢需要確認
YFinanceTools(
requires_confirmation_tools=["get_current_stock_price"],
stock_price=True,
analyst_recommendations=True
),
# 網絡搜索不需要確認
DuckDuckGoTools()
],
description="金融分析助手"
)三、流式輸出架構實現
3.1 基礎流式響應
Agno支持流式輸出,提供更好的用戶體驗:
from typing import Iterator
from agno.agent import Agent, RunResponse
from agno.models.openai import OpenAIChat
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
markdown=True,
show_tool_calls=True
)
# 方式1:使用print_response直接流式輸出
agent.print_response(
"分析NVDA股票的投資價值",
stream=True,
show_full_reasoning=True,
stream_intermediate_steps=True
)
# 方式2:獲取流式響應迭代器
run_response: Iterator[RunResponse] = agent.run(
"生成季度財務報告",
stream=True
)
for chunk in run_response:
if chunk.content:
print(chunk.content, end="", flush=True)3.2 與Web框架集成的流式輸出
3.2.1 FastAPI SSE(Server-Sent Events)實現
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agno.agent import Agent
from agno.models.openai import OpenAIChat
import json
import asyncio
app = FastAPI()
# 創建全局智能體實例
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[...],
markdown=True
)
asyncdef generate_sse_response(query: str):
"""生成SSE格式的流式響應"""
asyncfor chunk in agent.arun(query, stream=True):
if chunk.content:
# SSE格式:data: {json}\n\n
yieldf"data: {json.dumps({'content': chunk.content})}\n\n"
# 如果需要確認
if chunk.is_paused:
yieldf"data: {json.dumps({'type': 'confirmation_required', 'tools': [t.dict() for t in chunk.tools_requiring_confirmation]})}\n\n"
@app.post("/chat/stream")
asyncdef stream_chat(request: dict):
"""流式聊天接口"""
query = request.get("query")
return StreamingResponse(
generate_sse_response(query),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"# 禁用Nginx緩沖
}
)
@app.post("/chat/confirm")
asyncdef confirm_tool(request: dict):
"""確認工具執行"""
tool_id = request.get("tool_id")
confirmed = request.get("confirmed", False)
# 處理確認邏輯
if agent.is_paused:
for tool in agent.run_response.tools_requiring_confirmation:
if tool.id == tool_id:
tool.confirmed = confirmed
# 繼續執行并返回流式響應
return StreamingResponse(
generate_sse_response(None), # 繼續之前的執行
media_type="text/event-stream"
)3.2.2 WebSocket實現雙向通信
from fastapi import WebSocket, WebSocketDisconnect
import json
@app.websocket("/ws/agent")
asyncdef websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
whileTrue:
# 接收客戶端消息
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "query":
# 處理查詢請求
asyncfor chunk in agent.arun(message["content"], stream=True):
if chunk.content:
await websocket.send_json({
"type": "content",
"data": chunk.content
})
# 處理HITL
if chunk.is_paused:
await websocket.send_json({
"type": "confirmation_required",
"tools": [
{
"id": tool.id,
"name": tool.tool_name,
"args": tool.tool_args
}
for tool in chunk.tools_requiring_confirmation
]
})
elif message["type"] == "confirm":
# 處理確認消息
tool_id = message["tool_id"]
confirmed = message["confirmed"]
for tool in agent.run_response.tools_requiring_confirmation:
if tool.id == tool_id:
tool.confirmed = confirmed
# 繼續執行
asyncfor chunk in agent.acontinue_run(stream=True):
if chunk.content:
await websocket.send_json({
"type": "content",
"data": chunk.content
})
except WebSocketDisconnect:
print("Client disconnected")本文轉載自??AI 博物院?? 作者:longyunfeigu
?著作權歸作者所有,如需轉載,請注明出處,否則將追究法律責任
已于2025-11-18 08:44:00修改
贊
收藏
回復
分享
微博
QQ
微信
舉報
回復
相關推薦

















