Best Practices
A. Message design
- Keep messages small (IDs, not big payloads).
- Add metadata:
message_type
,correlation_id
,trace_id
,created_at
,tenant_id
,x-retry
. - Version your schema:
type=v2
, or use a versioned header.
B. Idempotency & exactly-once myth
- At-least-once delivery is normal. Build idempotent consumers:
- Use an idempotency key (message ID) and a processed table/cache.
- Make state changes safe to repeat.
- Don’t rely on global ordering. If you need ordering per key, shard by that key so one shard processes in order.
C. Throughput tuning
- Prefetch (QoS): start small (e.g., 10–50), then tune.
- Manual ack + batching (process N items then commit) when possible.
- Use multiple consumer replicas (horizontal scale).
- Reuse connections; open multiple channels per connection for concurrency.
D. Backpressure & protection
- Set queue length and message TTL limits.
- Use producer confirms to detect when the broker is slow or full.
- Auto-scale workers on queue depth / lag.
- Keep a max in-flight count per worker (prefetch + worker pool).
E. Reliability settings
- Durable queues + persistent messages + manual ACKs.
- Use publisher confirms on the producer side.
- Add DLQs and alarms.
F. Observability
- Track queue depth, ACK/NACK, retry counts, consumer utilization, processing latency.
- Log
correlation_id
end-to-end. - Create dashboards and alerts for spikes and DLQ growth.
G. Multi-tenancy & naming
- Use clear names:
svc.work.v1
,orders.events.v2
. - Consider per-tenant routing keys or vhosts if isolation is needed.
- Apply access control per vhost/user.
H. Deployment & HA notes
- Prefer quorum queues (for HA) instead of classic mirrored queues for new systems.
- Test node failure and network partitions.
- Keep definitions (exchanges/queues/bindings) in code or IaC so you can recreate them.
Tiny Go snippets you can reuse
Producer confirms (safer publishing)
ch.Confirm(false) acks := ch.NotifyPublish(make(chan amqp.Confirmation, 1)) err := ch.Publish("orders", "order.us.created", false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: []byte(`{"order_id":"123"}`), }) if err != nil { /* handle */ } conf := <-acks if !conf.Ack { /* retry or alert */ }
Consumer with prefetch + manual ack
ch.Qos(20, 0, false) // allow 20 in-flight msgs, _ := ch.Consume("work", "", false, false, false, false, nil) for d := range msgs { if err := process(d.Body); err != nil { d.Nack(false, true) // retry later (or route to delay queue) continue } d.Ack(false) }
Delay retry with TTL queue (idea)
main_queue --(fail)--> retry_exchange -> retry_queue (x-message-ttl = 10000) retry_queue ->(dead-letter to)-> main_exchange -> main_queue
Store x-retry
header; increase it each time; when it hits the limit, Nack(requeue=false)
to DLQ.
Category: RabbitMQ