Skip to content

Widhian Bramantya

coding is an art form

Menu
  • About Me
Menu
rabbitmq

Implementing RPC (Remote Procedure Call) with RabbitMQ

Posted on September 9, 2025September 9, 2025 by admin

Introduction

Sometimes, one service needs to send a request and wait for a reply from another service. This is called Remote Procedure Call (RPC).

RabbitMQ can be used to build an RPC pattern. Even though RabbitMQ is normally used for async messaging, with the right setup we can also support request/response workflows.

How RPC Works in RabbitMQ

The flow looks like this:

sequenceDiagram
  participant Client
  participant Broker as RabbitMQ
  participant Server

  Client->>Broker: Publish request (corr_id=123, reply_to=callback_queue)
  Broker->>Server: Deliver request
  Server-->>Broker: Publish response (corr_id=123, to=callback_queue)
  Broker-->>Client: Deliver response (corr_id=123)

Steps:

  1. Client sends a request message with two special properties:
    • reply_to: the name of the queue where the response should go.
    • correlation_id: a unique ID to match request and response.
  2. Server (worker) consumes the request queue, processes it, and sends a reply back to the reply_to queue with the same correlation_id.
  3. Client waits on the callback queue, matches the correlation_id, and returns the result.

Setting Up RPC in Go

We use the official RabbitMQ Go client:

go get github.com/rabbitmq/amqp091-go

Server (Worker)

The server receives requests, processes them, and sends back results.

package main

import (
	"log"
	amqp "github.com/rabbitmq/amqp091-go"
	"strconv"
)

func fib(n int) int {
	if n == 0 { return 0 }
	if n == 1 { return 1 }
	return fib(n-1) + fib(n-2)
}

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
	defer conn.Close()
	ch, _ := conn.Channel()
	defer ch.Close()

	q, _ := ch.QueueDeclare("rpc_queue", false, false, false, false, nil)
	msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil)

	log.Println(" [x] Awaiting RPC requests")
	for d := range msgs {
		n, _ := strconv.Atoi(string(d.Body))
		log.Printf(" [.] fib(%d)", n)

		response := strconv.Itoa(fib(n))

		// Publish result to reply_to queue
		ch.Publish("", d.ReplyTo, false, false, amqp.Publishing{
			ContentType:   "text/plain",
			CorrelationId: d.CorrelationId,
			Body:          []byte(response),
		})
	}
}

Client

The client sends requests and waits for the response.

package main

import (
	"log"
	amqp "github.com/rabbitmq/amqp091-go"
	"fmt"
	"math/rand"
	"time"
)

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
	defer conn.Close()
	ch, _ := conn.Channel()
	defer ch.Close()

	// Declare exclusive callback queue
	q, _ := ch.QueueDeclare("", false, false, true, false, nil)
	msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil)

	corrId := fmt.Sprintf("%d", rand.Int())

	// Send request with reply_to and correlation_id
	n := "6"
	ch.Publish("", "rpc_queue", false, false, amqp.Publishing{
		ContentType:   "text/plain",
		CorrelationId: corrId,
		ReplyTo:       q.Name,
		Body:          []byte(n),
	})
	log.Printf(" [x] Requesting fib(%s)", n)

	// Wait for response
	for d := range msgs {
		if d.CorrelationId == corrId {
			log.Printf(" [.] Got %s", d.Body)
			break
		}
	}
}
See also  Understanding RabbitMQ Exchange Types with Go: Default, Direct, Fanout, and Topic
Pages: 1 2
Category: RabbitMQ

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Linkedin

Widhian Bramantya

Recent Posts

  • Log Management at Scale: Integrating Elasticsearch with Beats, Logstash, and Kibana
  • Index Lifecycle Management (ILM) in Elasticsearch: Automatic Data Control Made Simple
  • Blue-Green Deployment in Elasticsearch: Safe Reindexing and Zero-Downtime Upgrades
  • Maintaining Super Large Datasets in Elasticsearch
  • Elasticsearch Best Practices for Beginners
  • Implementing the Outbox Pattern with Debezium
  • Production-Grade Debezium Connector with Kafka (Postgres Outbox Example – E-Commerce Orders)
  • Connecting Debezium with Kafka for Real-Time Streaming
  • Debezium Architecture – How It Works and Core Components
  • What is Debezium? – An Introduction to Change Data Capture
  • Offset Management and Consumer Groups in Kafka
  • Partitions, Replication, and Fault Tolerance in Kafka
  • Delivery Semantics in Kafka: At Most Once, At Least Once, Exactly Once
  • Producers and Consumers: How Data Flows in Kafka
  • Kafka Architecture Explained: Brokers, Topics, Partitions, and Offsets
  • Getting Started with Apache Kafka: Core Concepts and Use Cases
  • Security Best Practices for RabbitMQ in Production
  • Understanding RabbitMQ Virtual Hosts (vhosts) and Their Uses
  • RabbitMQ Performance Tuning: Optimizing Throughput and Latency
  • High Availability in RabbitMQ: Clustering and Mirrored Queues Explained

Recent Comments

  1. Playing with VPC AWS (Part 2) – Widhian's Blog on Playing with VPC AWS (Part 1): VPC, Subnet, Internet Gateway, Route Table, NAT, and Security Group
  2. Basic Concept of ElasticSearch (Part 3): Translog, Flush, and Refresh – Widhian's Blog on Basic Concept of ElasticSearch (Part 1): Introduction
  3. Basic Concept of ElasticSearch (Part 2): Architectural Perspective – Widhian's Blog on Basic Concept of ElasticSearch (Part 3): Translog, Flush, and Refresh
  4. Basic Concept of ElasticSearch (Part 3): Translog, Flush, and Refresh – Widhian's Blog on Basic Concept of ElasticSearch (Part 2): Architectural Perspective
  5. Basic Concept of ElasticSearch (Part 1): Introduction – Widhian's Blog on Basic Concept of ElasticSearch (Part 2): Architectural Perspective

Archives

  • October 2025
  • September 2025
  • August 2025
  • November 2021
  • October 2021
  • August 2021
  • July 2021
  • June 2021
  • March 2021
  • January 2021

Categories

  • Debezium
  • Devops
  • ElasticSearch
  • Golang
  • Kafka
  • Lua
  • NATS
  • Programming
  • RabbitMQ
  • Redis
  • VPC
© 2025 Widhian Bramantya | Powered by Minimalist Blog WordPress Theme