无服务器日志管道 · Cloudflare Workers + R2 + Queues · CDN 合作方日志对接
将 Cloudflare 边缘节点产生的 HTTP 访问日志(Logpush JSON 格式)自动转换为 CDN 合作方要求的自定义格式,并近实时推送到对方日志接收服务器。
全部基于 Cloudflare 原生服务:Logpush + R2 + Queues + Workers。无需服务器,无需运维,系统自动扩缩容。
145 字段 \u0001 分隔纯文本格式,HTTP Body 需 Gzip 压缩,URL 签名采用 MD5 鉴权方式。
从请求产生到日志送达约 70 秒,通常远优于合作方 SLA 要求(典型要求为 3 分钟内)。
| 服务 | 作用 | Dashboard 位置 |
|---|---|---|
| Logpush | 每约 1 分钟自动将边缘 HTTP 请求日志打包(ndjson.gz)写入 R2 | 域名 → Analytics & Logs → Logpush |
| R2 Object Storage | 存储原始日志文件及处理中的临时批次文件 | Dashboard → R2 Object Storage |
| R2 Event Notifications | R2 每有新文件写入,立即触发 Queue 消息(秒级响应) | R2 Bucket → Settings → Event Notifications |
| Queues | 解耦解析与发送两个阶段,保证 at-least-once 投递,自动重试 | Dashboard → Workers & Pages → Queues |
| Workers | 单脚本内包含两个逻辑 Worker:Parser(解析转换)和 Sender(签名推送) | 通过 wrangler deploy 或 GitHub Actions 部署 |
原始日志文件通过 DecompressionStream + ReadableStream 流式处理。Worker 无需将整个文件载入内存,逐行解压转换,内存占用恒定,不受文件大小影响。这是 Workers 128 MB 内存限制下的必要设计。
parse-queue 负责 R2 → 格式转换;send-queue 负责 R2 → HTTP 推送。两级分离实现独立重试策略:解析失败重试 2 次,发送失败重试 5 次。每个 Queue 均配置独立死信队列(DLQ)用于可观测性与人工恢复。
Queues 采用 at-least-once 语义。Sender 处理重复投递时,若对应 R2 临时文件已被删除(表明已成功发送),则静默 ack,不重复发送。
Parser 自动识别 EdgeStartTimestamp 的三种格式:Unix 秒整数、Unix 毫秒整数、RFC 3339 字符串。推荐 Logpush 配置 timestamp_format = unix 以保证一致性。
目标格式要求的是 CDN 节点所在国家(非客户端国家)。EdgeColoCode(IATA 机场三字码,如 NRT = 东京)通过内置映射表转换为 ISO 3166-1 alpha-2 国家码,覆盖全球 200+ 机场。若未命中则默认返回 CN。
Workers 的 SubtleCrypto API 不支持 MD5 算法,因此 src/index.js 内嵌了纯 JS 实现的 RFC 1321 MD5。鉴权 URL 签名格式为:MD5({uri}-{ts}-{rand}-{privateKey})。
http_requests 数据集没有直接暴露 Content-Length 字段。字段 #21 从 ResponseHeaders['content-length'] 取值,需通过 API 配置 Logpush Custom Fields(见部署步骤 6b)。未配置或响应头中不含该字段时,统一返回 -。
-,与该格式中其他无数据字段的处理方式一致。| 故障场景 | 系统行为 | 恢复方式 |
|---|---|---|
| 接收端临时不可用 | send-queue 指数退避重试,最多 5 次(约 15 分钟窗口) | 自动恢复;R2 临时文件保留至确认成功 |
| 原始文件含格式错误行 | 逐行捕获异常,跳过该行,继续处理 | 错误计数写入日志;文件级处理成功 |
| parse-queue 处理失败 | 重试 2 次后转入 parse-dlq | 人工检查 DLQ 消息及 R2 原始文件 |
| Queue 重复投递 | R2 文件不存在 → 静默 ack,不重发 | 自动处理,无数据重复 |
实测时间:2026-03-30,6小时窗口(11:00–17:00 GMT+8)· HTTP 请求总量:13.31M · 数据传输:70.39 GB
| 指标 | 实测数据 | 评估 |
|---|---|---|
| Worker 调用次数(6h) | 2k | 与 Logpush 文件批次数吻合 |
| 内部错误数 | 0 | 健康 |
| CPU 时间超限次数 | 3 | 发生在流量峰值期,需持续观察 |
| 内存超限次数 | 0 | 流式处理设计有效 |
| 中位 CPU Time | 3.52 ms | 远低于 30s 限制 |
| P90 CPU Time | 355 ms | 处理大文件批次时正常 |
| P99 CPU Time | 3.17s | 在 30s 限制内 |
| Wall Time P50 / P99 | 1.13s / 18.97s | 含网络等待,在限制内 |
| 5xx 子请求(发往接收端) | 36 / 15k(0.24%) | 接收端间歇性问题,已自动重试 |
| 指标 | 实测数据 | 评估 |
|---|---|---|
| 消息投递量(6h) | 355 | 与 Logpush 文件数吻合 |
| 消息确认量(ack率) | 347(97.7%) | 健康 |
| 消息重试次数 | 0 | 无解析失败 |
| 实时积压 | 0 | 实时消费,无积压 |
| 平均积压(均值/峰值) | 5.47 / 13 | 轻载 |
| 消费者延迟(均值/峰值) | 15.13s / ~250s | 峰值时延偏高,流量突发所致,可接受 |
| 消费者并发(均值/峰值) | 1.37 / 14 | 自动弹性扩容有效 |
| DLQ 消息数 | 0 | 零数据丢失 |
| 指标 | 实测数据 | 评估 |
|---|---|---|
| 消息投递量(6h) | 14.9k | 约 1490 万条日志行成功推送 |
| 消息确认量 | 15.45k | 含历史积压清零 |
| 消息重试次数 | 0 | 发送失败未上升至 Queue 级重试 |
| 实时积压 | 0 | 实时消费 |
| 平均积压(均值/峰值) | 32.6 / 106 | 峰值积压明显,在容量范围内 |
| 消费者延迟(均值/峰值) | 5.62s / ~55s | 可接受 |
| 消费者并发(均值/峰值) | 22.07 / ~220 | 峰值接近 250 并发上限 |
| DLQ 消息数 | 0 | 零数据丢失 |
| 维度 | 当前负载 | 5倍流量 | 10倍流量 |
|---|---|---|---|
| Worker CPU | 约 5% 上限 | 安全 | 大文件时 P99 接近上限 |
| Worker 内存 | 0次超限 | 安全(流式设计) | 安全 |
| parse-queue 延迟 | 均值 15s,峰值 250s | 峰值延迟 5–10 分钟 | 持续积压风险 |
| send-queue 并发 | 均值 22,峰值 220 | 峰值接近 250 上限 | 超出 250 上限 |
| 接收端可用性 | 5xx 0.24% | 需监控 5xx 率 | 可能成为瓶颈 |
| 优先级 | 指标 | 建议告警阈值 |
|---|---|---|
| ★★★ | DLQ 消息数(两个队列) | > 0,立即处理 |
| ★★★ | 实时积压(持续) | > 100 条持续超过 5 分钟 |
| ★★★ | CPU 时间超限次数 | > 10 次/小时 |
| ★★ | parse-queue 消费者延迟(持续) | 持续超过 5 分钟 |
| ★★ | 5xx 子请求比例 | > 0.5% 总请求量 |
| ★★ | send-queue 消费者并发(持续) | 持续 > 200 |
| ★ | P99 CPU Time | > 20s |
npm install -g wrangler);已准备好 Cloudflare API Token,权限范围限定为目标账号,需包含以下三项:Workers Scripts:Edit、Workers R2 Storage:Edit、Workers Queue:Edit(可使用 Edit Cloudflare Workers 模板后手动追加后两项)。Dashboard → R2 Object Storage → Create bucket
cdn-logs-rawDashboard → Workers & Pages → Queues → Create queue
依次创建以下 4 个队列(名称须完全一致):
| Queue 名称 | 用途 |
|---|---|
parse-queue | 接收新日志文件通知,触发 Parser Worker |
send-queue | 接收转换完成通知,触发 Sender Worker |
parse-dlq | 解析彻底失败的死信队列 |
send-dlq | 发送彻底失败的死信队列 |
R2 → cdn-logs-raw → Settings → Event Notifications → Add notification
object-createparse-queuelogs/.gz在项目目录下,通过 Wrangler 设置加密 Secret:
# 日志接收服务器地址(不含 URI 路径,不含末尾斜杠) wrangler secret put CTYUN_ENDPOINT # 鉴权私钥 wrangler secret put CTYUN_PRIVATE_KEY # 接收 URI 路径(如 /logpost/yourpath) wrangler secret put CTYUN_URI_EDGE # 验证已设置的 Secret wrangler secret list
wrangler secret put 直接写入 Cloudflare。它们会持久保存在 CF 云端,无需每次部署时重新上传。wrangler deploy
main 分支即可,CI/CD 会自动执行 wrangler deploy。Dashboard → [目标域名] → Analytics & Logs → Logpush → Create a Logpush job
cdn-logs-raw · Path prefix:logs/{DATE}/1(100% 采样)默认情况下,字段 #21(sent_http_content_length)回退使用 EdgeResponseBodyBytes。如需采集真实的 Content-Length 响应头值,需通过 API 配置 Logpush Custom Fields。这是一次性的 Zone 级别配置。
# 列出所有 ruleset,找 http_log_custom_fields 阶段
curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/rulesets" \
--request GET \
--header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
| jq '.result[] | select(.phase == "http_log_custom_fields") | {id, phase}'
若有返回,记录 id 作为 RULESET_ID,跳至步骤 C。若无返回,执行步骤 B。
curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/rulesets" \
--request POST \
--header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
--json '{
"name": "Zone-level phase entry point",
"kind": "zone",
"description": "Custom log fields for Logpush",
"phase": "http_log_custom_fields"
}' | jq '.result.id'
记录返回的 id 作为 RULESET_ID。
使用 raw_response_fields 捕获源站返回的原始 content-length 值(CF Transform Rules 修改之前)。
curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/rulesets/$RULESET_ID" \
--request PUT \
--header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
--json '{
"rules": [
{
"action": "log_custom_field",
"expression": "true",
"description": "Capture content-length response header for Logpush",
"action_parameters": {
"raw_response_fields": [
{ "name": "content-length" }
]
}
}
]
}'
# 先获取 Logpush Job ID curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/logpush/jobs" \ --request GET \ --header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \ | jq '.result[] | {id, name, dataset}' # 更新 Job,加入 ResponseHeaders 字段 curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/logpush/jobs/$JOB_ID" \ --request PUT \ --header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \ --json '{ "output_options": { "field_names": [ "CacheCacheStatus","ClientCountry","ClientIP","ClientRequestBytes", "ClientRequestHost","ClientRequestMethod","ClientRequestProtocol", "ClientRequestReferer","ClientRequestScheme","ClientRequestURI", "ClientRequestUserAgent","ClientSSLProtocol","ClientSrcPort", "EdgeColoCode","EdgeResponseBodyBytes","EdgeResponseBytes", "EdgeResponseContentType","EdgeResponseStatus","EdgeServerIP", "EdgeStartTimestamp","EdgeTimeToFirstByteMs","OriginIP", "OriginRequestHeaderSendDurationMs","OriginResponseDurationMs", "OriginResponseHeaderReceiveDurationMs","OriginResponseStatus", "OriginTLSHandshakeDurationMs","ParentRayID","RayID", "ResponseHeaders" ], "timestamp_format": "unix" } }'
ResponseHeaders 字段将包含 {"content-length": "12345"},Worker 自动将其用于字段 #21。Content-Length 头(改用 chunked transfer encoding),此时字段 #21 输出 -。等待约 2 分钟(Logpush 首次写入需初始化),逐步验证以下三处:
cdn-logs-raw/logs/ 目录下出现 .log.gz 文件parse-queue 的 Messages Processed 数量持续增长wrangler tail ctyun-logpush 查看实时日志# 正常日志输出示例(LOG_LEVEL=info)
[INFO] Parsing: logs/20260328/xxx.log.gz
[INFO] Done: logs/20260328/xxx.log.gz | lines=73 batches=1 errors=0
[INFO] Sent 73 lines → HTTP 200 | processed/xxx.txt
name = "ctyun-logpush" main = "src/index.js" compatibility_date = "2026-03-27" compatibility_flags = ["nodejs_compat"] account_id = "<your-account-id>" [[r2_buckets]] binding = "RAW_BUCKET" bucket_name = "cdn-logs-raw" [[queues.producers]] binding = "PARSE_QUEUE" queue = "parse-queue" [[queues.producers]] binding = "SEND_QUEUE" queue = "send-queue" [[queues.consumers]] queue = "parse-queue" max_batch_size = 5 max_batch_timeout = 10 max_retries = 2 dead_letter_queue = "parse-dlq" [[queues.consumers]] queue = "send-queue" max_batch_size = 50 max_batch_timeout = 5 max_retries = 5 dead_letter_queue = "send-dlq" [vars] BATCH_SIZE = "1000" # 每次 POST 的最大日志行数 LOG_LEVEL = "info" # debug | info | warn | error workers_dev = false preview_urls = false
| 变量名 | 类型 | 示例值 | 说明 |
|---|---|---|---|
| CTYUN_ENDPOINT | Secret | http://log.example.com:5580 | 日志接收服务器地址,不含 URI 路径,不含末尾斜杠 |
| CTYUN_PRIVATE_KEY | Secret | YourKey@1234 | MD5 鉴权私钥,由对接方提供 |
| CTYUN_URI_EDGE | Secret | /logpost/yourpath | HTTP POST 目标 URI 路径,由对接方提供 |
| BATCH_SIZE | Var | 1000 | 每批 POST 包含的日志行数 |
| LOG_LEVEL | Var | info | Worker 日志级别。排查问题时改为 debug,生产恢复 info |
wrangler secret put <NAME> 一次性设置后持久保存在 Cloudflare 云端,任何情况下不得写入 wrangler.toml 或提交至代码仓库。仓库中包含 .github/workflows/deploy.yml。向 main 分支推送代码时,自动触发 wrangler deploy(仅部署代码和配置)。
CI/CD 所需 GitHub secret 仅一个:CLOUDFLARE_API_TOKEN。Worker secrets(CTYUN_ENDPOINT、CTYUN_PRIVATE_KEY、CTYUN_URI_EDGE)通过 wrangler secret put 独立管理。
| 检查项 | 正常状态 | 异常处理 |
|---|---|---|
R2 logs/ 目录 | 每约 1 分钟有新 .gz 文件 | 检查 Logpush Job 是否处于 Enabled 状态 |
| parse-queue 积压 | 接近 0 | 持续积压 → 查看 Worker 日志排查解析错误 |
| send-queue 积压 | 接近 0 | 持续积压 → 检查接收端服务器可用性 |
| parse-dlq | Inactive(0 消息) | 有消息 → 检查对应 R2 原始文件是否损坏 |
| send-dlq | Inactive(0 消息) | 有消息 → 检查接收端,修复后从 R2 手动补传 |
R2 processed/ 目录 | 无文件积压 | 文件积压 → Sender 异常,查看 Worker 日志 |
| 信号 | 含义 | 处理方式 |
|---|---|---|
| Backlog = 0 + Lag < 30s | 系统实时消化,运行健康 | 正常,无需操作 |
| Backlog 持续增长 | 消费速度跟不上生产速度,真正的瓶颈信号 | 检查 Worker 错误日志;必要时扩容 |
| CPU Time Exceeded > 0 | 单个 Logpush 文件过大,单次调用 CPU 超限 | 持续监控频率;不要降低 max_batch_size |
| Messages Retried > 0 | 接收端不稳定 | 排查 5xx 子请求;确认接收端容量 |
| DLQ > 0 | 有数据丢失风险,最高优先级 | 立即检查 DLQ 消息;从 R2 手动补传 |
| send-queue 并发 > 200 | 接近 250 并发上限 | 提高 send-queue max_batch_size |
| 变更类型 | 操作方式 | 是否需要重新部署 |
|---|---|---|
| 轮换 CTYUN_ENDPOINT / CTYUN_PRIVATE_KEY / CTYUN_URI_EDGE | wrangler secret put <NAME> | 否,立即生效 |
| 修改 BATCH_SIZE / LOG_LEVEL | 编辑 wrangler.toml → wrangler deploy | 是 |
更新 Worker 代码(src/index.js) | git push → CI/CD 自动部署 | 是(CI/CD 处理) |