Skip to main content

1. 顶层导出与模块入口

# 任务层主入口
from blockx import (
    BlockTableCallConfig,      # 实时:在某 block 上按 trigger 源表跑函数
    CallListCallConfig,        # 回填:一批 positional args 批量跑
    BlockWriteHandler,         # 结果写入目标表
    ReturnValueResultHandler,  # 结果直接回传(不写表)
    TaskBuilder, Task, TaskResult,
    Function,                  # 业务函数封装
)

# 数据来源 / 区块对象来自 blockdb-py
from blockdb import BlockTable, Block, Aligned, Table
from blockdb.block_table import Subscribe

# 底层 dispatcher(进阶 / 定制化才用)
from blockx._grpc_client import blockx as c
一个 Task = 一个 CallConfig(算什么)+ 一个 Handler(结果怎么处理)。

2. 任务层核心 API 字典

2.1 BlockTableCallConfig — 实时区块任务配置

from blockx import BlockTableCallConfig
对应 task 的 functionCallConfigtype = "BlockTableCallConfig")。描述「在某个 block 上、对一组 trigger 源表逐行执行业务函数」。实时流水线用。

BlockTableCallConfig(block=None, triggerSources=None)

方法简述 — 构造实时区块任务配置。 入参
参数类型必填默认说明示例值
blockBlock / dict / 带 .id/.height/.timestamp 的 duck type / NoneNone当前区块;None 序列化为空 dictBlock(id="0x..", height=25015000, timestamp=1717000000)
triggerSourceslist[dict]None源表 + 业务函数列表,单项 schema 见下见下方示例
triggerSources 每一项(dict)的字段
字段类型必填说明示例值
tableBlockTable / str源表(取 .name 或 str 原值作为 table_id)"chaintable.trace.eth"
funccallable / Function / str / None业务函数。callable 抓源码内联;Functionto_trigger_dict()str 当 functionIddef f(record, chain_id): ...
functionIdstr直接传远端 functionId(func=None 时启用)"trace_to_contract"
codestr / dict / None直接传源码:strsourceCode,dict 形如 {"sourceCode": "..."}
paramslist业务函数额外位置参数;${表名} 占位符由服务端按当前 trigger 表的行替换["${chaintable.trace.eth}", "eth"]
operatorstr / dictOperator 预处理管道(来自 blockdb.operator),引擎在业务函数执行前对源表行做 filter / deduplicate;传 to_json() 串或 dict 均可op.to_json()
实例方法
方法返回说明
to_dict()dictwire schema(见下)
to_proto_config()bytesto_dict() 的 UTF-8 JSON bytes;datetime 转 epoch int、bytes"0x<hex>"
to_proto_type()str固定 "BlockTableCallConfig"
to_dict() 输出结构
{
  "block": {"id": "...", "height": 25015000, "timestamp": 1717000000},
  "triggerSources": [
    {
      "table": "chaintable.trace.eth",
      "functionId": "",
      "code": {"sourceCode": "def to_token_transfer(record, chain_id):\n    ...\n\n_ = to_token_transfer\n"},
      "params": ["${chaintable.trace.eth}", "eth"]
    }
  ]
}
示例
from blockx import BlockTableCallConfig
from blockdb import BlockTable

src = BlockTable("chaintable.trace.eth")
call_config = BlockTableCallConfig(
    block=block,
    triggerSources=[dict(table=src, func=to_token_transfer,
                         params=["${chaintable.trace.eth}", "eth"])],
)
附加 Operator 预处理管道(可选) 在 triggerSource 上挂 operator,引擎会在把行喂给业务函数前先跑这条 filter → deduplicate 管道。算子全部来自 blockdb.operator,blockx 只负责透传:
from blockdb import Column
from blockdb.operator import filter as op_filter

op = (op_filter((Column("value") > 0) & (Column("name") == "Transfer"))
      .deduplicate(Column("tx_id")))

call_config = BlockTableCallConfig(
    block=block,
    triggerSources=[dict(table=src, func=to_token_transfer,
                         params=["${chaintable.trace.eth}", "eth"],
                         operator=op.to_json())],   # str 或 dict 均可
)
序列化后 operator 原样进 triggerSources[i]to_dict() 里仅当传了非空 operator 才带该字段。管道的算子语义、to_json() 输出格式见 blockdb-py 的 Operator 文档。

2.2 CallListCallConfig — 回填 / 批处理配置

