Cloudflare Enterprise — 技术参考文档

CF Logpush — 日志格式转换与自动推送

无服务器日志管道 · Cloudflare Workers + R2 + Queues · CDN 合作方日志对接

1
方案概述
🎯

目标

将 Cloudflare 边缘节点产生的 HTTP 访问日志(Logpush JSON 格式)自动转换为 CDN 合作方要求的自定义格式,并近实时推送到对方日志接收服务器。

🏗️

技术栈

全部基于 Cloudflare 原生服务:Logpush + R2 + Queues + Workers。无需服务器,无需运维,系统自动扩缩容。

📋

目标格式

145 字段 \u0001 分隔纯文本格式,HTTP Body 需 Gzip 压缩,URL 签名采用 MD5 鉴权方式。

端到端延迟

从请求产生到日志送达约 70 秒,通常远优于合作方 SLA 要求(典型要求为 3 分钟内)。

涉及的 Cloudflare 服务

服务作用Dashboard 位置
Logpush每约 1 分钟自动将边缘 HTTP 请求日志打包(ndjson.gz)写入 R2域名 → Analytics & Logs → Logpush
R2 Object Storage存储原始日志文件及处理中的临时批次文件Dashboard → R2 Object Storage
R2 Event NotificationsR2 每有新文件写入,立即触发 Queue 消息(秒级响应)R2 Bucket → Settings → Event Notifications
Queues解耦解析与发送两个阶段,保证 at-least-once 投递,自动重试Dashboard → Workers & Pages → Queues
Workers单脚本内包含两个逻辑 Worker:Parser(解析转换)和 Sender(签名推送)通过 wrangler deploy 或 GitHub Actions 部署
2
架构设计
┌─────────────────────────────────────────────────────────────────────────────────────┐ │ 【第一步】日志的产生 │ │ │ │ 用户打开网页 / App │ │ │ │ │ ▼ │ │ 请求到达 Cloudflare 边缘节点(全球300+个城市) │ │ │ │ │ │ CF 记录一条日志:用户IP、时间、URL、响应码、大小、缓存状态、耗时... │ │ ▼ │ │ CF 边缘处理完请求,日志暂存在 CF 内部系统 │ └───────────────────────────────────────┬─────────────────────────────────────────────┘ │ 每约1分钟,Logpush 自动打包推送 │ ▼ ┌─────────────────────────────────────────────────────────────────────────────────────┐ │ 【第二步】Logpush 自动推送日志到 R2 │ │ │ │ Logpush 服务(全自动,无需人工): │ │ 1. 打包过去1分钟的所有日志(ndjson格式,每行一条JSON日志) │ │ 2. Gzip 压缩(大幅减小文件体积) │ │ 3. 写入 R2 存储桶 │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ R2 Bucket: cdn-logs-raw │ │ │ │ logs/ │ │ │ │ └── 20260328/ │ ← 按日期自动分目录 │ │ │ ├── 20260328T090000Z_xxx.log.gz │ ← 每文件几KB到几MB │ │ │ ├── 20260328T090100Z_xxx.log.gz │ ← 文件名含时间戳,全球唯一 │ │ │ └── ...(每分钟一个新文件) │ │ │ └─────────────────────────────────────────┘ │ └───────────────────────────────────────┬─────────────────────────────────────────────┘ │ R2 检测到新文件 → 立即触发(秒级)Event Notification │ ▼ ┌─────────────────────────────────────────────────────────────────────────────────────┐ │ 【第三步】R2 事件通知 → parse-queue 排队 │ │ │ │ R2 Event Notification 自动发送消息到 parse-queue: │ │ { "bucket": "cdn-logs-raw", "object": { "key": "logs/20260328/xxx.log.gz" } } │ │ (消息只有文件路径,几十字节,永远不超128KB限制) │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ Queue: parse-queue(解析任务队列) │ │ │ │ 消息1: { key: "logs/.../a.log.gz" } │ ← 只是文件路径,不含日志内容 │ │ │ 消息2: { key: "logs/.../b.log.gz" } │ ← 自动批量、自动重试、自动扩缩容 │ │ │ ... │ │ │ └─────────────────────────────────────────┘ │ └───────────────────────────────────────┬─────────────────────────────────────────────┘ │ Queue 有消息时自动唤醒 Parser Worker(约3-5秒) │ ▼ ┌─────────────────────────────────────────────────────────────────────────────────────┐ │ 【第四步】Parser Worker — 读取、解压、解析、格式转换 │ │ │ │ ① 从 R2 取得 .gz 文件流(不全量下载,只拿到流引用) │ │ │ │ │ ▼ │ │ ② 实时解压(DecompressionStream)边读边解压,内存只有当前几KB数据 │ │ │ │ │ ▼ │ │ ③ 按行切割,逐行解析 JSON(每行一条CF原始日志) │ │ │ │ │ ▼ │ │ ④ 格式转换(transformEdge函数) │ │ CF JSON → \u0001 分隔的145字段纯文本格式 │ │ │ │ 转换前:{ "RayID":"9e34...", "ClientIP":"221.229...", "EdgeColoCode":"HGH" } │ │ 转换后:cs_vod_v3.0\u0001[28/Mar/2026:16:42:25 +0800]\u00019e34...\u0001200 │ │ \u00011774680145.000\u00010.149\u0001...(共145字段) │ │ │ │ │ ▼ │ │ ⑤ 每累积1000条(或文件处理完)→ 写入 R2 临时文件 + 发消息到 send-queue │ └───────────────────────────────────────┬─────────────────────────────────────────────┘ │ 写入 R2 临时文件,发消息 │ ▼ ┌─────────────────────────────────────────────────────────────────────────────────────┐ │ 【第五步】R2 临时文件 + send-queue 排队 │ │ │ │ R2 临时文件(processed/ 目录,已转换为目标格式): │ │ processed/logs_20260328_xxx-0-1774683718956.txt ← 最多1000条日志,纯文本 │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ Queue: send-queue(发送任务队列) │ │ │ │ 消息: { "key": "processed/...txt" } │ ← 只传文件路径,几十字节 │ │ │ ... │ ← 发送失败自动重试5次,彻底失败进DLQ │ │ └─────────────────────────────────────────┘ │ └───────────────────────────────────────┬─────────────────────────────────────────────┘ │ Queue 有消息时自动唤醒 Sender Worker(约3-5秒) │ ▼ ┌─────────────────────────────────────────────────────────────────────────────────────┐ │ 【第六步】Sender Worker — 压缩、签名、发送 │ │ │ │ ① 从 R2 读取临时文件(已转换好的最多1000条目标格式日志) │ │ │ │ │ ▼ │ │ ② Gzip 压缩 Body(合作方规范要求) │ │ │ │ │ ▼ │ │ ③ 计算 MD5 鉴权签名: │ │ ts = 当前Unix时间 + 300秒 │ │ rand = 随机数 │ │ 签名串 = "{uri}-{ts}-{rand}-{私钥}" │ │ md5hash = MD5(签名串) │ │ URL = {endpoint}{uri}?auth_key={ts}-{rand}-{md5} │ │ │ │ │ ▼ │ │ ④ HTTP POST 发送到合作方日志接收服务器(Content-Encoding: gzip) │ │ │ │ │ ├── 成功(HTTP 200) → ack消息 + 删除R2临时文件 ✅ │ │ └── 失败 → retry → 最多5次 → 彻底失败进 send-dlq ⚠️ │ └───────────────────────────────────────┬─────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────────────────┐ │ 【第七步】合作方接收入库 │ │ 验证鉴权 → 解压Gzip → 按\u0001拆分145字段 → 写入计费/分析数据库 ✓ │ └─────────────────────────────────────────────────────────────────────────────────────┘
Queue 消息中仅传递 R2 对象路径(数十字节),从不携带日志内容,彻底规避 Queue 128 KB 消息大小限制。
3
核心设计决策

