> ## Documentation Index
> Fetch the complete documentation index at: https://docs.chaintable.com/llms.txt
> Use this file to discover all available pages before exploring further.

# BlockX-py API 完整手册

> BlockX-py 任务层 SDK 的模块入口、核心配置类和任务 API 字典。

***

## **1. 顶层导出与模块入口**

```Python theme={null}
# 任务层主入口
from blockx import (
    BlockTableCallConfig,      # 实时：在某 block 上按 trigger 源表跑函数
    CallListCallConfig,        # 回填：一批 positional args 批量跑
    BlockWriteHandler,         # 结果写入目标表
    ReturnValueResultHandler,  # 结果直接回传（不写表）
    TaskBuilder, Task, TaskResult,
    Function,                  # 业务函数封装
)

# 数据来源 / 区块对象来自 blockdb-py
from blockdb import BlockTable, Block, Aligned, Table
from blockdb.block_table import Subscribe

# 底层 dispatcher（进阶 / 定制化才用）
from blockx._grpc_client import blockx as c
```

**一个 Task = 一个 CallConfig（算什么）+ 一个 Handler（结果怎么处理）。**

***

## **2. 任务层核心 API 字典**

### **2.1 ****`BlockTableCallConfig`**** — 实时区块任务配置**

```Python theme={null}
from blockx import BlockTableCallConfig
```

对应 task 的 `functionCallConfig`（`type = "BlockTableCallConfig"`）。描述「在某个 block 上、对一组 trigger 源表逐行执行业务函数」。实时流水线用。

***

#### **`BlockTableCallConfig(block=None, triggerSources=None)`**

**方法简述** — 构造实时区块任务配置。

**入参**

| 参数               | 类型                                                           | 必填 | 默认     | 说明                       | 示例值                                                       |
| ---------------- | ------------------------------------------------------------ | -- | ------ | ------------------------ | --------------------------------------------------------- |
| `block`          | `Block / dict / 带 .id/.height/.timestamp 的 duck type / None` | ❌  | `None` | 当前区块；`None` 序列化为空 dict   | `Block(id="0x..", height=25015000, timestamp=1717000000)` |
| `triggerSources` | `list[dict]`                                                 | ❌  | `None` | 源表 + 业务函数列表，单项 schema 见下 | 见下方示例                                                     |

**`triggerSources` 每一项（dict）的字段**

| 字段           | 类型                                 | 必填 | 说明                                                                                                  | 示例值                                  |
| ------------ | ---------------------------------- | -- | --------------------------------------------------------------------------------------------------- | ------------------------------------ |
| `table`      | `BlockTable / str`                 | ✅  | 源表（取 `.name` 或 str 原值作为 table\_id）                                                                  | `"chaintable.trace.eth"`             |
| `func`       | `callable / Function / str / None` | ❌  | 业务函数。`callable` 抓源码内联；`Function` 走 `to_trigger_dict()`；`str` 当 functionId                           | `def f(record, chain_id): ...`       |
| `functionId` | `str`                              | ❌  | 直接传远端 functionId（`func=None` 时启用）                                                                   | `"trace_to_contract"`                |
| `code`       | `str / dict / None`                | ❌  | 直接传源码：`str` 当 `sourceCode`，dict 形如 `{"sourceCode": "..."}`                                          | —                                    |
| `params`     | `list`                             | ❌  | 业务函数额外位置参数；`${表名}` 占位符由服务端按当前 trigger 表的行替换                                                         | `["${chaintable.trace.eth}", "eth"]` |
| `operator`   | `str / dict`                       | ❌  | Operator 预处理管道（来自 `blockdb.operator`），引擎在业务函数执行前对源表行做 filter / deduplicate；传 `to_json()` 串或 dict 均可 | `op.to_json()`                       |

**实例方法**

| 方法                  | 返回      | 说明                                                                          |
| ------------------- | ------- | --------------------------------------------------------------------------- |
| `to_dict()`         | `dict`  | wire schema（见下）                                                             |
| `to_proto_config()` | `bytes` | `to_dict()` 的 UTF-8 JSON bytes；`datetime` 转 epoch int、`bytes` 转 `"0x<hex>"` |
| `to_proto_type()`   | `str`   | 固定 `"BlockTableCallConfig"`                                                 |

**`to_dict()` 输出结构**

```JSON theme={null}
{
  "block": {"id": "...", "height": 25015000, "timestamp": 1717000000},
  "triggerSources": [
    {
      "table": "chaintable.trace.eth",
      "functionId": "",
      "code": {"sourceCode": "def to_token_transfer(record, chain_id):\n    ...\n\n_ = to_token_transfer\n"},
      "params": ["${chaintable.trace.eth}", "eth"]
    }
  ]
}
```

示例

