Sunday, June 20, 2021

This is a continuation of the earlier article that introduced ServiceBus replication:
 Reference:
// Sample program for data migration
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.ServiceBus;
using Azure.Messaging.ServiceBus;
 
namespace SBQueueSender
{
    class Program
    {
        private static string connectionString = "";
        private static string secondaryConnectionString = "";
        private static string sourceTopicOrQueueName = "";
        private static string sourceSubscriptionName = "";
        private static string destinationTopicOrQueueName = "";
        private static string destinationSubscriptionName = "";
 
        static async Task Main(string[] args)
        {
            IConfigurationBuilder builder = new ConfigurationBuilder().AddJsonFile("appsettings.json");
            IConfigurationRoot config = builder.Build();
            connectionString = config["primaryConnectionString"];
            secondaryConnectionString = config["secondaryConnectionString"];
            sourceTopicOrQueueName = config["sourceTopicOrQueueName"];
            sourceSubscriptionName = config["sourceSubscriptionName"];
            destinationTopicOrQueueName = config["destinationTopicOrQueueName"];
            destinationSubscriptionName = config["destinationSubscriptionName"];
 
            if (string.IsNullOrWhiteSpace(connectionString) ||
                string.IsNullOrWhiteSpace(secondaryConnectionString) ||
                string.IsNullOrWhiteSpace(sourceTopicOrQueueName) ||
                string.IsNullOrWhiteSpace(sourceSubscriptionName) ||
                string.IsNullOrWhiteSpace(destinationTopicOrQueueName) ||
                string.IsNullOrWhiteSpace(destinationSubscriptionName))
            {
                Console.WriteLine("Please enter appropriate values in appsettings.json file. Exiting...");
                await Task.Delay(2000);
            }
            else
            {
                NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
                await using var client = new ServiceBusClient(connectionString);
                foreach (var sbQueue in namespaceManager.GetQueues("messageCount Gt 0"))
                {
                    sourceTopicOrQueueName = sbQueue.Path;
                    ServiceBusReceiver receiver = client.CreateReceiver(sbQueue.Path);
                    ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
                    await SendMessageToTopicAsync(secondaryConnectionString, sbQueue.Path, receivedMessage);
                }
 
                foreach (var sbTopic in namespaceManager.GetTopics("messageCount Gt 0"))
                {
                    sourceTopicOrQueueName = sbTopic.Path;
                    ServiceBusReceiver receiver = client.CreateReceiver(sbTopic.Path);
                    foreach (var sbSub in namespaceManager.GetSubscriptions(sbTopic.Path))
                    {
                        sourceSubscriptionName = sbSub.Name;
                        Console.WriteLine("Listening on topic: {0} with subscription: {1}", sourceTopicOrQueueName, sourceSubscriptionName);
                        await ReceiveMessagesFromSubscriptionAsync(connectionString, sbTopic.Path, sbSub.Name);
                    }
                }
            }
        }
 
        private static async Task ReceiveMessagesFromSubscriptionAsync(string connectionString, string sourceTopicName, string sourceSubscriptionName)
        {
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a processor that we can use to process the messages
                ServiceBusProcessor processor = client.CreateProcessor(sourceTopicName, sourceSubscriptionName, new ServiceBusProcessorOptions());
 
                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;
 
                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;
 
                // start processing 
                await processor.StartProcessingAsync();
 
                while (processor.IsProcessing)
                {
                    System.Threading.Thread.Sleep(30000);
                }
 
                Console.WriteLine("Wait for a minute and then press any key for confirmation to end the processing");
                Console.ReadKey();
 
                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
        }
 
        private static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string id = args.Message.MessageId;
            Console.WriteLine($"Received: {id} from subscription: {sourceSubscriptionName}");
 
            await SendMessageToTopicAsync(secondaryConnectionString, sourceTopicOrQueueName, args.Message);
            Console.WriteLine($"Sent: {id} from subscription: {sourceSubscriptionName}");
 
            // complete the message. messages is deleted from the queue. 
            await args.CompleteMessageAsync(args.Message);
        }
 
        private static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
 
        private static async Task SendMessageToTopicAsync(string connectionString, string destinationTopicOrQueueName, ServiceBusReceivedMessage serviceBusReceivedMessage)
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the topic
                ServiceBusSender sender = client.CreateSender(destinationTopicOrQueueName);
                await sender.SendMessageAsync(new ServiceBusMessage(serviceBusReceivedMessage));
                Console.WriteLine($"Sent a single message to the topic: {destinationTopicOrQueueName}");
            }
        }
 
        private static async Task SendTestMessageToTopicAsync(string body)
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the topic
                ServiceBusSender sender = client.CreateSender(sourceTopicOrQueueName);
                await sender.SendMessageAsync(new ServiceBusMessage(body));
                Console.WriteLine($"Sent a single message to the topic: {sourceTopicOrQueueName}");
            }
        }
    }
}

No comments:

Post a Comment