> ## Documentation Index
> Fetch the complete documentation index at: https://docs.chaintable.com/llms.txt
> Use this file to discover all available pages before exploring further.

# 实战：多表区块高度对齐订阅

> 如何在 Chaintable 中利用 Aligned 状态机优雅解决跨表乱序事件流消费难题

***

# 实战：跨表/多表区块高度对齐订阅 (L2 实时流)

## 业务背景与痛点

在 Web3 级联实时分析场景中，以下需求极其常见：

> “我的核心 Notebook 任务需要订阅消费以太坊的 `trace` 表。但我的清洗逻辑强依赖另外两张正在并行计算的业务中间表：`token_transfers` 和 `addresses_metadata`。我必须保证在一个区块高度 $H$ 上，另外两张依赖表也**已经把高度 $H$ 写入完毕**，我的清洗代码才能安全运行。否则就会因为读不到依赖状态而产生脏数据。”

如果直接写代码同时订阅三张表的底层事件流，由于分布式集群的计算快慢不一，你会收到完全乱序的高度通知（例如 `trace` 已经到了 18000010，而 `token` 还在 18000005）。

Chaintable SDK 内置的 **`Aligned` 对齐状态机**，就是为了在数据库变更流层面彻底把这个难题“自动做掉”。

***

## 架构设计：Trigger 表与 Depends 表

`Aligned` 状态机引入了两种清晰的流驱动角色：

1. **Trigger 表 (`loose_align`)**：你真正想要增量消费的核心事件源（如原始交易流）。
2. **Depends 表 (`strict_align`)**：你的业务逻辑所卡死的上游依赖表。

`Aligned` 会在后台高频检查所有 Depends 表的底层 `_height` 进度管理表。只有当 **Trigger 表在高度 $H$ 上写入 Ready，且每一张 Depends 表的当前共识高度均 $\ge H$ 时**，状态机才会将高度 $H$ 的 Block 上下文释放出来。**每个高度有且仅会放行一次。**

***

## 完整实战代码示例

在 Notebook 任务中，通过简单的几行声明式配置，即可启动这个高可靠的对齐数据管道：

````python theme={null}
from blockdb import BlockTable, Aligned
from blockdb.block_table import Subscribe  # L2 高级订阅组件

# 1) 初始化逻辑表实例（L2 逻辑表不实际存储数据，仅提供区块链语义）
trace = BlockTable("chaintable.trace.eth")          # 核心驱动源 (Trigger)
token = BlockTable("token.token_transfer.eth")      # 强依赖表 A (Depends)
addr  = BlockTable("address.addresses.eth")         # 强依赖表 B (Depends)

# 2) 开启全托管的底层多路复用订阅流
sub = Subscribe(tables=[trace, token, addr])

# 3) 将订阅流喂给 Aligned 状态机进行高度锁死对齐
aligned = Aligned(
    sub,
    loose_align=[trace],          # 只有 trace 产出新高度，才具备触发资格
    strict_align=[token, addr],   # 另外两张表若进度滞后，则强行阻塞放行
)

# 4) 像消费普通 Python 生成器一样，在区块时间心跳下安全消费
for block in aligned.listen():
    print(f"[高度对齐安全放行] 当前区块高度: {block.height}, 块哈希: {block.id}")
    
    # 此刻，trace、token、addr 在 block.height 下已经全域强一致性就绪
    # 你的清洗逻辑可以百分之百放心地读取上游表，绝无脏数据风险
    current_block_rows = trace.get_block_rows(block)
    
    # 执行你的业务计算并写回下游
    # ...

**最佳实践提示**

- `Subscribe` 内部有重连机制：异常会自动用最新游标重连，正常 stream 结束才退出。**直接 ****`for ... in aligned.listen():`**** 即可**，不需要自己写 try/except 重试。

- 不需要多表对齐？只订阅一张表就好：

    ```Python
    for table_id, block in Subscribe(tables=[trace]).listen():
        ...
````

* 依赖表如果漏处理了中间块（含空块没提交），"共识高度"会停在 gap 前——这就是 `Aligned`想要的语义：**没全 ready 的高度坚决不放行**。所以驱动侧要保证每个块都提交 task（空块提交空 task 推进 watermark）。