```Python theme={null}
from blockx import BlockTableCallConfig
from blockdb import BlockTable

src = BlockTable("chaintable.trace.eth")
call_config = BlockTableCallConfig(
    block=block,
    triggerSources=[dict(table=src, func=to_token_transfer,
                         params=["${chaintable.trace.eth}", "eth"])],
)
```

**附加 Operator 预处理管道（可选）**

在 triggerSource 上挂 `operator`，引擎会在把行喂给业务函数前先跑这条 `filter → deduplicate` 管道。算子全部来自 `blockdb.operator`，blockx 只负责透传：

```Python theme={null}
from blockdb import Column
from blockdb.operator import filter as op_filter

op = (op_filter((Column("value") > 0) & (Column("name") == "Transfer"))
      .deduplicate(Column("tx_id")))

call_config = BlockTableCallConfig(
    block=block,
    triggerSources=[dict(table=src, func=to_token_transfer,
                         params=["${chaintable.trace.eth}", "eth"],
                         operator=op.to_json())],   # str 或 dict 均可
)
```

序列化后 `operator` 原样进 `triggerSources[i]`；`to_dict()` 里仅当传了非空 `operator` 才带该字段。管道的算子语义、`to_json()` 输出格式见 blockdb-py 的 `Operator` 文档。

***

### **2.2 ****`CallListCallConfig`**** — 回填 / 批处理配置**

```Python theme={null}
from blockx import CallListCallConfig
```

对应 `functionCallConfig.type = "CallListCallConfig"`。回填 / 离线批处理用：直接传一个函数 + 一组 callList，每行是一组位置参数。

***

#### **`CallListCallConfig(func=None, callList=None)`**

**方法简述** — 构造回填批处理配置。`func` 解析规则同 `BlockTableCallConfig` 的 `func`。

**入参**

| 参数         | 类型                                 | 必填 | 默认     | 说明       | 示例值                                    |
| ---------- | ---------------------------------- | -- | ------ | -------- | -------------------------------------- |
| `func`     | `callable / Function / str / None` | ❌  | `None` | 业务函数     | `def f(record, chain_id): ...`         |
| `callList` | `list[list]`                       | ❌  | `None` | 每行一组位置参数 | `[[record1, "eth"], [record2, "eth"]]` |

**实例方法**：`to_dict()` / `to_proto_config()` / `to_proto_type()`（固定 `"CallListCallConfig"`）。

**`to_dict()` 输出结构**

```JSON theme={null}
{
  "function": "",
  "code": {"sourceCode": "def to_token_transfer(record, chain_id):\n    ...\n\n_ = to_token_transfer\n"},
  "callList": [["<record>", "eth"], ["<record>", "eth"]]
}
```

> wire 字段是 `function`（不是 `functionId`），SDK 序列化时做了一次重命名。

***

### **2.3 ****`BlockWriteHandler`**** / ****`ReturnValueResultHandler`**** — 结果处理器**

```Python theme={null}
from blockx import BlockWriteHandler, ReturnValueResultHandler
```

***

#### **`BlockWriteHandler(targetTable=None, block=None)`**

**方法简述** — 把业务函数返回的 dict 写入目标 BlockTable。对应 \`resultHandler.type = "BlockWriteResultHandler"\`。

**入参**

| 参数            | 类型                        | 必填 | 默认     | 说明                    | 示例值                          |
| ------------- | ------------------------- | -- | ------ | --------------------- | ---------------------------- |
| `targetTable` | `str / BlockTable / None` | ❌  | `None` | 目标表名（取 `.name` 或 str） | `"token.token_transfer.eth"` |
| `block`       | `Block / dict / None`     | ❌  | `None` | 区块信息（解析同 CallConfig）  | —                            |

**实例方法**：`to_dict()` / `to_proto_config()` / `to_proto_type()`（固定 `"BlockWriteResultHandler"`）。

**`to_dict()` 输出结构**

```JSON theme={null}
{
  "targetTable": "token.token_transfer.eth",
  "block": {"id": "...", "height": 25015000, "timestamp": 1717000000}
}
```

> debug 模式由环境变量 `BLOCKDB_DEBUG` 统一控制，不在 handler 上单独配置，见 \[§2.7 调试模式]\(#27-调试模式debug)。

***

#### **`ReturnValueResultHandler()`**

**方法简述** — **不写表**，把业务函数全部 return 值收集到 `handler_result` 回传给客户端。回填 dry-run / 探查常用。对应 `resultHandler.type = "ReturnValueResultHandler"`。

**入参**：无。

**实例方法**：`to_dict()` → `{}`；`to_proto_config()` → `b"{}"`；`to_proto_type()` → `"ReturnValueResultHandler"`。

***

### **2.4 ****`Function`**** — 业务函数封装**

```Python theme={null}
from blockx import Function
```

把 Python 业务函数封装成 `triggerSources` 里 `func=` 接受的对象。Function service 未上线，当前把源码内联进 task 直接传给 worker 执行。

***

#### **`Function(source_code=None, function_id="", name="", mode="inline")`**

**方法简述** — 构造函数封装。`source_code` 与 `function_id` 至少传一个，否则 `ValueError`。

**入参**

| 参数            | 类型           | 必填 | 默认         | 说明                                                                | 示例值                         |
| ------------- | ------------ | -- | ---------- | ----------------------------------------------------------------- | --------------------------- |
| `source_code` | `str / None` | ❌  | `None`     | 内联函数源码。worker 入口约定为 `_`，建议直接 `def _(...): ...` 或尾部加 `_ = my_func` | `"def _(record, cid): ..."` |
| `function_id` | `str`        | ❌  | `""`       | 远端已注册 functionId（service 上线后用）                                    | —                           |
| `name`        | `str`        | ❌  | `""`       | 函数名（当前仅作元信息保存）                                                    | `"trace_to_contract"`       |
| `mode`        | `str`        | ❌  | `"inline"` | 仅支持 `"inline"`；其它值抛 `NotImplementedError`                         | `"inline"`                  |

**实例方法**

| 方法                  | 返回     | 说明                                                                                                                 |
| ------------------- | ------ | ------------------------------------------------------------------------------------------------------------------ |
| `to_trigger_dict()` | `dict` | 渲染为 `triggerSources[i]` 的 `functionId` / `code` 合体片段：`{"functionId": <id>}`，有源码时再带 `"code": {"sourceCode": <src>}` |

```Python theme={null}
from blockx import Function

