>
构建、运行和监控业务数据的ETL(提取→转换→加载)管道。专注于财务数据流、API集成以及面向会计和运营团队的数据仓库加载模式。
python
def extract(api_url, headers, params=None):
从任意REST端点拉取分页JSON数据。
results = []
while api_url:
r = requests.get(api_url, headers=headers, params=params)
r.raiseforstatus()
data = r.json()
results.extend(data.get(data, data if isinstance(data, list) else [data]))
apiurl = data.get(nextpage_url) # 分页
params = None # 仅在首次调用时传递参数
return results
def clean(records, renamemap=None, dropnullson=None, datecols=None):
规范化、重命名、解析日期、删除空值。
df = pd.DataFrame(records)
if rename_map:
df = df.rename(columns=rename_map)
if date_cols:
for col in date_cols:
df[col] = pd.to_datetime(df[col], errors=coerce)
if dropnullson:
df = df.dropna(subset=dropnullson)
df = df.drop_duplicates()
return df
def loadcsv(df, outputpath):
df.tocsv(outputpath, index=False)
print(f✅ 已保存 {len(df)} 行 → {output_path})
python
import pandas as pd
def mergeglwithbank(glpath, bankpath, matchon=amount, datetolerancedays=3):
将总账条目与银行交易匹配。
标记未匹配行供人工审核。
gl = pd.readcsv(glpath, parse_dates=[date])
bank = pd.readcsv(bankpath, parse_dates=[date])
# 按金额+日期近似匹配
merged = pd.merge_asof(
gl.sort_values(date),
bank.sort_values(date),
on=date,
by=match_on,
tolerance=pd.Timedelta(days=datetolerancedays),
direction=nearest,
suffixes=(gl, bank)
)
unmatchedgl = gl[~gl.index.isin(merged.dropna(subset=[datebank]).index)]
unmatchedbank = bank[~bank.index.isin(merged.dropna(subset=[dategl]).index)]
print(f✅ 已匹配:{len(merged.dropna())} | ⚠️ 未匹配总账:{len(unmatchedgl)} | 银行:{len(unmatchedbank)})
return merged, unmatchedgl, unmatchedbank
python
import pandas as pd
def auditdataset(df, requiredcols=None, expected_types=None):
运行数据质量检查。返回报告字典。
report = {
row_count: len(df),
duplicate_rows: int(df.duplicated().sum()),
nullsummary: df.isnull().sum().todict(),
issues: []
}
if required_cols:
missing = [c for c in required_cols if c not in df.columns]
if missing:
report[issues].append(f缺少必需列:{missing})
if expected_types:
for col, dtype in expected_types.items():
if col in df.columns and not pd.api.types.isdtypeequal(df[col].dtype, dtype):
report[issues].append(f{col}:期望 {dtype},实际 {df[col].dtype})
# 标记空值超过20%的列
for col, nulls in report[null_summary].items():
pct = nulls / len(df) * 100
if pct > 20:
report[issues].append(f{col}:{pct:.1f}% 为空——需审核)
return report
bash
#!/bin/bash
set -euo pipefail
LOG=logs/gl-sync-$(date +%Y-%m-%d).log
mkdir -p logs data
echo [$(date)] 开始总账同步... | tee -a $LOG
python3 pipelines/gl_extract.py >> $LOG 2>&1 && \
python3 pipelines/gl_clean.py >> $LOG 2>&1 && \
python3 pipelines/gl_load.py >> $LOG 2>&1 && \
echo [$(date)] ✅ 总账同步完成 | tee -a $LOG || \
echo [$(date)] ❌ 总账同步失败——请检查 $LOG | tee -a $LOG
python
import pandas as pd
import sqlite3
def loadtosqlite(df, dbpath, tablename, if_exists=replace):
将DataFrame加载到SQLite。增量加载使用if_exists=append。
conn = sqlite3.connect(db_path)
df.tosql(tablename, conn, ifexists=ifexists, index=False)
conn.close()
print(f✅ 已加载 {len(df)} 行 → {dbpath}::{tablename})
def loadtopostgres(df, connstr, tablename, schema=public, if_exists=replace):
engine = createengine(connstr)
df.tosql(tablename, engine, schema=schema, ifexists=ifexists, index=False)
print(f✅ 已加载 {len(df)} 行 → {schema}.{table_name})
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 data-pipeline-agent-1776072994 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 data-pipeline-agent-1776072994 技能
skillhub install data-pipeline-agent-1776072994
文件大小: 5.29 KB | 发布时间: 2026-4-15 12:25