精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

深入Agno Workflows內核:從Step、Loop到Parallel,我們解析了確定性AI的完整實現 原創

發布于 2025-11-21 10:02
瀏覽
0收藏

Agno 的 工作流(Workflows) 讓你可以通過一系列定義好的步驟(steps) 來編排智能體(Agents)、團隊(Teams)以及函數(Functions),從而構建出 確定性(deterministic)、可控(controlled) 的智能系統。

與自由形式(free-form)的智能體交互不同,工作流提供結構化的自動化控制,保證每次執行的邏輯一致、結果可預測,因此非常適合需要可靠性與可重復性的生產環境。

為什么要使用 Workflows?

工作流讓你能夠對智能系統實現 可預測的控制(deterministic control),從而構建出可靠的自動化管線,每次執行都能得到一致結果。 在以下情況中,工作流尤為關鍵:

確定性執行(Deterministic Execution)

  • 每個步驟都有明確的輸入與輸出;
  • 每次運行都能得到一致結果;
  • 具有清晰的日志與審計記錄,適合生產環境。

復雜編排(Complex Orchestration)

  • 多智能體之間的協作與任務交接;
  • 支持并行處理與條件分支;
  • 支持循環結構(loops)來執行迭代任務。

簡而言之:

  • Workflows(工作流):用于“確定性、可重復的自動化”;
  • Teams(團隊):用于“動態、協作式的問題求解”。

場景

推薦方式

需要固定流程與可控輸出

使用 Workflows

需要靈活協作與智能推理

使用 Teams

工作流的確定性步驟執行(Deterministic Step Execution)

在工作流中,所有操作都按照嚴格定義的順序執行,每個步驟都會生成確定性輸出,作為下一個步驟的輸入。 這讓數據流變得可追蹤、可預測,也避免了自由對話中可能出現的隨機性。

Step 類型(Step Types)

類型

說明

Agents

具備特定能力和指令的單個智能體

Teams

多個智能體協同工作的團隊

Functions

自定義 Python 函數,用于執行特定邏輯或處理任務

確定性執行的優勢(Deterministic Benefits)

通過工作流機制,智能體與團隊仍然保留其獨特的智能與能力,但在一個受控的框架中運行:

  • 可預測執行:步驟按照定義順序運行;
  • 可重復結果:相同輸入總能得到相同輸出;
  • 數據流清晰:上一步輸出即為下一步輸入;
  • 狀態受控:步驟之間可保持會話狀態;
  • 可靠容錯:內置重試與錯誤恢復機制。

工作流 ≈ “智能體自動化的流水線版本”,在保持智能的同時,強調確定性與可控性。

與用戶的直接交互(Direct User Interaction)

如果用戶希望直接與工作流交互(而不是通過程序調用),你可以添加一個 ??WorkflowAgent??,讓工作流具備自然語言對話的能力。

這樣,工作流就能:

  • 像聊天機器人一樣進行對話;
  • 判斷是否能用已有結果回答;
  • 或者根據用戶的新問題自動重新執行工作流。

總結對比

特性

Workflows(工作流)

Teams(團隊)

執行方式

確定性、線性步驟

動態協作、自由分工

控制

嚴格定義的輸入輸出

由團隊領導動態調度

場景

自動化生產任務

復雜推理與多輪協作

典型用例

數據處理、報表生成、任務編排

問答系統、知識推理、內容生成

搭建工作流

Workflows 的作用

Workflow 是 Agno 的“編排層”,可以讓你像搭積木一樣組合多個智能體(Agent)、團隊(Team)或函數(Function)來形成一個完整的處理流程。

比如你可以:

  • 讓一個 Agent 先抓取數據;
  • 再讓另一個函數或 Agent 清洗數據;
  • 最后讓一個 Team 生成報告或發布結果。

Workflows 的核心構件

組件

作用

典型使用場景

??Workflow??

頂層 orchestrator(編排器),控制整個流程的執行

定義整體執行邏輯

??Step??

單個工作單元(核心執行節點)