fn = Function(source_code="""
def _(record, chain_id):
    if not record.get("tx_id"):
        return None
    return {"id": record["id"], "tx_id": record["tx_id"], "chain_id": chain_id}
""")
```

***

### **2.5 ****`TaskBuilder`**** / ****`Task`**** — 任务构造与提交**

```Python theme={null}
from blockx import TaskBuilder, Task
```

***

#### **`TaskBuilder.build(call_config, handler) → Task`**

**方法简述** — 静态方法，把 `call_config` + `handler` 组装成 `Task`。`task_id` 由 SDK 内部 `uuid4` 生成，用户不传。

**入参**

| 参数            | 类型                                                         | 必填 | 默认 | 说明     |
| ------------- | ---------------------------------------------------------- | -- | -- | ------ |
| `call_config` | `BlockTableCallConfig / CallListCallConfig / duck type`    | ✅  | —  | 算什么    |
| `handler`     | `BlockWriteHandler / ReturnValueResultHandler / duck type` | ✅  | —  | 结果怎么处理 |

**返回**：`Task`。

***

#### **`Task`**\*\* 属性\*\*

| 属性            | 类型    | 说明                |
| ------------- | ----- | ----------------- |
| `task_id`     | `str` | uuid hex；每次构造都是新的 |
| `call_config` | 见上    | 鸭子类型              |
| `handler`     | 见上    | 鸭子类型              |

#### **`Task.to_dict() → dict`**

整个 task 的 wire 结构：`{task_id, functionCallConfig, resultHandler}`。

***

#### **`Task.submit(poll_interval=None, timeout=None, ttl_ms=None, coordinator_addr=None, worker_addr=None, use_watch=None) → TaskResult`**

**方法简述** — 提交任务并等待终态返回。自动跑完整链路：`Coordinator.ReserveWorkerSlot → Worker.SubmitTask → Worker.WatchTasks`（流异常时降级 `GetTaskResult` 轮询）。

**入参**

| 参数                 | 类型             | 必填 | 默认      | 说明                                                      | 示例值     |
| ------------------ | -------------- | -- | ------- | ------------------------------------------------------- | ------- |
| `timeout`          | `float / None` | ❌  | `60.0`  | 整体等待 deadline（秒）；超时抛 `TimeoutError`                     | `180.0` |
| `poll_interval`    | `float / None` | ❌  | `0.2`   | 降级轮询时的间隔（秒）                                             | `0.5`   |
| `use_watch`        | `bool / None`  | ❌  | `True`  | `True` 优先 `WatchTasks` 实时推送；异常 / 流意外结束自动降级到轮询           | `True`  |
| `ttl_ms`           | `int / None`   | ❌  | `30000` | slot TTL（毫秒）；`worker_addr` 直连路径才有效                      | `30000` |
| `coordinator_addr` | `str / None`   | ❌  | `None`  | 覆盖 Coordinator endpoint                                 | —       |
| `worker_addr`      | `str / None`   | ❌  | `None`  | 传值则跳过 Coordinator，直连 `Worker.RequestTaskSlot`（旧路径 / 调试） | —       |

**返回**：`TaskResult`。

```Python theme={null}
result = task.submit(timeout=120.0)
if result.task_result.success:
    print(result.handler_result)   # -> {"written_rows": 42}
