Skip to main content

1. 顶层导出与模块入口

# 数据层主入口
from blockdb import (
    Table,
    BlockTable, Block, Aligned,
    TimeTable,
    Column, filter,
    LogicalType,
    operator, Operator,
)

# 订阅类(L1 / L2 同名,不在顶层暴露,从子包 import)
from blockdb.table import Subscribe, RowEvent          # L1 行事件订阅
from blockdb.block_table import Subscribe              # L2 区块订阅

# 时间桶对齐工具
from blockdb.time_table import align_time_at

# Operator 模块级入口(链式 pipeline)
from blockdb.operator import filter as op_filter, deduplicate as op_dedup

# Meta(建表 / 改 schema / 列表 / 删表)走 dispatcher
from blockdb._grpc_client import blockdb as c

2. 数据层核心 API 字典

2.1 Table — L1 行级表

from blockdb import Table

t = Table("demo.event_log")
针对一张 L1(行级)表的薄包装。name 是表名(如 "demo.event_log")。 BlockDB 里要订阅区块流、查 block/event/state 请用 L2级别BlockTable
不过每张 L2 表底下都有一组 L1 行级映射子表(典型如 <l2_name>._heightAligned 内部就靠它查共识高度),需要 row-level 访问时用 Table 类接入即可。

Table.get_row(row_id)

方法简述 — 按业务主键读取单行。 入参
参数类型必填默认说明示例值
row_idstr / int / bytes业务主键值(类型与表的 id 列匹配)"0xabc123..."
返回dict | None — 单行字典;不存在返回 None

Table.batch_get_rows(row_ids)

方法简述 — 一次 RPC 批量按主键读取多行。N 个主键场景必用此方法,不要 for 循环 get_row 入参
参数类型必填默认说明示例值
row_idslist[str / int / bytes]业务主键值列表["0xabc...", "0xdef..."]
返回list[dict] — 命中行列表(不命中的主键被略过)。

Table.filter_rows(filters="", order_by="", limit=0, offset=0)

方法简述 — 条件过滤查询(等价 SQL 的 WHERE ... ORDER BY ... LIMIT ... OFFSET ...),一次性返回结果。 入参
参数类型必填默认说明示例值
filtersstr""SQL WHERE 子句字符串(服务端拼进 ... WHERE <filters> ORDER BY ...);空串表示无过滤"block_height >= 1000"
order_bystr""排序子句(SQL 语法)"block_height asc"
limitint0最大返回行数;0 表示不限200
offsetint0跳过前 N 行(分页用)0
返回list[dict] — 命中行列表。
from blockdb import Table

t = Table("chaintable.event.eth")
rows = t.filter_rows(
    filters="block_height >= 1000 AND name IN ('Transfer', 'Deposit')",
    order_by="block_height asc", limit=200,
)

filters 走服务端 SQL:有 SQL 关键字 substring 黑名单(如 create / create2 / suicide 整词出现即被拒,和引号无关);外层别再加 LIMIT;bytes 列做 = 比较要 unhex(...)

Table.scan(sql)

方法简述 — 执行完整 SQL,一次性 buffered 拉完结果。sql**** 完整由调用方拼,Table.name 不会自动 FROM 进去。 入参
参数类型必填默认说明示例值
sqlstr完整 SQL(含 FROM)。不能写分号 ****;:一次只能执行一条语句,不支持多 SQL 拼接。"SELECT id, name FROM demo.event_log WHERE name='login'"
返回list[dict] — 完整结果集。

Table.scan_iter(sql)

方法简述 — 同 scan,但服务端 streaming,逐行 yield。大表 / 不确定结果集大小时首选,内存占用 O(1)。 入参:同 Table.scan 返回Generator[dict] — 行 generator。
for row in t.scan_iter("SELECT * FROM demo.event_log"):
    ...   # 处理单行;不要 list() 强转,那会退化成 scan

Table.upsert_rows(rows, sync=False)

方法简述 — 批量 upsert 多行。 入参
参数类型必填默认说明示例值
rowslist[dict]要写入的行;每行必须含主键 id[{"id": "0x...", "name": "x"}]
syncboolFalseTrue 等服务端确认写入完成;False fire-and-forgetTrue
返回str — 服务端 job_id

Table.delete_rows(row_ids, sync=False)

