Monstea
Articles

© 2026 Monstea Blog · Built with Next.js

← Back to articles

以太坊全网扫描实时AML风险监控报警系统设计

January 4, 2026
区块链AML风险监控系统设计以太坊

以太坊全网扫描实时AML风险监控报警系统设计

背景与目标

我们要在以太坊主网做全网扫描的实时 AML 风险监控报警,约束如下:

  • 监听对象:约 200 个 ERC-20 合约(稳定币白名单为主),重点处理 Transfer 事件
  • 触发时机:交易进入确认状态后报警,确认数设为 N=2
  • 报警对象:同时覆盖
    • address-centric(以地址为主体的风险报警)
    • counterparty-centric(以对手方/实体为主体或在报警中突出对手方)
  • 撤销能力:具备“风险提示取消/撤销”的通用机制(用于 reorg、未来 pending 预警等)
  • 下游消费:不设全局上限,可能多个 worker 进行归因与检验
  • 反骚扰策略:
    • 24h 内逐笔报警最多 3 次/地址
    • 超过阈值进入“热门地址”模式:不再逐笔,改为每 10 分钟汇总报警
    • “热门地址”维持方式:使用滚动窗口积分维持(非固定时长)

1. 为什么“全网扫描”可行:从吞吐视角看问题

我们并不处理全链所有交易,而是处理指定 200 合约的 Transfer logs:

  • 以太坊出块约 12 秒,日志量随市场波动显著
  • 只监听 200 合约,事件流通常仍可控,但应按峰值做容量:
    • 工程上建议按 500–2000 logs/s 的峰值做弹性(非常粗略,留足安全边际)
  • 真正的成本通常不在解码,而在:
    • 每条事件的标签/画像/状态查询(KV I/O)
    • 去重、节流、聚合、热门地址汇总
    • reorg 处理与“撤销/更正”消息

系统总体架构:事件流、确认管理、风险计算、报警与任务

核心采用“可回放的流式管道”,将采集与计算解耦:

  1. Chain Ingest(链数据接入)
  • 订阅新区块头
  • 对每个区块批量调用 eth_getLogs 拉取:
    • address filter:200 个合约
    • topic0:Transfer(address,address,uint256) 事件签名
  • 规范化事件并写入消息总线(Kafka/Pulsar)
  1. Confirmation Manager(确认管理,N=2)
  • 将“已上链事件(MINED)”缓冲并在到达 N=2 后输出“已确认事件(CONFIRMED)”
  • 记录 block_hash,发现重组则标记相关事件 INVALIDATED,并触发撤销/更正机制
  1. Risk Engine(逐事件风险评估)
  • 对每条 confirmed event,生成结构化风险信号(signals)
  • 依赖在线标签/画像 KV 与滑窗状态(可用流处理 state)
  1. Alert Engine(报警生成与反骚扰)
  • 非热门:逐笔报警(每地址 24h 最多 3 次 + reason cooldown)
  • 热门:每 10 分钟汇总报警(不逐笔)
  • 输出两条通道:
    • alerts:对外可见报警
    • cases/tasks:下游多 worker 归因/检验任务(可选强制)
  1. Evidence & Audit(证据与审计)
  • 明细事件热存(如 ClickHouse)+ 冷存(对象存储 Parquet)
  • 标签版本化(label_version/as_of)确保可回放与审计一致性

事件规范化与幂等:全链高吞吐的底线

3.1 唯一事件 ID

对每条 Transfer log 定义:

  • event_id = (chain_id, tx_hash, log_index)

用于:

  • 去重(采集重试、重复拉 logs)
  • 幂等落库
  • 证据引用(订阅/审计)

3.2 标准化字段(建议最小集)

  • 链与区块:chain_id, block_number, block_hash, block_time
  • 交易与日志:tx_hash, log_index
  • 合约与金额:token_contract, amount_raw
  • 地址:from, to
  • 可选:amount_usd_approx(稳定币近似等价 USD,但仍需处理 decimals)