else:
    print(result.error)`
```

***

### **2.6 ****`TaskResult`**** — 任务结果**

```Python theme={null}
from blockx import TaskResult
```

**方法简述** — `Task.submit()` 的返回，封装成功状态 + handler payload + 原始响应。

**属性**

| 属性                         | 类型                           | 说明                                                                           |
| -------------------------- | ---------------------------- | ---------------------------------------------------------------------------- |
| `task_result.success`      | `bool`                       | 是否成功                                                                         |
| `task_result.failure_code` | `str`<br />                  | 失败码（成功时为空）                                                                   |
| `task_result.retryable`    | `bool`                       | 是否可重试（失败时有意义）                                                                |
| `handler_result`           | `dict / list / bytes / None` | resultHandler 的 plugin payload；UTF-8 JSON 的 bytes 自动 `json.loads`，否则保留 bytes |
| `error`                    | `str / None`                 | 等价于 `failure_code` 的便捷字段；成功时 `None`                                          |
| `raw`                      | `Any`                        | 原始 gRPC 响应（`GetTaskResultResponse` 或 `WatchTasks` adapt 后的等价结构）              |

**handler\_result 匹配规则**

1. 先按 `plugin_name == handler.to_proto_type()` 匹配（task 没 handler 时回退常量 `"BlockWriteResultHandler"`）。

2. 没匹配上时取 plugins 列表第一项的非空 payload。

***

### **2.7 调试模式（debug）**

通过环境变量 `BLOCKDB_DEBUG=1`（`1 / true / yes / on`均可）开启，与 blockdb-py 共用同一开关。开启后 worker **不再真正写目标表**，而是把写入内容（目标表、行数、样本行）作为一条调试日志送入日志通道，方便确认"到底会写什么"。

* `BLOCKDB_DEBUG` 设一次即可把读侧 + 写侧整条 SDK 链路一起切到 debug，代码里不用改任何东西。

* 读 / 算逻辑不受影响，只有最后落库那一步被替换成打日志。

```Bash theme={null}
export BLOCKDB_DEBUG=1
```

***

## **3. 底层 Dispatcher（Coordinator / Worker）**

```Python theme={null}
from blockx._grpc_client import blockx as c
```

这是**进阶接口**——日常用 `Task.submit()` 就够了。需要定制超时、断点续传、独立观察 worker 状态、或调试 fixture 时才直接调底层 dispatcher。

| 函数                                                                | 返回                                | 说明                                                       |
| ----------------------------------------------------------------- | --------------------------------- | -------------------------------------------------------- |
| `c.reserve_worker_slot(task_id, coordinator_addr=None, **kwargs)` | resp（含 `worker_addr` / `slot_id`） | 连 Coordinator 拿 slot；拿不到按 `retry_interval` 重试到 `timeout` |
| `c.request_task_slot(task_id, ttl_ms=30000, worker_addr=None)`    | resp                              | 直连 Worker 申请 slot（旧路径 / 跳过 Coordinator）                  |
| `c.submit_task(task_input, slot_id, worker_addr=None)`            | resp                              | 把 `TaskInputDTO`（或已是 pb2 的 `TaskInput`）提交给 Worker        |
| `c.get_task_result(task_id, worker_addr=None)`                    | `GetTaskResultResponse`           | 单次拉结果                                                    |
| `c.watch_tasks(task_ids, worker_addr=None, rpc_timeout=None)`     | stream of `TaskUpdate`            | server-streaming，遇 `is_terminal / SUCCEEDED / FAILED` 终止 |
| `c.submit_and_wait(task_input, ...)`                              | `GetTaskResultResponse`           | 上面这些的组合，即 `Task.submit()` 的底层实现                          |

### **`TaskInputDTO`**

`Task._to_dto()` 产出的 dataclass，结构：

```Python theme={null}
TaskInputDTO(
    task_id: str,
    function_call_config: _ConfigDTO(type: str, config: bytes),
    result_handler:      _ConfigDTO(type: str, config: bytes),
)
```

real 路径在 grpc\_client 层把 DTO 转 `blockx_pb2.TaskInput`；上层只持有 dataclass DTO，不依赖 pb2。

### **调用路径**

| 路径 | 触发条件             | 流程                                                                                          |
| -- | ---------------- | ------------------------------------------------------------------------------------------- |
| 推荐 | 不传 `worker_addr` | `Coordinator.ReserveWorkerSlot` → `Worker.SubmitTask` → `WatchTasks`（兜底 `GetTaskResult` 轮询） |
| 直连 | 传 `worker_addr`  | 跳过 Coordinator，直接 `Worker.RequestTaskSlot` 拿 slot 后提交                                       |
