Skip to main content

为什么需要blockX

从BlockDB的block table介绍中,我们已经了解到,利用block table的事件订阅API,可以很容易的实现 “数据生产 -> 事件通知 -> 下游消费” 的区块数据处理闭环了,那么在生产中,实现这个闭环消费的notebook时,还会遇到什么问题呢? 事实上,这个闭环的下游消费环节,还需要认真考虑数据处理速度的问题。上游表的每个区块的数据行数有很大差异,有可能每个区块只有几条,也有可能一个区块有上千条,而出块频率是跟链相关的,比如bsc链,你必须在4s内处理完上游的区块数据,不然就会掉队且永远跟不上最新数据。 blockX:从单进程处理升级到serverless并行计算 如果我们在订阅BlockDB的notebook端,直接处理数据,这属于串行模式。
  • 痛点:串行处理1000条数据若需100s,区块链每12秒出新块,会永远追不上最新数据,需要考虑单机多并发,且受限核数,并发后的吞吐上限也很低。
  • BlockX方案:它是 Serverless 的分布式并发引擎,即开即用。它将一个区块的数据按行全并发执行,不论多少行数据,都接近单行数据处理的时间延迟,确保你的业务逻辑永远与出块频率同步。
因此,在chaintable上搭建链上数据处理链路,最佳实践为:
  • Notebook端:负责监听blockDB事件,执行blockX的task submit命令。
  • BlockX端:执行大规模、高并发的计算和原子化数据写入。

BlockX核心概念

在 BlockX 中,你只需定义一个 Task(任务)。一个 Task 由以下三部分组成:

Function (业务逻辑)

由用户编写的纯 Python 函数(无状态)。
  • 轻量化:函数执行过程应该是轻量级的
    1. cpu、内存占用极低
    2. 总耗时很短 — 一般200ms
    3. 只有纯逻辑+轻量级网络服务访问
  • 单行状态计算:函数只负责“单行输入 -> 逻辑处理 -> 结果输出”
    • 不关心其他并发调用
    • 不需要聚合上游多条数据
函数的返回值必须为dict或者dict list,后续ResultHandler会将所有dict收集为一个list集中处理。
  • 每个dict会视为一个数据单位,比如需要落表的一行数据,一个完整计算结果等等
  • 单次函数需要返回多个待处理的结果时,需要用dict list返回
典型的无状态处理函数示例:
def trace_to_token_transfer(record):
    # 防御性过滤:排除代理调用等非转账行为
    if record['call_type'] == 'delegatecall':
        return None

    # 必须有 value 且有 tx_id
    if not (record.get('value', 0) > 0 and record.get('tx_id')):
        return None

    # 仅返回 dict / list[dict] / None
    return dict(
        id=record['id'],
        token_id='0x000000000000000000000000000000000000EEEE',  # 原生 ETH 标识
        from_addr=record['from_addr'],
        to_addr=record['to_addr'],
        value=record['value'],
        tx_id=record['tx_id'],
    )

FunctionCallConfig(计算配置)

用于定义“处理哪些数据”以及“如何预处理”。 目前有两种类型的FunctionCallConfig:

BlockTableCallConfig