方法简述 — 批量按主键删除。 入参
参数类型必填默认说明示例值
row_idslist要删除的主键列表["0xabc..."]
syncboolFalseupsert_rows.syncFalse
返回strjob_id

2.2 BlockTable / Block / Subscribe / Aligned — L2 区块级表

from blockdb import BlockTable, Block, Aligned
from blockdb.block_table import Subscribe

BlockTable(name)

构造 L2 区块级表实例。name 是表名(如 "chaintable.trace.eth")。

BlockTable.get_state(state_id, current_block=None)

方法简述 — 在 current_block 时刻(含之前)查某个 state 的快照。 入参
参数类型必填默认说明示例值
state_idstr / int / bytesstate 主键"0xabc..."
current_blockBlock / dict / 任意带 .id/.height/.timestamp 的 duck type时间锚点;None 表示最新Block(id="0x..", height=1234, timestamp=170...)
返回dict | None — 单行;不存在返回 None

BlockTable.get_event(event_id, current_block=None)

方法简述 — 在 `current_block` 时刻(含之前)查某个 event。 入参 / 返回结构同 get_state

BlockTable.get_block_rows(block)

方法简述 — 取整个 block 内所有行。 入参
参数类型必填默认说明示例值
blockBlock / dict / duck type目标 block见上
返回list[dict] — 该 block 全部行。

BlockTable.upsert_block(block, rows, sync=False)

方法简述 — 把一个 block 的所有行打包写入。 入参
参数类型必填默认说明示例值
blockBlock / dict / duck type目标 block
rowslist[dict]该 block 的所有行[{"id": "0x..", "value": 1.0}]
syncboolFalseTable.upsert_rows.syncTrue
返回strjob_id

BlockTable.upsert_block_bundle(bundle, rows, sync=False)

方法简述 — 按 bundle(整 1000-block 高度区间)批量写入,专用于历史回填。一次调用把整段 1000 块的共识高度推进、bundle hash 算定,比按 block 逐块写快约 ×100(`upsert_block` 每块都要重算整 bundle 的 hash 并按块推进共识,回填会反压客户端)。 入参
参数类型必填默认说明示例值
bundleintbundle 号。bundle=0→高度 0;bundle=N(N≥1)→高度 (N-1)*1000+1 .. N*1000(如 1→1–1000,2→1001–2000)。必须整 1000 对齐,无法表达 500–600 这类区间3
rowslist[dict]每行的 block_height 必须落在该 bundle 区间内,否则服务端报 INVALID_ARGUMENT: row N block_height is not in bundle range[{"id": "0x..", "block_height": 2500, "value": 1.0}]
syncboolFalseTable.upsert_rows.syncTrue
返回strjob_id 写入语义(与直觉不同,务必注意)
  • 行数据:只写入你传入的 rows,bundle 号不会自动生成 1000 块数据。传几行写几行。
  • 共识/元数据按整段提交:即使只写 1 行,<table>._height 的共识高度也会推进整个 bundle 区间<table>._bundle 记一条 {id=bundle号, data_hash, accum_block_count=1000}
  • 自动补全:服务端按 block_height 自动补全 block_id / block_timestamp(链上真实区块元信息),无需自己传。
  • 无回滚:共识区间与 bundle hash 没有 SDK 回滚入口,删主表行也不会重算。

Block(id, height, timestamp)** — 区块数据单位**

方法简述Subscribe(L2) / Aligned 对外 yield 的类型;也是 BlockTable 几个方法可接受的入参。 构造入参
参数类型必填默认说明
idstrblock id
heightintblock 高度;传 None 自动归 0
timestampint / datetimeepoch 整数;传 datetime 自动转 epoch;传 None 归 0
类方法
方法说明
Block.from_block_data(data)从带 .id / .height / .timestamp 的对象构造 Block

Subscribe(tables=None, start_at=None)** — L2 区块订阅**

from blockdb.block_table import Subscribe
方法简述 — 订阅一张或多张 L2 表的区块流。 构造入参
参数类型必填默认说明示例值
tableslist[BlockTable / str]功能必填None要订阅的表(多个会聚合在同一个 stream)**签名上可空,但不传等于不订阅任何表,****listen()**会空转——实际必须传至少一张表。*[trace, token]
start_atint / NoneNone起始游标(epoch);None 从最新开始1700000000
实例方法
方法返回说明
listen()Generator[(table_id: str, block: Block)]无限生成器,按事件 yield (table_id, Block) 元组。异常会自动用最新游标重连;stream 正常结束才退出。

