Monday, June 21, 2021

This is a continuation of the earlier article that introduced ServiceBus replication:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.ServiceBus;
using Azure.Messaging.ServiceBus;
using Microsoft.ServiceBus.Messaging;

namespace SBQueueSender
{
    /*
     * Program to complete structure and content replication of SB Entities from source namespace to destination namespace
     */
    class Program
    {
        private static string connectionString = "";
        private static string secondaryConnectionString = "";
        private static string sourceTopicOrQueueName = "";
        private static string sourceSubscriptionName = "";
        private static string destinationTopicName = "";
        private static string destinationSubscriptionName = "";
        private static string replicateStructure = "";
        private static string replicateContent = "";

        static async Task Main(string[] args)
        {
            IConfigurationBuilder builder = new ConfigurationBuilder().AddJsonFile("appsettings.json");
            IConfigurationRoot config = builder.Build();
            connectionString = config["primaryConnectionString"];
            secondaryConnectionString = config["secondaryConnectionString"];
            sourceTopicOrQueueName = config["sourceTopicName"];
            sourceSubscriptionName = config["sourceSubscriptionName"];
            destinationTopicName = config["destinationTopicName"];
            destinationSubscriptionName = config["destinationSubscriptionName"];
            replicateStructure = config["replicateStructure"];
            replicateContent = config["replicateContent"];

            if (string.IsNullOrWhiteSpace(connectionString) ||
                string.IsNullOrWhiteSpace(secondaryConnectionString) ||
                string.IsNullOrWhiteSpace(sourceTopicOrQueueName) ||
                string.IsNullOrWhiteSpace(sourceSubscriptionName) ||
                string.IsNullOrWhiteSpace(destinationTopicName) ||
                string.IsNullOrWhiteSpace(destinationSubscriptionName) ||
                string.IsNullOrWhiteSpace(replicateStructure) ||
                string.IsNullOrWhiteSpace(replicateContent))
            {
                Console.WriteLine("Please enter appropriate values in appsettings.json file. Exiting...");
                await Task.Delay(2000);
            }
            else
            {
                NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
                if (Boolean.TrueString.CompareTo(replicateStructure) == 0)
                {
                    await ReplicateStructure(namespaceManager, secondaryConnectionString);
                }

                if (Boolean.TrueString.CompareTo(replicateContent) == 0)
                {
                    await ReplicateContent(namespaceManager, secondaryConnectionString);
                }
            }
        }

        /// <summary>
        /// Replicates the structure of the SB namespace.
        /// </summary>
        /// <param name="namespaceManager"></param>
        /// <param name="destinationConnectionString"></param>
        /// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
        public static async Task ReplicateStructure(NamespaceManager namespaceManager, string destinationConnectionString)
        {
            var targetNamespaceManager = NamespaceManager.CreateFromConnectionString(destinationConnectionString);
            foreach (var queue in namespaceManager.GetQueues())
            {
                try
                {
                    Console.WriteLine(string.Format("Creating queue {0} ", queue.Path));
                    targetNamespaceManager.CreateQueue(queue);
                }
                catch (MessagingEntityAlreadyExistsException)
                {
                    Console.WriteLine(string.Format("Queue {0} already exists.", queue.Path));
                }
            }

            foreach (var topic in namespaceManager.GetTopics())
            {
                try
                {
                    Console.WriteLine(string.Format("Creating topic {0} ", topic.Path));
                    targetNamespaceManager.CreateTopic(topic);
                }
                catch (MessagingEntityAlreadyExistsException)
                {
                    Console.WriteLine(string.Format("Topic {0}  already exits...", topic.Path));
                }

                foreach (var subscription in namespaceManager.GetSubscriptions(topic.Path))
                {
                    try
                    {
                        Console.WriteLine(string.Format("Creating subscription {0} ", subscription.Name));
                        targetNamespaceManager.CreateSubscription(subscription);
                    }
                    catch (MessagingEntityAlreadyExistsException)
                    {
                        Console.WriteLine(string.Format("Subscription {0}  already exits...", subscription.Name));
                    }
                }
            }
        }

