Skip to main content

3 分钟快速上手 (Quick Start)

一个 blockx「任务」干的事永远是同一句话:在某个区块上,对源表的每一行跑你的函数,把返回的 dict 写进目标表。为什么BlockDB已经完成写入闭环,还需要BlockX?简而言之是性能原因,详见BlockX:大规模链上数据的并发函数计算引擎 提一下bundle
from blockx import BlockTableCallConfig, BlockWriteHandler, TaskBuilder
from blockdb import Block

# 1) 写一个转换函数:入参是源表的一行 dict,返回 dict 就写入目标表,返回 None 就丢弃这行
#    worker 在隔离沙箱里执行它:常量 / 辅助逻辑都要写进函数体,不能引用函数外的全局名
def to_token_transfer(record, chain_id):
    value = float(record.get("value") or 0)        # FLOAT 列以字符串传入,先转一次再用
    if value <= 0 or not record.get("tx_id"):
        return None                                  # 不合格的行直接过滤掉
    return {
        "id":        record["id"],
        "from_addr": record["from_addr"],
        "to_addr":   record["to_addr"],
        "value":     value,
        "tx_id":     record["tx_id"],
    }

# 2) 指定要处理哪个区块
block = Block(id="0xblock...", height=25015000, timestamp=1717000000)

# 3) CallConfig:在这个 block 上、对源表的每一行跑 to_token_transfer
call_config = BlockTableCallConfig(
    block=block,
    triggerSources=[dict(
        table="chaintable.trace.eth",                 # 源表
        func=to_token_transfer,                        # 你的转换函数
        params=["${chaintable.trace.eth}", "eth"],     # ${表名} = 把该表本块的行逐行喂给函数
    )],
)

# 4) Handler:把函数返回的 dict 写到目标表
handler = BlockWriteHandler(targetTable="token.token_transfer.eth", block=block)

# 5) 组装 + 提交。task.submit() 自动跑完「派活 → 执行 → 等结果」整条链路
task = TaskBuilder.build(call_config=call_config, handler=handler)
result = task.submit()

print(result.task_result.success, result.handler_result)
# -> True {"written_rows": 42}
心智模型
  • 数据层(blockdb) 负责存取 —— BlockTable / Table / Subscribe
  • 任务层(blockx) 负责算 —— 把「源表 + 函数 + 目标表」打包成一个 `Task` 丢给远端 worker 跑。
  • 一个 Task = 一个 CallConfig(算什么)+ 一个 Handler(结果怎么处理)。
实时流水线就是「每来一个新 block,构造一个 Task 提交一次」。

核心场景 Cookbook

场景 2.1 — 实时流水线:单源表 → 目标表

场景描述 最常见的生产形态:实时监听源表的新区块,每出一个块就把它转换、写进目标表。把上面的「单块」例子接到 Subscribe 的实时流上即可——单源表不需要 ****Aligned 示例代码
from blockx import BlockTableCallConfig, BlockWriteHandler, TaskBuilder
from blockdb import BlockTable
from blockdb.block_table import Subscribe        # L2 区块订阅(注意是 block_table 子包)

SOURCE = "chaintable.trace.eth"
TARGET = "token.token_transfer.eth"

def to_token_transfer(record, chain_id):
    value = float(record.get("value") or 0)
    if value <= 0 or not record.get("tx_id"):
        return None
    return {"id": record["id"], "from_addr": record["from_addr"],
            "to_addr": record["to_addr"], "value": value, "tx_id": record["tx_id"]}

src = BlockTable(SOURCE)

# listen() 是无限生成器,断连自动重连——直接 for 消费即可,不用自己写重试
for table_id, block in Subscribe(tables=[src]).listen():
    call_config = BlockTableCallConfig(
        block=block,
        triggerSources=[dict(
            table=src,
            func=to_token_transfer,
            params=[f"${{{SOURCE}}}", "eth"],
        )],
    )
    handler = BlockWriteHandler(targetTable=TARGET, block=block)
    task = TaskBuilder.build(call_config=call_config, handler=handler)

    res = task.submit(timeout=120.0)
    if res.task_result.success:
        n = (res.handler_result or {}).get("written_rows", 0)
        print(f"h={block.height} written={n}")
    else:
        print(f"h={block.height} error={res.error}")
最佳实践提示
  • 业务函数实时 / 回填两条路径可以完全复用同一份——只是驱动侧喂数据的方式不同(见场景 2.3)。
  • params 里的 ${表名} 是占位符:worker 会把该表在当前 block 的行逐行作为第一个参数喂进你的函数;其余 params(如 "eth")原样作为后续位置参数。
  • 默认 timeout=60s;单块行数多、函数重的链路适当调大(如 120~180)。

场景 2.2 — worker 内查在线表 + 多表高度对齐

