Introduction
Sometimes, one service needs to send a request and wait for a reply from another service. This is called Remote Procedure Call (RPC).
RabbitMQ can be used to build an RPC pattern. Even though RabbitMQ is normally used for async messaging, with the right setup we can also support request/response workflows.
How RPC Works in RabbitMQ
The flow looks like this:
sequenceDiagram participant Client participant Broker as RabbitMQ participant Server Client->>Broker: Publish request (corr_id=123, reply_to=callback_queue) Broker->>Server: Deliver request Server-->>Broker: Publish response (corr_id=123, to=callback_queue) Broker-->>Client: Deliver response (corr_id=123)
Steps:
- Client sends a request message with two special properties:
reply_to
: the name of the queue where the response should go.correlation_id
: a unique ID to match request and response.
- Server (worker) consumes the request queue, processes it, and sends a reply back to the
reply_to
queue with the samecorrelation_id
. - Client waits on the callback queue, matches the
correlation_id
, and returns the result.
Setting Up RPC in Go
We use the official RabbitMQ Go client:
go get github.com/rabbitmq/amqp091-go
Server (Worker)
The server receives requests, processes them, and sends back results.
package main import ( "log" amqp "github.com/rabbitmq/amqp091-go" "strconv" ) func fib(n int) int { if n == 0 { return 0 } if n == 1 { return 1 } return fib(n-1) + fib(n-2) } func main() { conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/") defer conn.Close() ch, _ := conn.Channel() defer ch.Close() q, _ := ch.QueueDeclare("rpc_queue", false, false, false, false, nil) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil) log.Println(" [x] Awaiting RPC requests") for d := range msgs { n, _ := strconv.Atoi(string(d.Body)) log.Printf(" [.] fib(%d)", n) response := strconv.Itoa(fib(n)) // Publish result to reply_to queue ch.Publish("", d.ReplyTo, false, false, amqp.Publishing{ ContentType: "text/plain", CorrelationId: d.CorrelationId, Body: []byte(response), }) } }
Client
The client sends requests and waits for the response.
package main import ( "log" amqp "github.com/rabbitmq/amqp091-go" "fmt" "math/rand" "time" ) func main() { conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/") defer conn.Close() ch, _ := conn.Channel() defer ch.Close() // Declare exclusive callback queue q, _ := ch.QueueDeclare("", false, false, true, false, nil) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil) corrId := fmt.Sprintf("%d", rand.Int()) // Send request with reply_to and correlation_id n := "6" ch.Publish("", "rpc_queue", false, false, amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, Body: []byte(n), }) log.Printf(" [x] Requesting fib(%s)", n) // Wait for response for d := range msgs { if d.CorrelationId == corrId { log.Printf(" [.] Got %s", d.Body) break } } }
Pages: 1 2
Category: RabbitMQ