地址与对手方双视角:统一建模

对每个 confirmed Transfer 事件,派生两条评估单元(assessment):

  • 视角 A(from 作为主体):subject = from, counterparty = to, direction = out
  • 视角 B(to 作为主体):subject = to, counterparty = from, direction = in

好处:

  • 规则、节流、热门判定统一围绕 subject_id
  • 同时满足 address-centric 与 counterparty 信息呈现需求(报警中带上 top counterparties/标签)

风险信号分层:锚点、行为、暴露

5.1 Anchor Signals(强锚点,高优先级)

命中强标签(举例):

  • sanctioned_entity
  • mixer / tumbler
  • scam / phishing / drainer / stolen_funds
  • darknet / ransomware

特点:

  • 可解释性强
  • 高置信度时可直接 high severity

5.2 Behavioral Signals(滑窗行为)

基于近 5m/1h/24h:

  • rapid_outflow(入金后快速外流)
  • burst_counterparties(对手方激增)
  • structuring(拆分转账)
  • high_frequency(频次异常)

5.3 Exposure Signals(关系暴露)

建议以离线预计算为主、在线查询为辅:

  • 近 7/30 天与高风险集合 1-hop/2-hop 暴露分数快照
  • 在线只点查结果,避免实时跑 3-hop

6. 撤销/取消能力:将“更正”做成协议能力

即使我们主线是 confirmed 才报警,仍必须具备撤销能力(应对 reorg、未来 pending 预警、规则更正)。

建议对外协议定义 3 类消息:

  • ALERT:逐笔报警
  • HOT_SUMMARY:热门地址汇总报警
  • CANCEL:撤销/取消(指向已有的 alert_id/summary_id)

幂等要求:

  • 消息携带稳定 ID(alert_id 或 summary_id)
  • 下游按 ID 去重/覆盖
  • CANCEL 可重复投递不出错


第二部分:反骚扰与热门地址

需求回顾:报警风暴控制策略

  • 非热门地址:逐笔报警
    • 约束:每个地址 24h 最多 3 次
    • 补充:同原因 reason_code 需 cooldown(例如 30 分钟)防重复
  • 热门地址:不再逐笔
    • 改为:每 10 分钟输出一条汇总报警
    • 热门维持:用滚动 24h 积分窗口维持(非固定时长)

关键点:热门维持不能依赖“逐笔报警次数”,否则进入热门后不逐笔会导致热门无法维持。必须用“事件强度/风险积分”来维持。

积分模型与计算

对每条 confirmed assessment(subject 视角)产生一个事件权重 [w]:

  • high => [w = 5]
  • med => [w = 2]
  • low => [w = 0](不计入热门积分,减少噪声与状态规模)

滚动 24h 积分:

  • [S_{24} = \sum w]

热门阈值与滞回防抖

建议初始阈值:

  • 进入热门:[;S_{24} \ge 7;]
  • 退出热门:[;S_{24} \le 4;]

解释:

  • 1 个 high(5) + 1 个 med(2) 进入热门
  • 或 4 个 med(8) 进入热门
  • 滞回避免进出抖动(7/4 可调)

10分钟桶滑窗实现

为什么选择10分钟桶

  • 与"每 10 分钟汇总"的频率一致
  • 24h = 144 个桶,滑窗更新为 O(1)
  • 比逐事件时间戳队列更稳定、内存可控

热门积分状态结构

以 subject_id 为 key,保存:

  • score_buckets[144]:过去 24h 每个 10 分钟桶的积分
  • bucket_start_ts:当前桶起始时间(对齐到 10 分钟)
  • cursor:当前桶索引
  • S24:24h 总积分
  • is_hot:当前是否热门

更新方法:

  • 事件到来时计算其归属桶 start_ts
  • 若跨桶,则循环前进 cursor,逐桶清零,并同步更新 S24(减去被覆盖桶旧值)
  • 将当前桶加上 w,S24 += w

非热门逐笔预算:24h最多3次