Aligned(sub, loose_align=None, strict_align=None)** — 多表区块高度对齐**

from blockdb import Aligned
方法简述 — 消费 Subscribe.listen() 的乱序多表流,按高度对齐后对外 yield 单一 Block 对齐通过 ⇔ 该高度在所有 trigger 表都 ready,且不大于每个 depends 表的共识高度。每个高度只 yield 一次。 构造入参
参数类型必填默认说明示例值
subSubscribe(L2)上游订阅流Subscribe(tables=[...])
loose_alignlist[BlockTable / str]功能必填Nonetrigger 表(你真正想消费的)。签名上可空,但为空时永远拿不到 trigger 高度,listen() 永不 yield——至少要传一张。[trace]
strict_alignlist[BlockTable / str]Nonedepends 表(必须先 ready)。留空 = 不做依赖对齐(合法)。[token, addr]
实例方法
方法返回说明
listen()Generator[Block]yield 已对齐的 Block(从 trigger 表里取)
> 内部细节:每个 depends 表 T 对应一张子表 T._height(注意是下划线),每行是已处理高度区间。Aligned 把这些区间合并算出”第一个 gap 前的最高高度” = 共识高度(即从高度0开始的全部连续高度)。驱动侧要确保每个块都提交 task(含空块),coverage 才连续。

2.3 TimeTable & align_time_at — L2 时间表

from blockdb import TimeTable
from blockdb.time_table import align_time_at
描述某个对象随物理时间变化的状态序列。时间桶固定 60 秒,读写都自动向后取整到整分钟。表结构固定为 {id, time_at, value}

TimeTable(name)

构造 TimeTable 实例。

TimeTable.get_value(row_id, time_at)

方法简述 — 查 `row_id` 在 `time_at` 所属时间桶时的状态。`time_at` 自动向后取整。 入参
参数类型必填默认说明示例值
row_idstr业务主键"0xabc..."
time_atstr / datetime目标时间;接受 YYYY-MM-DD HH:MM[:SS] / YYYY-MM-DDTHH:MM[:SS]datetime"2026-05-18 17:04:35"
返回dict | None{id, value, time_at};不存在返回 None

TimeTable.upsert_time_rows(time_at, rows, sync=False)

方法简述 — 往 time_at 所属时间桶批量 upsert。行内若带 time_at 会被服务端忽略。 入参
参数类型必填默认说明示例值
time_atstr / datetime目标时间桶"2026-05-18 17:04:35"
rowslist[dict]形如 [{id, value}, ...][{"id": "0x..", "value": 1.0}]
syncboolFalseTable.upsert_rows.syncTrue
返回strjob_id

align_time_at(value)

方法简述 — 把 value 向后取整到整分钟、秒归零。TimeTable 的所有方法内部自动调用此函数,调用方一般不需要直接用,给”想提前预览会落到哪个桶”的场景。 入参
参数类型必填默认说明示例值
valuestr / datetime目标时间"2026-05-18 17:04:35"
返回str — 形如 "YYYY-MM-DD HH:MM:00"
输入输出
"2026-05-18 17:04:35""2026-05-18 17:05:00"
"2026-05-18 17:04:00""2026-05-18 17:04:00"(已对齐,不进位)
datetime(2026, 5, 18, 17, 4, 35)"2026-05-18 17:05:00"

2.4 Column / filter & Operator pipeline

Column(name)** — 列引用**

from blockdb import Column
方法简述 — 用 Python 表达式描述列上的条件,可用于 filter()Operator pipeline。 构造入参
参数类型必填默认说明示例值
namestr列名"block_height"
支持的运算符
Python 表达式等价语义
Column("a") == va = vEqualTo
Column("a") != va != vNotEqualTo
Column("a") < va < vLessThan
Column("a") <= va <= vLessThanOrEqual
Column("a") > va > vGreaterThan
Column("a") >= va >= vGreaterThanOrEqual
Column("a").in_([...])a IN (...)In
expr1 & expr2expr1 AND expr2
<code>expr1 | expr2</code>expr1 OR expr2
~exprNOT expr
字面量类型推断:bool → booleanint → intfloat → doublestr → stringbytes → binary(hex),None → null