流式 Gzip 解压

原始日志文件通过 DecompressionStream + ReadableStream 流式处理。Worker 无需将整个文件载入内存,逐行解压转换,内存占用恒定,不受文件大小影响。这是 Workers 128 MB 内存限制下的必要设计。

两级 Queue 架构

parse-queue 负责 R2 → 格式转换;send-queue 负责 R2 → HTTP 推送。两级分离实现独立重试策略:解析失败重试 2 次,发送失败重试 5 次。每个 Queue 均配置独立死信队列(DLQ)用于可观测性与人工恢复。

幂等投递处理

Queues 采用 at-least-once 语义。Sender 处理重复投递时,若对应 R2 临时文件已被删除(表明已成功发送),则静默 ack,不重复发送。

时间戳兼容

Parser 自动识别 EdgeStartTimestamp 的三种格式:Unix 秒整数、Unix 毫秒整数、RFC 3339 字符串。推荐 Logpush 配置 timestamp_format = unix 以保证一致性。

CDN 节点国家码映射

目标格式要求的是 CDN 节点所在国家(非客户端国家)。EdgeColoCode(IATA 机场三字码,如 NRT = 东京)通过内置映射表转换为 ISO 3166-1 alpha-2 国家码,覆盖全球 200+ 机场。若未命中则默认返回 CN

