Timeplus Streaming SQL Guide
You are an expert in Timeplus — a high-performance real-time streaming analytics
platform built on a streaming SQL engine (Proton). You write correct, efficient
Timeplus SQL and execute it via the ClickHouse-compatible HTTP API.
Quick Reference
| Task | Reference |
|---|
| Get data in | INLINECODE0 |
| Transform data |
references/TRANSFORMATIONS.md |
| Send data out |
references/SINKS.md |
| Full SQL syntax, types, functions |
references/SQL_REFERENCE.md |
| Random streams (simulated data) |
references/RANDOM_STREAMS.md |
| Python & JavaScript UDFs |
references/UDFS.md |
| Python Table Functions |
references/Python_TABLE_FUNCTION.md |
Executing SQL
Environment Setup
Always use these environment variables — never hardcode credentials:
CODEBLOCK0
Running SQL via curl (port 8123)
Port 8123 is the ClickHouse-compatible HTTP interface. Use it for all DDL and
historical queries (CREATE, DROP, INSERT, SELECT from table(...)).
Always use username password with -u option
NOTE, if the curl returns nothing, it is not an error, it means the query returns no records. You can check the HTTP status code to confirm success (200 OK) or failure (4xx/5xx).
CODEBLOCK1
Health check:
CODEBLOCK2
DDL example — create a stream:
CODEBLOCK3
Historical query with JSON output:
CODEBLOCK4
Insert data:
CODEBLOCK5
Streaming Ingest via REST API (port 3218)
For pushing event batches into a stream:
CODEBLOCK6
Output Formats
Append ?default_format=<format> to the URL:
| Format | Use Case |
|---|
| INLINECODE9 | Default, human-readable |
| INLINECODE10 |
One JSON object per line |
|
JSONCompact | Compact JSON array |
|
CSV | Comma-separated |
|
Vertical | Column-per-line, for inspection |
Core Concepts
Streaming vs Historical Queries
CODEBLOCK7
The _tp_time Column
Every stream has a built-in _tp_time datetime64(3, 'UTC') event-time column.
It defaults to ingestion time. You can set a custom event-time column via
SETTINGS event_time_column='your_column' when creating the stream.
Stream Modes
| Mode | Created With | Behavior |
|---|
| INLINECODE17 | INLINECODE18 (default) | Immutable log, new rows only |
| INLINECODE19 |
+ SETTINGS mode='versioned_kv' | Latest value per primary key |
|
changelog_kv |
+ SETTINGS mode='changelog_kv' | Insert/Update/Delete tracking |
|
mutable |
CREATE MUTABLE STREAM | Row-level UPDATE/DELETE (Enterprise) |
Common Patterns
Pattern 1: Create stream → insert → query
CODEBLOCK8
Pattern 2: Window aggregation (streaming)
CODEBLOCK9
Pattern 3: Materialized view pipeline
CODEBLOCK10
Pattern 4: Random stream for testing
echo "CREATE RANDOM STREAM IF NOT EXISTS mock_sensors (
device_id string DEFAULT 'device-' || to_string(rand() % 10),
temperature float32 DEFAULT 20 + (rand() % 30),
status string DEFAULT ['ok','warn','error'][rand() % 3 + 1]
) SETTINGS eps=5" | \
curl "http://${TIMEPLUS_HOST}:8123/" \
-u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
--data-binary @-
Error Handling
Common errors and fixes:
| Error | Cause | Fix |
|---|
| INLINECODE25 | Wrong host/port | Check TIMEPLUS_HOST and port 8123 is open |
| INLINECODE27 |
Wrong credentials | Check
TIMEPLUS_USER /
TIMEPLUS_PASSWORD |
|
Stream already exists | Duplicate CREATE | Use
CREATE STREAM IF NOT EXISTS |
|
Unknown column | Typo or wrong stream | Run
DESCRIBE stream_name to check schema |
|
Streaming query timeout | Using streaming on port 8123 | Wrap with
table() for historical query |
|
Type mismatch | Wrong data type | Use explicit cast:
cast(val, 'float32') |
Inspect a stream:
CODEBLOCK12
List all streams:
CODEBLOCK13
Explain a query:
echo "EXPLAIN SELECT * FROM tumble(sensor_data, 1m) GROUP BY window_start" | \
curl "http://${TIMEPLUS_HOST}:8123/" \
-u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
--data-binary @-
When to Read Reference Files
Load the relevant reference file when the user's request requires deeper knowledge:
- - Creating or modifying streams, external streams, sources → INLINECODE38
- Window functions, JOINs, CTEs, materialized views, aggregations → INLINECODE39
- Sinks, external tables, Kafka output, webhooks → INLINECODE40
- Data types, full function catalog, query settings, all DDL → INLINECODE41
- Simulating data, random streams, test data generation → INLINECODE42
- Writing Python UDFs, JavaScript UDFs, remote UDFs, SQL lambdas → INLINECODE43
- Python Table Functions → INLINECODE44
- Scheduled Tasks → INLINECODE45
- Alerts → INLINECODE46
Timeplus 流式 SQL 指南
您是 Timeplus 的专家——这是一个基于流式 SQL 引擎 (Proton) 构建的高性能实时流式分析平台。您能编写正确、高效的 Timeplus SQL,并通过兼容 ClickHouse 的 HTTP API 执行。
快速参考
| 任务 | 参考文件 |
|---|
| 数据接入 | references/INGESTION.md |
| 数据转换 |
references/TRANSFORMATIONS.md |
| 数据输出 | references/SINKS.md |
| 完整 SQL 语法、类型、函数 | references/SQL_REFERENCE.md |
| 随机流(模拟数据) | references/RANDOM_STREAMS.md |
| Python 和 JavaScript UDF | references/UDFS.md |
| Python 表函数 | references/Python
TABLEFUNCTION.md |
执行 SQL
环境设置
始终使用以下环境变量——切勿硬编码凭据:
- - TIMEPLUSHOST # 主机名或 IP
- TIMEPLUSUSER # 用户名
- TIMEPLUS_PASSWORD # 密码(可为空)
通过 curl 运行 SQL(端口 8123)
端口 8123 是兼容 ClickHouse 的 HTTP 接口。用于所有 DDL 和历史查询(CREATE、DROP、INSERT、从 table(...) 进行 SELECT)。始终使用 -u 选项指定用户名和密码。
注意,如果 curl 返回空值,并非错误,而是表示查询未返回任何记录。您可以通过检查 HTTP 状态码来确认成功(200 OK)或失败(4xx/5xx)。
bash
标准模式——将 SQL 通过管道传递给 curl
echo 您的 SQL 语句 | curl http://${TIMEPLUS_HOST}:8123/ \
-u ${TIMEPLUS
USER}:${TIMEPLUSPASSWORD} \
--data-binary @-
健康检查:
bash
curl http://${TIMEPLUS_HOST}:8123/
返回:Ok.
DDL 示例——创建流:
bash
echo CREATE STREAM IF NOT EXISTS sensor_data (
device_id string,
temperature float32,
ts datetime64(3, UTC) DEFAULT now64(3, UTC)
) SETTINGS logstoreretentionms=86400000 | \
curl http://${TIMEPLUS_HOST}:8123/ \
-u ${TIMEPLUSUSER}:${TIMEPLUSPASSWORD} \
--data-binary @-
带 JSON 输出的历史查询:
bash
echo SELECT * FROM table(sensor_data) LIMIT 10 | \
curl http://${TIMEPLUSHOST}:8123/?defaultformat=JSONEachRow \
-u ${TIMEPLUSUSER}:${TIMEPLUSPASSWORD} \
--data-binary @-
插入数据:
bash
echo INSERT INTO sensordata (deviceid, temperature) VALUES (dev-1, 23.5), (dev-2, 18.2) | \
curl http://${TIMEPLUS_HOST}:8123/ \
-u ${TIMEPLUSUSER}:${TIMEPLUSPASSWORD} \
--data-binary @-
通过 REST API 进行流式数据摄入(端口 3218)
用于将事件批次推送到流中:
bash
curl -s -X POST http://${TIMEPLUSHOST}:3218/proton/v1/ingest/streams/sensordata \
-H Content-Type: application/json \
-d {
columns: [device_id, temperature],
data: [
[dev-1, 23.5],
[dev-2, 18.2],
[dev-3, 31.0]
]
}
输出格式
在 URL 后追加 ?default_format=<格式>:
| 格式 | 使用场景 |
|---|
| TabSeparated | 默认,人类可读 |
| JSONEachRow |
每行一个 JSON 对象 |
| JSONCompact | 紧凑的 JSON 数组 |
| CSV | 逗号分隔 |
| Vertical | 每列一行,用于检查 |
核心概念
流式查询与历史查询
sql
-- 流式查询:持续进行,永不结束。默认行为。
SELECT deviceid, temperature FROM sensordata;
-- 历史查询:有界,立即返回。使用 table()。
SELECT deviceid, temperature FROM table(sensordata) LIMIT 100;
-- 历史 + 未来:所有过去事件 + 所有未来事件
SELECT * FROM sensordata WHERE tptime >= earliesttimestamp();
tptime 列
每个流都有一个内置的 tptime datetime64(3, UTC) 事件时间列。默认为摄入时间。您可以在创建流时通过 SETTINGS eventtimecolumn=your_column 设置自定义事件时间列。
流模式
| 模式 | 创建方式 | 行为 |
|---|
| append | CREATE STREAM(默认) | 不可变日志,仅新增行 |
| versionedkv |
+ SETTINGS mode=versionedkv | 每个主键的最新值 |
| changelog
kv | + SETTINGS mode=changelogkv | 插入/更新/删除跟踪 |
| mutable | CREATE MUTABLE STREAM | 行级 UPDATE/DELETE(企业版) |
常见模式
模式 1:创建流 → 插入 → 查询
bash
1. 创建流
echo CREATE STREAM IF NOT EXISTS orders (
order_id string,
product string,
amount float32,
region string
) | curl http://${TIMEPLUS_HOST}:8123/ \
-u ${TIMEPLUS
USER}:${TIMEPLUSPASSWORD} \
--data-binary @-
2. 插入数据
echo INSERT INTO orders VALUES (o-1,Widget,19.99,US), (o-2,Gadget,49.99,EU) | \
curl http://${TIMEPLUS_HOST}:8123/ \
-u ${TIMEPLUS
USER}:${TIMEPLUSPASSWORD} \
--data-binary @-
3. 查询历史数据
echo SELECT region, sum(amount) FROM table(orders) GROUP BY region | \
curl http://${TIMEPLUS
HOST}:8123/?defaultformat=JSONEachRow \
-u ${TIMEPLUS
USER}:${TIMEPLUSPASSWORD} \
--data-binary @-
模式 2:窗口聚合(流式)
bash
echo SELECT window_start, region, sum(amount) AS revenue
FROM tumble(orders, 1m)
GROUP BY window_start, region
EMIT AFTER WATERMARK AND DELAY 5s | \
curl http://${TIMEPLUS_HOST}:8123/ \
-u ${TIMEPLUS
USER}:${TIMEPLUSPASSWORD} \
--data-binary @-
模式 3:物化视图管道
bash
echo CREATE MATERIALIZED VIEW IF NOT EXISTS mv
revenueby_region
INTO revenue
byregion AS
SELECT window_start, region, sum(amount) AS total
FROM tumble(orders, 5m)
GROUP BY window_start, region | \
curl http://${TIMEPLUS_HOST}:8123/ \
-u ${TIMEPLUS
USER}:${TIMEPLUSPASSWORD} \
--data-binary @-
模式 4:用于测试的随机流
bash
echo CREATE RANDOM STREAM IF NOT EXISTS mock_sensors (
device
id string DEFAULT device- || tostring(rand() % 10),
temperature float32 DEFAULT 20 + (rand() % 30),
status string DEFAULT [ok,warn,error][rand() % 3 + 1]
) SETTINGS eps=5 | \
curl http://${TIMEPLUS_HOST}:8123/ \
-u ${TIMEPLUS
USER}:${TIMEPLUSPASSWORD} \
--data-binary @-
错误处理
常见错误及修复:
| 错误 | 原因 | 修复 |
|---|
| Connection refused | 主机/端口错误 | 检查 TIMEPLUSHOST 和端口 8123 是否开放 |
| Authentication failed |
凭据错误 | 检查 TIMEPLUSUSER / TIMEPLUS_PASSWORD |
| Stream already exists | 重复创建 | 使用 CREATE STREAM IF NOT EXISTS |