from blockx import CallListCallConfig
对应 functionCallConfig.type = "CallListCallConfig"。回填 / 离线批处理用:直接传一个函数 + 一组 callList,每行是一组位置参数。

CallListCallConfig(func=None, callList=None)

方法简述 — 构造回填批处理配置。func 解析规则同 BlockTableCallConfigfunc 入参
参数类型必填默认说明示例值
funccallable / Function / str / NoneNone业务函数def f(record, chain_id): ...
callListlist[list]None每行一组位置参数[[record1, "eth"], [record2, "eth"]]
实例方法to_dict() / to_proto_config() / to_proto_type()(固定 "CallListCallConfig")。 to_dict() 输出结构
{
  "function": "",
  "code": {"sourceCode": "def to_token_transfer(record, chain_id):\n    ...\n\n_ = to_token_transfer\n"},
  "callList": [["<record>", "eth"], ["<record>", "eth"]]
}
wire 字段是 function(不是 functionId),SDK 序列化时做了一次重命名。

2.3 BlockWriteHandler / ReturnValueResultHandler — 结果处理器

from blockx import BlockWriteHandler, ReturnValueResultHandler

BlockWriteHandler(targetTable=None, block=None)

方法简述 — 把业务函数返回的 dict 写入目标 BlockTable。对应 `resultHandler.type = “BlockWriteResultHandler”`。 入参
参数类型必填默认说明示例值
targetTablestr / BlockTable / NoneNone目标表名(取 .name 或 str)"token.token_transfer.eth"
blockBlock / dict / NoneNone区块信息(解析同 CallConfig)
实例方法to_dict() / to_proto_config() / to_proto_type()(固定 "BlockWriteResultHandler")。 to_dict() 输出结构
{
  "targetTable": "token.token_transfer.eth",
  "block": {"id": "...", "height": 25015000, "timestamp": 1717000000}
}
debug 模式由环境变量 BLOCKDB_DEBUG 统一控制,不在 handler 上单独配置,见 [§2.7 调试模式](#27-调试模式debug)。

ReturnValueResultHandler()

方法简述不写表,把业务函数全部 return 值收集到 handler_result 回传给客户端。回填 dry-run / 探查常用。对应 resultHandler.type = "ReturnValueResultHandler" 入参:无。 实例方法to_dict(){}to_proto_config()b"{}"to_proto_type()"ReturnValueResultHandler"

2.4 Function — 业务函数封装

from blockx import Function
把 Python 业务函数封装成 triggerSourcesfunc= 接受的对象。Function service 未上线,当前把源码内联进 task 直接传给 worker 执行。

Function(source_code=None, function_id="", name="", mode="inline")

方法简述 — 构造函数封装。source_codefunction_id 至少传一个,否则 ValueError 入参
参数类型必填默认说明示例值
source_codestr / NoneNone内联函数源码。worker 入口约定为 _,建议直接 def _(...): ... 或尾部加 _ = my_func"def _(record, cid): ..."
function_idstr""远端已注册 functionId(service 上线后用)
namestr""函数名(当前仅作元信息保存)"trace_to_contract"
modestr"inline"仅支持 "inline";其它值抛 NotImplementedError"inline"
实例方法
方法返回说明
to_trigger_dict()dict渲染为 triggerSources[i]functionId / code 合体片段:{"functionId": <id>},有源码时再带 "code": {"sourceCode": <src>}
from blockx import Function

fn = Function(source_code="""
def _(record, chain_id):
    if not record.get("tx_id"):
        return None
    return {"id": record["id"], "tx_id": record["tx_id"], "chain_id": chain_id}
""")

2.5 TaskBuilder / Task — 任务构造与提交

from blockx import TaskBuilder, Task

TaskBuilder.build(call_config, handler) → Task

方法简述 — 静态方法,把 call_config + handler 组装成 Tasktask_id 由 SDK 内部 uuid4 生成,用户不传。 入参
参数类型必填默认说明
call_configBlockTableCallConfig / CallListCallConfig / duck type算什么
handlerBlockWriteHandler / ReturnValueResultHandler / duck type结果怎么处理
返回Task

Task** 属性**

属性类型说明
task_idstruuid hex;每次构造都是新的
call_config见上鸭子类型
handler见上鸭子类型

Task.to_dict() → dict

整个 task 的 wire 结构:{task_id, functionCallConfig, resultHandler}

Task.submit(poll_interval=None, timeout=None, ttl_ms=None, coordinator_addr=None, worker_addr=None, use_watch=None) → TaskResult

方法简述 — 提交任务并等待终态返回。自动跑完整链路:Coordinator.ReserveWorkerSlot → Worker.SubmitTask → Worker.WatchTasks(流异常时降级 GetTaskResult 轮询)。 入参
参数类型必填默认说明示例值
timeoutfloat / None60.0整体等待 deadline(秒);超时抛 TimeoutError180.0
poll_intervalfloat / None0.2降级轮询时的间隔(秒)0.5
use_watchbool / NoneTrueTrue 优先 WatchTasks 实时推送;异常 / 流意外结束自动降级到轮询True
ttl_msint / None30000slot TTL(毫秒);worker_addr 直连路径才有效30000
coordinator_addrstr / NoneNone覆盖 Coordinator endpoint
worker_addrstr / NoneNone传值则跳过 Coordinator,直连 Worker.RequestTaskSlot(旧路径 / 调试)
返回TaskResult
result = task.submit(timeout=120.0)
if result.task_result.success:
    print(result.handler_result)   # -> {"written_rows": 42}
else:
    print(result.error)`

2.6 TaskResult — 任务结果

from blockx import TaskResult
方法简述Task.submit() 的返回,封装成功状态 + handler payload + 原始响应。 属性
属性类型说明
task_result.successbool是否成功
task_result.failure_codestr
失败码(成功时为空)
task_result.retryablebool是否可重试(失败时有意义)
handler_resultdict / list / bytes / NoneresultHandler 的 plugin payload;UTF-8 JSON 的 bytes 自动 json.loads,否则保留 bytes
errorstr / None等价于 failure_code 的便捷字段;成功时 None
rawAny原始 gRPC 响应(GetTaskResultResponseWatchTasks adapt 后的等价结构)
handler_result 匹配规则
  1. 先按 plugin_name == handler.to_proto_type() 匹配(task 没 handler 时回退常量 "BlockWriteResultHandler")。
  2. 没匹配上时取 plugins 列表第一项的非空 payload。

2.7 调试模式(debug)

通过环境变量 BLOCKDB_DEBUG=11 / true / yes / on均可)开启,与 blockdb-py 共用同一开关。开启后 worker 不再真正写目标表,而是把写入内容(目标表、行数、样本行)作为一条调试日志送入日志通道,方便确认”到底会写什么”。
  • BLOCKDB_DEBUG 设一次即可把读侧 + 写侧整条 SDK 链路一起切到 debug,代码里不用改任何东西。
  • 读 / 算逻辑不受影响,只有最后落库那一步被替换成打日志。
export BLOCKDB_DEBUG=1

3. 底层 Dispatcher(Coordinator / Worker)

from blockx._grpc_client import blockx as c
这是进阶接口——日常用 Task.submit() 就够了。需要定制超时、断点续传、独立观察 worker 状态、或调试 fixture 时才直接调底层 dispatcher。
函数返回说明
c.reserve_worker_slot(task_id, coordinator_addr=None, **kwargs)resp(含 worker_addr / slot_id连 Coordinator 拿 slot;拿不到按 retry_interval 重试到 timeout
c.request_task_slot(task_id, ttl_ms=30000, worker_addr=None)resp直连 Worker 申请 slot(旧路径 / 跳过 Coordinator)
c.submit_task(task_input, slot_id, worker_addr=None)respTaskInputDTO(或已是 pb2 的 TaskInput)提交给 Worker
c.get_task_result(task_id, worker_addr=None)GetTaskResultResponse单次拉结果
c.watch_tasks(task_ids, worker_addr=None, rpc_timeout=None)stream of TaskUpdateserver-streaming,遇 is_terminal / SUCCEEDED / FAILED 终止
c.submit_and_wait(task_input, ...)GetTaskResultResponse上面这些的组合,即 Task.submit() 的底层实现

TaskInputDTO

Task._to_dto() 产出的 dataclass,结构:
TaskInputDTO(
    task_id: str,
    function_call_config: _ConfigDTO(type: str, config: bytes),
    result_handler:      _ConfigDTO(type: str, config: bytes),
)
real 路径在 grpc_client 层把 DTO 转 blockx_pb2.TaskInput;上层只持有 dataclass DTO,不依赖 pb2。

调用路径

路径触发条件流程
推荐不传 worker_addrCoordinator.ReserveWorkerSlotWorker.SubmitTaskWatchTasks(兜底 GetTaskResult 轮询)
直连worker_addr跳过 Coordinator,直接 Worker.RequestTaskSlot 拿 slot 后提交