该预算仅作用于“非热门状态”。状态字段:

  • sent_alert_times[]:最近 24h 逐笔报警发送时间(保留最近 3~10 个即可)
  • sent_count_24h:可由数组长度计算
  • last_alert_time_by_reason[reason_code]:原因级节流(cooldown)

判断顺序(推荐):

  1. 若 is_hot=true:不走逐笔预算,直接聚合
  2. 若非热门:
    • 先 reason cooldown
    • 再地址预算(sent_count_24h < 3)
    • 通过则发逐笔 ALERT 并记录时间戳

热门地址:每10分钟汇总报警

汇总桶key与幂等ID

汇总的唯一键建议:

  • summary_id = (chain_id, subject_id, bucket_start_ts)

要求:

  • 每个 summary_id 只发一次“最终版”,最易幂等
  • 若需要重试,重复发送同 summary_id,下游按 ID 覆盖/去重

汇总内容与字段

每个 10 分钟桶内聚合:

  • tx_count_in/out
  • sum_amount_in/out(稳定币可近似 USD;或按 token 汇总)
  • unique_counterparty_count(可近似)
  • top_counterparties_by_amount(Top 5)
  • top_counterparties_by_risk(Top 5)
  • top_reasons(Top 3:anchor/behavior/exposure 子类)
  • highest_severity
  • evidence_refs(1–3 条代表性证据引用:A/C/B 策略)
    • A:锚点命中事件(若桶内存在)
    • C:桶内最新 confirmed
    • B:桶内最大额(参考)

汇总发送触发机制

两种推荐方式(二选一):

方式 A(更准):定时 flush

  • 每 10 分钟由定时器(或流处理的 punctuate/watermark)flush 所有活跃热门地址的当前桶
  • 优点:时间对齐、行为稳定
  • 缺点:实现需定时机制与活跃集合

方式 B(更简):跨桶时 flush 上一桶

  • 当检测到事件跨入新桶时,立即发送上一桶的 summary
  • 优点:实现简单
  • 缺点:如果桶内后半段没事件,summary 可能延迟到下次事件才发(不符合“固定每 10 分钟”)

你要求“每 10 分钟一次”,更匹配方式 A。

多worker归因与检验

建议将“报警”与“归因任务”拆开:

  • alerts topic:对外/运营消费
  • cases 或 tasks topic:给多个 worker 的检验任务

热门地址场景:

  • 每个 summary_id 可对应一个 case_id(一桶一 case)
  • worker 产出 case_update:
    • 归因:实体类型修正、标签置信度、是否误报、建议加入白名单/重点监控
    • 证据补充:可异步生成 2-hop/3-hop 路径解释(按需)

并发与幂等:

  • case_id 稳定
  • 消费分区键建议用 subject_id,减少同一地址跨 worker 冲突
  • 写回采用 upsert(按 case_id)保证幂等

撤销、更正与热门汇总

  • 逐笔 ALERT:可能因 reorg 触发 CANCEL(极少,但必须支持)
  • HOT_SUMMARY:
    • 若你采用“最终版每桶一次”,通常不需要撤销
    • 如遇 reorg 影响已汇总桶,可发同 summary_id 的 CORRECTION(或 CANCEL + 重新发)

第三部分:链路、确认管理与存储分层

采集层:按区块拉logs保证补洞能力

建议按“区块高度”驱动采集:

  • 订阅新区块头(WS)
  • 对每个区块 [B]:
    • 用 eth_getLogs 拉取范围 [B, B] 的 logs
    • filter:
      • address:200 合约列表
      • topic0:Transfer 事件 signature
  • 解码后写入 mined_events topic

为什么要按块拉:

  • WS 可能丢消息
  • RPC 抖动需要重试
  • 按块可做“缺块补采”,可回溯一致性更好

幂等:

  • 同一块重复拉取不会造成重复,因为 downstream 用 event_id=(tx_hash,log_index) 去重/upsert

确认管理:N=2的状态机