BlockTableCallConfig是最典型的链上数据处理场景的task计算配置。 功能,直接表达如何对上游trigger表的区块读取,以及对读取数据的filter和去重需求,生成最终的function call list。
{
    "type": "BlockTableCallConfig",
    "config": {
      "block": {
        "id": "0x123...",
        "height": 18000000,
        "timestamp": 1712345678
      },
      "triggerSources": [
        {
          "table": "ethereum.trace",
          "operator": {
            "AST JSON — trace 表的 filter 执行计划树"
          },
          "function": "trace_to_token_transfer"
        },
        {
          "table": "ethereum.event",
          "function": "event_to_token_transfer"
        }
      ]
    }
  • block,标识当前处理的区块信息
  • triggerSource,可以配置多个
    • 表达需要处理那个上游表,对应block的所有数据
    • 其中operators字段,支持配置filter和dedup表达,针对上游表数据进行过滤和去重预处理
    • function,预处理后的每行数据,需要调用的函数

CallListCallConfig

场景,用户自己生成call list,批量的call list和function一起交付blockX并行执行。
{
    "type": "CallListCallConfig",
    "config": {
      "function": "trace_to_token_transfer",
      "callList": [
       [param1, param2, param3, ...], // 每个param可以是任意逻辑类型
       [...]
      ]
    }
}

ResultHandler (结果处理器)

结果处理器,定义了task中所有函数计算完,结果需要做什么处理。ResultHandler可以为空,因为也有可能函数结果不需要再做处理。 目前有两种结果处理器。

BlockWriteResultHandler

BlockWriteResultHandler是链上数据处理场景的专用Result Handler,它会自动将所有的results用block table的区块粒度写API,写入block table。
  • targetTable,目标表,类型必须是block state table或block event table
  • block,写入的区块信息,BlockWriteResultHandler会调用blockDB区块写API执行批量写入,并保证区块粒度的写入原子性
{
    "type": "BlockWriteResultHandler",
    "config": {
      "targetTable": "token.token_transfer.eth",
      "block": {
        "block_id": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
        "block_height": 18000000,
        "block_timestamp": 1712345678
      }
}

ReturnValueResultHandler

ReturnValueResultHandler用于用户需要自行处理task所有函数结果的场景。ReturnValueResultHandler会将函数计算的结果收集,并设置在task response.handlerResult字段,直接返回给client端处理。
{
    "type": "ReturnValueResultHandler",
    "config": {
    }
}
response结果样例:
  • results为函数返回的dict list全集
{
  "failureCode": "0",
  "handlerResult": {
    "handlerName": "ReturnValueResultHandler",
    "success": true,
**    "results": [**
      {
        "tx_hash": "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22026",
        "from": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e",
        "to": "0x32Be343B94f860124dC4fEe278FDCBD38C102D88",
        "value": "1.5 ETH",
        "block_number": 19560200
      },
      {
        "tx_hash": "0x8d9a2636a0f8b8e8f8d9a2636a0f8b8e8f8d9a2636a0f8b8e8f8d9a2636a0f8b",
        "from": "0x32Be343B94f860124dC4fEe278FDCBD38C102D88",
        "to": "0x1234567890abcdef1234567890abcdef12345678",
        "value": "0.25 ETH",
        "block_number": 19560201
      }
    ]
  }
}

BlockX task是怎么加速函数计算的

BlockX 之所以能实现极致的性能,并非单纯依靠“多开几个线程”,而是通过全并发执行、全局**** IO ****拦截缓存、以及高效的任务调度合力实现的。

Task的核心优化思路

每一个 Task 在进入 BlockX 后,BlockX会在百毫秒内确定最合适的执行worker并完成任务分配,而worker会通过特定优化的三个阶段独立完成整个task:
  1. CallBuilder 插件根据任务描述生成调用列表(Call List)。这个过程可能会利用高速缓存快速读取上游source表的区块数据,并行执行数据过滤和去重预处理。
  2. 函数全并发执行,生成的数千个 Call 进入全并发执行阶段,尽可能使用worker所有的计算资源。
  3. 集中处理所有 Call 产生的 Results(如批量写入或直接返回),完成计算闭环。
其中,阶段2,函数全并发执行是task的耗时关键路径,blockX对于这个阶段的function call全并发执行优化是基于一个潜在约定:所有 Call 假定在当前 Task 周期内,外部世界状态是完全相同的。
  • 这个假设是合理的,以block数据中每行都需要执行函数计算为例来理解,实际每个block中的数据没有先后关系,均为同一个区块时刻,因此针对每一行的全并发函数执行,可以视为同一时刻的链上数据处理
  • 因此,blockX会协调同一task内所有子函数调用和其他IO操作,视为同一时刻的IO调用,完全相同的IO操作,实际都只需要执行一次,并发和后继的function call都可以直接复用缓存结果
基于IO状态全缓存,和call全并发执行特性,用户无需担心每个task内call list多大,也不用担心存在重复的call或者外部IO,blockX可以稳定的以接近单次函数执行的延迟完成task。 为了配合 BlockX 的加速机制,开发者编写 Function 时应遵循以下“轻量化”原则:
  1. 无状态:Call 之间不应有任何顺序依赖,也不应修改会影响其他 Call 的全局变量。
  2. 低流量:网络访问只限快速结束的,不支持长链接,尽量减少大数据量的传输。
  3. 计算纯粹:将 CPU 和内存占用降到最低,只保留核心逻辑 + 轻量级网络访问。
function中,开发者完全不需要自己写表。

BlockX总结

最终,再次总结用户应该在何时使用blockX计算,何时在notebook本地计算。
场景特征建议方案与理由
低频、少量数据notebook 端处理逻辑简单,单机性能足以覆盖,链路更短。
高频、海量数据 (如:Transfer、事件解析)BlockX (必选)只有分布式并行才能追上出块速度。
需要写入 Block 表BlockX (首选)内置 Handler 自动处理数据写入,安全可靠。
需要智能缓存 (如:函数内频繁访问同一合约)BlockX (必选)自动合并重复的网络 IO,大幅降低 RPC 节点压力。