system-design-bangla

RabbitMQ

এটি একটি জনপ্রিয় Message Broker যা Message Queue - implement করে থাকে।

Message Broker কি?

যে system - message routing, delivery, exchanges, queues, acknowledgements, retries ইত্যাদি manage করে তাই হচ্ছে Message Broker।

RabbitMQ এর components

rabbitmq-basic

Basic Code Example

Connection Manager

// rabbitmq/connection.js
const amqp = require("amqplib");

let connection = null;
let channel = null;

async function connectRabbitMQ() {
  try {
    connection = await amqp.connect(process.env.RABBITMQ_URL);

    connection.on("error", (err) => {
      console.error("RabbitMQ connection error:", err.message);
    });

    connection.on("close", () => {
      console.error("RabbitMQ connection closed. Reconnecting...");
      setTimeout(connectRabbitMQ, 5000);
    });

    channel = await connection.createChannel();

    console.log("✅ RabbitMQ connected");
  } catch (err) {
    console.error("❌ RabbitMQ connection failed:", err.message);
    setTimeout(connectRabbitMQ, 5000);
  }
}

function getChannel() {
  if (!channel) throw new Error("Channel not initialized");
  return channel;
}

module.exports = {
  connectRabbitMQ,
  getChannel,
};

Producer/Publisher

// rabbitmq/producer.js
const { getChannel } = require("./connection");

async function publish(queue, message) {
  const channel = getChannel();

  await channel.assertQueue(queue, {
    durable: true, // survives restart
  });

  channel.sendToQueue(
    queue,
    Buffer.from(JSON.stringify(message)),
    {
      persistent: true, // message saved to disk
    }
  );

  console.log("📤 Message sent:", message);
}

module.exports = { publish };

Consumer/Worker

// rabbitmq/consumer.js
const { getChannel } = require("./connection");

async function consume(queue, handler) {
  const channel = getChannel();

  await channel.assertQueue(queue, {
    durable: true,
  });

  // Fair dispatch (important for scaling)
  channel.prefetch(1);

  console.log(`📥 Waiting for messages in ${queue}`);

  channel.consume(queue, async (msg) => {
    if (!msg) return;

    try {
      const data = JSON.parse(msg.content.toString());

      await handler(data);

      channel.ack(msg); // success
    } catch (err) {
      console.error("❌ Error processing message:", err);

      // Reject and requeue (or send to DLQ in real systems)
      channel.nack(msg, false, true);
    }
  });
}

module.exports = { consume };

Queue Backpressure এবং Auto-scaling Workers

ধরুন peak time-এ আপনার সিস্টেমে প্রতি সেকেন্ডে ১০০০টা মেসেজ আসছে, কিন্তু আপনার consumer প্রতি সেকেন্ডে মাত্র ২০০টা প্রসেস করতে পারে। তাহলে প্রতি সেকেন্ডে ৮০০টা মেসেজ queue-এ জমা হতে থাকবে। Peak শেষ হওয়ার পরও এই backlog ক্লিয়ার করতে ৪০-৫০ মিনিট লাগবে (যদি কোনো নতুন মেসেজ না-ই আসে)।

এর উপর যদি consumer-এ error হয় এবং মেসেজ nack/requeue হয়, তাহলে queue আরও বড় হতে থাকবে।

ভুল চিন্তাভাবনা: “Just queue them all” — সব মেসেজ queue-তে ফেলে দিই, পরে দেখা যাবে।

সঠিক চিন্তাভাবনা: “Can our consumers catch up eventually?” — যদি queue সাইজ ক্রমাগত বাড়তেই থাকে এবং consumer কখনো catch up করতে না পারে, তাহলে সিস্টেম একসময় ভেঙে পড়বে। প্রশ্নটা হওয়া উচিত — আমাদের consumer-রা কি কখনো এই backlog সামলে উঠতে পারবে?

সমাধান: Dynamic Worker Scaling