场景描述 转换逻辑常常需要「查另一张表的当前状态」——比如把 ERC20 Transfer 事件换算成人类可读金额时,要先查 token.token.ethdecimals。这有两层:
  1. worker 内查表:函数体里直接 blockdb.Table(...).get_row(...)(worker 沙箱里要重新 import)。
  2. 高度对齐:要保证查在线表时它已经追平到当前高度,就把依赖表一起喂进 Aligned,等共识高度齐了再处理,共识高度条件如下,需全部满足才会Align当前Block
    1. Trigger表(loose_align):该高度有数据即可
    2. Depends表(strict_align):该高度及之前所有高度都有数据
示例代码
from blockx import BlockTableCallConfig, BlockWriteHandler, TaskBuilder
from blockdb import BlockTable, Aligned
from blockdb.block_table import Subscribe

def event_to_token_transfer(record, chain_id):
    from blockdb import Table as _Table            # ★ worker 沙箱内重新 import

    if record.get("name") != "Transfer":
        return None
    contract_id = record.get("contract_id")

    # ★ worker 内 Online 表 lookup:查 token.token.<chain> 拿 decimals
    token = _Table(f"token.token.{chain_id}").get_row(contract_id)
    if not token:
        return None
    decimals = token.get("decimals")
    if decimals is None:
        return None

    import json
    params = json.loads(record["params"]) if isinstance(record.get("params"), str) else record.get("params")
    from_addr, to_addr, raw_amount = params
    return {
        "id": record["id"],
        "from_addr": from_addr.lower(),
        "to_addr": to_addr.lower(),
        "token_id": token["id"],
        "value": raw_amount / 10 ** decimals,
        "tx_id": record.get("tx_id") or "0x",
    }

event = BlockTable("chaintable.event.eth")        # trigger:我要消费的源
token = BlockTable("token.token.eth")             # depends:必须已追平到该高度

# 所有表(trigger + depends)都喂进同一个 Subscribe,再用 Aligned 包一层
sub = Subscribe(tables=[event, token])
aligned = Aligned(sub, loose_align=[event], strict_align=[token])

for block in aligned.listen():
    # 此刻 token 的共识高度 >= block.height,函数内查 token.token.eth 一定是追平的状态
    call_config = BlockTableCallConfig(
        block=block,
        triggerSources=[dict(
            table=event,
            func=event_to_token_transfer,
            params=["${chaintable.event.eth}", "eth"],
        )],
    )
    handler = BlockWriteHandler(targetTable="token.token_transfer.eth", block=block)
    TaskBuilder.build(call_config=call_config, handler=handler).submit()
相关提示
  • Aligned 的语义:trigger 表(loose_align)在高度 H ready,且每个 depends 表(strict_align)的共识高度 ≥ H,才 yield 这个 H,每个高度只 yield 一次。
  • 共识高度读的是 depends 表的 <table>._height 子表,每行是一段已处理区间,连续覆盖到第一个 gap 前就是水位。
  • 驱动侧要保证每个块都提交 task(空块也提交空 task 推进 watermark),否则 coverage 出现 gap、共识高度卡住,Aligned 会一直 hold 不 yield。

场景 2.3 — 历史回填 / 批处理(一次性跑一批,不订阅)

场景描述 回填历史数据时不需要订阅区块流,而是手里已经有一批 records,想一次性批量跑同一个函数。这时用 CallListCallConfig:每行是一组位置参数。配 ReturnValueResultHandler 可以只拿结果不写表(适合先 dry-run 看输出)。 示例代码
from blockx import CallListCallConfig, ReturnValueResultHandler, BlockWriteHandler, TaskBuilder

# 业务函数与实时路径完全复用
def to_token_transfer(record, chain_id):
    value = float(record.get("value") or 0)
    if value <= 0 or not record.get("tx_id"):
        return None
    return {"id": record["id"], "value": value, "tx_id": record["tx_id"]}

# 假设 records 来自 blockdb 的历史扫描:Table("...").scan_iter("SELECT ...")
records = [...]

# callList:每行 = 一组位置参数 [record, chain_id]
calls = [[r, "eth"] for r in records]

# A) 只看结果、不落库(dry-run):用 ReturnValueResultHandler
task = TaskBuilder.build(
    call_config=CallListCallConfig(func=to_token_transfer, callList=calls),
    handler=ReturnValueResultHandler(),
)
res = task.submit()
print(res.handler_result)        # 函数所有 return 值收集回来

# B) 直接落库:换成 BlockWriteHandler(targetTable=...) 即可
task = TaskBuilder.build(
    call_config=CallListCallConfig(func=to_token_transfer, callList=calls),
    handler=BlockWriteHandler(targetTable="token.token_transfer.eth"),
)
task.submit()
最佳实践提示
  • 回填大区间时,自己按高度 / 数量切片成多个 task,用线程池并行提交,吞吐远高于串行。
  • ReturnValueResultHandler 可先 dry-run 看一眼函数到底产出什么,确认无误再换 BlockWriteHandler 真正落库。
超大回填区间,未来会使用**BlockBundle**(暂未上线) 要点:
  • bundle是解决回填效率问题的,有高级封装的接口block_bundle_init的接口,但是有必要介绍一下内部原理
    • 内部会用spark s3副本 + bundle blockX task
    • 看init进度:接口内打系统日志,total blocks,complete blocks,running
    • 要点:基于trigger表的S3副本,S3副本是异步同步的,所以调init前用户需要保证上游S3副本是最新数据
      • 现状,上游如果刚修复了,要等1h/内部沟通,未来产品体验,应该会提供异步s3同步的监控

