Prefect Flow Builder
Overview
Use this skill to add or refactor Prefect-managed offline workflows in this repository without mixing orchestration concerns into business logic.
Workflow
- 1. Classify the change.
- - New flow or major refactor: read
references/flow-design.md and references/template-prefect-yaml.md. - Deployment-only or config change: read
references/deployment-patterns.md. - Resource or concurrency tuning: read
references/resources-and-concurrency.md. - If the system model is unclear, read
references/architecture.md first.
- 2. Keep orchestration separate from compute.
- - Put heavy business logic in reusable job or service modules under
src/core/.... - Keep Prefect wrappers in
src/prefect_flows/.... - Use tasks for independently observable units or meaningful side effects; avoid exploding one logical step into many tiny tasks just for structure.
- Choose task invocation mode deliberately: direct call for simple serial execution,
.submit() or .map() for in-flow concurrency, and .delay() only for background execution on separate infrastructure. - If a child flow is intentionally serial, call tasks directly inside the loop and let the child flow own error aggregation and final failure semantics. Do not default to
.submit() just because the unit is a task. - Reserve
.submit() for cases that actually need Prefect futures, parallel fan-out, or non-blocking wait and collection behavior, and make sure terminal futures are resolved before the flow returns. - Introduce child flows only when you need a separate scaling, resource, or failure boundary.
- 3. Put concurrency controls at the right layer.
- - Use task-runner concurrency for in-flow task execution only; it is not a substitute for deployment or infrastructure throttling.
- Use deployment, work-pool, worker, or work-queue limits to control how many flow runs the platform launches.
- Use tag-based concurrency limits when many tasks across flows share an external bottleneck such as a database, API, or memory-heavy resource.
- 4. Treat
prefect.yaml as deployment source of truth.
- - Put deployment names, schedules, work pool selection, resources, concurrency policies, and deployment parameter defaults in
prefect.yaml. - Let CI provide only deploy-time values such as
PREFECT_API_URL and PREFECT_DEPLOY_IMAGE. - Keep runtime business env in Kubernetes via
env_from, not CI. - Keep infrastructure choices aligned with Prefect's worker model: deployments target work pools, and workers poll compatible pools to execute runs.
- 5. Make failures observable.
- - Raise exceptions instead of returning non-zero codes silently.
- Include key context in exception messages and logs.
- Use readable task and flow names so Prefect UI can identify the failing unit quickly.
- When a batch must continue after per-item failures, convert each item into a structured result, finish the batch, then raise once at the flow boundary if the aggregate outcome should be failed.
- 6. Validate in this order.
- - Run focused tests for changed flow and job code.
- Review
prefect.yaml for deployment or resource drift. - Trigger a small manual run before enabling or changing schedules.
- If the question is about Prefect semantics rather than repo conventions, verify against the official v3 docs before codifying a pattern.
Repository Anchors
- - Flow entrypoints: INLINECODE18
- Reusable compute logic: INLINECODE19
- Deployment definitions: INLINECODE20
- Team guide: INLINECODE21
Prefect v3 Defaults
- - Tasks support three distinct execution modes in v3: direct call blocks and returns a result,
.submit() returns a PrefectFuture for concurrent execution in the same flow, and .delay() is for background execution on separate workers. - Task runners are optional. If you are not intentionally using concurrency, do not introduce
.submit() or a task runner configuration just to preserve the task decorator. - Task states are orchestrated client-side and may appear with eventual consistency in the UI, so design recovery and resume logic around durable business outcomes instead of assuming every intermediate task-state transition is the source of truth.
References
- -
references/architecture.md for the system model and lifecycle. - INLINECODE27 for deployment design and config ownership.
- INLINECODE28 for wrapping existing jobs as flows and tasks.
- INLINECODE29 for sizing and concurrency decisions.
- INLINECODE30 for reusable deployment templates.
- Prefect v3 Tasks: https://docs.prefect.io/v3/concepts/tasks
- Prefect v3 Task runners: https://docs.prefect.io/v3/concepts/task-runners/
- Prefect v3 Flows: https://docs.prefect.io/v3/concepts/flows
- Prefect v3 Deployments: https://docs.prefect.io/v3/concepts/deployments
- Prefect v3 Workers: https://docs.prefect.io/v3/concepts/workers
- Prefect v3 Tag-based concurrency limits: https://docs.prefect.io/v3/concepts/tag-based-concurrency-limits
Prefect Flow Builder
概述
使用此技能在此仓库中添加或重构Prefect管理的离线工作流,避免将编排关注点混入业务逻辑中。
工作流程
- 1. 对变更进行分类。
- 新流程或重大重构:阅读 references/flow-design.md 和 references/template-prefect-yaml.md。
- 仅部署或配置变更:阅读 references/deployment-patterns.md。
- 资源或并发调优:阅读 references/resources-and-concurrency.md。
- 如果系统模型不清晰,请先阅读 references/architecture.md。
- 2. 保持编排与计算分离。
- 将繁重的业务逻辑放在 src/core/... 下可复用的作业或服务模块中。
- 将Prefect包装器保留在 src/prefect_flows/... 中。
- 对可独立观察的单元或有意义的副作用使用任务;避免为了结构而将一个逻辑步骤拆分成许多微小任务。
- 有意识地选择任务调用模式:简单串行执行使用直接调用,流程内并发使用 .submit() 或 .map(),仅在独立基础设施上后台执行时使用 .delay()。
- 如果子流程是有意串行的,则在循环内直接调用任务,让子流程负责错误聚合和最终失败语义。不要因为单元是任务就默认使用 .submit()。
- 仅在确实需要Prefect futures、并行扇出或非阻塞等待和收集行为时使用 .submit(),并确保在流程返回前解析终端futures。
- 仅当需要独立的扩展、资源或失败边界时才引入子流程。
- 3. 将并发控制放在正确的层级。
- 仅在流程内任务执行时使用任务运行器并发;它不能替代部署或基础设施节流。
- 使用部署、工作池、工作器或工作队列限制来控制平台启动的流程运行数量。
- 当跨流程的许多任务共享外部瓶颈(如数据库、API或内存密集型资源)时,使用基于标签的并发限制。
- 4. 将 prefect.yaml 视为部署的单一事实来源。
- 将部署名称、调度、工作池选择、资源、并发策略和部署参数默认值放在 prefect.yaml 中。
- 让CI仅提供部署时值,如 PREFECT
APIURL 和 PREFECT
DEPLOYIMAGE。
- 通过 env_from 在Kubernetes中保留运行时业务环境,而非CI。
- 保持基础设施选择与Prefect的工作器模型一致:部署针对工作池,工作器轮询兼容池以执行运行。
- 5. 使失败可观测。
- 抛出异常,而不是静默返回非零代码。
- 在异常消息和日志中包含关键上下文。
- 使用可读的任务和流程名称,以便Prefect UI快速识别失败的单元。
- 当批次必须在逐项失败后继续时,将每个项转换为结构化结果,完成批次,然后在流程边界处一次性抛出(如果聚合结果应为失败)。
- 6. 按此顺序验证。
- 对更改的流程和作业代码运行针对性测试。
- 检查 prefect.yaml 是否存在部署或资源漂移。
- 在启用或更改调度前,触发一次小型手动运行。
- 如果问题涉及Prefect语义而非仓库约定,请在编码模式前对照官方v3文档进行验证。
仓库锚点
- - 流程入口点:src/prefectflows/
- 可复用的计算逻辑:src/core/
- 部署定义:prefect.yaml
- 团队指南:docs/cgofflineprediction/prefectorchestration_overview.md
Prefect v3 默认值
- - v3中的任务支持三种不同的执行模式:直接调用会阻塞并返回结果,.submit() 返回 PrefectFuture 用于同一流程内的并发执行,.delay() 用于在独立工作器上后台执行。
- 任务运行器是可选的。如果你不打算使用并发,不要仅仅为了保留任务装饰器而引入 .submit() 或任务运行器配置。
- 任务状态在客户端编排,可能在UI中呈现最终一致性,因此请围绕持久化的业务结果设计恢复和重试逻辑,而不是假设每个中间任务状态转换都是事实来源。
参考资料
- - references/architecture.md:系统模型和生命周期
- references/deployment-patterns.md:部署设计和配置所有权
- references/flow-design.md:将现有作业包装为流程和任务
- references/resources-and-concurrency.md:规模调整和并发决策
- references/template-prefect-yaml.md:可复用的部署模板
- Prefect v3 任务:https://docs.prefect.io/v3/concepts/tasks
- Prefect v3 任务运行器:https://docs.prefect.io/v3/concepts/task-runners/
- Prefect v3 流程:https://docs.prefect.io/v3/concepts/flows
- Prefect v3 部署:https://docs.prefect.io/v3/concepts/deployments
- Prefect v3 工作器:https://docs.prefect.io/v3/concepts/workers
- Prefect v3 基于标签的并发限制:https://docs.prefect.io/v3/concepts/tag-based-concurrency-limits