Data Pipeline Builder
Build ETL pipelines without code.
What This Does
- - Extract — Pull data from databases, APIs, files, cloud storage
- Transform — Clean, filter, aggregate, join, enrich
- Load — Push to databases, APIs, files, cloud storage
- Schedule — Run pipelines on cron schedules
- Monitor — Track pipeline health and performance
Quick Start
CODEBLOCK0
Supported Sources
| Source | Type | Auth |
|---|
| PostgreSQL | Database | Connection string |
| MySQL |
Database | Connection string |
| MongoDB | Database | Connection string |
| SQLite | Database | File path |
| Google Sheets | Cloud | OAuth / API Key |
| Airtable | Cloud | API Key |
| Notion | Cloud | API Key |
| REST API | API | Bearer / API Key |
| GraphQL | API | Bearer / API Key |
| CSV | File | File path |
| JSON | File | File path |
| S3 | Cloud | Access Key |
| GCS | Cloud | Service Account |
Supported Destinations
Same as sources, plus:
- - Webhooks
- Email
- Slack
- Discord
Commands
create
Create a new pipeline.
CODEBLOCK1
extract
Configure extraction step.
CODEBLOCK2
transform
Add transformation step.
CODEBLOCK3
load
Configure load step.
CODEBLOCK4
run
Execute pipeline.
CODEBLOCK5
schedule
Schedule pipeline.
CODEBLOCK6
status
Check pipeline status.
CODEBLOCK7
Example Pipelines
1. Sync PostgreSQL to Google Sheets
CODEBLOCK8
2. API to Database
CODEBLOCK9
3. CSV to Airtable
CODEBLOCK10
Pipeline Configuration
Pipelines are stored as JSON:
CODEBLOCK11
Monitoring
CODEBLOCK12
See Also
- -
references/connectors.md — Source/destination connectors - INLINECODE1 — Transformation functions
- INLINECODE2 — Main implementation
数据管道构建器
无需编码即可构建ETL管道。
功能说明
- - 提取 — 从数据库、API、文件、云存储中拉取数据
- 转换 — 清洗、过滤、聚合、连接、丰富数据
- 加载 — 推送至数据库、API、文件、云存储
- 调度 — 按cron计划运行管道
- 监控 — 追踪管道健康状态与性能
快速开始
bash
创建管道
neckr0ik-etl-builder create --name sync-users --source postgres --destination sheets
添加转换
neckr0ik-etl-builder transform --pipeline sync-users --type filter --field active --value true
运行管道
neckr0ik-etl-builder run --name sync-users
调度管道
neckr0ik-etl-builder schedule --name sync-users --cron 0
支持的数据源
| 数据源 | 类型 | 认证方式 |
|---|
| PostgreSQL | 数据库 | 连接字符串 |
| MySQL |
数据库 | 连接字符串 |
| MongoDB | 数据库 | 连接字符串 |
| SQLite | 数据库 | 文件路径 |
| Google Sheets | 云服务 | OAuth / API密钥 |
| Airtable | 云服务 | API密钥 |
| Notion | 云服务 | API密钥 |
| REST API | API | Bearer令牌 / API密钥 |
| GraphQL | API | Bearer令牌 / API密钥 |
| CSV | 文件 | 文件路径 |
| JSON | 文件 | 文件路径 |
| S3 | 云服务 | 访问密钥 |
| GCS | 云服务 | 服务账号 |
支持的目标端
与数据源相同,另加:
- - Webhooks
- 电子邮件
- Slack
- Discord
命令
create
创建新管道。
bash
neckr0ik-etl-builder create --name <名称> [选项]
选项:
--source <类型> 数据源类型(postgres, mysql, api, csv...)
--destination <类型> 目标端类型
--config <文件> 配置文件
extract
配置提取步骤。
bash
neckr0ik-etl-builder extract --pipeline <名称> [选项]
选项:
--table <名称> 要提取的表(适用于数据库)
--query 自定义查询
--endpoint API端点
--file <路径> 文件路径
transform
添加转换步骤。
bash
neckr0ik-etl-builder transform --pipeline <名称> [选项]
转换类型:
filter 按条件过滤行
map 映射字段值
aggregate 聚合数据(求和、计数、平均值...)
join 与另一个数据源连接
enrich 用外部数据丰富
clean 清洗空值、修剪字符串
validate 验证数据质量
load
配置加载步骤。
bash
neckr0ik-etl-builder load --pipeline <名称> [选项]
选项:
--mode <模式> 加载模式(追加、替换、更新插入)
--table <名称> 目标表
--mapping <文件> 字段映射
run
执行管道。
bash
neckr0ik-etl-builder run --name <名称> [选项]
选项:
--dry-run 预览而不执行
--limit <数量> 仅处理N条记录
--parallel 并行运行阶段
schedule
调度管道。
bash
neckr0ik-etl-builder schedule --name <名称> --cron <表达式>
status
检查管道状态。
bash
neckr0ik-etl-builder status --name <名称>
示例管道
1. 同步PostgreSQL到Google Sheets
bash
创建管道
neckr0ik-etl-builder create --name user-sync --source postgres --destination sheets
配置提取
neckr0ik-etl-builder extract --pipeline user-sync \
--query SELECT * FROM users WHERE updated_at > NOW() - INTERVAL 1 day
添加转换
neckr0ik-etl-builder transform --pipeline user-sync --type clean
neckr0ik-etl-builder transform --pipeline user-sync --type filter --field active --value true
按小时调度
neckr0ik-etl-builder schedule --name user-sync --cron 0
2. API到数据库
bash
创建管道
neckr0ik-etl-builder create --name api-sync --source api --destination postgres
配置提取
neckr0ik-etl-builder extract --pipeline api-sync \
--endpoint https://api.example.com/users \
--auth bearer \
--token $API_TOKEN
转换
neckr0ik-etl-builder transform --pipeline api-sync --type map --field id --to user_id
neckr0ik-etl-builder transform --pipeline api-sync --type clean
加载
neckr0ik-etl-builder load --pipeline api-sync --table api_users --mode upsert
3. CSV到Airtable
bash
创建管道
neckr0ik-etl-builder create --name csv-import --source csv --destination airtable
配置
neckr0ik-etl-builder extract --pipeline csv-import --file ./data.csv
neckr0ik-etl-builder transform --pipeline csv-import --type clean
neckr0ik-etl-builder load --pipeline csv-import --table Imports --mapping ./mapping.json
管道配置
管道以JSON格式存储:
json
{
name: user-sync,
source: {
type: postgres,
connection: postgresql://...,
query: SELECT * FROM users
},
transformations: [
{type: filter, field: active, value: true},
{type: clean},
{type: map, from: id, to: user_id}
],
destination: {
type: google_sheets,
spreadsheet_id: ...,
range: Sheet1!A1
},
schedule: 0
}
监控
bash
查看管道历史
neckr0ik-etl-builder history --name user-sync --limit 10
查看失败运行
neckr0ik-etl-builder failures --name user-sync
导出日志
neckr0ik-etl-builder logs --name user-sync --output ./logs.json
参见
- - references/connectors.md — 数据源/目标端连接器
- references/transforms.md — 转换函数
- scripts/pipeline.py — 主要实现