返回顶部
d

data-pipeline-agent数据管道代理

>

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 98.0.1
安全检测
已通过
219
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

data-pipeline-agent

数据管道代理

构建、运行和监控业务数据的ETL(提取→转换→加载)管道。专注于财务数据流、API集成以及面向会计和运营团队的数据仓库加载模式。

使用场景

  • - 从API(QBO、Stripe、Salesforce、银行数据源等)提取数据
  • 清理和规范化杂乱的电子表格或CSV导出文件
  • 将多个来源的数据合并为一个规范数据集
  • 将转换后的数据加载到数据库、数据仓库或Google Sheets
  • 调度定期数据同步(每日总账提取、每周应收账款账龄刷新等)
  • 审计数据质量——检测空值、重复项、类型不匹配

不适用场景

  • - 实时流处理——使用Kafka、Kinesis或Pub/Sub处理亚秒级延迟
  • 交互式仪表盘——此代理输出数据;可视化属于BI工具范畴
  • 原始SQL查询优化——使用DBA工具处理查询计划和索引
  • 一次性手动导出——如果只发生一次,直接下载CSV即可
  • 对客户端系统的写入操作——除非获得Irfan批准,否则仅进行只读提取

管道模式

模式1:API提取→清理→CSV

python

从REST API提取,清理,输出CSV


import requests, pandas as pd, json
from datetime import datetime, timedelta

def extract(api_url, headers, params=None):
从任意REST端点拉取分页JSON数据。
results = []
while api_url:
r = requests.get(api_url, headers=headers, params=params)
r.raiseforstatus()
data = r.json()
results.extend(data.get(data, data if isinstance(data, list) else [data]))
apiurl = data.get(nextpage_url) # 分页
params = None # 仅在首次调用时传递参数
return results

def clean(records, renamemap=None, dropnullson=None, datecols=None):
规范化、重命名、解析日期、删除空值。
df = pd.DataFrame(records)
if rename_map:
df = df.rename(columns=rename_map)
if date_cols:
for col in date_cols:
df[col] = pd.to_datetime(df[col], errors=coerce)
if dropnullson:
df = df.dropna(subset=dropnullson)
df = df.drop_duplicates()
return df

def loadcsv(df, outputpath):
df.tocsv(outputpath, index=False)
print(f✅ 已保存 {len(df)} 行 → {output_path})

示例:QBO发票提取

