深入Agno Workflows內核:從Step、Loop到Parallel,我們解析了確定性AI的完整實現 原創
Agno 的 工作流(Workflows) 讓你可以通過一系列定義好的步驟(steps) 來編排智能體(Agents)、團隊(Teams)以及函數(Functions),從而構建出 確定性(deterministic)、可控(controlled) 的智能系統。
與自由形式(free-form)的智能體交互不同,工作流提供結構化的自動化控制,保證每次執行的邏輯一致、結果可預測,因此非常適合需要可靠性與可重復性的生產環境。
為什么要使用 Workflows?
工作流讓你能夠對智能系統實現 可預測的控制(deterministic control),從而構建出可靠的自動化管線,每次執行都能得到一致結果。 在以下情況中,工作流尤為關鍵:
確定性執行(Deterministic Execution)
- 每個步驟都有明確的輸入與輸出;
- 每次運行都能得到一致結果;
- 具有清晰的日志與審計記錄,適合生產環境。
復雜編排(Complex Orchestration)
- 多智能體之間的協作與任務交接;
- 支持并行處理與條件分支;
- 支持循環結構(loops)來執行迭代任務。
簡而言之:
- Workflows(工作流):用于“確定性、可重復的自動化”;
- Teams(團隊):用于“動態、協作式的問題求解”。
場景 | 推薦方式 |
需要固定流程與可控輸出 | 使用 Workflows |
需要靈活協作與智能推理 | 使用 Teams |
工作流的確定性步驟執行(Deterministic Step Execution)
在工作流中,所有操作都按照嚴格定義的順序執行,每個步驟都會生成確定性輸出,作為下一個步驟的輸入。 這讓數據流變得可追蹤、可預測,也避免了自由對話中可能出現的隨機性。
Step 類型(Step Types)
類型 | 說明 |
Agents | 具備特定能力和指令的單個智能體 |
Teams | 多個智能體協同工作的團隊 |
Functions | 自定義 Python 函數,用于執行特定邏輯或處理任務 |
確定性執行的優勢(Deterministic Benefits)
通過工作流機制,智能體與團隊仍然保留其獨特的智能與能力,但在一個受控的框架中運行:
- 可預測執行:步驟按照定義順序運行;
- 可重復結果:相同輸入總能得到相同輸出;
- 數據流清晰:上一步輸出即為下一步輸入;
- 狀態受控:步驟之間可保持會話狀態;
- 可靠容錯:內置重試與錯誤恢復機制。
工作流 ≈ “智能體自動化的流水線版本”,在保持智能的同時,強調確定性與可控性。
與用戶的直接交互(Direct User Interaction)
如果用戶希望直接與工作流交互(而不是通過程序調用),你可以添加一個 ??WorkflowAgent??,讓工作流具備自然語言對話的能力。
這樣,工作流就能:
- 像聊天機器人一樣進行對話;
- 判斷是否能用已有結果回答;
- 或者根據用戶的新問題自動重新執行工作流。
總結對比
特性 | Workflows(工作流) | Teams(團隊) |
執行方式 | 確定性、線性步驟 | 動態協作、自由分工 |
控制 | 嚴格定義的輸入輸出 | 由團隊領導動態調度 |
場景 | 自動化生產任務 | 復雜推理與多輪協作 |
典型用例 | 數據處理、報表生成、任務編排 | 問答系統、知識推理、內容生成 |
搭建工作流
Workflows 的作用
Workflow 是 Agno 的“編排層”,可以讓你像搭積木一樣組合多個智能體(Agent)、團隊(Team)或函數(Function)來形成一個完整的處理流程。
比如你可以:
- 讓一個 Agent 先抓取數據;
- 再讓另一個函數或 Agent 清洗數據;
- 最后讓一個 Team 生成報告或發布結果。
Workflows 的核心構件
組件 | 作用 | 典型使用場景 |
? | 頂層 orchestrator(編排器),控制整個流程的執行 | 定義整體執行邏輯 |
? | 單個工作單元(核心執行節點) | 每個 Step 可以是 Agent、Team 或 Python 函數 |
? | 循環執行一個或多個 Step | 重復運行直到條件滿足 |
? | 并行執行多個 Step | 同時調用多個 Agent/Team 并合并結果 |
? | 條件分支執行 | 根據條件決定是否執行某步 |
? | 動態路由執行 | 根據內容決定下一步走向(if/else 多分支邏輯) |
Step 的輸入與輸出
當 Step 是函數時,Agno 提供了標準化接口:
- ?
?StepInput??:每步的輸入結構體; - ?
?StepOutput???:輸出結果,包含??content?? 字段(可包含 Agent 的返回內容)。
這樣,不論 Step 是函數還是智能體,輸入輸出格式都統一了,方便后續編排和復用。
示例:混合執行工作流
from agno.workflow import Step, Workflow, StepOutput
def data_preprocessor(step_input):
# 自定義數據預處理邏輯
return StepOutput(content=f"Processed: {step_input.input}")
workflow = Workflow(
name="Mixed Execution Pipeline",
steps=[
research_team, # 團隊成員(Team)
data_preprocessor, # 自定義函數
content_agent, # Agent
]
)
workflow.print_response("Analyze the competitive landscape for fintech startups", markdown=True)執行邏輯:
- 輸入“Analyze the competitive landscape for fintech startups”;
- ?
?research_team??(團隊)先執行研究; - ?
?data_preprocessor?? 處理研究結果; - ?
?content_agent?? 生成最終輸出; - 最終在終端打印格式化的結果。
設計理念總結
Agno 的工作流設計遵循:
- 清晰(clarity):每個 Step 只負責一件事;
- 可組合(composability):Step 可以是 Agent、Team 或函數;
- 可擴展(extensibility):你能輕松添加循環、并行或條件分支;
- 數據流標準化(StepInput / StepOutput):簡化了復雜流程中的數據傳遞。
運行工作流
Workflow 執行的核心接口
Agno 提供三種運行方式:
函數 | 描述 | 返回類型 |
? | 同步運行工作流 | ? 對象 |
? | 異步運行工作流 | ? 或異步迭代器 |
? | 封裝版打印輸出(內部調用 ? | 直接打印 Markdown 輸出 |
Workflow 示例結構(標準流程)
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.db.sqlite import SqliteDb
from agno.team import Team
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.hackernews import HackerNewsTools
from agno.workflow import Workflow
from agno.utils.pprint import pprint_run_response
# 1 定義智能體
hackernews_agent = Agent(
name="Hackernews Agent",
model=OpenAIChat(id="gpt-5-mini"),
tools=[HackerNewsTools()],
role="Extract key insights from Hackernews posts",
)
web_agent = Agent(
name="Web Agent",
model=OpenAIChat(id="gpt-5-mini"),
tools=[DuckDuckGoTools()],
role="Search the web for the latest trends",
)
# 2 定義團隊
research_team = Team(
name="Research Team",
members=[hackernews_agent, web_agent],
instructions="Research tech topics from Hackernews and the web",
)
# 3 定義內容規劃 Agent
content_planner = Agent(
name="Content Planner",
model=OpenAIChat(id="gpt-5-mini"),
instructions=[
"Plan a 4-week content schedule for the given topic",
"Ensure 3 posts per week",
],
)
# 4 定義工作流
content_creation_workflow = Workflow(
name="Content Creation Workflow",
description="Automated content creation from research to scheduling",
db=SqliteDb(db_file="tmp/workflow.db"),
steps=[research_team, content_planner],
)
# 5 執行工作流
if __name__ == "__main__":
response = content_creation_workflow.run(
input="AI trends in 2024",
markdown=True,
)
pprint_run_response(response, markdown=True)執行邏輯:
- ?
?research_team?? 調用 HackerNews 和 DuckDuckGo 搜索; - 輸出結果交給?
?content_planner??; - 生成 4 周的內容計劃。
異步執行(Async)
Agno 支持異步執行 ??arun()??,可以與 FastAPI、AsyncIO 集成:
response = await workflow.arun(input="Recent breakthroughs in quantum computing")流式輸出(Streaming)
流式執行可以實時獲取每個事件(例如步驟開始、結束、Agent 輸出):
response = workflow.run(
input="AI trends in 2024",
stream=True, # 打開流模式
stream_events=True, # 輸出所有事件類型
)可迭代輸出:
for event in response:
print(event.event, event.data)事件系統(Events)
Agno 的事件機制提供了完整的生命周期追蹤。以下是關鍵事件類型表:
分類 | 事件類型 | 描述 |
核心事件 | ? , ? | 表示工作流開始/結束/錯誤 |
步驟事件 | ? , ? | 每個 Step 的執行狀態 |
條件事件 | ? , ? | 條件執行的開始和結束 |
并行事件 | ? , ? | 并行執行的開始與結束 |
循環事件 | ? , ? | 循環過程中的生命周期 |
路由事件 | ? , ? | 路由控制開始/結束 |
這些事件都封裝在 ??WorkflowRunOutputEvent?? 對象中。
事件存儲與分析
工作流可以將所有執行事件存儲到數據庫,用于:
- 調試(Debugging)
- 審計(Audit Trails)
- 性能分析(Performance)
- 錯誤溯源(Error tracing)
from agno.run.workflow import WorkflowRunEvent
workflow = Workflow(
name="Debug Workflow",
store_events=True, # 啟用事件存儲
events_to_skip=[
WorkflowRunEvent.step_started, # 可過濾無用事件
WorkflowRunEvent.parallel_execution_started,
],
steps=[...]
)存儲結果可以從:
- ?
?workflow.run_response.events?? 獲取; - 或直接在數據庫中查詢。
關閉遙測(Telemetry)
Agno 默認會記錄模型使用統計,可關閉:
export AGNO_TELEMETRY=false或在代碼中:
workflow = Workflow(..., telemetry=False)適用場景總結
目標 | 建議用法 |
簡單工作流快速測試 | ? |
異步應用(如 FastAPI) | ? |
實時輸出進度 | ? |
生產監控 / 調試 | ? |
性能優化 | 跳過不必要事件 ? |
總結
Workflows(工作流)是構建確定性、可重復自動化系統的核心工具。它通過預定義的步驟序列,將智能體、團隊和函數組織成結構化的執行流程,確保每次運行邏輯一致、結果可靠。與動態協作的Teams不同,Workflows強調流程控制和輸出可預測性,非常適合生產環境中需要嚴格編排的任務。
本文轉載自???AI 博物院?? 作者:longyunfeigu

