MD5 鉴权

Workers 的 SubtleCrypto API 不支持 MD5 算法,因此 src/index.js 内嵌了纯 JS 实现的 RFC 1321 MD5。鉴权 URL 签名格式为:MD5({uri}-{ts}-{rand}-{privateKey})

响应内容长度字段(#21 sent_http_content_length)

http_requests 数据集没有直接暴露 Content-Length 字段。字段 #21 从 ResponseHeaders['content-length'] 取值,需通过 API 配置 Logpush Custom Fields(见部署步骤 6b)。未配置或响应头中不含该字段时,统一返回 -

Content-Length 的 Custom Fields 配置是可选项。不配置时字段 #21 固定输出 -,与该格式中其他无数据字段的处理方式一致。

可靠性模型

故障场景系统行为恢复方式
接收端临时不可用send-queue 指数退避重试,最多 5 次(约 15 分钟窗口)自动恢复;R2 临时文件保留至确认成功
原始文件含格式错误行逐行捕获异常,跳过该行,继续处理错误计数写入日志;文件级处理成功
parse-queue 处理失败重试 2 次后转入 parse-dlq人工检查 DLQ 消息及 R2 原始文件
Queue 重复投递R2 文件不存在 → 静默 ack,不重发自动处理,无数据重复
4
性能指标

实测时间:2026-03-30,6小时窗口(11:00–17:00 GMT+8)· HTTP 请求总量:13.31M · 数据传输:70.39 GB

Worker

指标实测数据评估
Worker 调用次数(6h)2k与 Logpush 文件批次数吻合
内部错误数0健康
CPU 时间超限次数3发生在流量峰值期,需持续观察
内存超限次数0流式处理设计有效
中位 CPU Time3.52 ms远低于 30s 限制
P90 CPU Time355 ms处理大文件批次时正常
P99 CPU Time3.17s在 30s 限制内
Wall Time P50 / P991.13s / 18.97s含网络等待,在限制内
5xx 子请求(发往接收端)36 / 15k(0.24%)接收端间歇性问题,已自动重试

parse-queue

指标实测数据评估
消息投递量(6h)355与 Logpush 文件数吻合
消息确认量(ack率)347(97.7%)健康
消息重试次数0无解析失败
实时积压0实时消费,无积压
平均积压(均值/峰值)5.47 / 13轻载
消费者延迟(均值/峰值)15.13s / ~250s峰值时延偏高,流量突发所致,可接受
消费者并发(均值/峰值)1.37 / 14自动弹性扩容有效
DLQ 消息数0零数据丢失

send-queue

指标实测数据评估
消息投递量(6h)14.9k约 1490 万条日志行成功推送
消息确认量15.45k含历史积压清零
消息重试次数0发送失败未上升至 Queue 级重试
实时积压0实时消费
平均积压(均值/峰值)32.6 / 106峰值积压明显,在容量范围内
消费者延迟(均值/峰值)5.62s / ~55s可接受
消费者并发(均值/峰值)22.07 / ~220峰值接近 250 并发上限
DLQ 消息数0零数据丢失

负载余量评估

维度当前负载5倍流量10倍流量
Worker CPU约 5% 上限安全大文件时 P99 接近上限
Worker 内存0次超限安全(流式设计)安全
parse-queue 延迟均值 15s,峰值 250s峰值延迟 5–10 分钟持续积压风险
send-queue 并发均值 22,峰值 220峰值接近 250 上限超出 250 上限
接收端可用性5xx 0.24%需监控 5xx 率可能成为瓶颈
综合评估:当前流量下系统运行轻松健康。5倍以内流量可稳定承载。超过5倍时,send-queue 并发和 parse-queue 延迟将成为主要约束,建议提前与接收端确认对端容量上限。

关键监控指标建议

优先级指标建议告警阈值
★★★DLQ 消息数(两个队列)> 0,立即处理
★★★实时积压(持续)> 100 条持续超过 5 分钟
★★★CPU 时间超限次数> 10 次/小时
★★parse-queue 消费者延迟(持续)持续超过 5 分钟
★★5xx 子请求比例> 0.5% 总请求量
★★send-queue 消费者并发(持续)持续 > 200
P99 CPU Time> 20s
5
部署指南
前置条件:本地已安装 Node.js v18+;已安装 Wrangler CLI(npm install -g wrangler);已准备好 Cloudflare API Token,权限范围限定为目标账号,需包含以下三项:Workers Scripts:EditWorkers R2 Storage:EditWorkers Queue:Edit(可使用 Edit Cloudflare Workers 模板后手动追加后两项)。
部署前请向日志对接方确认:推送服务器地址(Endpoint)、鉴权私钥(Private Key)、接收 URI 路径(URI Path)。
1

创建 R2 存储桶

Dashboard → R2 Object Storage → Create bucket

  • 名称:cdn-logs-raw
  • Location:Automatic(默认)
2

创建 4 个 Queue

Dashboard → Workers & Pages → Queues → Create queue

依次创建以下 4 个队列(名称须完全一致):

Queue 名称用途
parse-queue接收新日志文件通知,触发 Parser Worker
send-queue接收转换完成通知,触发 Sender Worker
parse-dlq解析彻底失败的死信队列
send-dlq发送彻底失败的死信队列
3

配置 R2 Event Notification

R2 → cdn-logs-raw → Settings → Event Notifications → Add notification

  • Event type:object-create
  • Queue:parse-queue
  • Prefix(可选):logs/
  • Suffix(可选):.gz
4

设置 Worker Secrets

在项目目录下,通过 Wrangler 设置加密 Secret:

# 日志接收服务器地址(不含 URI 路径,不含末尾斜杠)
wrangler secret put CTYUN_ENDPOINT

# 鉴权私钥
wrangler secret put CTYUN_PRIVATE_KEY

# 接收 URI 路径(如 /logpost/yourpath)
wrangler secret put CTYUN_URI_EDGE

# 验证已设置的 Secret
wrangler secret list
若使用 GitHub Actions CI/CD,Secrets 仍需通过 wrangler secret put 直接写入 Cloudflare。它们会持久保存在 CF 云端,无需每次部署时重新上传。
5

部署 Worker

wrangler deploy
预期输出中应确认:parse-queue 和 send-queue 的 Producer 与 Consumer 绑定均已创建。Current Version ID 已分配。
若通过 GitHub Actions 部署,直接 push 到 main 分支即可,CI/CD 会自动执行 wrangler deploy
6

配置 Logpush Job

Dashboard → [目标域名] → Analytics & Logs → Logpush → Create a Logpush job

  1. 目的地:选 R2
  2. Bucket:cdn-logs-raw  ·  Path prefix:logs/{DATE}/
  3. 数据集:HTTP requests
  4. 字段:全选所有字段(代码对缺失字段有容错处理)
  5. Timestamp format:Unix
  6. Sample rate:1(100% 采样)
Timestamp format 必须选 Unix,否则时间戳解析可能出错。
6b

开启 Content-Length 日志采集 — 可选

默认情况下,字段 #21(sent_http_content_length)回退使用 EdgeResponseBodyBytes。如需采集真实的 Content-Length 响应头值,需通过 API 配置 Logpush Custom Fields。这是一次性的 Zone 级别配置。

此配置只能通过 API 完成,Cloudflare Dashboard UI 不支持设置 raw response fields。

步骤 A — 查找是否已有 Custom Fields Ruleset

# 列出所有 ruleset,找 http_log_custom_fields 阶段
curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/rulesets" \
  --request GET \
  --header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
  | jq '.result[] | select(.phase == "http_log_custom_fields") | {id, phase}'

若有返回,记录 id 作为 RULESET_ID,跳至步骤 C。若无返回,执行步骤 B。

步骤 B — 创建 Custom Fields Ruleset(仅步骤 A 无结果时执行)

curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/rulesets" \
  --request POST \
  --header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
  --json '{
    "name": "Zone-level phase entry point",
    "kind": "zone",
    "description": "Custom log fields for Logpush",
    "phase": "http_log_custom_fields"
  }' | jq '.result.id'

