3 分钟快速上手 (Quick Start)
一个 blockx「任务」干的事永远是同一句话:在某个区块上,对源表的每一行跑你的函数,把返回的 dict 写进目标表。为什么BlockDB已经完成写入闭环,还需要BlockX?简而言之是性能原因,详见BlockX:大规模链上数据的并发函数计算引擎 提一下bundle-
数据层(blockdb) 负责存取 ——
BlockTable/Table/Subscribe。 - 任务层(blockx) 负责算 —— 把「源表 + 函数 + 目标表」打包成一个 `Task` 丢给远端 worker 跑。
-
一个
Task= 一个 CallConfig(算什么)+ 一个 Handler(结果怎么处理)。
实时流水线就是「每来一个新 block,构造一个 Task 提交一次」。
核心场景 Cookbook
场景 2.1 — 实时流水线:单源表 → 目标表
场景描述 最常见的生产形态:实时监听源表的新区块,每出一个块就把它转换、写进目标表。把上面的「单块」例子接到Subscribe 的实时流上即可——单源表不需要 ****Aligned。
示例代码
- 业务函数实时 / 回填两条路径可以完全复用同一份——只是驱动侧喂数据的方式不同(见场景 2.3)。
-
params里的${表名}是占位符:worker 会把该表在当前 block 的行逐行作为第一个参数喂进你的函数;其余params(如"eth")原样作为后续位置参数。 -
默认
timeout=60s;单块行数多、函数重的链路适当调大(如120~180)。
场景 2.2 — worker 内查在线表 + 多表高度对齐
场景描述 转换逻辑常常需要「查另一张表的当前状态」——比如把 ERC20Transfer 事件换算成人类可读金额时,要先查 token.token.eth 拿 decimals。这有两层:
-
worker 内查表:函数体里直接
blockdb.Table(...).get_row(...)(worker 沙箱里要重新import)。 -
高度对齐:要保证查在线表时它已经追平到当前高度,就把依赖表一起喂进
Aligned,等共识高度齐了再处理,共识高度条件如下,需全部满足才会Align当前Block- Trigger表(loose_align):该高度有数据即可
- Depends表(strict_align):该高度及之前所有高度都有数据
-
Aligned的语义:trigger 表(loose_align)在高度 H ready,且每个 depends 表(strict_align)的共识高度 ≥ H,才 yield 这个 H,每个高度只 yield 一次。 -
共识高度读的是 depends 表的
<table>._height子表,每行是一段已处理区间,连续覆盖到第一个 gap 前就是水位。 -
驱动侧要保证每个块都提交 task(空块也提交空 task 推进 watermark),否则 coverage 出现 gap、共识高度卡住,
Aligned会一直 hold 不 yield。
场景 2.3 — 历史回填 / 批处理(一次性跑一批,不订阅)
场景描述 回填历史数据时不需要订阅区块流,而是手里已经有一批 records,想一次性批量跑同一个函数。这时用CallListCallConfig:每行是一组位置参数。配 ReturnValueResultHandler 可以只拿结果不写表(适合先 dry-run 看输出)。
示例代码
- 回填大区间时,自己按高度 / 数量切片成多个 task,用线程池并行提交,吞吐远高于串行。
-
ReturnValueResultHandler可先 dry-run 看一眼函数到底产出什么,确认无误再换BlockWriteHandler真正落库。
BlockBundle**(暂未上线)
要点:
-
bundle是解决回填效率问题的,有高级封装的接口block_bundle_init的接口,但是有必要介绍一下内部原理
- 内部会用spark s3副本 + bundle blockX task
- 看init进度:接口内打系统日志,total blocks,complete blocks,running
-
要点:基于trigger表的S3副本,S3副本是异步同步的,所以调init前用户需要保证上游S3副本是最新数据
- 现状,上游如果刚修复了,要等1h/内部沟通,未来产品体验,应该会提供异步s3同步的监控
场景 2.4 — 用 Operator 管道在函数执行前预过滤 / 去重
场景描述 有些过滤 / 去重逻辑不必塞进业务函数里——可以用blockdb.operator 拼一条 filter → deduplicate 管道,挂到 triggerSource 的 operator 字段上,让引擎在把行喂给你的函数之前先做掉。源表行先被砍一刀,函数只处理真正关心的子集。
示例代码
-
管道的算子(
filter/deduplicate/Column表达式)全部来自blockdb;blockx 只负责把它挂到 triggerSource 上透传,引擎在函数前执行。 -
operator接受op.to_json()字符串或 dict,二选一。 - 管道可选:简单逻辑直接写函数体里更直观;行数大、过滤比例高时用 operator 先砍一刀更省。
高阶参数与性能调优
task.submit(...)** 的参数**
| 参数 | 类型 | 默认 | 说明 |
|---|---|---|---|
timeout | float | 60.0 | 整体等待上限(秒),超时抛 TimeoutError。单块重、行数多时调大 |
poll_interval | float | 0.2 | 轮询结果的间隔(秒),仅在 watch 流不可用、降级轮询时生效 |
use_watch | bool | True | True 走实时推送(WatchTasks)拿终态;流异常时自动降级到轮询。一般不用动 |
ttl_ms | int | 30000 | slot 存活时间(毫秒) |
coordinator_addr / worker_addr | str | None | 覆盖默认服务地址;传 worker_addr 会跳过调度直连 worker(旧路径,调试用) |
func** 的传法**
| 写法 | 行为 | 适用 |
|---|---|---|
func=callable | SDK 用 inspect.getsource 抓函数源码内联传给 worker | 日常首选,直接写 def |
func=Function(source_code="...") | 显式持有源码字符串 | 想把代码当数据管理、动态拼装 |
[未上线]func="some_function_id" | 当作远端已注册的 functionId | Function service 上线后用(当前未上线) |
(临时)在 worker 函数体内 from blockx_sdk import call_subfunc,再 call_subfunc(function_id, *args) | function内call function | 暂时使用,未来会下线。blockx_sdk不是blockx的内容 |
不论哪种写法,worker 入口约定函数名为_。传 callable /def时 SDK 自动补一行_ = <你的函数名>别名,你不用关心。
实时 vs 回填 对照
| 维度 | 实时 | 回填 (未来会被BlockBundle代替) |
|---|---|---|
| 数据来源 | Subscribe.listen() 推 block | 自己用 Table.scan_iter() 等拉历史行 |
| CallConfig | BlockTableCallConfig | CallListCallConfig |
| Handler | BlockWriteHandler | BlockWriteHandler(落库)/ ReturnValueResultHandler(只回传) |
| 单 task 粒度 | 一个 block | 一段 callList |
| 并发 | 单流按 block 顺序 | 自行切片 + 线程池并行 |
Debug 模式(写入预演 / 不落库)
联调新转换逻辑时,最怕脏数据被误写进目标表。通过环境变量BLOCKDB_DEBUG 开启 debug 模式(和 blockdb-py 完全一样的开关):
开启后,worker 端不会真正写目标表,而是把「这次会写什么」转发给日志系统展示(日志系统开发中)。读 / 算逻辑照常跑,只有最后一步落库被替换成打日志。
BLOCKDB_DEBUG 是和 blockdb-py 共用的同一开关,设一次就把整条 SDK 链路(读侧 + 写侧)一起切到 debug,代码里不用改任何东西。
适用场景:新写入逻辑上线前自检、数据清洗联调期反复跑、demo / 教学不污染共享表。
上线前记得关掉 BLOCKDB_DEBUG,否则 prod 任务的写入会全部进日志而不落库。