状态定义

  • MINED:事件来自某个块,已知 block_number 与 block_hash
  • CONFIRMED:当链头高度 H 满足 H ≥ block_number + 2 时输出

reorg处理机制

维护 block_number -> block_hash 映射(至少覆盖最近若干百块):

  • 若同高度的 block_hash 变化:
    • 标记受影响高度范围为 reorged
    • 这些高度内的 MINED 事件全部 INVALIDATED
    • 若其中存在已输出 CONFIRMED 并已报警的:
      • 发送 CANCEL(指向 alert_id/summary_id)

N=2 已显著降低 reorg 概率,但不能假设为 0。

实现方式与选择

两条主流路径:

  • 流处理实现(推荐):
    • Kafka Streams/Flink 维护按 block_number 的 buffer 与 block_hash 状态
    • 输出 confirmed_events topic
  • 自研服务 + 状态表:
    • blocks 表记录高度/hash
    • events_by_block 存 block_number -> event_ids
    • 轮询推进确认

流处理的优势:天然适合“回放、扩容、状态一致性”。

风险引擎与报警引擎

在线必需的数据

  • 标签与画像KV(读多写少):
    • address -> {tags, risk_level, entity_type, confidence, label_version}
  • 热门、节流、聚合状态(写多读多):
    • 适合放在流处理 state;或 Redis(但写压大)

事件明细与证据引用

即使你实时不做复杂图搜索,也要保证“证据可回放”:

  • 每条报警至少带:
    • tx_hash, log_index, token, from, to, amount, block_number
  • 热门汇总带 1–3 条 evidence_refs(A/C/B)

这样订阅增强、人工审核、审计复盘都有稳定抓手。

存储分层:热查询与冷归档

明细事件存储

  • 热存:ClickHouse/Pinot(保 7–30 天,支持快速检索)
  • 冷存:对象存储(S3/OSS)+ 分区 Parquet(按 date、token、chain_id 分区)

标签与画像

  • 在线:Redis/Scylla/Bigtable(秒级更新生效)
  • 离线来源:批处理管道(合作方/规则/模型/人工修正)
  • 版本化字段:label_version, as_of, source, confidence

汇总数据分层

  • 10 分钟热门汇总本身就可作为“近实时摘要层”
  • 同时可离线生成 90 天摘要(之前讨论过)用于地址页与解释:
    • addr-token 90d summary
    • addr-counterparty 90d summary
    • 中高风险按需生成 3-hop 解释(异步)

订阅与证据增强

实时报警阶段

  • 证据=当前事件(逐笔)或桶内 1–3 条代表性事件(汇总)

解释增强(订阅与人工)

  • 可异步拉取该地址近 90 天摘要与对手方摘要
  • 若高风险:触发 2-hop/3-hop 路径生成(终止于 CEX/DEX Router/Bridge/超级节点)
  • 在边上附 1–3 条交易证据(A+C 主导,B 参考)

运营与可观测性

关键监控指标

  • Ingest:每块 logs 拉取成功率、延迟、缺块补采次数
  • Confirm:确认延迟分布、reorg 触发率、CANCEL 数量
  • Risk/Alert:每秒处理事件数、P95 处理时延、逐笔报警数、热门汇总数、热门地址数量
  • 质量:anchor 命中率、误报率(来自 worker 反馈)、热门抖动率(进出频率)
  • 状态规模:活跃 subject 数、state TTL 命中率、topN 截断次数

总结与扩展点

本设计的核心特点与扩展方向:

  • 先用 confirmed N=2 + anchor/行为/暴露信号跑起来
  • 用积分窗口把高频地址收敛成 10 分钟汇总,避免报警风暴
  • 用 cases/tasks 通道承载多 worker 归因与检验,形成标签闭环
  • 解释(2-hop/3-hop)作为异步增强,不阻塞实时报警

数据流向图

数据流向图(Go Indexer + Kafka + 确认/风控/热门汇总 + 多 Worker)

说明:方框=服务/组件;圆柱=Kafka Topic(日志流);箭头=数据流向
Producer=写入 Kafka;Consumer=从 Kafka 读取