Queue-এর সাইজ (depth) মনিটর করে, সেই অনুযায়ী consumer/worker-এর সংখ্যা বাড়ানো বা কমানো (auto-scale)। যেমন:

এভাবে peak load-এও সিস্টেম backlog ক্লিয়ার করতে পারে, এবং normal time-এ অতিরিক্ত resource খরচ হয় না।

Message Queue এবং Worker Thread এর তফাৎ

Message Queue এবং Worker Thread নিয়ে অনেকের ভিতর confusion কাজ করে – দুটোই asynchronous processing এ ব্যবহৃত হয়।

Message Queue হচ্ছে একটা asynchronous communication mechanism, যা ভিন্ন প্রসেস বা সার্ভিসের মধ্যে decoupled ভাবে কাজ করে। একটি service (producer) message পাঠায় queue-এ, এবং অন্য service (consumer) পরে সেটি নিয়ে প্রসেস করে।

Worker Thread হচ্ছে একটি thread যেটি background এ task execute করে। এটি একই process এর ভিতরে কাজ করে।

Message Queue এবং Worker Thread একসাথে ব্যবহার করা যায়। সাধারণত Message Queue থেকে message বা job নিয়ে Worker Thread সেগুলো process করে। Process শেষ হলে worker acknowledgement (ACK) পাঠায়।

mq-wt

Real life Message Queue use-cases

Duplicate অর্ডার সমস্যা — RabbitMQ এবং Idempotency

একদিন সকালে John তাদের ওয়েবসাইটের orders টেবিল দেখছিলেন। হঠাৎ চোখে পড়ল — একই অর্ডার একাধিকবার ঢুকে গেছে Production Database-এ। ব্যাপারটা কী?

সমস্যার শুরু কোথায়?

John দেখলেন, payment module-এ RabbitMQ চলছে। আর সেখানেই লুকিয়ে ছিল আসল সমস্যা।

RabbitMQ (বা যেকোনো message queue) একটা গ্যারান্টি দেয় — at-least-once delivery। মানে, একটা message অন্তত একবার পৌঁছাবেই। কিন্তু এটা exactly-once না। অর্থাৎ, কোনো কোনো সময় একই message একাধিকবারও আসতে পারে।

কীভাবে ঘটল?

স্বাভাবিক flow ছিল এরকম —

কিন্তু ধরুন, insert হওয়ার পর ACK পাঠানোর আগেই consumer crash করল, অথবা network চলে গেল।

RabbitMQ তখন ভাবল — “ACK আসেনি, তার মানে message process হয়নি।” তাই সে আবার message পাঠিয়ে দিল।

আর consumer সেই একই কাজ আবার করল — আবার insert। ফলাফল? duplicate এন্ট্রি।

সমাধান কী?

যদি consumer ভুলে একই data দুইবার insert করতে চায়, Database নিজেই সেটা reject করে দেবে। সহজ, কিন্তু কার্যকর।

তবে শুধু এটুকুতেই কাজ শেষ না।

Unique constraint error টা ধরবে ঠিকই, কিন্তু application-এ exception উঠবে। তাই এই exception টাকে “failure” হিসেবে না দেখে “ইতিমধ্যে হয়ে গেছে” হিসেবে handle করতে হবে।

আরেকটু পরিষ্কার সমাধান হলো idempotency key — সাধারণত RabbitMQ-এর message_id ব্যবহার করা হয় এই কাজে।

প্রতিটা message-এর একটা unique ID থাকে। সেই ID টা আলাদা একটা জায়গায় রেখে দিন। পরের বার একই message আসলে আগেই চেক করুন — এই ID কি আগে দেখা গেছে?

যদি হ্যাঁ → ACK করুন, কিন্তু আর কিছু করবেন না

যদি না → Insert করুন, তারপর ACK করুন

duplicate-mq

Message Queue এবং Pub/Sub এর মধ্যে পার্থক্য কি?

