Agent Workflow Designer
层级: 强大
类别: 工程
领域: 多智能体系统 / AI编排
设计生产级多智能体编排系统。涵盖五种核心模式(顺序流水线、并行扇出/扇入、层级委派、事件驱动、共识)、平台特定实现、交接协议、状态管理、错误恢复、上下文窗口预算和成本优化。
任务是顺序执行的吗(每一步需要前一步的输出)?
是 → 顺序流水线
否 → 任务可以并行运行吗?
是 → 并行扇出/扇入
否 → 存在决策层级吗?
是 → 层级委派
否 → 是事件触发的吗?
是 → 事件驱动
否 → 需要共识/验证吗?
是 → 共识模式
使用场景: 每一步依赖前一步的输出。研究 → 草稿 → 审校 → 润色。
python
@dataclass
class PipelineStage:
name: str
system_prompt: str
input_key: str # 从状态中获取什么
output_key: str # 向状态写入什么
model: str = claude-3-5-sonnet-20241022
max_tokens: int = 2048
class SequentialPipeline:
def init(self, stages: list[PipelineStage]):
self.stages = stages
self.client = anthropic.Anthropic()
def run(self, initial_input: str) -> dict:
state = {input: initial_input}
for stage in self.stages:
print(f[{stage.name}] 处理中...)
stageinput = state.get(stage.inputkey, )
response = self.client.messages.create(
model=stage.model,
maxtokens=stage.maxtokens,
system=stage.system_prompt,
messages=[{role: user, content: stage_input}],
)
state[stage.output_key] = response.content[0].text
state[f{stage.name}tokens] = response.usage.inputtokens + response.usage.output_tokens
print(f[{stage.name}] 完成。Token数:{state[f{stage.name}_tokens]})
return state
使用场景: 可以并发运行的独立任务。同时研究5个竞争对手。
python
async def runagent(client, taskname: str-system-str-user-str-model-strclaude-3-5-sonnet-20241022) -> dict:
单次异步智能体调用
loop = asyncio.geteventloop()
def _call():
return client.messages.create(
model=model,
max_tokens=2048,
system=system,
messages=[{role: user, content: user}],
)
response = await loop.runinexecutor(None, _call)
return {
task: task_name,
output: response.content[0].text,
tokens: response.usage.inputtokens + response.usage.outputtokens,
}
async def parallelresearch(competitors: list[str], researchtype: str) -> dict:
扇出:并行研究所有竞争对手。扇入:综合结果。
client = anthropic.Anthropic()
# 扇出:生成并行智能体调用
tasks = [
run_agent(
client,
task_name=competitor,
system=f你是一名竞争情报分析师。研究{competitor}并提供:定价、关键功能、目标市场和已知弱点。,
user=f分析{competitor},与我们在{research_type}市场的产品进行比较。,
)
for competitor in competitors
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 优雅处理失败
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
if failed:
print(f警告:{len(failed)}个研究任务失败:{failed})
# 扇入:综合
combined_research = \n\n.join([
f## {r[task]}\n{r[output]} for r in successful
])
synthesis = await run_agent(
client,
task_name=综合分析师,
system=你是一名战略分析师。将竞争对手研究综合成简洁的比较矩阵和战略建议。,
user=f综合以下竞争对手分析:\n\n{combined_research},
model=claude-3-5-sonnet-20241022,
)
return {
individual_analyses: successful,
synthesis: synthesis[output],
total_tokens: sum(r[tokens] for r in successful) + synthesis[tokens],
}
使用场景: 需要发现子任务的复杂任务。编排器分解工作,委派给专家。
python
ORCHESTRATOR_SYSTEM = 你是一名编排智能体。你的工作是:
可用专家:
以JSON格式回复计划:
{
subtasks: [
{id: 1, agent: 研究员, task: ..., depends_on: []},
{id: 2, agent: 写手, task: ..., depends_on: [1]}
]
}
SPECIALIST_SYSTEMS = {
研究员: 你是一名研究专家。查找准确、相关的信息,并尽可能引用来源。,
写手: 你是一名专业写手。以要求的格式创建清晰、引人入胜的内容。,
程序员: 你是一名资深软件工程师。编写整洁、注释完善且带有错误处理的代码。,
分析师: 你是一名数据分析师。提供结构化的分析,并附有证据支持的结论。,
}
class HierarchicalOrchestrator:
def init(self):
self.client = anthropic.Anthropic()
def run(self, user_request: str) -> str:
# 1. 编排器创建计划
plan_response = self.client.messages.create(
model=claude-3-5-sonnet-20241022,
max_tokens=1024,
system=ORCHESTRATOR_SYSTEM,
messages=[{role: user, content: user_request}],
)
plan = json.loads(plan_response.content[0].text)
results = {}
# 2. 执行子任务,尊重依赖关系
for subtask in self.topologicalsort(plan[subtasks]):
context = self.buildcontext(subtask, results
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 agent-workflow-designer-1776175170 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 agent-workflow-designer-1776175170 技能
skillhub install agent-workflow-designer-1776175170
文件大小: 6.1 KB | 发布时间: 2026-4-15 12:11