HEADERS = {Authorization: Bearer , Accept: application/json} records = extract(https://quickbooks.api.intuit.com/v3/company//query?query=SELECT * FROM Invoice, HEADERS) df = clean(records, renamemap={TxnDate: invoicedate, TotalAmt: amount}, datecols=[invoicedate]) loadcsv(df, fdata/invoices{datetime.today().date()}.csv)

模式2:多源合并

python
import pandas as pd

def mergeglwithbank(glpath, bankpath, matchon=amount, datetolerancedays=3):

将总账条目与银行交易匹配。
标记未匹配行供人工审核。

gl = pd.readcsv(glpath, parse_dates=[date])
bank = pd.readcsv(bankpath, parse_dates=[date])

# 按金额+日期近似匹配
merged = pd.merge_asof(
gl.sort_values(date),
bank.sort_values(date),
on=date,
by=match_on,
tolerance=pd.Timedelta(days=datetolerancedays),
direction=nearest,
suffixes=(gl, bank)
)

unmatchedgl = gl[~gl.index.isin(merged.dropna(subset=[datebank]).index)]
unmatchedbank = bank[~bank.index.isin(merged.dropna(subset=[dategl]).index)]

print(f✅ 已匹配:{len(merged.dropna())} | ⚠️ 未匹配总账:{len(unmatchedgl)} | 银行:{len(unmatchedbank)})
return merged, unmatchedgl, unmatchedbank

模式3:数据质量审计

python
import pandas as pd

def auditdataset(df, requiredcols=None, expected_types=None):

运行数据质量检查。返回报告字典。

report = {
row_count: len(df),
duplicate_rows: int(df.duplicated().sum()),
nullsummary: df.isnull().sum().todict(),
issues: []
}

if required_cols:
missing = [c for c in required_cols if c not in df.columns]
if missing:
report[issues].append(f缺少必需列:{missing})

if expected_types:
for col, dtype in expected_types.items():
if col in df.columns and not pd.api.types.isdtypeequal(df[col].dtype, dtype):
report[issues].append(f{col}:期望 {dtype},实际 {df[col].dtype})

# 标记空值超过20%的列
for col, nulls in report[null_summary].items():
pct = nulls / len(df) * 100
if pct > 20:
report[issues].append(f{col}:{pct:.1f}% 为空——需审核)

return report

使用示例

df = pd.readcsv(data/araging.csv) report = audit_dataset( df, requiredcols=[customerid, invoicedate, amount, duedate], expectedtypes={amount: float64, customerid: object} ) print(report)

模式4:定时Cron管道

bash
#!/bin/bash

daily-gl-sync.sh — 通过cron或OpenClaw cron工具运行


提取总账,清理,加载到SQLite,出错时通知

set -euo pipefail
LOG=logs/gl-sync-$(date +%Y-%m-%d).log
mkdir -p logs data

echo [$(date)] 开始总账同步... | tee -a $LOG

python3 pipelines/gl_extract.py >> $LOG 2>&1 && \
python3 pipelines/gl_clean.py >> $LOG 2>&1 && \
python3 pipelines/gl_load.py >> $LOG 2>&1 && \
echo [$(date)] ✅ 总账同步完成 | tee -a $LOG || \
echo [$(date)] ❌ 总账同步失败——请检查 $LOG | tee -a $LOG

模式5:加载到SQLite/PostgreSQL

python
import pandas as pd
import sqlite3

def loadtosqlite(df, dbpath, tablename, if_exists=replace):

将DataFrame加载到SQLite。增量加载使用if_exists=append。

conn = sqlite3.connect(db_path)
df.tosql(tablename, conn, ifexists=ifexists, index=False)
conn.close()
print(f✅ 已加载 {len(df)} 行 → {dbpath}::{tablename})

PostgreSQL版本(需要psycopg2 + sqlalchemy)

from sqlalchemy import create_engine

def loadtopostgres(df, connstr, tablename, schema=public, if_exists=replace):
engine = createengine(connstr)
df.tosql(tablename, engine, schema=schema, ifexists=ifexists, index=False)
print(f✅ 已加载 {len(df)} 行 → {schema}.{table_name})



常见业务管道

应收账款账龄刷新管道

  1. 1. 提取:QBO发票API → 原始JSON
  2. 转换:计算逾期天数、账龄区间(0-30、31-60、61-90、90+)
  3. 丰富:关联客户联系数据
  4. 加载:Google Sheets应收账款账龄标签页 + SQLite归档
  5. 告警:标记超过60天的发票进入跟进队列

银行数据对账管道

  1. 1. 提取:银行API(Plaid/CSV导出)+ QBO总账
  2. 转换:规范化日期、金额、备注字段
  3. 匹配:按金额+日期模糊匹配(±3天容差)
  4. 标记:未匹配交易→人工审核CSV
  5. 加载:对账日志→SQLite + 邮件摘要

薪资→总账映射管道

  1. 1. 提取:薪资系统CSV导出(Gusto、ADP等)
  2. 转换

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 data-pipeline-agent-1776072994 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 data-pipeline-agent-1776072994 技能

通过命令行安装

skillhub install data-pipeline-agent-1776072994

下载

⬇ 下载 data-pipeline-agent v98.0.1(免费)

文件大小: 5.29 KB | 发布时间: 2026-4-15 12:25

v98.0.1 最新 2026-4-15 12:25
Corrected display name

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部