每個 Step 可以是 Agent、Team 或 Python 函數

??Loop??

循環執行一個或多個 Step

重復運行直到條件滿足

??Parallel??

并行執行多個 Step

同時調用多個 Agent/Team 并合并結果

??Condition??

條件分支執行

根據條件決定是否執行某步

??Router??

動態路由執行

根據內容決定下一步走向(if/else 多分支邏輯)

Step 的輸入與輸出

當 Step 是函數時,Agno 提供了標準化接口:

  • ??StepInput??:每步的輸入結構體;
  • ??StepOutput???:輸出結果,包含??content?? 字段(可包含 Agent 的返回內容)。

這樣,不論 Step 是函數還是智能體,輸入輸出格式都統一了,方便后續編排和復用。

示例:混合執行工作流

from agno.workflow import Step, Workflow, StepOutput

def data_preprocessor(step_input):
    # 自定義數據預處理邏輯
    return StepOutput(content=f"Processed: {step_input.input}")

workflow = Workflow(
    name="Mixed Execution Pipeline",
    steps=[
        research_team,      # 團隊成員(Team)
        data_preprocessor,  # 自定義函數
        content_agent,      # Agent
    ]
)

workflow.print_response("Analyze the competitive landscape for fintech startups", markdown=True)

執行邏輯:

  1. 輸入“Analyze the competitive landscape for fintech startups”;
  2. ??research_team??(團隊)先執行研究;
  3. ??data_preprocessor?? 處理研究結果;
  4. ??content_agent?? 生成最終輸出;
  5. 最終在終端打印格式化的結果。

設計理念總結

Agno 的工作流設計遵循:

  • 清晰(clarity):每個 Step 只負責一件事;
  • 可組合(composability):Step 可以是 Agent、Team 或函數;
  • 可擴展(extensibility):你能輕松添加循環、并行或條件分支;
  • 數據流標準化(StepInput / StepOutput):簡化了復雜流程中的數據傳遞。

運行工作流

Workflow 執行的核心接口

Agno 提供三種運行方式:

函數

描述

返回類型

??workflow.run()??

同步運行工作流

??WorkflowRunOutput??

 對象

??workflow.arun()??

異步運行工作流

??WorkflowRunOutput??

 或異步迭代器

??workflow.print_response()??

封裝版打印輸出(內部調用 ??run()??)

直接打印 Markdown 輸出

Workflow 示例結構(標準流程)

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.db.sqlite import SqliteDb
from agno.team import Team
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.hackernews import HackerNewsTools
from agno.workflow import Workflow
from agno.utils.pprint import pprint_run_response

# 1 定義智能體
hackernews_agent = Agent(
    name="Hackernews Agent",
    model=OpenAIChat(id="gpt-5-mini"),
    tools=[HackerNewsTools()],
    role="Extract key insights from Hackernews posts",
)

web_agent = Agent(
    name="Web Agent",
    model=OpenAIChat(id="gpt-5-mini"),
    tools=[DuckDuckGoTools()],
    role="Search the web for the latest trends",
)

# 2 定義團隊
research_team = Team(
    name="Research Team",
    members=[hackernews_agent, web_agent],
    instructions="Research tech topics from Hackernews and the web",
)

# 3 定義內容規劃 Agent
content_planner = Agent(
    name="Content Planner",
    model=OpenAIChat(id="gpt-5-mini"),
    instructions=[
        "Plan a 4-week content schedule for the given topic",
        "Ensure 3 posts per week",
    ],
)

# 4 定義工作流
content_creation_workflow = Workflow(
    name="Content Creation Workflow",
    description="Automated content creation from research to scheduling",
    db=SqliteDb(db_file="tmp/workflow.db"),
    steps=[research_team, content_planner],
)

# 5 執行工作流
if __name__ == "__main__":
    response = content_creation_workflow.run(
        input="AI trends in 2024",
        markdown=True,
    )
    pprint_run_response(response, markdown=True)

執行邏輯:

  1. ??research_team?? 調用 HackerNews 和 DuckDuckGo 搜索;
  2. 輸出結果交給??content_planner??;
  3. 生成 4 周的內容計劃。