filter(expr)

from blockdb import filter
方法简述 — 把 Column 表达式序列化为 filter_rows.filters 接受的字符串。 入参
参数类型必填默认说明示例值
expr_Expr (Column 表达式) / str表达式或预先准备好的 JSON 串(Column("a") > 0) & (Column("b") == "x")
返回str — 可直接传给 Table.filter_rows(filters=...)
from blockdb import Table, Column, filter

t = Table("demo.event_log")
where = filter(
    (Column("created_at") >= 1700000000)
    & ((Column("name") == "login") | (Column("name") == "signup"))
    & ~(Column("value") == 0)
)
t.filter_rows(filters=where, order_by="created_at asc", limit=100)

Operator** / operator.filter / operator.deduplicate — pipeline**

from blockdb import Operator, operator
from blockdb.operator import filter as op_filter, deduplicate as op_dedup
方法简述 — 构造一条链式 read → filter → deduplicate pipeline 并序列化为 JSON。
Operator 现阶段的下游消费方还未接入,详见 [§5 Troubleshooting](#5—常见异常与自救指南-troubleshooting)。可以提前用 to_dict() / to_json() 检查输出格式。
模块级入口
入口返回说明
operator.filter(expr)Operator新建 pipeline 并追加一个 filter 步骤;exprblockdb.filter 共用 Column 表达式
operator.deduplicate(*cols)Operator新建 pipeline 并追加 deduplicate;cols 接受 Column 或字符串列名,至少一个
直接构造
Operator(source: str = "source_table")
实例方法
方法返回说明
.filter(expr)self追加 filter 步骤(链式)
.deduplicate(*cols)self追加 deduplicate 步骤
.to_dict()dict序列化为 {"relations": [...]}
.to_json()strto_dict() 的 JSON 字符串
to_dict()** 输出结构**
{
  "relations": [
    {"read": {
        "common": {"direct": {}},
        "base_schema": {"names": ["..."]},
        "named_table": {"names": ["source_table"]}
    }},
    {"filter": {
        "common": {"direct": {}},
        "input": {"relation_id": 0},
        "condition": "<scalar_function tree>"
    }},
    {"deduplicate": {
        "common": {"direct": {}},
        "input": {"relation_id": 1},
        "keys": ["<selection>"]
    }}
  ]
}
约定:
  • named_table.names = ["source_table"] 是占位;下游消费方按实际表名重写。
  • base_schema.names 按首次出现顺序收集列名;所有 selection 引用都是该数组下标。
  • 比较 / 逻辑算子函数名:equal / not_equal / lt / lte / gt / gte / and / or / not
  • Python int 默认 i32,超 32 位用 i64
  • In 没有原生函数,展开成 equal 链 + or 折叠。
from blockdb import Column
from blockdb.operator import filter as op_filter

op = (
    op_filter(
        (Column("value") > 0)
        & ((Column("name") == "Transfer") | (Column("name") == "Deposit"))
    )
    .deduplicate(Column("tx_id"), Column("from_addr"))
)
print(op.to_json())

Subscribe** (L1) / **RowEvent

from blockdb.table import Subscribe, RowEvent
方法简述 — 订阅 L1 行级事件流。
当前服务端未开放此能力,详见 [§5 Troubleshooting](#5—常见异常与自救指南-troubleshooting);用 scan_iter 轮询替代。
构造入参
参数类型必填默认说明
tableslist[Table / str]None订阅的表
start_atint / NoneNone起始游标
** RowEvent dataclass 字段**
字段类型说明
table_idstr事件所属表
rowslist[dict]该批次行数据
created_atint事件时间戳

3. 类型映射与数据还原`LogicalType`

from blockdb import LogicalType
from blockdb.logical_type import restore_value
LogicalTypestr, Enum,对齐服务端列类型表。

枚举值

用途
STRING HASH ADDRESS TOKENID TXID BLOCKID CHAINID TEXT字符串系列
BLOCKHEIGHT INT UINT256整数(UINT256 服务端发字符串)
FLOAT浮点(覆盖 SQL double / decimal / numeric / real
BOOLEAN布尔
DATE TIMESTAMP时间类
LIST DICT OBJECT JSON复合(JSON 文本自动 json.loads
DATASET LOGO字符串别名

restore_value(logical_type, raw)

方法简述 — 把 raw 值还原成对应 Python 类型。服务端发来的 logical_type 既可能是 LogicalType 名("HASH"),也可能是 SQL 原生类型("varchar(255)", "timestamp(3)" …),SDK 自动归一化后做类型转换。 入参
参数类型必填默认说明示例值
logical_typestrLogicalType 名或 SQL 原生类型"TIMESTAMP", "varchar(255)"
rawAny已 unwrap 的 Python 原语(str / int / float / bool / bytes …)
返回Any — 转换后的 Python 值。

类型转换映射表

LogicalType还原为
STRING / HASH / ADDRESS / TOKENID / TXID / BLOCKID / CHAINID / TEXT / UINT256 / DATASET / LOGOstr
INT / BLOCKHEIGHTint
FLOATfloat
BOOLEANbool
DATEdatetime.date.fromisoformat(...)
TIMESTAMPdatetime.datetime(UTC;支持 epoch 数值、ISO8601、带 Z 后缀)
LIST / DICT / OBJECT / JSONjson.loads(...),失败 fall back 原值
这个函数主要给 ResultSet 反序列化用,用户层一般无需直接调;放在这里供进阶 / 工具脚本使用。

4. Dispatcher Meta 管理服务 API

from blockdb._grpc_client import blockdb as c
> 这是进阶管理接口——日常查询用不到。建表、改 schema、列表、删表等管理操作走这里。

4.1 L1 Table Meta

c.get_table(table_id)

table_id: strdict{table, columns, indexes}

c.create_table(table_id, type="")

table_id: strtype: str(可选)→ None

c.save_table(table_id, columns=None, indexes=None)

覆盖式写 schema:必须把已有所有列(含 server 自动建的 `id` / `block_*`)一并传,缺列会被删;类型不一致会被拒绝。
参数类型必填默认说明
table_idstr表名
columnslist[dict]None[{table_id?, name, type, id?}, ...]
indexeslist[dict]None[{table_id?, columns: list[str], id?}, ...]
None

c.drop_table(table_id)** → **None

c.list_table(regex=".*", type="")** → **list[dict]


4.2 L2 Block Table Meta

c.get_block_table(table_id)

dict{block_table, block_columns, block_indexes, derived_tables}block_table{id, type, chain_id}

c.create_block_table(table_id, type="", chain_id="")** → **None

type 例:"event" / "state"

c.save_block_table(table_id, columns=None, indexes=None)** → **None

覆盖式,规则同 save_table

**c.drop_block_table(table_id) → **None

c.list_block_tables(regex=".*", type="")** → **list[dict]


4.3 L2 Time Table Meta

c.get_time_table(table_id)

dict{time_table, time_columns, derived_tables}

c.create_time_table(table_id)** → **None

表结构固定为 {id, time_at, value}

c.save_time_table(table_id, id_type="")** → **None

只能改业务主键 id 的逻辑类型;id_typeSTRING / HASH / ADDRESS / TOKENID / TXID / BLOCKID / CHAINID

c.drop_time_table(table_id)** → **None

c.list_time_tables(regex=".*")** → **list[dict]


完整示例

from blockdb._grpc_client import blockdb as c

# 1) 建 event 类型 BlockTable
c.create_block_table("token.token_transfer.eth", type="event", chain_id="eth")

# 2) 覆盖式写 schema(含 server 自动建的列也必须一起传)
c.save_block_table("token.token_transfer.eth", columns=[
    {"name": "id",              "type": "HASH"},
    {"name": "block_height",    "type": "INTEGER"},
    {"name": "block_timestamp", "type": "TIMESTAMP"},
    {"name": "block_id",        "type": "TX_HASH"},
    {"name": "token_id",        "type": "ADDRESS"},
    {"name": "from_addr",       "type": "ADDRESS"},
    {"name": "to_addr",         "type": "ADDRESS"},
    {"name": "value",           "type": "FLOAT"},
    {"name": "tx_id",           "type": "TXID"},
])

# 3) 建 TimeTable 并把主键改为 ADDRESS
c.create_time_table("time_table_demo")
c.save_time_table("time_table_demo", id_type="ADDRESS")
print(c.list_time_tables(regex="time_table_.*"))