1. 顶层导出与模块入口
2. 数据层核心 API 字典
2.1 Table — L1 行级表
name 是表名(如 "demo.event_log")。
BlockDB 里要订阅区块流、查 block/event/state 请用 L2级别BlockTable
不过每张 L2 表底下都有一组 L1 行级映射子表(典型如<l2_name>._height,Aligned内部就靠它查共识高度),需要 row-level 访问时用Table类接入即可。
Table.get_row(row_id)
方法简述 — 按业务主键读取单行。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
row_id | str / int / bytes | ✅ | — | 业务主键值(类型与表的 id 列匹配) | "0xabc123..." |
dict | None — 单行字典;不存在返回 None。
Table.batch_get_rows(row_ids)
方法简述 — 一次 RPC 批量按主键读取多行。N 个主键场景必用此方法,不要 for 循环 get_row。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
row_ids | list[str / int / bytes] | ✅ | — | 业务主键值列表 | ["0xabc...", "0xdef..."] |
list[dict] — 命中行列表(不命中的主键被略过)。
Table.filter_rows(filters="", order_by="", limit=0, offset=0)
方法简述 — 条件过滤查询(等价 SQL 的 WHERE ... ORDER BY ... LIMIT ... OFFSET ...),一次性返回结果。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
filters | str | ❌ | "" | SQL WHERE 子句字符串(服务端拼进 ... WHERE <filters> ORDER BY ...);空串表示无过滤 | "block_height >= 1000" |
order_by | str | ❌ | "" | 排序子句(SQL 语法) | "block_height asc" |
limit | int | ❌ | 0 | 最大返回行数;0 表示不限 | 200 |
offset | int | ❌ | 0 | 跳过前 N 行(分页用) | 0 |
list[dict] — 命中行列表。
filters 走服务端 SQL:有 SQL 关键字 substring 黑名单(如 create / create2 / suicide 整词出现即被拒,和引号无关);外层别再加 LIMIT;bytes 列做 = 比较要 unhex(...)。
Table.scan(sql)
方法简述 — 执行完整 SQL,一次性 buffered 拉完结果。sql**** 完整由调用方拼,Table.name 不会自动 FROM 进去。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
sql | str | ✅ | — | 完整 SQL(含 FROM)。不能写分号 ****;:一次只能执行一条语句,不支持多 SQL 拼接。 | "SELECT id, name FROM demo.event_log WHERE name='login'" |
list[dict] — 完整结果集。
Table.scan_iter(sql)
方法简述 — 同 scan,但服务端 streaming,逐行 yield。大表 / 不确定结果集大小时首选,内存占用 O(1)。
入参:同 Table.scan。
返回:Generator[dict] — 行 generator。
Table.upsert_rows(rows, sync=False)
方法简述 — 批量 upsert 多行。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
rows | list[dict] | ✅ | — | 要写入的行;每行必须含主键 id | [{"id": "0x...", "name": "x"}] |
sync | bool | ❌ | False | True 等服务端确认写入完成;False fire-and-forget | True |
str — 服务端 job_id。
Table.delete_rows(row_ids, sync=False)
方法简述 — 批量按主键删除。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
row_ids | list | ✅ | — | 要删除的主键列表 | ["0xabc..."] |
sync | bool | ❌ | False | 同 upsert_rows.sync | False |
str — job_id。
2.2 BlockTable / Block / Subscribe / Aligned — L2 区块级表
BlockTable(name)
构造 L2 区块级表实例。name 是表名(如 "chaintable.trace.eth")。
BlockTable.get_state(state_id, current_block=None)
方法简述 — 在 current_block 时刻(含之前)查某个 state 的快照。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
state_id | str / int / bytes | ✅ | — | state 主键 | "0xabc..." |
current_block | Block / dict / 任意带 .id/.height/.timestamp 的 duck type | ✅ | 时间锚点;None 表示最新 | Block(id="0x..", height=1234, timestamp=170...) |
dict | None — 单行;不存在返回 None。
BlockTable.get_event(event_id, current_block=None)
方法简述 — 在 `current_block` 时刻(含之前)查某个 event。
入参 / 返回结构同 get_state。
BlockTable.get_block_rows(block)
方法简述 — 取整个 block 内所有行。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
block | Block / dict / duck type | ✅ | — | 目标 block | 见上 |
list[dict] — 该 block 全部行。
BlockTable.upsert_block(block, rows, sync=False)
方法简述 — 把一个 block 的所有行打包写入。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
block | Block / dict / duck type | ✅ | — | 目标 block | — |
rows | list[dict] | ✅ | — | 该 block 的所有行 | [{"id": "0x..", "value": 1.0}] |
sync | bool | ❌ | False | 同 Table.upsert_rows.sync | True |
str — job_id。
BlockTable.upsert_block_bundle(bundle, rows, sync=False)
方法简述 — 按 bundle(整 1000-block 高度区间)批量写入,专用于历史回填。一次调用把整段 1000 块的共识高度推进、bundle hash 算定,比按 block 逐块写快约 ×100(`upsert_block` 每块都要重算整 bundle 的 hash 并按块推进共识,回填会反压客户端)。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
bundle | int | ✅ | — | bundle 号。bundle=0→高度 0;bundle=N(N≥1)→高度 (N-1)*1000+1 .. N*1000(如 1→1–1000,2→1001–2000)。必须整 1000 对齐,无法表达 500–600 这类区间 | 3 |
rows | list[dict] | ✅ | — | 每行的 block_height 必须落在该 bundle 区间内,否则服务端报 INVALID_ARGUMENT: row N block_height is not in bundle range | [{"id": "0x..", "block_height": 2500, "value": 1.0}] |
sync | bool | ❌ | False | 同 Table.upsert_rows.sync | True |
str — job_id。
写入语义(与直觉不同,务必注意):
-
行数据:只写入你传入的
rows,bundle 号不会自动生成 1000 块数据。传几行写几行。 -
共识/元数据按整段提交:即使只写 1 行,
<table>._height的共识高度也会推进整个 bundle 区间,<table>._bundle记一条{id=bundle号, data_hash, accum_block_count=1000}。 -
自动补全:服务端按
block_height自动补全block_id/block_timestamp(链上真实区块元信息),无需自己传。 - 无回滚:共识区间与 bundle hash 没有 SDK 回滚入口,删主表行也不会重算。
Block(id, height, timestamp)** — 区块数据单位**
方法简述 — Subscribe(L2) / Aligned 对外 yield 的类型;也是 BlockTable 几个方法可接受的入参。
构造入参
| 参数 | 类型 | 必填 | 默认 | 说明 |
|---|---|---|---|---|
id | str | ✅ | — | block id |
height | int | ✅ | — | block 高度;传 None 自动归 0 |
timestamp | int / datetime | ✅ | — | epoch 整数;传 datetime 自动转 epoch;传 None 归 0 |
| 方法 | 说明 |
|---|---|
Block.from_block_data(data) | 从带 .id / .height / .timestamp 的对象构造 Block |
Subscribe(tables=None, start_at=None)** — L2 区块订阅**
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
tables | list[BlockTable / str] | 功能必填 | None | 要订阅的表(多个会聚合在同一个 stream)**签名上可空,但不传等于不订阅任何表,****listen()**会空转——实际必须传至少一张表。* | [trace, token] |
start_at | int / None | ❌ | None | 起始游标(epoch);None 从最新开始 | 1700000000 |
| 方法 | 返回 | 说明 |
|---|---|---|
listen() | Generator[(table_id: str, block: Block)] | 无限生成器,按事件 yield (table_id, Block) 元组。异常会自动用最新游标重连;stream 正常结束才退出。 |
Aligned(sub, loose_align=None, strict_align=None)** — 多表区块高度对齐**
Subscribe.listen() 的乱序多表流,按高度对齐后对外 yield 单一 Block。
对齐通过 ⇔ 该高度在所有 trigger 表都 ready,且不大于每个 depends 表的共识高度。每个高度只 yield 一次。
构造入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
sub | Subscribe(L2) | ✅ | — | 上游订阅流 | Subscribe(tables=[...]) |
loose_align | list[BlockTable / str] | 功能必填 | None | trigger 表(你真正想消费的)。签名上可空,但为空时永远拿不到 trigger 高度,listen() 永不 yield——至少要传一张。 | [trace] |
strict_align | list[BlockTable / str] | ❌ | None | depends 表(必须先 ready)。留空 = 不做依赖对齐(合法)。 | [token, addr] |
| 方法 | 返回 | 说明 |
|---|---|---|
listen() | Generator[Block] | yield 已对齐的 Block(从 trigger 表里取) |
T 对应一张子表 T._height(注意是下划线),每行是已处理高度区间。Aligned 把这些区间合并算出”第一个 gap 前的最高高度” = 共识高度(即从高度0开始的全部连续高度)。驱动侧要确保每个块都提交 task(含空块),coverage 才连续。
2.3 TimeTable & align_time_at — L2 时间表
{id, time_at, value}。
TimeTable(name)
构造 TimeTable 实例。
TimeTable.get_value(row_id, time_at)
方法简述 — 查 `row_id` 在 `time_at` 所属时间桶时的状态。`time_at` 自动向后取整。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
row_id | str | ✅ | — | 业务主键 | "0xabc..." |
time_at | str / datetime | ✅ | — | 目标时间;接受 YYYY-MM-DD HH:MM[:SS] / YYYY-MM-DDTHH:MM[:SS] 或 datetime | "2026-05-18 17:04:35" |
dict | None — {id, value, time_at};不存在返回 None。
TimeTable.upsert_time_rows(time_at, rows, sync=False)
方法简述 — 往 time_at 所属时间桶批量 upsert。行内若带 time_at 会被服务端忽略。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
time_at | str / datetime | ✅ | — | 目标时间桶 | "2026-05-18 17:04:35" |
rows | list[dict] | ✅ | — | 形如 [{id, value}, ...] | [{"id": "0x..", "value": 1.0}] |
sync | bool | ❌ | False | 同 Table.upsert_rows.sync | True |
str — job_id。
align_time_at(value)
方法简述 — 把 value 向后取整到整分钟、秒归零。TimeTable 的所有方法内部自动调用此函数,调用方一般不需要直接用,给”想提前预览会落到哪个桶”的场景。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
value | str / datetime | ✅ | — | 目标时间 | "2026-05-18 17:04:35" |
str — 形如 "YYYY-MM-DD HH:MM:00"。
| 输入 | 输出 |
|---|---|
"2026-05-18 17:04:35" | "2026-05-18 17:05:00" |
"2026-05-18 17:04:00" | "2026-05-18 17:04:00"(已对齐,不进位) |
datetime(2026, 5, 18, 17, 4, 35) | "2026-05-18 17:05:00" |
2.4 Column / filter & Operator pipeline
Column(name)** — 列引用**
filter() 和 Operator pipeline。
构造入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
name | str | ✅ | — | 列名 | "block_height" |
| Python 表达式 | 等价语义 |
|---|---|
Column("a") == v | a = v(EqualTo) |
Column("a") != v | a != v(NotEqualTo) |
Column("a") < v | a < v(LessThan) |
Column("a") <= v | a <= v(LessThanOrEqual) |
Column("a") > v | a > v(GreaterThan) |
Column("a") >= v | a >= v(GreaterThanOrEqual) |
Column("a").in_([...]) | a IN (...)(In) |
expr1 & expr2 | expr1 AND expr2 |
| <code>expr1 | expr2</code> | expr1 OR expr2 |
~expr | NOT expr |
bool → boolean,int → int,float → double,str → string,bytes → binary(hex),None → null。
filter(expr)
Column 表达式序列化为 filter_rows.filters 接受的字符串。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
expr | _Expr (Column 表达式) / str | ✅ | — | 表达式或预先准备好的 JSON 串 | (Column("a") > 0) & (Column("b") == "x") |
str — 可直接传给 Table.filter_rows(filters=...)。
Operator** / operator.filter / operator.deduplicate — pipeline**
read → filter → deduplicate pipeline 并序列化为 JSON。
Operator 现阶段的下游消费方还未接入,详见 [§5 Troubleshooting](#5—常见异常与自救指南-troubleshooting)。可以提前用模块级入口to_dict()/to_json()检查输出格式。
| 入口 | 返回 | 说明 |
|---|---|---|
operator.filter(expr) | Operator | 新建 pipeline 并追加一个 filter 步骤;expr 与 blockdb.filter 共用 Column 表达式 |
operator.deduplicate(*cols) | Operator | 新建 pipeline 并追加 deduplicate;cols 接受 Column 或字符串列名,至少一个 |
| 方法 | 返回 | 说明 |
|---|---|---|
.filter(expr) | self | 追加 filter 步骤(链式) |
.deduplicate(*cols) | self | 追加 deduplicate 步骤 |
.to_dict() | dict | 序列化为 {"relations": [...]} |
.to_json() | str | to_dict() 的 JSON 字符串 |
to_dict()** 输出结构**
-
named_table.names = ["source_table"]是占位;下游消费方按实际表名重写。 -
base_schema.names按首次出现顺序收集列名;所有 selection 引用都是该数组下标。 -
比较 / 逻辑算子函数名:
equal / not_equal / lt / lte / gt / gte / and / or / not。 -
Python
int默认i32,超 32 位用i64。 -
In没有原生函数,展开成equal链 +or折叠。
Subscribe** (L1) / **RowEvent
当前服务端未开放此能力,详见 [§5 Troubleshooting](#5—常见异常与自救指南-troubleshooting);用 scan_iter 轮询替代。
构造入参
| 参数 | 类型 | 必填 | 默认 | 说明 |
|---|---|---|---|---|
tables | list[Table / str] | ❌ | None | 订阅的表 |
start_at | int / None | ❌ | None | 起始游标 |
RowEvent dataclass 字段**
| 字段 | 类型 | 说明 |
|---|---|---|
table_id | str | 事件所属表 |
rows | list[dict] | 该批次行数据 |
created_at | int | 事件时间戳 |
3. 类型映射与数据还原`LogicalType`
LogicalType 是 str, Enum,对齐服务端列类型表。
枚举值
| 值 | 用途 |
|---|---|
STRING HASH ADDRESS TOKENID TXID BLOCKID CHAINID TEXT | 字符串系列 |
BLOCKHEIGHT INT UINT256 | 整数(UINT256 服务端发字符串) |
FLOAT | 浮点(覆盖 SQL double / decimal / numeric / real) |
BOOLEAN | 布尔 |
DATE TIMESTAMP | 时间类 |
LIST DICT OBJECT JSON | 复合(JSON 文本自动 json.loads) |
DATASET LOGO | 字符串别名 |
restore_value(logical_type, raw)
方法简述 — 把 raw 值还原成对应 Python 类型。服务端发来的 logical_type 既可能是 LogicalType 名("HASH"),也可能是 SQL 原生类型("varchar(255)", "timestamp(3)" …),SDK 自动归一化后做类型转换。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
logical_type | str | ✅ | — | LogicalType 名或 SQL 原生类型 | "TIMESTAMP", "varchar(255)" |
raw | Any | ✅ | — | 已 unwrap 的 Python 原语(str / int / float / bool / bytes …) | — |
Any — 转换后的 Python 值。
类型转换映射表
| LogicalType | 还原为 |
|---|---|
STRING / HASH / ADDRESS / TOKENID / TXID / BLOCKID / CHAINID / TEXT / UINT256 / DATASET / LOGO | str |
INT / BLOCKHEIGHT | int |
FLOAT | float |
BOOLEAN | bool |
DATE | datetime.date.fromisoformat(...) |
TIMESTAMP | datetime.datetime(UTC;支持 epoch 数值、ISO8601、带 Z 后缀) |
LIST / DICT / OBJECT / JSON | json.loads(...),失败 fall back 原值 |
这个函数主要给 ResultSet 反序列化用,用户层一般无需直接调;放在这里供进阶 / 工具脚本使用。
4. Dispatcher Meta 管理服务 API
4.1 L1 Table Meta
c.get_table(table_id)
table_id: str → dict:{table, columns, indexes}。
c.create_table(table_id, type="")
table_id: str,type: str(可选)→ None。
c.save_table(table_id, columns=None, indexes=None)
覆盖式写 schema:必须把已有所有列(含 server 自动建的 `id` / `block_*`)一并传,缺列会被删;类型不一致会被拒绝。
| 参数 | 类型 | 必填 | 默认 | 说明 |
|---|---|---|---|---|
table_id | str | ✅ | — | 表名 |
columns | list[dict] | ❌ | None | [{table_id?, name, type, id?}, ...] |
indexes | list[dict] | ❌ | None | [{table_id?, columns: list[str], id?}, ...] |
None。
c.drop_table(table_id)** → **None
c.list_table(regex=".*", type="")** → **list[dict]
4.2 L2 Block Table Meta
c.get_block_table(table_id)
dict:{block_table, block_columns, block_indexes, derived_tables}。block_table 含 {id, type, chain_id}。
c.create_block_table(table_id, type="", chain_id="")** → **None
type 例:"event" / "state"。
c.save_block_table(table_id, columns=None, indexes=None)** → **None
覆盖式,规则同 save_table。
**c.drop_block_table(table_id) → **None
c.list_block_tables(regex=".*", type="")** → **list[dict]
4.3 L2 Time Table Meta
c.get_time_table(table_id)
dict:{time_table, time_columns, derived_tables}。
c.create_time_table(table_id)** → **None
表结构固定为 {id, time_at, value}。
c.save_time_table(table_id, id_type="")** → **None
只能改业务主键 id 的逻辑类型;id_type ∈ STRING / HASH / ADDRESS / TOKENID / TXID / BLOCKID / CHAINID。