场景 2.4 — 用 Operator 管道在函数执行前预过滤 / 去重

场景描述 有些过滤 / 去重逻辑不必塞进业务函数里——可以用 blockdb.operator 拼一条 filter → deduplicate 管道,挂到 triggerSource 的 operator 字段上,让引擎在把行喂给你的函数之前先做掉。源表行先被砍一刀,函数只处理真正关心的子集。 示例代码
from blockdb import Column, BlockTable
from blockdb.operator import filter as op_filter
from blockx import BlockTableCallConfig, BlockWriteHandler, TaskBuilder

trace = BlockTable("chaintable.trace.eth")

# 管道:只保留 value > 0 的 Transfer 行,再按 tx_id 去重
op = (
    op_filter((Column("value") > 0) & (Column("name") == "Transfer"))
    .deduplicate(Column("tx_id"))
)

def to_token_transfer(record, chain_id):
    return {"id": record["id"], "value": float(record.get("value") or 0), "tx_id": record["tx_id"]}

call_config = BlockTableCallConfig(
    block=block,
    triggerSources=[dict(
        table=trace,
        func=to_token_transfer,
        params=[f"${{{trace.name}}}", "eth"],
        operator=op.to_json(),          # ★ 挂上预处理管道(str 或 dict 均可)
    )],
)
handler = BlockWriteHandler(targetTable="token.token_transfer.eth", block=block)
TaskBuilder.build(call_config=call_config, handler=handler).submit()
最佳实践提示
  • 管道的算子(filter / deduplicate / Column 表达式)全部来自 blockdb;blockx 只负责把它挂到 triggerSource 上透传,引擎在函数前执行。
  • operator 接受 op.to_json() 字符串或 dict,二选一。
  • 管道可选:简单逻辑直接写函数体里更直观;行数大、过滤比例高时用 operator 先砍一刀更省。

高阶参数与性能调优

task.submit(...)** 的参数**

参数类型默认说明
timeoutfloat60.0整体等待上限(秒),超时抛 TimeoutError。单块重、行数多时调大
poll_intervalfloat0.2轮询结果的间隔(秒),仅在 watch 流不可用、降级轮询时生效
use_watchboolTrueTrue 走实时推送(WatchTasks)拿终态;流异常时自动降级到轮询。一般不用动
ttl_msint30000slot 存活时间(毫秒)
coordinator_addr / worker_addrstrNone覆盖默认服务地址;传 worker_addr 会跳过调度直连 worker(旧路径,调试用)

func** 的传法**

写法行为适用
func=callable
SDK 用 inspect.getsource 抓函数源码内联传给 worker日常首选,直接写 def
func=Function(source_code="...")显式持有源码字符串想把代码当数据管理、动态拼装
[未上线]
func="some_function_id"
当作远端已注册的 functionIdFunction service 上线后用(当前未上线)
(临时)在 worker 函数体内 from blockx_sdk import call_subfunc,再 call_subfunc(function_id, *args)function内call function暂时使用,未来会下线。blockx_sdk不是blockx的内容
不论哪种写法,worker 入口约定函数名为 _。传 callable / def 时 SDK 自动补一行 _ = <你的函数名> 别名,你不用关心。

实时 vs 回填 对照

维度实时回填 (未来会被BlockBundle代替)
数据来源Subscribe.listen() 推 block自己用 Table.scan_iter() 等拉历史行
CallConfigBlockTableCallConfigCallListCallConfig
HandlerBlockWriteHandlerBlockWriteHandler(落库)/ ReturnValueResultHandler(只回传)
单 task 粒度一个 block一段 callList
并发单流按 block 顺序自行切片 + 线程池并行
业务函数两边同一份

Debug 模式(写入预演 / 不落库)

联调新转换逻辑时,最怕脏数据被误写进目标表。通过环境变量 BLOCKDB_DEBUG 开启 debug 模式(和 blockdb-py 完全一样的开关): 开启后,worker 端不会真正写目标表,而是把「这次会写什么」转发给日志系统展示(日志系统开发中)。读 / 算逻辑照常跑,只有最后一步落库被替换成打日志。
export BLOCKDB_DEBUG=1     # 1 / true / yes / on 都算开
BLOCKDB_DEBUG 是和 blockdb-py 共用的同一开关,设一次就把整条 SDK 链路(读侧 + 写侧)一起切到 debug,代码里不用改任何东西。 适用场景:新写入逻辑上线前自检、数据清洗联调期反复跑、demo / 教学不污染共享表。
上线前记得关掉 BLOCKDB_DEBUG,否则 prod 任务的写入会全部进日志而不落库。

反馈通道

本 SDK 处于快速迭代期。各位数据组的同学在数据开发链路上如果遇到任何「用得不爽、接口不符合 Python 习惯、报错信息难懂」的地方,请随时在 **开发体验汇总群 **反馈! 更多内容请查看BlockX-py API完整手册