        /// <summary>
        /// Replicates the contents of the SB entities
        /// </summary>
        /// <param name="namespaceManager"></param>
        /// <param name="destinationConnectionString"></param>
        /// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
        public static async Task ReplicateContent(NamespaceManager namespaceManager, string destinationConnectionString)
        {
            await using var client = new ServiceBusClient(connectionString);
            foreach (var sbQueue in namespaceManager.GetQueues("messageCount Gt 0"))
            {
                sourceTopicOrQueueName = sbQueue.Path;
                ServiceBusReceiver receiver = client.CreateReceiver(sbQueue.Path);
                ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync(new TimeSpan(0,0,30));
                await SendMessageToTopicAsync(destinationConnectionString, sbQueue.Path, receivedMessage);
            }

            foreach (var sbTopic in namespaceManager.GetTopics("messageCount Gt 0"))
            {
                sourceTopicOrQueueName = sbTopic.Path;
                ServiceBusReceiver receiver = client.CreateReceiver(sbTopic.Path);
                foreach (var sbSub in namespaceManager.GetSubscriptions(sbTopic.Path))
                {
                    sourceSubscriptionName = sbSub.Name;
                    Console.WriteLine("Listening on topic: {0} with subscription: {1}", sourceTopicOrQueueName, sourceSubscriptionName);
                    await ReceiveMessagesFromSubscriptionAsync(namespaceManager, sbTopic.Path, connectionString, sbTopic.Path, sbSub.Name);
                }
            }
        }

        private static async Task ReceiveMessagesFromSubscriptionAsync(NamespaceManager namespaceManager, string sbTopicPath, string connectionString, string sourceTopicName, string sourceSubscriptionName)
        {
            try
            {
                await using (ServiceBusClient client = new ServiceBusClient(connectionString))
                {
                    // create a processor that we can use to process the messages
                    ServiceBusProcessor processor = client.CreateProcessor(sourceTopicName, sourceSubscriptionName, new ServiceBusProcessorOptions()
                    {
                        MaxConcurrentCalls = 1,
                        AutoCompleteMessages = false,
                    });

                    // add handler to process messages
                    processor.ProcessMessageAsync += MessageHandler;

                    // add handler to process any errors
                    processor.ProcessErrorAsync += ErrorHandler;

                    // start processing 
                    await processor.StartProcessingAsync();

                    Console.WriteLine("press any key to stop ...");
                    Console.ReadKey();

                    // stop processing 
                    Console.WriteLine("\nStopping the receiver...");
                    await processor.StopProcessingAsync();
                    Console.WriteLine("Stopped receiving messages");
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Error: ", e);
            }
        }

        private static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string id = args.Message.MessageId;
            Console.WriteLine($"Received: {id} from subscription: {sourceSubscriptionName}");

            await SendMessageToTopicAsync(secondaryConnectionString, sourceTopicOrQueueName, args.Message);
            Console.WriteLine($"Sent: {id} from subscription: {sourceSubscriptionName}");

            // complete the message. messages is deleted from the queue.
            await args.CompleteMessageAsync(args.Message);
        }

        private static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }

        private static async Task SendMessageToTopicAsync(string connectionString, string destinationTopicName, ServiceBusReceivedMessage serviceBusReceivedMessage)
        {
            if (serviceBusReceivedMessage != null)
            {
                try
                {
                    // create a Service Bus client
                    await using (ServiceBusClient client = new ServiceBusClient(connectionString))
                    {
                        // create a sender for the topic
                        ServiceBusSender sender = client.CreateSender(destinationTopicName);
                        await sender.SendMessageAsync(new ServiceBusMessage(serviceBusReceivedMessage));
                        Console.WriteLine($"Sent a single message to the topic: {destinationTopicName}");
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine("Error: ", e);
                }
            }
            else
            {
                Console.WriteLine("received null message.");
            }
        }

        private static async Task SendTestMessageToTopicAsync(string body)
        {
            // create a Service Bus client 
            try
            {
                await using (ServiceBusClient client = new ServiceBusClient(connectionString))
                {
                    // create a sender for the topic
                    ServiceBusSender sender = client.CreateSender(sourceTopicOrQueueName);
                    await sender.SendMessageAsync(new ServiceBusMessage(body));
                    Console.WriteLine($"Sent a single message to the topic: {sourceTopicOrQueueName}");
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Error: ", e);
            }
        }
    }
}

No comments:

Post a Comment