system-design-bangla

Kafka

Kafka খুবই জনপ্রিয় Message Queue। High-performance data pipelines, streaming analytics, data integration, এবং mission-critical applications গুলোর জন্য Kafka ব্যবহার করা হয়।

Kafka এর components

Cluster

Broker

Topic

Topic এর ভিতর Data Partitioning হয়ে থাকে। প্রতিটি Partition এ মূলত Set of Messages বিদ্যমান থাকে। প্রতিটি message identifie করা হয় partition এর offset দ্বারা। Messages গুলো partition এর শেষে যুক্ত হয় এবং অন্য দিক থেকে send করা হয়।

Partition

প্রতিটি মেসেজ একটি Dedicated Partition এ চলে যাবে।

Consumer

ছবিতে দুটি Partition এবং একটি Consumer আছে, Kafka এর আর্কিটেকচার অনুযায়ী একধিক Partition থেকে ডেটা consume করতে পারবে।

Consumer

এখানে দুটি Partition এবং দুটি Consumer আছে, Kafka এই দুটি Partition এবং দুটি Consumer কে সমানভাবে ডিস্ট্রিবিউট (Auto Balancing) করে দিবে।

Consumer

যেহেতু ১টি Partition কে কেবল ১টি Consumer নিতে পারবে সেজন্য consumer 3 কিছু consume করতে পারবে না।

zookeeper

RabbitMQ

এটি একটি জনপ্রিয় Message Broker যা Message Queue - implement করে থাকে।

Message Broker কি?

যে system - message routing, delivery, exchanges, queues, acknowledgements, retries ইত্যাদি manage করে তাই হচ্ছে Message Broker।

RabbitMQ এর components

rabbitmq-basic

Basic Code Example

Connection Manager

// rabbitmq/connection.js
const amqp = require("amqplib");

let connection = null;
let channel = null;

async function connectRabbitMQ() {
  try {
    connection = await amqp.connect(process.env.RABBITMQ_URL);

    connection.on("error", (err) => {
      console.error("RabbitMQ connection error:", err.message);
    });

    connection.on("close", () => {
      console.error("RabbitMQ connection closed. Reconnecting...");
      setTimeout(connectRabbitMQ, 5000);
    });

    channel = await connection.createChannel();

    console.log("✅ RabbitMQ connected");
  } catch (err) {
    console.error("❌ RabbitMQ connection failed:", err.message);
    setTimeout(connectRabbitMQ, 5000);
  }
}

function getChannel() {
  if (!channel) throw new Error("Channel not initialized");
  return channel;
}

module.exports = {
  connectRabbitMQ,
  getChannel,
};

Producer/Publisher

// rabbitmq/producer.js
const { getChannel } = require("./connection");

async function publish(queue, message) {
  const channel = getChannel();

  await channel.assertQueue(queue, {
    durable: true, // survives restart
  });

  channel.sendToQueue(
    queue,
    Buffer.from(JSON.stringify(message)),
    {
      persistent: true, // message saved to disk
    }
  );

  console.log("📤 Message sent:", message);
}

module.exports = { publish };

Consumer/Worker

// rabbitmq/consumer.js
const { getChannel } = require("./connection");

async function consume(queue, handler) {
  const channel = getChannel();

  await channel.assertQueue(queue, {
    durable: true,
  });

  // Fair dispatch (important for scaling)
  channel.prefetch(1);

  console.log(`📥 Waiting for messages in ${queue}`);

  channel.consume(queue, async (msg) => {
    if (!msg) return;

    try {
      const data = JSON.parse(msg.content.toString());

      await handler(data);

      channel.ack(msg); // success
    } catch (err) {
      console.error("❌ Error processing message:", err);

      // Reject and requeue (or send to DLQ in real systems)
      channel.nack(msg, false, true);
    }
  });
}

module.exports = { consume };

Message Queue এবং Worker Thread এর তফাৎ

Message Queue এবং Worker Thread নিয়ে অনেকের ভিতর confusion কাজ করে – দুটোই asynchronous processing এ ব্যবহৃত হয়।

Message Queue হচ্ছে একটা asynchronous communication mechanism, যা ভিন্ন প্রসেস বা সার্ভিসের মধ্যে decoupled ভাবে কাজ করে। একটি service (producer) message পাঠায় queue-এ, এবং অন্য service (consumer) পরে সেটি নিয়ে প্রসেস করে।

Worker Thread হচ্ছে একটি thread যেটি background এ task execute করে। এটি একই process এর ভিতরে কাজ করে।

Message Queue এবং Worker Thread একসাথে ব্যবহার করা যায়। সাধারণত Message Queue থেকে message বা job নিয়ে Worker Thread সেগুলো process করে। Process শেষ হলে worker acknowledgement (ACK) পাঠায়।

mq-wt

Real life Message Queue use-cases

Duplicate অর্ডার সমস্যা — RabbitMQ এবং Idempotency

একদিন সকালে John তাদের ওয়েবসাইটের orders টেবিল দেখছিলেন। হঠাৎ চোখে পড়ল — একই অর্ডার একাধিকবার ঢুকে গেছে Production Database-এ। ব্যাপারটা কী?

সমস্যার শুরু কোথায়?

John দেখলেন, payment module-এ RabbitMQ চলছে। আর সেখানেই লুকিয়ে ছিল আসল সমস্যা।

RabbitMQ (বা যেকোনো message queue) একটা গ্যারান্টি দেয় — at-least-once delivery। মানে, একটা message অন্তত একবার পৌঁছাবেই। কিন্তু এটা exactly-once না। অর্থাৎ, কোনো কোনো সময় একই message একাধিকবারও আসতে পারে।

কীভাবে ঘটল?

স্বাভাবিক flow ছিল এরকম —

কিন্তু ধরুন, insert হওয়ার পর ACK পাঠানোর আগেই consumer crash করল, অথবা network চলে গেল।

RabbitMQ তখন ভাবল — “ACK আসেনি, তার মানে message process হয়নি।” তাই সে আবার message পাঠিয়ে দিল।

আর consumer সেই একই কাজ আবার করল — আবার insert। ফলাফল? duplicate এন্ট্রি।

সমাধান কী?

যদি consumer ভুলে একই data দুইবার insert করতে চায়, Database নিজেই সেটা reject করে দেবে। সহজ, কিন্তু কার্যকর।

তবে শুধু এটুকুতেই কাজ শেষ না।

Unique constraint error টা ধরবে ঠিকই, কিন্তু application-এ exception উঠবে। তাই এই exception টাকে “failure” হিসেবে না দেখে “ইতিমধ্যে হয়ে গেছে” হিসেবে handle করতে হবে।

আরেকটু পরিষ্কার সমাধান হলো idempotency key — সাধারণত RabbitMQ-এর message_id ব্যবহার করা হয় এই কাজে।

প্রতিটা message-এর একটা unique ID থাকে। সেই ID টা আলাদা একটা জায়গায় রেখে দিন। পরের বার একই message আসলে আগেই চেক করুন — এই ID কি আগে দেখা গেছে?

যদি হ্যাঁ → ACK করুন, কিন্তু আর কিছু করবেন না

যদি না → Insert করুন, তারপর ACK করুন

গুরুত্বপূর্ণ প্রশ্নগুলো