異步執行(Async)

Agno 支持異步執行 ??arun()??,可以與 FastAPI、AsyncIO 集成:

response = await workflow.arun(input="Recent breakthroughs in quantum computing")

流式輸出(Streaming)

流式執行可以實時獲取每個事件(例如步驟開始、結束、Agent 輸出):

response = workflow.run(
    input="AI trends in 2024",
    stream=True,            # 打開流模式
    stream_events=True,     # 輸出所有事件類型
)

可迭代輸出:

for event in response:
    print(event.event, event.data)

事件系統(Events)

Agno 的事件機制提供了完整的生命周期追蹤。以下是關鍵事件類型表:

分類

事件類型

描述

核心事件

??WorkflowStarted??

, ??WorkflowCompleted???, ??WorkflowError??

表示工作流開始/結束/錯誤

步驟事件

??StepStarted??

, ??StepCompleted???, ??StepError??

每個 Step 的執行狀態

條件事件

??ConditionExecutionStarted??

, ??ConditionExecutionCompleted??

條件執行的開始和結束

并行事件

??ParallelExecutionStarted??

, ??ParallelExecutionCompleted??

并行執行的開始與結束

循環事件

??LoopExecutionStarted??

, ??LoopIterationStartedEvent???, ??LoopIterationCompletedEvent???, ??LoopExecutionCompleted??

循環過程中的生命周期

路由事件

??RouterExecutionStarted??

, ??RouterExecutionCompleted??

路由控制開始/結束

這些事件都封裝在 ??WorkflowRunOutputEvent?? 對象中。

事件存儲與分析

工作流可以將所有執行事件存儲到數據庫,用于:

  • 調試(Debugging)
  • 審計(Audit Trails)
  • 性能分析(Performance)
  • 錯誤溯源(Error tracing)

from agno.run.workflow import WorkflowRunEvent

workflow = Workflow(
    name="Debug Workflow",
    store_events=True,  # 啟用事件存儲
    events_to_skip=[
        WorkflowRunEvent.step_started,  # 可過濾無用事件
        WorkflowRunEvent.parallel_execution_started,
    ],
    steps=[...]
)

存儲結果可以從:

  • ??workflow.run_response.events?? 獲取;
  • 或直接在數據庫中查詢。

關閉遙測(Telemetry)

Agno 默認會記錄模型使用統計,可關閉:

export AGNO_TELEMETRY=false

或在代碼中:

workflow = Workflow(..., telemetry=False)

適用場景總結

目標

建議用法

簡單工作流快速測試

??workflow.print_response()??


異步應用(如 FastAPI)

??await workflow.arun()??

實時輸出進度

??stream=True, stream_events=True??

生產監控 / 調試

??store_events=True??

性能優化

跳過不必要事件 ??events_to_skip??

總結

Workflows(工作流)是構建確定性、可重復自動化系統的核心工具。它通過預定義的步驟序列,將智能體、團隊和函數組織成結構化的執行流程,確保每次運行邏輯一致、結果可靠。與動態協作的Teams不同,Workflows強調流程控制和輸出可預測性,非常適合生產環境中需要嚴格編排的任務。


本文轉載自???AI 博物院?? 作者:longyunfeigu