flowchart LR
  %% ===== Sources =====
  subgraph ETH[以太坊网络 / RPC 节点]
    RPC[(RPC / Archive Node)]
  end

  %% ===== Ingest =====
  subgraph ING[采集层]
    IDX["Go Indexer / Ingest Service\n(Producer)"]
  end

  %% ===== Kafka =====
  subgraph KFK[Kafka Cluster]
    T_MINED[(Topic: mined_events\nkey: token_contract 或 block_number)]
    T_CONF[(Topic: confirmed_events\nkey: subject_id 或 tx_hash)]
    T_ALERT[(Topic: alerts\n逐笔 ALERT + 撤销 CANCEL)]
    T_SUM[(Topic: hot_summaries\n每10分钟 HOT_SUMMARY)]
    T_CASE[(Topic: cases/tasks\n给归因/检验 Worker)]
    T_UPD[(Topic: case_updates\nWorker 回写结果)]
    T_LABEL[(Topic: label_updates\n标签/画像更新流)]
  end

  %% ===== Processing =====
  subgraph PROC[处理层]
    CM["Confirmation Manager (N=2)\nConsumer+Producer"]
    RE[Risk Engine\nConsumer+Producer]
    AE[Alert/Hot Engine\nConsumer+Producer]
  end

  %% ===== Data Stores =====
  subgraph DS[数据存储/服务]
    LKV[(Labels/Profile KV\nRedis/Scylla/etc.)]
    CH[(ClickHouse/Pinot\n事件明细/查询)]
    OBJ[(Object Storage\n冷归档 Parquet)]
  end

  %% ===== Workers =====
  subgraph WRK[下游归因与检验]
    W1["Worker Group\n(Consumer)"]
  end

  %% ===== Sinks =====
  subgraph SINK[下游消费/通知]
    OPS[运营台/告警系统]
    SUB[订阅推送/Webhook]
  end

  %% ===== Flows =====
  RPC -->|eth_getLogs by block| IDX
  IDX -->|Transfer logs 规范化事件| T_MINED

  T_MINED -->|按块缓冲&reorg检测| CM
  CM -->|达到 N=2 输出| T_CONF
  CM -->|reorg 导致失效| T_ALERT

  T_CONF -->|生成 subject/counterparty 评估| RE
  RE -->|带风险信号的评估事件| AE

  AE -->|非热门: 逐笔报警| T_ALERT
  AE -->|热门: 10分钟汇总| T_SUM
  AE -->|"归因/检验任务(可选)"| T_CASE

  T_CASE --> W1
  W1 -->|结论/归因/误报标记| T_UPD
  T_UPD -->|"更新标签/画像(回灌)"| LKV
  T_LABEL -->|标签更新订阅| RE

  %% Evidence storage
  T_MINED --> CH
  T_CONF --> CH
  T_ALERT --> CH
  T_SUM --> CH
  CH --> OBJ

  %% Downstream notifications
  T_ALERT --> OPS
  T_SUM --> OPS
  T_ALERT --> SUB
  T_SUM --> SUB

图中角色对照(你最关心的 Producer/Consumer)

  • Go Indexer:Producer(把链上 logs 写入 mined_events)
  • Confirmation Manager:Consumer(mined_events) + Producer(confirmed_events/alerts(CANCEL))
  • Risk Engine:Consumer(confirmed_events + label_updates) + Producer(评估输出给 Alert Engine)
  • Alert/Hot Engine:Consumer(评估流) + Producer(alerts/hot_summaries/cases)
  • Worker 组:Consumer(cases) + Producer(case_updates)
  • Labels/Profile KV:通常被 Risk Engine/Worker 读写(不一定经 Kafka;也可用 label_updates 广播)

如果你告诉我你希望 mined_events 的分区 key 用 block_number 还是 token_contract(以及 confirmed_events 用 subject_id 还是 tx_hash),我可以把“按分区保证顺序/并行度”的版本也画出来。