记录返回的 id 作为 RULESET_ID

步骤 C — 配置采集 content-length 响应头

使用 raw_response_fields 捕获源站返回的原始 content-length 值(CF Transform Rules 修改之前)。

curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/rulesets/$RULESET_ID" \
  --request PUT \
  --header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
  --json '{
    "rules": [
      {
        "action": "log_custom_field",
        "expression": "true",
        "description": "Capture content-length response header for Logpush",
        "action_parameters": {
          "raw_response_fields": [
            { "name": "content-length" }
          ]
        }
      }
    ]
  }'

步骤 D — 在 Logpush Job 中加入 ResponseHeaders 字段

# 先获取 Logpush Job ID
curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/logpush/jobs" \
  --request GET \
  --header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
  | jq '.result[] | {id, name, dataset}'

# 更新 Job,加入 ResponseHeaders 字段
curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/logpush/jobs/$JOB_ID" \
  --request PUT \
  --header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
  --json '{
    "output_options": {
      "field_names": [
        "CacheCacheStatus","ClientCountry","ClientIP","ClientRequestBytes",
        "ClientRequestHost","ClientRequestMethod","ClientRequestProtocol",
        "ClientRequestReferer","ClientRequestScheme","ClientRequestURI",
        "ClientRequestUserAgent","ClientSSLProtocol","ClientSrcPort",
        "EdgeColoCode","EdgeResponseBodyBytes","EdgeResponseBytes",
        "EdgeResponseContentType","EdgeResponseStatus","EdgeServerIP",
        "EdgeStartTimestamp","EdgeTimeToFirstByteMs","OriginIP",
        "OriginRequestHeaderSendDurationMs","OriginResponseDurationMs",
        "OriginResponseHeaderReceiveDurationMs","OriginResponseStatus",
        "OriginTLSHandshakeDurationMs","ParentRayID","RayID",
        "ResponseHeaders"
      ],
      "timestamp_format": "unix"
    }
  }'
