এটি একটি জনপ্রিয় Message Broker যা Message Queue - implement করে থাকে।
Message Broker কি?
যে system - message routing, delivery, exchanges, queues, acknowledgements, retries ইত্যাদি manage করে তাই হচ্ছে Message Broker।
Producer: Producer যার কাজ হচ্ছে message pass করা।
Exchange: producer থেকে message নিয়ে routing rule অনুযায়ী queue-তে পাঠায়।
Queue: যেখানে message store থাকে, যতক্ষণ না consumer তা receive করে।
Broker: এটি মেসেজের ডিস্ট্রিবিউটর। এর কাজ শুধু মেসেজ সঠিক জায়গায় পৌঁছানো নিশ্চিত করা।
Consumer: যে queue থেকে message পড়ে। প্রসেসিং করার মূল দায়িত্ব consumer এর।
Acknowledgement: consumer message process এর পর ack (acknowledgement) পাঠায়। ack না পেলে broker ধরে নেয় message properly process হয়নি। তখন message requeue/retry হতে পারে।
Durability: আপনি যখন একটি কিউ (Queue) তৈরি করেন, তখন তাকে durable: true হিসেবে ডিক্লেয়ার করতে হয়। যার ফলে Broker restart দিলেও Queue টিকে থাকবে। ধরুন, আপনার একটি ই-কমার্স সাইট আছে। কাস্টমার অর্ডার করার পর আপনি একটি মেসেজ পাঠালেন। হঠাৎ করে RabbitMQ সার্ভারে পাওয়ার disconnect হলো। এখন durable: true থাকলে, সার্ভার চালু হওয়ার পর RabbitMQ ডিস্ক থেকে মেসেজগুলো reload করবে এবং কনজিউমার সেগুলো প্রসেস করতে পারবে।
Prefetch: Prefetch হলো একটি কন্ট্রোল মেকানিজম, যা নির্ধারণ করে একজন Consumer-এর কাছে একসাথে কতগুলো মেসেজ পাঠানো হবে।
// rabbitmq/connection.js
const amqp = require("amqplib");
let connection = null;
let channel = null;
async function connectRabbitMQ() {
try {
connection = await amqp.connect(process.env.RABBITMQ_URL);
connection.on("error", (err) => {
console.error("RabbitMQ connection error:", err.message);
});
connection.on("close", () => {
console.error("RabbitMQ connection closed. Reconnecting...");
setTimeout(connectRabbitMQ, 5000);
});
channel = await connection.createChannel();
console.log("✅ RabbitMQ connected");
} catch (err) {
console.error("❌ RabbitMQ connection failed:", err.message);
setTimeout(connectRabbitMQ, 5000);
}
}
function getChannel() {
if (!channel) throw new Error("Channel not initialized");
return channel;
}
module.exports = {
connectRabbitMQ,
getChannel,
};
// rabbitmq/producer.js
const { getChannel } = require("./connection");
async function publish(queue, message) {
const channel = getChannel();
await channel.assertQueue(queue, {
durable: true, // survives restart
});
channel.sendToQueue(
queue,
Buffer.from(JSON.stringify(message)),
{
persistent: true, // message saved to disk
}
);
console.log("📤 Message sent:", message);
}
module.exports = { publish };
// rabbitmq/consumer.js
const { getChannel } = require("./connection");
async function consume(queue, handler) {
const channel = getChannel();
await channel.assertQueue(queue, {
durable: true,
});
// Fair dispatch (important for scaling)
channel.prefetch(1);
console.log(`📥 Waiting for messages in ${queue}`);
channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const data = JSON.parse(msg.content.toString());
await handler(data);
channel.ack(msg); // success
} catch (err) {
console.error("❌ Error processing message:", err);
// Reject and requeue (or send to DLQ in real systems)
channel.nack(msg, false, true);
}
});
}
module.exports = { consume };
ধরুন peak time-এ আপনার সিস্টেমে প্রতি সেকেন্ডে ১০০০টা মেসেজ আসছে, কিন্তু আপনার consumer প্রতি সেকেন্ডে মাত্র ২০০টা প্রসেস করতে পারে। তাহলে প্রতি সেকেন্ডে ৮০০টা মেসেজ queue-এ জমা হতে থাকবে। Peak শেষ হওয়ার পরও এই backlog ক্লিয়ার করতে ৪০-৫০ মিনিট লাগবে (যদি কোনো নতুন মেসেজ না-ই আসে)।
এর উপর যদি consumer-এ error হয় এবং মেসেজ nack/requeue হয়, তাহলে queue আরও বড় হতে থাকবে।
ভুল চিন্তাভাবনা: “Just queue them all” — সব মেসেজ queue-তে ফেলে দিই, পরে দেখা যাবে।
সঠিক চিন্তাভাবনা: “Can our consumers catch up eventually?” — যদি queue সাইজ ক্রমাগত বাড়তেই থাকে এবং consumer কখনো catch up করতে না পারে, তাহলে সিস্টেম একসময় ভেঙে পড়বে। প্রশ্নটা হওয়া উচিত — আমাদের consumer-রা কি কখনো এই backlog সামলে উঠতে পারবে?
সমাধান: Dynamic Worker Scaling
Queue-এর সাইজ (depth) মনিটর করে, সেই অনুযায়ী consumer/worker-এর সংখ্যা বাড়ানো বা কমানো (auto-scale)। যেমন:
এভাবে peak load-এও সিস্টেম backlog ক্লিয়ার করতে পারে, এবং normal time-এ অতিরিক্ত resource খরচ হয় না।
Message Queue এবং Worker Thread নিয়ে অনেকের ভিতর confusion কাজ করে – দুটোই asynchronous processing এ ব্যবহৃত হয়।
Message Queue হচ্ছে একটা asynchronous communication mechanism, যা ভিন্ন প্রসেস বা সার্ভিসের মধ্যে decoupled ভাবে কাজ করে। একটি service (producer) message পাঠায় queue-এ, এবং অন্য service (consumer) পরে সেটি নিয়ে প্রসেস করে।
Worker Thread হচ্ছে একটি thread যেটি background এ task execute করে। এটি একই process এর ভিতরে কাজ করে।
Message Queue এবং Worker Thread একসাথে ব্যবহার করা যায়। সাধারণত Message Queue থেকে message বা job নিয়ে Worker Thread সেগুলো process করে। Process শেষ হলে worker acknowledgement (ACK) পাঠায়।
একদিন সকালে John তাদের ওয়েবসাইটের orders টেবিল দেখছিলেন। হঠাৎ চোখে পড়ল — একই অর্ডার একাধিকবার ঢুকে গেছে Production Database-এ। ব্যাপারটা কী?
সমস্যার শুরু কোথায়?
John দেখলেন, payment module-এ RabbitMQ চলছে। আর সেখানেই লুকিয়ে ছিল আসল সমস্যা।
RabbitMQ (বা যেকোনো message queue) একটা গ্যারান্টি দেয় — at-least-once delivery। মানে, একটা message অন্তত একবার পৌঁছাবেই। কিন্তু এটা exactly-once না। অর্থাৎ, কোনো কোনো সময় একই message একাধিকবারও আসতে পারে।
কীভাবে ঘটল?
স্বাভাবিক flow ছিল এরকম —
কিন্তু ধরুন, insert হওয়ার পর ACK পাঠানোর আগেই consumer crash করল, অথবা network চলে গেল।
RabbitMQ তখন ভাবল — “ACK আসেনি, তার মানে message process হয়নি।” তাই সে আবার message পাঠিয়ে দিল।
আর consumer সেই একই কাজ আবার করল — আবার insert। ফলাফল? duplicate এন্ট্রি।
সমাধান কী?
যদি consumer ভুলে একই data দুইবার insert করতে চায়, Database নিজেই সেটা reject করে দেবে। সহজ, কিন্তু কার্যকর।
তবে শুধু এটুকুতেই কাজ শেষ না।
Unique constraint error টা ধরবে ঠিকই, কিন্তু application-এ exception উঠবে। তাই এই exception টাকে “failure” হিসেবে না দেখে “ইতিমধ্যে হয়ে গেছে” হিসেবে handle করতে হবে।
আরেকটু পরিষ্কার সমাধান হলো idempotency key — সাধারণত RabbitMQ-এর message_id ব্যবহার করা হয় এই কাজে।
প্রতিটা message-এর একটা unique ID থাকে। সেই ID টা আলাদা একটা জায়গায় রেখে দিন। পরের বার একই message আসলে আগেই চেক করুন — এই ID কি আগে দেখা গেছে?
যদি হ্যাঁ → ACK করুন, কিন্তু আর কিছু করবেন না
যদি না → Insert করুন, তারপর ACK করুন
ধরেন আপনি একটা রেস্টুরেন্টের কিচেনে কাজ করছেন। অর্ডার আসে, একটা তালিকায় জমা হয়, এবং যে রাঁধুনি ফ্রি সে অর্ডারটা তুলে নেয়। এক অর্ডার, এক রাঁধুনি।
// Producer — কাজ পাঠাচ্ছে
queue.send("resize_image", {"file": "photo.jpg", "size": "800x600"})
// Consumer A (একজনই এটা process করবে)
task = queue.receive() # "resize_image" পেয়ে কাজ করল
// Queue থেকে message delete হয়ে গেল
মূল বৈশিষ্ট্য: message একবার consume হলে চলে যায়। Consumer B বা C এটা আর দেখবে না। Load balancing নিজেই হয় — ৩টা consumer থাকলে ৩টা task parallel-এ চলবে।
এটি একপ্রকারের রেডিও ব্রডকাস্ট। একজন বলছে, যতজন শুনছে সবাই সেটা পাচ্ছে। কেউ শুনলো বা না শুনলো সেটা broadcaster-এর দায় নয়।
// Publisher — event পাঠাচ্ছে
topic.publish("order_placed", {"order_id": 42, "total": 1500})
// Subscriber 1 — Email Service
def on_order(event):
send_confirmation_email(event["order_id"]) # নিজের কাজ করছে
// Subscriber 2 — Analytics Service
def on_order(event):
track_revenue(event["total"]) # সেও একই event পেয়েছে
// দুজনই একই message পেয়েছে, স্বাধীনভাবে কাজ করেছে
Message Queue বেছে নেবো যখন:
Pub/Sub বেছে নেবো যখন:
RabbitMQ এর Broker কে Smart এবং Consumer কে Dumb বলা হয়। কারণ কী?
RabbitMQ broker অনেক কাজ করে:
Consumer সাধারণত শুধু message receive করে এবং process করে। সেজন্য RabbitMQ এর Broker কে Smart এবং Consumer কে Dumb বলা হয়।
Apache Kafka এর Consumer কে Smart এবং Broker কে Dumb বলা হয়। কারণ কী?
Kafka broker-এর কাজ তুলনামূলকভাবে সহজ:
Business logic এবং processing strategy consumer-এর দায়িত্ব।
আপনার মূল সমস্যা হলো:
“এই কাজটা কে করবে?”
অর্থাৎ আপনি work distribution বা task processing করতে চান।
ভালো use cases,
উদাহরণ,
User একটি ছবি upload করল।
Upload API
|
v
RabbitMQ
|
v
Image Processing Worker
Worker image process করল, কাজ শেষ, message-ও শেষ।
এখানে replay বা historical event store করার দরকার নেই।
আপনার মূল সমস্যা হলো:
“কি কি ঘটনা ঘটছে, সেগুলো সবাইকে জানাতে হবে এবং ভবিষ্যতের জন্য সংরক্ষণ করতে হবে।”
ভালো use cases,
উদাহরণ,
User order করল।
Order Service
|
v
Order Topic (Kafka)
|
-----------------------------
| | |
Billing Analytics Notification
একই event অনেক service consume করতে পারে।
পরে নতুন Fraud Detection Service যোগ হলেও পুরোনো event replay করে শুরু করতে পারবে।