这是轻量版 backfill 方案。它在指定时间窗内把 R2 历史日志重新取出,过滤 Worker 子请求,转换为天翼云要求的格式,并按明确限速发送到客户接收端。
整个设计围绕三个客户端硬性要求:
客户接收端无法承受任意速率。Sender 通过下式硬封顶:
max_concurrency × (1000ms / MIN_SENDER_INVOCATION_MS) × BATCH_SIZE
= 20 × (1000 / 200) × 1000
= 100,000 行/秒
这是理论上限,前提是每个 batch 接近 BATCH_SIZE = 1000 行,且客户 ACK 延迟低于 200ms。实际持续吞吐还受 Parser 上游供给限制 — 见下方 吞吐与调速。
.done 与 .queued)尽量减少重复。常见场景(Parser 重试、Queue 重复投递)已经被消除,但还有两种残留场景无法消除:
SEND_TIMEOUT_MS 触发,但 POST 实际已被接收端接受。Worker 无法得知。重试 → 接收端收到第 2 份。正常情况下概率 < 0.001%。.done 写入失败 — R2 PUT 永久性失败(已 3 次内部重试)。没有 .done 时下一次重试会重发。概率 < 0.0001%。客户按父请求计费/审计。Parser 按以下规则在每条记录上过滤:
ParentRayID !== "00" // 即子请求
WorkerSubrequest === true // 显式子请求标记
两条规则都在格式转换之前应用。
生产 worker ctyun-logpush-worker 实时把日志推到同一个客户接收端,量级相近。Backfill 在 Cloudflare 层完全隔离:
| 资源 | 生产 | Backfill | 是否共享 |
|---|---|---|---|
| Worker isolate / CPU 配额 | ctyun-logpush | ctyun-logpush-backfill | 独立 |
| Queues | parse-queue / send-queue | parse-queue-backfill / send-queue-backfill | 独立 |
| R2 bucket | cdn-logs-raw | cdn-logs-raw | 共享(前缀不同) |
| R2 前缀 | processed/ | processed-backfill/<run-id>/ | 独立 |
| 客户接收端 | 共享 | 共享 | 共享(下游压力共担) |
含义:backfill 假死(比如客户接收端卡住)不会消耗生产 worker 的 concurrency 或 queue 槽位。两个 worker 共享接收端带宽,所以 backfill 的显式限速是保护接收端的关键。
R2 logs/
-> parse-queue-backfill
-> Parser (gzip 解码 + 过滤 + 转换 + 切成 1000 行 batch)
-> processed-backfill/<run-id>/{batch}.txt + {batch}.queued
-> send-queue-backfill
-> Sender (gzip + 签名 POST + .done 标记 + 清理)
-> 客户接收端
.gz 文件独立解析并直接写成最终 batch 文件。每文件最后一个 batch 可能不满 1000 行 — 这是可接受的。
[BACKFILL_START_TIME, BACKFILL_END_TIME] 内的日志ParentRayID = "00" 且 WorkerSubrequest != true| 变量 | 默认 | 含义 |
|---|---|---|
BACKFILL_START_TIME | "" | 补传开始时间,ISO 8601 |
BACKFILL_END_TIME | "" | 补传结束时间,ISO 8601,必须 <= 当前时间 |
BACKFILL_ENABLED | "false" | 必须显式 "true" 才运行 |
BACKFILL_RATE | "60" | 每分钟入队文件数(上限 100) |
BATCH_SIZE | "1000" | 每次 POST 行数。在未确认接收端容量前不要调高。 |
SEND_TIMEOUT_MS | "300000" | 等客户 ACK 的最长时间。客户实测 ACK 1–5 秒,长 timeout 是为了最小化 abort 引起的重复。 |
LOG_PREFIX | "logs/" | Logpush 在 R2 的根前缀 |
LOG_LEVEL | "info" | debug / info / warn / error |
最大补传时间窗:7 天(MAX_RANGE_HOURS = 24 * 7)。更大时间窗需要拆成多次运行。
wrangler queues create parse-queue-backfill
wrangler queues create send-queue-backfill
wrangler queues create parse-dlq-backfill
wrangler queues create send-dlq-backfill
wrangler secret put CTYUN_ENDPOINT --config wrangler-backfill.toml
wrangler secret put CTYUN_PRIVATE_KEY --config wrangler-backfill.toml
wrangler secret put CTYUN_URI_EDGE --config wrangler-backfill.toml
wrangler deploy --config wrangler-backfill.toml
pipeline 有两个独立上限,取较低者。下面的数字来自一次 vivo 客户实测(651 个 raw 文件、95 分钟回放窗口、27,908 个 batch、~2.5 小时跑完)。
| 阶段 | 配置 | 理论上限 | 实测吞吐 |
|---|---|---|---|
| Sender | max_concurrency=20 × 单 invocation ~0.55s(含 ACK ~400ms) | ~36 batch/s | ~3 batch/s(利用率 8% — 大部分时间 idle) |
| Parser | max_concurrency=10 × 10 万行单文件 ~150 秒 wall time × ~50 batch/文件 | ~3.3 batch/s | ~3 batch/s(利用率 93% — 满负载) |
如果要更快跑完,唯一安全的杠杆是调高 Parser max_concurrency:
Parser max_concurrency | 预期持续吞吐 | 同样 651 文件预期完成时间 | 风险 |
|---|---|---|---|
10(默认) | ~3 batch/s | ~2.5 小时 | 无 — 当前默认 |
20 | ~6 batch/s | ~1.4 小时 | 低 — R2 操作量翻倍,仍远低于平台限制 |
30 | ~9 batch/s | ~55 分钟 | 中 — 持续 R2 binding 压力可能让单 op 延迟劣化 |
50+ | 不确定 | 不确定 | 不推荐 — 单 op 延迟可能非线性增长 |
max_concurrency 超过 20,不要缩短 MIN_SENDER_INVOCATION_MS 低于 200。Sender 已经有大量富余 — 瓶颈在上游。动这两个参数只会给接收端和同一域名上的生产 worker 增加压力,没有收益。
针对大 raw 文件,parse queue 特意配置为 max_batch_size = 1,这样每个大 gzip 文件独占一个 invocation,不会和另外两个大文件共享同一个 invocation 的 CPU 预算。
注:单 batch 的 parser I/O 成本(每 R2 op ~588 ms × 5 ops)是长 Worker invocation 内多次串行 R2 操作累积的特性,不是单个 R2 操作本身慢。Sender 单 batch invocation 用类似的 R2 调用 <100 ms 就能跑完,因为 invocation 短。
有两个独立的观察通道:HTTP status 接口(聚合进度)和 Worker 日志(per-batch 发送证据)。
/backfill/status — 聚合进度curl https://ctyun-logpush-backfill.<your-subdomain>.workers.dev/backfill/status
关键字段:
| 字段 | 含义 |
|---|---|
summary | 一句话状态总结 |
stage_code | scanning_and_queueing / sending / sent_waiting_cleanup / cleaned |
delivery_completed | batches_pending == 0 且全部 batch 已发送时为 true |
fully_completed | 临时文件清理完才为 true |
matched_raw_files | 时间窗内匹配的 raw 文件总数 |
batches_sent / batches_pending | 已发 / 待发 batch 数 |
log_lines_sent | 已发送到接收端的总行数 |
batch_lines_avg / _min / _max | batch 大小分布(每 POST 行数) |
replay_window_beijing | 北京时间下的补传时间窗 |
last_refresh_beijing | 上次状态更新时间 |
要看原始状态和底层 artifact 统计,用 GET /backfill/status?view=raw。
wrangler tail ctyun-logpush-backfill
每次 Sender invocation 成功都会输出一行:
[BACKFILL][INFO] Sent batch lines=1000 bytes=1064735 (uncompressed)
-> HTTP 200 | ack_ms=408 queue_wait_ms=525 | processed-backfill/.../batch.txt
| 字段 | 含义 |
|---|---|
ack_ms | fetch() 开始到接收端 HTTP 响应的时间(往返 + 接收端处理) |
queue_wait_ms | batch 在 send-queue 里的总等待时间(从 Parser 入队到 Sender 接走+发送完) |
bytes | 解压前的 payload 大小;gzip 压缩后约 1/10 |
Parser 也会输出 Parsing: ... 和 Done: ...,两条日志的时间差就是单文件 wall time。
backfill-state/ 下会出现的文件| 文件 | 什么时候写入 |
|---|---|
progress.json | 每次 cron tick (~1/min) 更新;run 一启动就有 |
status.json | 仅当有人 GET /backfill/status 时写入。如果从未 curl 过这个接口,R2 上就不会出现这个文件。 |
当前代码不写的文件: send-stats.json(聚合 ack_ms / queue_wait_ms 统计)在 queue-only 重构时被移除了。每个 batch 的 ack_ms / queue_wait_ms 只会输出到 Worker 日志(见上面通道 2)— 不会聚合到任何 R2 文件。如果你在某个老 bucket 上看到 send-stats.json,那是旧版本代码留下的残留数据。
所有 batch 发送完成后:
inspectRunArtifacts — list 所有 batch artifact 并核实 .done 都存在cleanup.status = ready — 全部确认已发送后进入此状态cleanup.status = deleting — 批量删除 processed-backfill/<run-id>/*cleanup.status = done — state 标记为 cleaned,run 彻底完成宽限期是有意保留的:给运维方一个窗口在文件被删除前检查发送内容。
如果同一个 [A, B] 时间窗已经成功跑完,需要重新跑:
# 1. 删除 state 文件
wrangler r2 object delete cdn-logs-raw/backfill-state/progress.json
wrangler r2 object delete cdn-logs-raw/backfill-state/status.json
# 2. 重新部署,或等待下一次 cron(保持 BACKFILL_ENABLED="true")
旧的 processed-backfill/<run-id>/ 前缀不会阻塞新的 run — 每次 run 用新的 run_id(带时间戳后缀)。
这是正常的重复补传路径。只要上一轮已经进入 cleaned(或 fully_completed = true),直接修改 BACKFILL_START_TIME / BACKFILL_END_TIME 后重新部署即可。
processed-backfill/<old-run-id>/backfill-state/progress.json / status.json只有当上一轮已经是 cleaned 时,Worker 才会自动按新时间窗初始化下一轮。如果上一轮还是 running / done,不要立刻切换时间窗。
最稳妥的判断条件:
/backfill/status 显示 fully_completed = truesend-queue-backfill backlog 归零(在 Cloudflare Dashboard 可见)