Serverless log pipeline · Cloudflare Workers + R2 + Queues · CDN Partner Log Integration
Automatically transform Cloudflare edge access logs from JSON (Logpush format) into a custom CDN partner log format, and deliver them to a remote log ingestion endpoint in near real-time.
100% Cloudflare-native: Logpush + R2 + Queues + Workers. Zero servers, zero infrastructure maintenance, auto-scaling by design.
Target: 145-field \u0001-delimited plaintext format. Body compressed with Gzip. Authentication via MD5 HMAC URL signing.
Approximately 70 seconds from request to log delivery. Typically well within partner SLA requirements.
| Service | Role | Location in Dashboard |
|---|---|---|
| Logpush | Exports edge HTTP request logs to R2 (~1 min batches, gzip-compressed ndjson) | Domain → Analytics & Logs → Logpush |
| R2 Object Storage | Stores raw log files and processed batch files (temporary) | Dashboard → R2 Object Storage |
| R2 Event Notifications | Triggers a Queue message on every new object created in R2 | R2 Bucket → Settings → Event Notifications |
| Queues | Decouples parse and send stages; guarantees at-least-once delivery with automatic retry | Dashboard → Workers & Pages → Queues |
| Workers | Two logical workers in one script: Parser and Sender | Deployed via wrangler deploy or GitHub Actions CI/CD |
Raw log files are processed via DecompressionStream + ReadableStream. The Worker never buffers the full file in memory — it decompresses and transforms line-by-line, keeping memory usage constant regardless of file size. This is essential given the 128 MB Worker memory limit.
parse-queue handles R2 → transform. send-queue handles R2 → HTTP POST. The separation allows independent retry policies: parse failures retry 2×, send failures retry 5×. Both queues have dedicated dead-letter queues (DLQs) for observability and manual recovery.
Queues use at-least-once semantics. The Sender handles duplicate delivery gracefully: if the R2 temporary file has already been deleted (indicating a prior successful send), the message is silently acknowledged without re-sending.
The parser auto-detects EdgeStartTimestamp in three formats: Unix seconds integer, Unix milliseconds integer, and RFC 3339 string. Logpush is recommended to be configured with timestamp_format = unix for consistency.
The target format requires the CDN node country (not client country). EdgeColoCode (IATA airport code, e.g. NRT = Tokyo) is mapped to ISO 3166-1 alpha-2 country code via a built-in lookup table covering 200+ airports globally. If the IATA code is not in the table, defaults to CN.
Workers' SubtleCrypto API does not support MD5. A pure-JS RFC 1321 implementation is included in src/index.js. The auth URL signature format is: MD5({uri}-{ts}-{rand}-{privateKey}).
The http_requests dataset does not expose a dedicated Content-Length field. Field #21 is populated from ResponseHeaders['content-length'] when Logpush Custom Fields are configured via API (see Step 6b in Deployment). If not configured or the header is absent, the field returns -.
content-length is optional. Without it, field #21 will consistently output -, which is the standard placeholder for unavailable fields in this format.| Failure Scenario | Behavior | Recovery |
|---|---|---|
| Remote endpoint temporarily unavailable | send-queue retries up to 5× with exponential backoff (~15 min window) | Automatic; R2 file preserved until delivery confirmed |
| Malformed log record in source file | Per-line error caught; line skipped; batch continues | Error count logged; file-level processing succeeds |
| Parser crash on corrupt .gz file | parse-queue retries 2×; moves to parse-dlq | Manual inspection of DLQ message + raw R2 file |
| Duplicate Queue delivery | R2 file not found → silent ack, no re-send | Automatic; no data duplication |
Measured: 2026-03-30, 6-hour window (11:00–17:00 GMT+8) · HTTP traffic: 13.31M requests · Data transfer: 70.39 GB
| Metric | Observed | Assessment |
|---|---|---|
| Worker invocations (6h) | 2k | Matches Logpush batch count |
| Errors (internal) | 0 | Clean |
| Exceeded CPU Time Limits | 3 | Occurred at peak; monitor if increasing |
| Exceeded Memory | 0 | Streaming design effective |
| Median CPU Time | 3.52 ms | Far below 30s limit |
| P90 CPU Time | 355 ms | Normal for large file batches |
| P99 CPU Time | 3.17s | Within 30s limit |
| Wall Time P50 / P99 | 1.13s / 18.97s | Within limits including network wait |
| 5xx subrequests (to partner) | 36 / 15k | 0.24% — partner-side intermittent; auto-retried |
| Metric | Observed | Assessment |
|---|---|---|
| Messages ingested (6h) | 355 | Matches Logpush file count |
| Messages acknowledged | 347 (97.7%) | Healthy |
| Messages retried | 0 | No parse failures |
| Realtime backlog | 0 | Real-time consumption |
| Average backlog (avg/peak) | 5.47 / 13 | Light |
| Consumer lag time (avg/peak) | 15.13s / ~250s | Peak lag at traffic burst; acceptable |
| Consumer concurrency (avg/peak) | 1.37 / 14 | Auto-scaled effectively |
| DLQ messages | 0 | No data loss |
| Metric | Observed | Assessment |
|---|---|---|
| Messages ingested (6h) | 14.9k | ~14.9M log lines delivered |
| Messages acknowledged | 15.45k | Including prior backlog clearance |
| Messages retried | 0 | No send failures escalated to queue |
| Realtime backlog | 0 | Real-time consumption |
| Average backlog (avg/peak) | 32.6 / 106 | Moderate at peak; within capacity |
| Consumer lag time (avg/peak) | 5.62s / ~55s | Acceptable |
| Consumer concurrency (avg/peak) | 22.07 / ~220 | Near 250 concurrency limit at peak |
| DLQ messages | 0 | No data loss |
| Dimension | Current Load | 5× Traffic | 10× Traffic |
|---|---|---|---|
| Worker CPU | ~5% of limit | Safe | P99 near limit on large files |
| Worker Memory | 0 overruns | Safe (streaming design) | Safe |
| parse-queue lag | avg 15s, peak 250s | Peak lag ~5–10 min | Sustained backlog risk |
| send-queue concurrency | avg 22, peak 220 | Near 250 limit at peak | Exceeds 250 limit |
| Partner endpoint | 0.24% 5xx | Monitor 5xx rate | Likely bottleneck |
| Priority | Metric | Alert Threshold |
|---|---|---|
| ★★★ | DLQ message count (both queues) | > 0 — immediate action |
| ★★★ | Realtime Backlog (sustained) | > 100 messages for > 5 min |
| ★★★ | Exceeded CPU Time Limits | > 10 per hour |
| ★★ | Consumer Lag Time (parse-queue) | Sustained > 5 minutes |
| ★★ | 5xx subrequests rate | > 0.5% of total |
| ★★ | send-queue Consumer Concurrency | Sustained > 200 |
npm install -g wrangler). A Cloudflare API Token scoped to the target account with the following permissions: Workers Scripts:Edit, Workers R2 Storage:Edit, and Workers Queue:Edit (use the Edit Cloudflare Workers template and manually add the R2 and Queue permissions).Dashboard → R2 Object Storage → Create bucket
cdn-logs-rawDashboard → Workers & Pages → Queues → Create queue
Create all four queues (exact names required):
| Queue Name | Purpose |
|---|---|
parse-queue | Triggers Parser Worker on new R2 files |
send-queue | Triggers Sender Worker on processed batches |
parse-dlq | Dead-letter queue for parse failures |
send-dlq | Dead-letter queue for send failures |
R2 → cdn-logs-raw → Settings → Event Notifications → Add notification
object-createparse-queuelogs/.gzFrom the project directory, set encrypted secrets via Wrangler:
# Log ingestion endpoint (base URL, no trailing slash) wrangler secret put CTYUN_ENDPOINT # Authentication private key wrangler secret put CTYUN_PRIVATE_KEY # Target URI path (e.g. /logpost/yourpath) wrangler secret put CTYUN_URI_EDGE # Verify wrangler secret list
wrangler secret put directly. They persist in Cloudflare and do not need to be re-uploaded on each deployment.wrangler deploy
main branch. The workflow handles wrangler deploy automatically.Dashboard → [Target Zone] → Analytics & Logs → Logpush → Create a Logpush job
cdn-logs-raw · Path prefix: logs/{DATE}/1 (100%)By default, field #21 (sent_http_content_length) falls back to EdgeResponseBodyBytes. To capture the true Content-Length response header value, configure Logpush Custom Fields via API. This is a one-time zone-level configuration.
# List zone rulesets and find http_log_custom_fields phase
curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/rulesets" \
--request GET \
--header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
| jq '.result[] | select(.phase == "http_log_custom_fields") | {id, phase}'
If a ruleset is returned, note its id as RULESET_ID and skip to Step C. If no result, proceed to Step B.
curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/rulesets" \
--request POST \
--header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
--json '{
"name": "Zone-level phase entry point",
"kind": "zone",
"description": "Custom log fields for Logpush",
"phase": "http_log_custom_fields"
}' | jq '.result.id'
Note the returned id as RULESET_ID.
Use raw_response_fields to capture the original content-length value from the origin before any CF transformations.
curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/rulesets/$RULESET_ID" \
--request PUT \
--header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \
--json '{
"rules": [
{
"action": "log_custom_field",
"expression": "true",
"description": "Capture content-length response header for Logpush",
"action_parameters": {
"raw_response_fields": [
{ "name": "content-length" }
]
}
}
]
}'
# Get your Logpush job ID first curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/logpush/jobs" \ --request GET \ --header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \ | jq '.result[] | {id, name, dataset}' # Update job to include ResponseHeaders field curl "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/logpush/jobs/$JOB_ID" \ --request PUT \ --header "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \ --json '{ "output_options": { "field_names": [ "CacheCacheStatus","ClientCountry","ClientIP","ClientRequestBytes", "ClientRequestHost","ClientRequestMethod","ClientRequestProtocol", "ClientRequestReferer","ClientRequestScheme","ClientRequestURI", "ClientRequestUserAgent","ClientSSLProtocol","ClientSrcPort", "EdgeColoCode","EdgeResponseBodyBytes","EdgeResponseBytes", "EdgeResponseContentType","EdgeResponseStatus","EdgeServerIP", "EdgeStartTimestamp","EdgeTimeToFirstByteMs","OriginIP", "OriginRequestHeaderSendDurationMs","OriginResponseDurationMs", "OriginResponseHeaderReceiveDurationMs","OriginResponseStatus", "OriginTLSHandshakeDurationMs","ParentRayID","RayID", "ResponseHeaders" ], "timestamp_format": "unix" } }'
ResponseHeaders in each log line will contain {"content-length": "12345"} and the Worker will automatically use it for field #21.Content-Length and use chunked transfer encoding instead. In those cases field #21 will output -.Allow ~2 minutes for Logpush to initialize and write the first batch, then verify:
cdn-logs-raw/logs/ contains .log.gz filesparse-queue → Messages Processed counter is incrementingwrangler tail ctyun-logpush# Expected log output (LOG_LEVEL=info)
[INFO] Parsing: logs/20260328/xxx.log.gz
[INFO] Done: logs/20260328/xxx.log.gz | lines=73 batches=1 errors=0
[INFO] Sent 73 lines → HTTP 200 | processed/xxx.txt
name = "ctyun-logpush" main = "src/index.js" compatibility_date = "2026-03-27" compatibility_flags = ["nodejs_compat"] account_id = "<your-account-id>" [[r2_buckets]] binding = "RAW_BUCKET" bucket_name = "cdn-logs-raw" [[queues.producers]] binding = "PARSE_QUEUE" queue = "parse-queue" [[queues.producers]] binding = "SEND_QUEUE" queue = "send-queue" [[queues.consumers]] queue = "parse-queue" max_batch_size = 5 max_batch_timeout = 10 max_retries = 2 dead_letter_queue = "parse-dlq" [[queues.consumers]] queue = "send-queue" max_batch_size = 50 max_batch_timeout = 5 max_retries = 5 dead_letter_queue = "send-dlq" [vars] BATCH_SIZE = "1000" # Lines per HTTP POST batch LOG_LEVEL = "info" # debug | info | warn | error workers_dev = false preview_urls = false
| Name | Type | Example | Description |
|---|---|---|---|
| CTYUN_ENDPOINT | Secret | http://log.example.com:5580 | Log ingestion server base URL (no trailing slash, no URI path) |
| CTYUN_PRIVATE_KEY | Secret | YourKey@1234 | MD5 authentication private key (provided by log partner) |
| CTYUN_URI_EDGE | Secret | /logpost/yourpath | Target URI path for HTTP POST (provided by log partner) |
| BATCH_SIZE | Var | 1000 | Number of log lines per POST batch. |
| LOG_LEVEL | Var | info | Worker log verbosity. Use debug for troubleshooting; revert to info in production. |
wrangler secret put <NAME> and persist in Cloudflare. They are never stored in wrangler.toml or committed to source control.The repository includes .github/workflows/deploy.yml. Any push to main triggers an automatic wrangler deploy (code and configuration only).
Required GitHub secret for CI/CD: CLOUDFLARE_API_TOKEN only. Worker secrets (CTYUN_ENDPOINT, CTYUN_PRIVATE_KEY, CTYUN_URI_EDGE) are managed separately via wrangler secret put.
| Indicator | Healthy State | Action if Abnormal |
|---|---|---|
R2 logs/ | New .gz files appearing every ~1 min | Check Logpush job is Enabled |
| parse-queue backlog | Near 0 at all times | Sustained backlog → check Worker logs for parse errors |
| send-queue backlog | Near 0 at all times | Sustained backlog → check remote endpoint availability |
| parse-dlq | Inactive (0 messages) | Messages present → inspect raw .gz file for corruption |
| send-dlq | Inactive (0 messages) | Messages present → check endpoint, manually reprocess from R2 |
R2 processed/ | No file accumulation | Files accumulating → Sender not consuming; check Worker logs |
| Signal | Meaning | Action |
|---|---|---|
| Backlog = 0 + Lag < 30s | System consuming in real time | Healthy — no action needed |
| Backlog growing continuously | Consumption falling behind production — genuine bottleneck | Check Worker errors; scale if needed |
| Exceeded CPU Time Limits > 0 | Individual Logpush file too large for single Worker invocation | Monitor frequency; do NOT reduce max_batch_size |
| Messages Retried > 0 | Partner endpoint instability | Investigate 5xx subrequests; confirm endpoint capacity |
| DLQ > 0 | Data loss risk — highest priority | Inspect DLQ messages immediately; manually reprocess from R2 |
| send-queue Concurrency > 200 | Near the 250 concurrent consumer limit | Increase max_batch_size for send-queue |
| Change Type | Method | Redeploy Required? |
|---|---|---|
| Rotate CTYUN_ENDPOINT / CTYUN_PRIVATE_KEY / CTYUN_URI_EDGE | wrangler secret put <NAME> | No — effective immediately |
| Change BATCH_SIZE / LOG_LEVEL | Edit wrangler.toml → wrangler deploy | Yes |
Update Worker code (src/index.js) | git push → CI/CD auto-deploys | Yes (handled by CI/CD) |