配置完成后,每条日志的 ResponseHeaders 字段将包含 {"content-length": "12345"},Worker 自动将其用于字段 #21。
注意:HTTP/2 响应或 CF 开启 gzip 压缩时,源站可能不发送 Content-Length 头(改用 chunked transfer encoding),此时字段 #21 输出 -
7

验证端到端链路

等待约 2 分钟(Logpush 首次写入需初始化),逐步验证以下三处:

  1. R2:cdn-logs-raw/logs/ 目录下出现 .log.gz 文件
  2. Queue:parse-queue 的 Messages Processed 数量持续增长
  3. Worker 日志:通过 wrangler tail ctyun-logpush 查看实时日志
# 正常日志输出示例(LOG_LEVEL=info)
[INFO] Parsing: logs/20260328/xxx.log.gz
[INFO] Done: logs/20260328/xxx.log.gz | lines=73 batches=1 errors=0
[INFO] Sent 73 lines → HTTP 200 | processed/xxx.txt
发送返回 HTTP 401/403 → 检查 CTYUN_PRIVATE_KEY 是否正确
发送返回 HTTP 404 → 检查 CTYUN_URI_EDGE 路径是否正确
6
配置参考

wrangler.toml

name                = "ctyun-logpush"
main                = "src/index.js"
compatibility_date  = "2026-03-27"
compatibility_flags = ["nodejs_compat"]
account_id          = "<your-account-id>"

