Agent Task Queue
Use this skill when a task needs queue-based orchestration for multiple agents or workers. The bundled runtime is in src/ and covers:
- - Priority queue, delayed queue, and dead-letter queue behavior
- Scheduler polling with concurrency limits, retries, and timeouts
- Dependency validation, dependency gating, and result propagation
- Task logs and aggregate metrics
- Pluggable storage through in-memory, SQLite, or Redis backends
Workflow
- 1. Import
TaskQueue and Scheduler from src/index.ts. - Pick storage:
- Default
InMemoryStorage for local execution or tests
-
SQLiteStorage for single-node persistence
-
RedisStorage for distributed workers
- 3. Register handlers with
scheduler.register(taskType, handler). - Enqueue tasks with
priority, runAt, dependencies, retryPolicy, and timeoutMs as needed. - Drive execution via
scheduler.tick() or scheduler.start(). - Inspect
queue.logs(), queue.metrics(), queue.getSnapshot(), and queue.get(taskId) for state and traceability.
Key Files
- -
src/TaskQueue.ts: queue lifecycle, ready/dead-letter snapshots, logs, metrics - INLINECODE20 : polling loop, concurrency control, retries, timeout handling
- INLINECODE21 : DAG validation and dependency result propagation
- INLINECODE22 : storage implementations
- INLINECODE23 : behavior coverage
- INLINECODE24 : end-to-end usage
Implementation Notes
- - Dependency tasks must already exist when a dependent task is enqueued.
- A task becomes runnable only after all dependencies are
completed. - Completed dependency results are stored and exposed to downstream handlers through
context.dependencies. - When retries are exhausted, the task is moved to
dead_letter. - Timeout cancellation uses
AbortSignal; long-running handlers should watch context.signal.
Validation
Run:
CODEBLOCK0
If Redis or SQLite packages are unavailable in the environment, install dependencies first with npm install.
Agent Task Queue
当任务需要基于队列的多代理或多工作者编排时,使用此技能。打包的运行时位于 src/ 目录中,涵盖以下功能:
- - 优先级队列、延迟队列和死信队列行为
- 带有并发限制、重试和超时机制的调度器轮询
- 依赖验证、依赖门控和结果传播
- 任务日志和聚合指标
- 通过内存、SQLite 或 Redis 后端实现可插拔存储
工作流程
- 1. 从 src/index.ts 导入 TaskQueue 和 Scheduler。
- 选择存储方式:
- 本地执行或测试使用默认的 InMemoryStorage
- 单节点持久化使用 SQLiteStorage
- 分布式工作者使用 RedisStorage
- 3. 使用 scheduler.register(taskType, handler) 注册处理器。
- 根据需要设置 priority、runAt、dependencies、retryPolicy 和 timeoutMs 来入队任务。
- 通过 scheduler.tick() 或 scheduler.start() 驱动执行。
- 使用 queue.logs()、queue.metrics()、queue.getSnapshot() 和 queue.get(taskId) 检查状态和可追溯性。
关键文件
- - src/TaskQueue.ts:队列生命周期、就绪/死信快照、日志、指标
- src/Scheduler.ts:轮询循环、并发控制、重试、超时处理
- src/DependencyManager.ts:DAG 验证和依赖结果传播
- src/storage/:存储实现
- tests/task-queue.test.ts:行为覆盖测试
- examples/basic.ts:端到端使用示例
实现说明
- - 当依赖任务入队时,依赖任务必须已经存在。
- 任务只有在所有依赖项都 completed 后才能变为可运行状态。
- 已完成的依赖结果会被存储,并通过 context.dependencies 暴露给下游处理器。
- 当重试次数耗尽时,任务会被移动到 dead_letter。
- 超时取消使用 AbortSignal;长时间运行的处理器应监听 context.signal。
验证
运行:
bash
npm run check
如果环境中没有 Redis 或 SQLite 包,请先使用 npm install 安装依赖。