?著作權歸作者所有,如需轉載,請注明出處,否則將追究法律責任
收藏
回復
舉報
回復
相關推薦
欧美成人高清视频| 51精品久久久久久久蜜臀| 美女被啪啪一区二区| 欧美一级黄视频| 亚洲成人一区| 精品处破学生在线二十三| 日本中文字幕片| 欧美日韩在线资源| 成人午夜精品一区二区三区| 国产国产精品人在线视| 久草手机视频在线观看| 露出调教综合另类| 欧美高清www午色夜在线视频| 日韩精品视频在线观看视频| 中文字幕在线播放| 成人福利视频在线| 成人做爽爽免费视频| 天天操天天干视频| 亚洲女同另类| 亚洲午夜精品久久久久久性色| 亚洲免费成人在线视频| 国产高清不卡| 亚洲国产人成综合网站| 亚洲国产精品综合| 日韩在线免费播放| 国产成+人+日韩+欧美+亚洲| 国产精品视频在线观看| 国产又大又黑又粗免费视频| 亚洲国产不卡| 永久免费看mv网站入口亚洲| 人妻 日韩 欧美 综合 制服| 日韩成人免费av| 欧美性猛交xxxx免费看| 亚洲国产一二三精品无码| 亚洲成人三级| 久久日韩精品一区二区五区| 国产精品18毛片一区二区| 91精品人妻一区二区三区果冻| 国产日韩欧美一区二区三区在线观看| 欧美尺度大的性做爰视频| 中文字幕第24页| 一本久久青青| 亚洲久久久久久久久久| 国产探花一区二区三区| 亚洲免费看片| 欧美日韩电影在线| 密臀av一区二区三区| 国产乱码午夜在线视频| 亚洲国产精品影院| 国产人妻人伦精品| 91高清在线观看视频| 国产精品日韩成人| 少妇免费毛片久久久久久久久| 日本一级在线观看| 91视频观看视频| 激情视频一区二区| 熟妇高潮一区二区高潮| 成人av在线一区二区| 97久久夜色精品国产九色| 国产精品久久久久久免费免熟| 奇米四色…亚洲| 国产精品99久久久久久白浆小说| 天天爱天天做天天爽| 久久国产欧美| 国产精品久久久久久久久借妻| 黄色污污网站在线观看| 日韩中文字幕区一区有砖一区| 日韩av片永久免费网站| 日韩免费av网站| 日韩黄色片在线观看| 国产精品久久中文| 一级做a爰片久久毛片16| 寂寞少妇一区二区三区| av色综合网| 亚洲狼人综合网| aaa亚洲精品| 欧美二区在线看| 成年人视频网站在线| 国产精品水嫩水嫩| 成人手机视频在线| 污片视频在线免费观看| 午夜影院在线观看欧美| 国产精品亚洲二区在线观看| 成人一级视频| 日韩免费视频线观看| 国产情侣久久久久aⅴ免费| 女人抽搐喷水高潮国产精品| 国产亚洲一区二区在线| 欧美极品aaaaabbbbb| 国产模特精品视频久久久久| 国产免费一区视频观看免费 | 免费日本黄色网址| 日韩人体视频| 精品国产欧美一区二区五十路| 欧美成人免费观看视频| 免费日韩视频| 91青草视频久久| 婷婷国产在线| 日韩理论在线观看| 成年人网站国产| av成人免费看| 亚洲黄色av网站| 一本色道久久88| 影音先锋一区| 国产拍精品一二三| 香蕉视频国产在线| 亚洲色图制服诱惑| 日韩手机在线观看视频| 欧洲精品99毛片免费高清观看 | 欧美成人视屏| 亚洲成av人影院在线观看网| 午夜在线观看av| 北条麻妃在线一区二区免费播放| 在线观看成人黄色| 日本熟妇成熟毛茸茸| 久久 天天综合| 欧美日韩国产精品一区二区| 蜜桃成人365av| 欧美久久久久久久久久| 超碰97人人干| 亚洲国产精品第一区二区三区| 国产日韩欧美日韩大片| 欧美色视频免费| 午夜视频在线观看一区二区三区| 亚洲欧美手机在线| 成人影院在线| 国产99久久精品一区二区永久免费 | 欧美jjzz| 国产精品久久久久影院日本| 亚洲av成人精品毛片| 亚洲最新在线观看| 欧美激情第3页| 国产成人三级| 国产91精品久久久| 日韩在线视频免费| 亚洲精品国产无套在线观| 天堂在线一区二区三区| 欧美久久综合网| 国产99久久精品一区二区 夜夜躁日日躁| 亚洲欧美另类日韩| 亚洲综合清纯丝袜自拍| 国产九九九视频| 久久久人成影片免费观看| 国产精品一区av| av资源种子在线观看| 日本精品一区二区三区高清 | 日韩av一二三四区| 在线一区二区三区视频| 欧美日本啪啪无遮挡网站| 国产成人免费看一级大黄| 亚洲视频在线一区二区| 特黄特黄一级片| 亚洲欧美色图| 99re视频在线播放| 色网在线观看| 亚洲国产精品99| 日韩 欧美 精品| hitomi一区二区三区精品| 全黄性性激高免费视频| 久久97精品| 日本高清视频精品| 九色在线播放| 欧美日韩另类一区| 战狼4完整免费观看在线播放版| 免费成人美女在线观看| 亚洲自拍三区| 国产精品麻豆| 欧美黄色片视频| 色噜噜在线播放| 欧美午夜美女看片| 欧美精品日韩在线| 精品一区二区三区久久| 国产精品igao激情视频| 国产精品中文字幕制服诱惑| 欧美又大又硬又粗bbbbb| 国家队第一季免费高清在线观看| 欧美性猛片xxxx免费看久爱| 少妇aaaaa| av不卡免费电影| 不卡av免费在线| 911精品美国片911久久久| www.久久艹| 欧美成人h版| 久久人人爽亚洲精品天堂| www.成人免费视频| 精品久久久久久| 顶级黑人搡bbw搡bbbb搡| 国产福利一区在线| 日本三级免费观看| 久久久久国产| 欧美一区二区综合| 日韩精品一级毛片在线播放| 色综合久久中文字幕综合网小说| 五月婷婷激情在线| 欧美日韩国产123区| 久久午夜无码鲁丝片| 国产午夜精品一区二区三区视频| 亚洲第一色av| 久久久久国产精品一区三寸| 欧美美女黄色网| 精品亚洲成人| 国产精品手机视频| 97久久网站| 欧美影院在线播放| 最新日本在线观看| 亚洲视频视频在线| 亚洲精品国偷拍自产在线观看蜜桃| 色综合一个色综合亚洲| 国产一级做a爱免费视频| 欧美激情综合五月色丁香| www男人天堂| 久久成人免费日本黄色| 777精品久无码人妻蜜桃| 欧美一区高清| 亚洲欧美日韩精品久久久| 欧美偷窥清纯综合图区| 92看片淫黄大片看国产片| 日韩毛片免费观看| 欧美激情亚洲另类| 麻豆视频网站在线观看| 国产亚洲精品va在线观看| 欧日韩在线视频| 欧美一区二区三区视频免费播放| 久久久蜜桃一区二区| 香蕉乱码成人久久天堂爱免费| 五月综合色婷婷| 国产精品色婷婷| 国精产品一区二区三区| 成人午夜大片免费观看| 手机在线观看日韩av| 看片的网站亚洲| 高清一区在线观看| 日韩综合在线视频| av免费在线播放网站| 亚洲专区在线| 男人的天堂狠狠干| 狠狠干综合网| 国产成人亚洲综合无码| 国产精品久久观看| 自拍另类欧美| 99久久www免费| 一级做a爰片久久| 欧美电影三区| 一区二区三区四区| 欧美大人香蕉在线| 亚洲欧洲国产精品久久| 日本精品黄色| 图片区小说区区亚洲五月| 国产一区日韩| 亚洲欧美国产精品桃花| 97精品一区二区| 伊人情人网综合| 国产精品久久久乱弄| 亚洲一二三区在线| 欧美a级成人淫片免费看| 正在播放精油久久| 66视频精品| 国产精品va在线观看无码| 伊人精品成人久久综合软件| www.av91| 亚洲免费中文| 欧美午夜性生活| 精品一区二区精品| 美女被艹视频网站| 成人在线视频首页| 精品人妻一区二区三区视频| 国产亚洲自拍一区| 四虎成人免费影院| 成人免费小视频| 久久久久久久极品内射| 黄色成人av网| 久久久999久久久| 制服丝袜中文字幕亚洲| 亚洲精品一级片| 日韩av在线影院| 日本国产在线| 色噜噜狠狠色综合网图区| 99热国产在线| 91精品成人久久| 国产麻豆久久| 3d精品h动漫啪啪一区二区 | 欧美13videosex性极品| 国产激情久久久| 日本一区精品视频| 久久五月天婷婷| 色喇叭免费久久综合| 成人小视频在线观看免费| 国产日韩专区| 中文国产在线观看| youjizz国产精品| 欧美激情 一区| 亚洲妇熟xx妇色黄| 九九热最新视频| 日韩你懂的在线播放| 国产网站在线播放| 久热精品视频在线| 在线观看的黄色| 亚洲wwwav| 国产欧美久久一区二区三区| 欧美精品一区二区性色a+v| 国产精品一国产精品k频道56| 亚洲第一区第二区第三区| 91日韩精品一区| 亚洲一级生活片| 欧美影视一区二区三区| 好吊色一区二区三区| 色琪琪综合男人的天堂aⅴ视频| 白浆在线视频| 91色中文字幕| 欧美亚洲精品在线| aa在线观看视频| 国产精品一卡二卡在线观看| 亚洲成人黄色av| 欧美日韩国产激情| www日本视频| 日韩中文字幕免费视频| 欧美日韩美女| 精品国产免费人成电影在线观...| 99久久夜色精品国产亚洲1000部| 国产在线青青草| 国产1区2区3区精品美女| 国产色无码精品视频国产| 91黄色免费观看| 暖暖视频在线免费观看| 午夜精品一区二区三区av| 国产95亚洲| 亚洲精品中文字幕在线| 久久久久看片| 免费a级黄色片| 亚洲成人高清在线| 午夜精品久久久久久久96蜜桃 | 欧美久久高跟鞋激| 成年人在线免费观看| 欧美在线观看网址综合| 美女一区2区| 妞干网视频在线观看| 国产成人亚洲综合a∨猫咪 | 精品高清美女精品国产区| 精品人妻av一区二区三区| 久久精品亚洲热| 91精品麻豆| 中文字幕av导航| 加勒比av一区二区| 一级片一级片一级片| 欧美日韩在线免费视频| 在线免费观看黄| 国产日韩欧美在线| 视频在线不卡免费观看| 欧美精品久久久久久久久25p| 中文久久乱码一区二区| 中文字幕+乱码+中文字幕明步| 色婷婷av一区二区三区久久| 成人国产精品| 艳母动漫在线免费观看| 国产精品一区二区久久精品爱涩 | 一本到不卡精品视频在线观看| 天天综合天天色| 欧美亚洲国产视频小说| 欧美极品在线观看| 最近中文字幕一区二区| 中文字幕一区二区三| 96亚洲精品久久久蜜桃| 久久偷看各类女兵18女厕嘘嘘| gogo大尺度成人免费视频| 小泽玛利亚av在线| 国产凹凸在线观看一区二区| 精品无码人妻一区二区三区| 亚洲第一二三四五区| 三妻四妾完整版在线观看电视剧 | 欧美久色视频| 无码任你躁久久久久久老妇| 欧美午夜电影在线| 中文日本在线观看| 99精彩视频在线观看免费| 91久久在线| 嘿嘿视频在线观看| 91精品国产综合久久婷婷香蕉 | 欧美成人免费| 91精品国产自产在线| 激情综合自拍| 久久久久亚洲av成人无码电影| 欧美日韩午夜影院| 任你弄在线视频免费观看| 欧美精品欧美精品系列c| 麻豆精品精品国产自在97香蕉 | 午夜在线播放| 国产精品久久久久久久小唯西川| 视频一区免费在线观看| 玖玖爱这里只有精品| 亚洲精品456在线播放狼人| 精品欧美日韩精品| www国产无套内射com| 国产午夜精品福利| 国产福利资源在线| 日韩av电影免费观看高清| 亚洲国产精品日韩专区av有中文 | 天天做夜夜做人人爱精品| 亚洲娇小娇小娇小| 狠狠色狠狠色综合日日小说| 国产欧美黑人|