Message Queue — কাজের ধরন

ধরেন আপনি একটা রেস্টুরেন্টের কিচেনে কাজ করছেন। অর্ডার আসে, একটা তালিকায় জমা হয়, এবং যে রাঁধুনি ফ্রি সে অর্ডারটা তুলে নেয়। এক অর্ডার, এক রাঁধুনি।

// Producer — কাজ পাঠাচ্ছে
queue.send("resize_image", {"file": "photo.jpg", "size": "800x600"})

// Consumer A (একজনই এটা process করবে)
task = queue.receive()  # "resize_image" পেয়ে কাজ করল

// Queue থেকে message delete হয়ে গেল

মূল বৈশিষ্ট্য: message একবার consume হলে চলে যায়। Consumer B বা C এটা আর দেখবে না। Load balancing নিজেই হয় — ৩টা consumer থাকলে ৩টা task parallel-এ চলবে।

Pub/Sub — কাজের ধরন

এটি একপ্রকারের রেডিও ব্রডকাস্ট। একজন বলছে, যতজন শুনছে সবাই সেটা পাচ্ছে। কেউ শুনলো বা না শুনলো সেটা broadcaster-এর দায় নয়।

// Publisher — event পাঠাচ্ছে
topic.publish("order_placed", {"order_id": 42, "total": 1500})

// Subscriber 1 — Email Service
def on_order(event):
  send_confirmation_email(event["order_id"])  # নিজের কাজ করছে

// Subscriber 2 — Analytics Service
def on_order(event):
  track_revenue(event["total"])  # সেও একই event পেয়েছে

// দুজনই একই message পেয়েছে, স্বাধীনভাবে কাজ করেছে

কোনটা কখন ব্যবহার করবো?

Message Queue বেছে নেবো যখন:

Pub/Sub বেছে নেবো যখন:

RabbitMQ এবং Apache Kafka এর মধ্যে মূল পার্থক্য কি?

RabbitMQ কীভাবে কাজ করে

rabbit-1

Apache Kafka কীভাবে কাজ করে

kafka-1

RabbitMQ এর Broker কে Smart এবং Consumer কে Dumb বলা হয়। কারণ কী?

RabbitMQ broker অনেক কাজ করে:

Consumer সাধারণত শুধু message receive করে এবং process করে। সেজন্য RabbitMQ এর Broker কে Smart এবং Consumer কে Dumb বলা হয়।

Apache Kafka এর Consumer কে Smart এবং Broker কে Dumb বলা হয়। কারণ কী?

Kafka broker-এর কাজ তুলনামূলকভাবে সহজ:

Business logic এবং processing strategy consumer-এর দায়িত্ব।

RabbitMQ ব্যবহার করুন যখন

আপনার মূল সমস্যা হলো:

“এই কাজটা কে করবে?”

অর্থাৎ আপনি work distribution বা task processing করতে চান।

ভালো use cases,

উদাহরণ,

User একটি ছবি upload করল।

Upload API
  |
  v
RabbitMQ
  |
  v
Image Processing Worker

Worker image process করল, কাজ শেষ, message-ও শেষ।

এখানে replay বা historical event store করার দরকার নেই।

Kafka ব্যবহার করুন যখন

আপনার মূল সমস্যা হলো:

“কি কি ঘটনা ঘটছে, সেগুলো সবাইকে জানাতে হবে এবং ভবিষ্যতের জন্য সংরক্ষণ করতে হবে।”

ভালো use cases,

উদাহরণ,

User order করল।

Order Service
      |
      v
Order Topic (Kafka)
      |
-----------------------------
|            |             |
Billing   Analytics   Notification

একই event অনেক service consume করতে পারে।

পরে নতুন Fraud Detection Service যোগ হলেও পুরোনো event replay করে শুরু করতে পারবে।

গুরুত্বপূর্ণ প্রশ্নগুলো