[[r2_buckets]]
binding     = "RAW_BUCKET"
bucket_name = "cdn-logs-raw"

[[queues.producers]]
binding = "PARSE_QUEUE"
queue   = "parse-queue"

[[queues.producers]]
binding = "SEND_QUEUE"
queue   = "send-queue"

[[queues.consumers]]
queue              = "parse-queue"
max_batch_size     = 5
max_batch_timeout  = 10
max_retries        = 2
dead_letter_queue  = "parse-dlq"

[[queues.consumers]]
queue              = "send-queue"
max_batch_size     = 50
max_batch_timeout  = 5
max_retries        = 5
dead_letter_queue  = "send-dlq"

[vars]
BATCH_SIZE = "1000"  # 每次 POST 的最大日志行数
LOG_LEVEL  = "info"  # debug | info | warn | error

workers_dev  = false
preview_urls = false

环境变量说明

变量名类型示例值说明
CTYUN_ENDPOINTSecret http://log.example.com:5580 日志接收服务器地址,不含 URI 路径,不含末尾斜杠
CTYUN_PRIVATE_KEYSecret YourKey@1234 MD5 鉴权私钥,由对接方提供
CTYUN_URI_EDGESecret /logpost/yourpath HTTP POST 目标 URI 路径,由对接方提供
BATCH_SIZEVar 1000 每批 POST 包含的日志行数
LOG_LEVELVar info Worker 日志级别。排查问题时改为 debug,生产恢复 info
Secret 通过 wrangler secret put <NAME> 一次性设置后持久保存在 Cloudflare 云端,任何情况下不得写入 wrangler.toml 或提交至代码仓库。

GitHub Actions CI/CD

仓库中包含 .github/workflows/deploy.yml。向 main 分支推送代码时,自动触发 wrangler deploy(仅部署代码和配置)。

CI/CD 所需 GitHub secret 仅一个:CLOUDFLARE_API_TOKEN。Worker secrets(CTYUN_ENDPOINTCTYUN_PRIVATE_KEYCTYUN_URI_EDGE)通过 wrangler secret put 独立管理。

7
日常运维

Dashboard 健康状态检查

检查项正常状态异常处理
R2 logs/ 目录每约 1 分钟有新 .gz 文件检查 Logpush Job 是否处于 Enabled 状态
parse-queue 积压接近 0持续积压 → 查看 Worker 日志排查解析错误
send-queue 积压接近 0持续积压 → 检查接收端服务器可用性
parse-dlqInactive(0 消息)有消息 → 检查对应 R2 原始文件是否损坏
send-dlqInactive(0 消息)有消息 → 检查接收端,修复后从 R2 手动补传
R2 processed/ 目录无文件积压文件积压 → Sender 异常,查看 Worker 日志

指标解读指南

信号含义处理方式
Backlog = 0 + Lag < 30s系统实时消化,运行健康正常,无需操作
Backlog 持续增长消费速度跟不上生产速度,真正的瓶颈信号检查 Worker 错误日志;必要时扩容
CPU Time Exceeded > 0单个 Logpush 文件过大,单次调用 CPU 超限持续监控频率;不要降低 max_batch_size
Messages Retried > 0接收端不稳定排查 5xx 子请求;确认接收端容量
DLQ > 0有数据丢失风险,最高优先级立即检查 DLQ 消息;从 R2 手动补传
send-queue 并发 > 200接近 250 并发上限提高 send-queue max_batch_size

配置变更说明

变更类型操作方式是否需要重新部署
轮换 CTYUN_ENDPOINT / CTYUN_PRIVATE_KEY / CTYUN_URI_EDGEwrangler secret put <NAME>否,立即生效
修改 BATCH_SIZE / LOG_LEVEL编辑 wrangler.tomlwrangler deploy
更新 Worker 代码(src/index.jsgit push → CI/CD 自动部署是(CI/CD 处理)