> ## Documentation Index
> Fetch the complete documentation index at: https://docs.chaintable.com/llms.txt
> Use this file to discover all available pages before exploring further.

# BlockX-py 快速开始

> 使用 BlockX-py 提交实时区块任务、对齐多表订阅并执行历史回填。

***

## **3 分钟快速上手 (Quick Start)**

一个 blockx「任务」干的事永远是同一句话：**在某个区块上，对源表的每一行跑你的函数，把返回的 dict 写进目标表。为什么BlockDB已经完成写入闭环，还需要BlockX？简而言之是性能原因，详见**[BlockX：大规模链上数据的并发函数计算引擎](https://debankglobal.sg.larksuite.com/wiki/KUrhwlFN4iCOeYkZPt6l9k4agqe)

提一下bundle

```Python theme={null}
from blockx import BlockTableCallConfig, BlockWriteHandler, TaskBuilder
from blockdb import Block

# 1) 写一个转换函数：入参是源表的一行 dict，返回 dict 就写入目标表，返回 None 就丢弃这行
#    worker 在隔离沙箱里执行它：常量 / 辅助逻辑都要写进函数体，不能引用函数外的全局名
def to_token_transfer(record, chain_id):
    value = float(record.get("value") or 0)        # FLOAT 列以字符串传入，先转一次再用
    if value <= 0 or not record.get("tx_id"):
        return None                                  # 不合格的行直接过滤掉
    return {
        "id":        record["id"],
        "from_addr": record["from_addr"],
        "to_addr":   record["to_addr"],
        "value":     value,
        "tx_id":     record["tx_id"],
    }

# 2) 指定要处理哪个区块
block = Block(id="0xblock...", height=25015000, timestamp=1717000000)

# 3) CallConfig：在这个 block 上、对源表的每一行跑 to_token_transfer
call_config = BlockTableCallConfig(
    block=block,
    triggerSources=[dict(
        table="chaintable.trace.eth",                 # 源表
        func=to_token_transfer,                        # 你的转换函数
        params=["${chaintable.trace.eth}", "eth"],     # ${表名} = 把该表本块的行逐行喂给函数
    )],
)

# 4) Handler：把函数返回的 dict 写到目标表
handler = BlockWriteHandler(targetTable="token.token_transfer.eth", block=block)

# 5) 组装 + 提交。task.submit() 自动跑完「派活 → 执行 → 等结果」整条链路
task = TaskBuilder.build(call_config=call_config, handler=handler)
result = task.submit()

print(result.task_result.success, result.handler_result)
# -> True {"written_rows": 42}
```

**心智模型**：

* **数据层（blockdb）** 负责存取 —— `BlockTable` / `Table` / `Subscribe`。

* **任务层（blockx）** 负责算 —— 把「源表 + 函数 + 目标表」打包成一个 \`Task\` 丢给远端 worker 跑。

* 一个 `Task` = 一个 **CallConfig**（算什么）+ 一个 **Handler**（结果怎么处理）。

> 实时流水线就是「每来一个新 block，构造一个 Task 提交一次」。

***

## **核心场景 Cookbook**

### **场景 2.1 — 实时流水线：单源表 → 目标表**

**场景描述**

最常见的生产形态：实时监听源表的新区块，每出一个块就把它转换、写进目标表。把上面的「单块」例子接到 `Subscribe` 的实时流上即可——**单源表不需要 \*\*\*\*`Aligned`**。

**示例代码**

```Python theme={null}
from blockx import BlockTableCallConfig, BlockWriteHandler, TaskBuilder
from blockdb import BlockTable
from blockdb.block_table import Subscribe        # L2 区块订阅（注意是 block_table 子包）

SOURCE = "chaintable.trace.eth"
TARGET = "token.token_transfer.eth"

def to_token_transfer(record, chain_id):
    value = float(record.get("value") or 0)
    if value <= 0 or not record.get("tx_id"):
        return None
    return {"id": record["id"], "from_addr": record["from_addr"],
            "to_addr": record["to_addr"], "value": value, "tx_id": record["tx_id"]}

src = BlockTable(SOURCE)

# listen() 是无限生成器，断连自动重连——直接 for 消费即可，不用自己写重试
for table_id, block in Subscribe(tables=[src]).listen():
    call_config = BlockTableCallConfig(
        block=block,
        triggerSources=[dict(
            table=src,
            func=to_token_transfer,
            params=[f"${{{SOURCE}}}", "eth"],
        )],
    )
    handler = BlockWriteHandler(targetTable=TARGET, block=block)
    task = TaskBuilder.build(call_config=call_config, handler=handler)

    res = task.submit(timeout=120.0)
    if res.task_result.success:
        n = (res.handler_result or {}).get("written_rows", 0)
        print(f"h={block.height} written={n}")
    else:
        print(f"h={block.height} error={res.error}")
```

**最佳实践提示**

* 业务函数实时 / 回填两条路径**可以完全复用同一份**——只是驱动侧喂数据的方式不同（见场景 2.3）。

* `params` 里的 `${表名}` 是占位符：worker 会把该表在当前 block 的行**逐行**作为第一个参数喂进你的函数；其余 `params`（如 `"eth"`）原样作为后续位置参数。

* 默认 `timeout=60s`；单块行数多、函数重的链路适当调大（如 `120~180`）。

***

### **场景 2.2 — worker 内查在线表 + 多表高度对齐**

**场景描述**

转换逻辑常常需要「查另一张表的当前状态」——比如把 ERC20 `Transfer` 事件换算成人类可读金额时，要先查 `token.token.eth` 拿 `decimals`。这有两层：

1. **worker 内查表**：函数体里直接 `blockdb.Table(...).get_row(...)`（worker 沙箱里要重新 `import`）。

2. **高度对齐**：要保证查在线表时它已经追平到当前高度，就把依赖表一起喂进 `Aligned`，等共识高度齐了再处理，共识高度条件如下，需全部满足才会Align当前Block

   1. Trigger表(loose\_align)：该高度有数据即可

   2. Depends表(strict\_align)：该高度及之前所有高度都有数据

**示例代码**

```Python theme={null}
from blockx import BlockTableCallConfig, BlockWriteHandler, TaskBuilder
from blockdb import BlockTable, Aligned
from blockdb.block_table import Subscribe

def event_to_token_transfer(record, chain_id):
    from blockdb import Table as _Table            # ★ worker 沙箱内重新 import

    if record.get("name") != "Transfer":
        return None
    contract_id = record.get("contract_id")

    # ★ worker 内 Online 表 lookup：查 token.token.<chain> 拿 decimals
    token = _Table(f"token.token.{chain_id}").get_row(contract_id)
    if not token:
        return None
    decimals = token.get("decimals")
    if decimals is None:
        return None

    import json
    params = json.loads(record["params"]) if isinstance(record.get("params"), str) else record.get("params")
    from_addr, to_addr, raw_amount = params
    return {
        "id": record["id"],
        "from_addr": from_addr.lower(),
        "to_addr": to_addr.lower(),
        "token_id": token["id"],
        "value": raw_amount / 10 ** decimals,
        "tx_id": record.get("tx_id") or "0x",
    }

event = BlockTable("chaintable.event.eth")        # trigger：我要消费的源
token = BlockTable("token.token.eth")             # depends：必须已追平到该高度

# 所有表（trigger + depends）都喂进同一个 Subscribe，再用 Aligned 包一层
sub = Subscribe(tables=[event, token])
aligned = Aligned(sub, loose_align=[event], strict_align=[token])

for block in aligned.listen():
    # 此刻 token 的共识高度 >= block.height，函数内查 token.token.eth 一定是追平的状态
    call_config = BlockTableCallConfig(
        block=block,
        triggerSources=[dict(
            table=event,
            func=event_to_token_transfer,
            params=["${chaintable.event.eth}", "eth"],
        )],
    )
    handler = BlockWriteHandler(targetTable="token.token_transfer.eth", block=block)
    TaskBuilder.build(call_config=call_config, handler=handler).submit()
```

**相关提示**

* `Aligned` 的语义：trigger 表（`loose_align`）在高度 H ready，且每个 depends 表（`strict_align`）的共识高度 ≥ H，才 yield 这个 H，每个高度只 yield 一次。

* 共识高度读的是 depends 表的 `<table>._height` 子表，每行是一段已处理区间，连续覆盖到第一个 gap 前就是水位。

* 驱动侧要保证**每个块都提交 task**（空块也提交空 task 推进 watermark），否则 coverage 出现 gap、共识高度卡住，`Aligned` 会一直 hold 不 yield。

***

### **场景 2.3 — 历史回填 / 批处理（一次性跑一批，不订阅）**

**场景描述**

回填历史数据时不需要订阅区块流，而是手里已经有一批 records，想**一次性批量跑同一个函数**。这时用 `CallListCallConfig`：每行是一组位置参数。配 `ReturnValueResultHandler` 可以**只拿结果不写表**（适合先 dry-run 看输出）。

**示例代码**

```Python theme={null}
from blockx import CallListCallConfig, ReturnValueResultHandler, BlockWriteHandler, TaskBuilder

# 业务函数与实时路径完全复用
def to_token_transfer(record, chain_id):
    value = float(record.get("value") or 0)
    if value <= 0 or not record.get("tx_id"):
        return None
    return {"id": record["id"], "value": value, "tx_id": record["tx_id"]}

# 假设 records 来自 blockdb 的历史扫描：Table("...").scan_iter("SELECT ...")
records = [...]

# callList：每行 = 一组位置参数 [record, chain_id]
calls = [[r, "eth"] for r in records]

# A) 只看结果、不落库（dry-run）：用 ReturnValueResultHandler
task = TaskBuilder.build(
    call_config=CallListCallConfig(func=to_token_transfer, callList=calls),
    handler=ReturnValueResultHandler(),
)
res = task.submit()
print(res.handler_result)        # 函数所有 return 值收集回来

# B) 直接落库：换成 BlockWriteHandler(targetTable=...) 即可
task = TaskBuilder.build(
    call_config=CallListCallConfig(func=to_token_transfer, callList=calls),
    handler=BlockWriteHandler(targetTable="token.token_transfer.eth"),
)
task.submit()
```

**最佳实践提示**

* 回填大区间时，**自己按高度 / 数量切片成多个 task**，用线程池并行提交，吞吐远高于串行。

* `ReturnValueResultHandler` 可先 dry-run 看一眼函数到底产出什么，确认无误再换 `BlockWriteHandler` 真正落库。

超大回填区间，未来会使用\*\*`BlockBundle`\*\*(暂未上线)

要点：

* bundle是解决回填效率问题的，有高级封装的接口block\_bundle\_init的接口，但是有必要介绍一下内部原理

  * 内部会用spark s3副本 + bundle blockX task

  * 看init进度：接口内打系统日志，total blocks，complete blocks，running

  * 要点：基于trigger表的S3副本，S3副本是异步同步的，所以调init前用户需要保证上游S3副本是最新数据

    * 现状，上游如果刚修复了，要等1h/内部沟通，未来产品体验，应该会提供异步s3同步的监控

***

### **场景 2.4 — 用 Operator 管道在函数执行前预过滤 / 去重**

**场景描述**

有些过滤 / 去重逻辑不必塞进业务函数里——可以用 `blockdb.operator` 拼一条 `filter → deduplicate` 管道，挂到 triggerSource 的 `operator` 字段上，让引擎在把行喂给你的函数**之前**先做掉。源表行先被砍一刀，函数只处理真正关心的子集。

**示例代码**

```Python theme={null}
from blockdb import Column, BlockTable
from blockdb.operator import filter as op_filter
from blockx import BlockTableCallConfig, BlockWriteHandler, TaskBuilder

trace = BlockTable("chaintable.trace.eth")

# 管道：只保留 value > 0 的 Transfer 行，再按 tx_id 去重
op = (
    op_filter((Column("value") > 0) & (Column("name") == "Transfer"))
    .deduplicate(Column("tx_id"))
)

def to_token_transfer(record, chain_id):
    return {"id": record["id"], "value": float(record.get("value") or 0), "tx_id": record["tx_id"]}

call_config = BlockTableCallConfig(
    block=block,
    triggerSources=[dict(
        table=trace,
        func=to_token_transfer,
        params=[f"${{{trace.name}}}", "eth"],
        operator=op.to_json(),          # ★ 挂上预处理管道（str 或 dict 均可）
    )],
)
handler = BlockWriteHandler(targetTable="token.token_transfer.eth", block=block)
TaskBuilder.build(call_config=call_config, handler=handler).submit()
```

**最佳实践提示**

* 管道的算子（`filter` / `deduplicate` / `Column` 表达式）全部来自 `blockdb`；blockx 只负责把它挂到 triggerSource 上透传，引擎在函数前执行。

* `operator` 接受 `op.to_json()` 字符串或 dict，二选一。

* 管道**可选**：简单逻辑直接写函数体里更直观；行数大、过滤比例高时用 operator 先砍一刀更省。

***

## **高阶参数与性能调优**

### **`task.submit(...)`**\*\* 的参数\*\*

| 参数                                 | 类型      | 默认      | 说明                                               |
| ---------------------------------- | ------- | ------- | ------------------------------------------------ |
| `timeout`                          | `float` | `60.0`  | 整体等待上限（秒），超时抛 `TimeoutError`。单块重、行数多时调大          |
| `poll_interval`                    | `float` | `0.2`   | 轮询结果的间隔（秒），仅在 watch 流不可用、降级轮询时生效                 |
| `use_watch`                        | `bool`  | `True`  | `True` 走实时推送（`WatchTasks`）拿终态；流异常时自动降级到轮询。一般不用动  |
| `ttl_ms`                           | `int`   | `30000` | slot 存活时间（毫秒）                                    |
| `coordinator_addr` / `worker_addr` | `str`   | `None`  | 覆盖默认服务地址；传 `worker_addr` 会跳过调度直连 worker（旧路径，调试用） |

### **`func`**\*\* 的传法\*\*

| 写法                                                                                           | 行为                                         | 适用                                |
| -------------------------------------------------------------------------------------------- | ------------------------------------------ | --------------------------------- |
| `func=callable`<br />                                                                        | SDK 用 `inspect.getsource` 抓函数源码内联传给 worker | 日常首选，直接写 `def`                    |
| `func=Function(source_code="...")`                                                           | 显式持有源码字符串                                  | 想把代码当数据管理、动态拼装                    |
| \[未上线]<br />`func="some_function_id"`                                                        | 当作远端已注册的 functionId                        | Function service 上线后用（当前未上线）      |
| （临时）在 worker 函数体内 `from blockx_sdk import call_subfunc`，再 `call_subfunc(function_id, *args)` | function内call function                     | 暂时使用，未来会下线。blockx\_sdk不是blockx的内容 |

> 不论哪种写法，worker 入口约定函数名为 `_`。传 callable / `def` 时 SDK 自动补一行 `_ = <你的函数名>` 别名，你不用关心。

### **实时 vs 回填 对照**

| 维度         | 实时                           | 回填 （未来会被BlockBundle代替）                                   |
| ---------- | ---------------------------- | -------------------------------------------------------- |
| 数据来源       | `Subscribe.listen()` 推 block | 自己用 `Table.scan_iter()` 等拉历史行                            |
| CallConfig | `BlockTableCallConfig`       | `CallListCallConfig`                                     |
| Handler    | `BlockWriteHandler`          | `BlockWriteHandler`（落库）/ `ReturnValueResultHandler`（只回传） |
| 单 task 粒度  | 一个 block                     | 一段 callList                                              |
| 并发         | 单流按 block 顺序                 | 自行切片 + 线程池并行                                             |

业务函数两边**同一份**。

***

## **Debug 模式（写入预演 / 不落库）**

联调新转换逻辑时，最怕脏数据被误写进目标表。通过环境变量 `BLOCKDB_DEBUG` 开启 debug 模式（和 blockdb-py 完全一样的开关）：

**开启后，worker 端不会真正写目标表**，而是把「这次会写什么」转发给日志系统展示（日志系统开发中）。读 / 算逻辑照常跑，只有最后一步落库被替换成打日志。

```Bash theme={null}
export BLOCKDB_DEBUG=1     # 1 / true / yes / on 都算开
```

`BLOCKDB_DEBUG` 是和 blockdb-py 共用的同一开关，设一次就把整条 SDK 链路（读侧 + 写侧）一起切到 debug，代码里不用改任何东西。

**适用场景**：新写入逻辑上线前自检、数据清洗联调期反复跑、demo / 教学不污染共享表。

> 上线前记得关掉 `BLOCKDB_DEBUG`，否则 prod 任务的写入会全部进日志而不落库。

***

## **反馈通道**

本 SDK 处于快速迭代期。各位数据组的同学在数据开发链路上如果遇到任何「用得不爽、接口不符合 Python 习惯、报错信息难懂」的地方，请随时在 \*\*开发体验汇总群 \*\*反馈！

**更多内容请查看**[BlockX-py API完整手册](https://debankglobal.sg.larksuite.com/wiki/FkLjwi67TiEdyakZjj6lMCR0gnG)
