1. 顶层导出与模块入口
2. 任务层核心 API 字典
2.1 BlockTableCallConfig — 实时区块任务配置
functionCallConfig(type = "BlockTableCallConfig")。描述「在某个 block 上、对一组 trigger 源表逐行执行业务函数」。实时流水线用。
BlockTableCallConfig(block=None, triggerSources=None)
方法简述 — 构造实时区块任务配置。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
block | Block / dict / 带 .id/.height/.timestamp 的 duck type / None | ❌ | None | 当前区块;None 序列化为空 dict | Block(id="0x..", height=25015000, timestamp=1717000000) |
triggerSources | list[dict] | ❌ | None | 源表 + 业务函数列表,单项 schema 见下 | 见下方示例 |
triggerSources 每一项(dict)的字段
| 字段 | 类型 | 必填 | 说明 | 示例值 |
|---|---|---|---|---|
table | BlockTable / str | ✅ | 源表(取 .name 或 str 原值作为 table_id) | "chaintable.trace.eth" |
func | callable / Function / str / None | ❌ | 业务函数。callable 抓源码内联;Function 走 to_trigger_dict();str 当 functionId | def f(record, chain_id): ... |
functionId | str | ❌ | 直接传远端 functionId(func=None 时启用) | "trace_to_contract" |
code | str / dict / None | ❌ | 直接传源码:str 当 sourceCode,dict 形如 {"sourceCode": "..."} | — |
params | list | ❌ | 业务函数额外位置参数;${表名} 占位符由服务端按当前 trigger 表的行替换 | ["${chaintable.trace.eth}", "eth"] |
operator | str / dict | ❌ | Operator 预处理管道(来自 blockdb.operator),引擎在业务函数执行前对源表行做 filter / deduplicate;传 to_json() 串或 dict 均可 | op.to_json() |
| 方法 | 返回 | 说明 |
|---|---|---|
to_dict() | dict | wire schema(见下) |
to_proto_config() | bytes | to_dict() 的 UTF-8 JSON bytes;datetime 转 epoch int、bytes 转 "0x<hex>" |
to_proto_type() | str | 固定 "BlockTableCallConfig" |
to_dict() 输出结构
operator,引擎会在把行喂给业务函数前先跑这条 filter → deduplicate 管道。算子全部来自 blockdb.operator,blockx 只负责透传:
operator 原样进 triggerSources[i];to_dict() 里仅当传了非空 operator 才带该字段。管道的算子语义、to_json() 输出格式见 blockdb-py 的 Operator 文档。
2.2 CallListCallConfig — 回填 / 批处理配置
functionCallConfig.type = "CallListCallConfig"。回填 / 离线批处理用:直接传一个函数 + 一组 callList,每行是一组位置参数。
CallListCallConfig(func=None, callList=None)
方法简述 — 构造回填批处理配置。func 解析规则同 BlockTableCallConfig 的 func。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
func | callable / Function / str / None | ❌ | None | 业务函数 | def f(record, chain_id): ... |
callList | list[list] | ❌ | None | 每行一组位置参数 | [[record1, "eth"], [record2, "eth"]] |
to_dict() / to_proto_config() / to_proto_type()(固定 "CallListCallConfig")。
to_dict() 输出结构
wire 字段是function(不是functionId),SDK 序列化时做了一次重命名。
2.3 BlockWriteHandler / ReturnValueResultHandler — 结果处理器
BlockWriteHandler(targetTable=None, block=None)
方法简述 — 把业务函数返回的 dict 写入目标 BlockTable。对应 `resultHandler.type = “BlockWriteResultHandler”`。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
targetTable | str / BlockTable / None | ❌ | None | 目标表名(取 .name 或 str) | "token.token_transfer.eth" |
block | Block / dict / None | ❌ | None | 区块信息(解析同 CallConfig) | — |
to_dict() / to_proto_config() / to_proto_type()(固定 "BlockWriteResultHandler")。
to_dict() 输出结构
debug 模式由环境变量 BLOCKDB_DEBUG 统一控制,不在 handler 上单独配置,见 [§2.7 调试模式](#27-调试模式debug)。
ReturnValueResultHandler()
方法简述 — 不写表,把业务函数全部 return 值收集到 handler_result 回传给客户端。回填 dry-run / 探查常用。对应 resultHandler.type = "ReturnValueResultHandler"。
入参:无。
实例方法:to_dict() → {};to_proto_config() → b"{}";to_proto_type() → "ReturnValueResultHandler"。
2.4 Function — 业务函数封装
triggerSources 里 func= 接受的对象。Function service 未上线,当前把源码内联进 task 直接传给 worker 执行。
Function(source_code=None, function_id="", name="", mode="inline")
方法简述 — 构造函数封装。source_code 与 function_id 至少传一个,否则 ValueError。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
source_code | str / None | ❌ | None | 内联函数源码。worker 入口约定为 _,建议直接 def _(...): ... 或尾部加 _ = my_func | "def _(record, cid): ..." |
function_id | str | ❌ | "" | 远端已注册 functionId(service 上线后用) | — |
name | str | ❌ | "" | 函数名(当前仅作元信息保存) | "trace_to_contract" |
mode | str | ❌ | "inline" | 仅支持 "inline";其它值抛 NotImplementedError | "inline" |
| 方法 | 返回 | 说明 |
|---|---|---|
to_trigger_dict() | dict | 渲染为 triggerSources[i] 的 functionId / code 合体片段:{"functionId": <id>},有源码时再带 "code": {"sourceCode": <src>} |
2.5 TaskBuilder / Task — 任务构造与提交
TaskBuilder.build(call_config, handler) → Task
方法简述 — 静态方法,把 call_config + handler 组装成 Task。task_id 由 SDK 内部 uuid4 生成,用户不传。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 |
|---|---|---|---|---|
call_config | BlockTableCallConfig / CallListCallConfig / duck type | ✅ | — | 算什么 |
handler | BlockWriteHandler / ReturnValueResultHandler / duck type | ✅ | — | 结果怎么处理 |
Task。
Task** 属性**
| 属性 | 类型 | 说明 |
|---|---|---|
task_id | str | uuid hex;每次构造都是新的 |
call_config | 见上 | 鸭子类型 |
handler | 见上 | 鸭子类型 |
Task.to_dict() → dict
整个 task 的 wire 结构:{task_id, functionCallConfig, resultHandler}。
Task.submit(poll_interval=None, timeout=None, ttl_ms=None, coordinator_addr=None, worker_addr=None, use_watch=None) → TaskResult
方法简述 — 提交任务并等待终态返回。自动跑完整链路:Coordinator.ReserveWorkerSlot → Worker.SubmitTask → Worker.WatchTasks(流异常时降级 GetTaskResult 轮询)。
入参
| 参数 | 类型 | 必填 | 默认 | 说明 | 示例值 |
|---|---|---|---|---|---|
timeout | float / None | ❌ | 60.0 | 整体等待 deadline(秒);超时抛 TimeoutError | 180.0 |
poll_interval | float / None | ❌ | 0.2 | 降级轮询时的间隔(秒) | 0.5 |
use_watch | bool / None | ❌ | True | True 优先 WatchTasks 实时推送;异常 / 流意外结束自动降级到轮询 | True |
ttl_ms | int / None | ❌ | 30000 | slot TTL(毫秒);worker_addr 直连路径才有效 | 30000 |
coordinator_addr | str / None | ❌ | None | 覆盖 Coordinator endpoint | — |
worker_addr | str / None | ❌ | None | 传值则跳过 Coordinator,直连 Worker.RequestTaskSlot(旧路径 / 调试) | — |
TaskResult。
2.6 TaskResult — 任务结果
Task.submit() 的返回,封装成功状态 + handler payload + 原始响应。
属性
| 属性 | 类型 | 说明 |
|---|---|---|
task_result.success | bool | 是否成功 |
task_result.failure_code | str | 失败码(成功时为空) |
task_result.retryable | bool | 是否可重试(失败时有意义) |
handler_result | dict / list / bytes / None | resultHandler 的 plugin payload;UTF-8 JSON 的 bytes 自动 json.loads,否则保留 bytes |
error | str / None | 等价于 failure_code 的便捷字段;成功时 None |
raw | Any | 原始 gRPC 响应(GetTaskResultResponse 或 WatchTasks adapt 后的等价结构) |
-
先按
plugin_name == handler.to_proto_type()匹配(task 没 handler 时回退常量"BlockWriteResultHandler")。 - 没匹配上时取 plugins 列表第一项的非空 payload。
2.7 调试模式(debug)
通过环境变量BLOCKDB_DEBUG=1(1 / true / yes / on均可)开启,与 blockdb-py 共用同一开关。开启后 worker 不再真正写目标表,而是把写入内容(目标表、行数、样本行)作为一条调试日志送入日志通道,方便确认”到底会写什么”。
-
BLOCKDB_DEBUG设一次即可把读侧 + 写侧整条 SDK 链路一起切到 debug,代码里不用改任何东西。 - 读 / 算逻辑不受影响,只有最后落库那一步被替换成打日志。
3. 底层 Dispatcher(Coordinator / Worker)
Task.submit() 就够了。需要定制超时、断点续传、独立观察 worker 状态、或调试 fixture 时才直接调底层 dispatcher。
| 函数 | 返回 | 说明 |
|---|---|---|
c.reserve_worker_slot(task_id, coordinator_addr=None, **kwargs) | resp(含 worker_addr / slot_id) | 连 Coordinator 拿 slot;拿不到按 retry_interval 重试到 timeout |
c.request_task_slot(task_id, ttl_ms=30000, worker_addr=None) | resp | 直连 Worker 申请 slot(旧路径 / 跳过 Coordinator) |
c.submit_task(task_input, slot_id, worker_addr=None) | resp | 把 TaskInputDTO(或已是 pb2 的 TaskInput)提交给 Worker |
c.get_task_result(task_id, worker_addr=None) | GetTaskResultResponse | 单次拉结果 |
c.watch_tasks(task_ids, worker_addr=None, rpc_timeout=None) | stream of TaskUpdate | server-streaming,遇 is_terminal / SUCCEEDED / FAILED 终止 |
c.submit_and_wait(task_input, ...) | GetTaskResultResponse | 上面这些的组合,即 Task.submit() 的底层实现 |
TaskInputDTO
Task._to_dto() 产出的 dataclass,结构:
blockx_pb2.TaskInput;上层只持有 dataclass DTO,不依赖 pb2。
调用路径
| 路径 | 触发条件 | 流程 |
|---|---|---|
| 推荐 | 不传 worker_addr | Coordinator.ReserveWorkerSlot → Worker.SubmitTask → WatchTasks(兜底 GetTaskResult 轮询) |
| 直连 | 传 worker_addr | 跳过 Coordinator,直接 Worker.RequestTaskSlot 拿 slot 后提交 |