C# Producer Consumer using Rabbit MQ

So I've been learning about the different message queuing systems and discovered rabbit MQ, an open source and very popular message queuing system. I've created a very basic producer/consumer system that sends a message to a rabbit MQ queue.

Producer/Consumer is pretty simple, you have two systems a producer that writes to a queue and a consumer that reads from a queue. This is desirable because it splits responsibility and allows for scalability. The code below is going to assume you have a rabbit MQ instance installed on your local machine. You can get rabbit MQ here: https://www.rabbitmq.com/

Now the code is broken down into 3 parts, the main program, the producer, and the consumer. This is a basic example so its using old school threading rather than the new async/await .net prefers. But this is just to show how to use rabbit MQ in C# and to demonstrate a producer/consumer example. You will need to make sure your project has the rabbit MQ client nuget package installed as well.

The code in a nutshell starts two threads, a producer thread, and a consumer thread. The producer writes to the queue and the consumer reads from it. I'm sure this code could be much more optimized and in production these would most likely be completely separate processes but for demonstration we are using threads.

The main program:
 using System;  
 using System.Threading;  
 namespace ProducerConsumer  
 {  
   class Program  
   {  
     static void Main(string[] args)  
     {  
       Console.WriteLine("Starting Producer/Consumer Threads");  
       var producerThread = new Thread(new ThreadStart(RunProducer));  
       var consumerThread = new Thread(new ThreadStart(RunConsumer));  
       producerThread.Start();  
       consumerThread.Start();  
       producerThread.Join();  
       consumerThread.Join();  
       while (true) { }  
     }  
     public static void RunProducer()  
     {  
       var producer = new Producer();  
       producer.SendMessage();  
     }  
     public static void RunConsumer()  
     {  
       var consumer = new Consumer();  
       consumer.Receive();  
     }  
   }  
 }  


The Producer Code:
 using RabbitMQ.Client;  
 using System;  
 using System.Text;  
 using System.Threading;  
 namespace ProducerConsumer  
 {  
   internal class Producer  
   {  
     public void SendMessage()  
     {  
       var counter = 1;  
       while(true)  
       {  
         var factory = new ConnectionFactory() { HostName = "localhost" };  
         using (var connection = factory.CreateConnection())  
         using (var channel = connection.CreateModel())  
         {  
           channel.QueueDeclare(queue: "test-queue",  
                      durable: true,  
                      exclusive: false,  
                      autoDelete: false,  
                      arguments: null);  
           var message = $"Hello World {counter++}!";  
           var body = Encoding.UTF8.GetBytes(message);  
           channel.BasicPublish(exchange: "test-exchange",  
                      routingKey: "test",  
                      basicProperties: null,  
                      body: body);  
           Console.WriteLine(" [x] Sent {0}", message);  
         }  
         Thread.Sleep(1000);  
       }  
     }  
   }  
 }  


The Consumer Code:
 using RabbitMQ.Client;  
 using RabbitMQ.Client.Events;  
 using System;  
 using System.Text;  
 using System.Threading;  
 namespace ProducerConsumer  
 {  
   internal class Consumer  
   {  
     public void Receive()  
     {  
       var factory = new ConnectionFactory() { HostName = "localhost" };  
       using (var connection = factory.CreateConnection())  
       using (var channel = connection.CreateModel())  
       {  
         channel.QueueDeclare(queue: "test-queue",  
                    durable: true,  
                    exclusive: false,  
                    autoDelete: false,  
                    arguments: null);  
         var consumer = new EventingBasicConsumer(channel);  
         consumer.Received += Consumer_Received;  
         while (true)  
         {  
           channel.BasicConsume(queue: "test-queue",  
                      autoAck: true,  
                      consumer: consumer);  
           Thread.Sleep(1000);  
         }  
       }  
     }  
     private void Consumer_Received(object sender, BasicDeliverEventArgs e)  
     {  
       var body = e.Body;  
       var message = Encoding.UTF8.GetString(body.ToArray());  
       Console.WriteLine(" [x] Received {0}", message);  
     }  
   }  
 }  

Comments

Popular posts from this blog

String.Replace vs Regex.Replace

C# Form Application in Kiosk Mode/Fullscreen

Javascript numeric only text box