This is a continuation of the
earlier article that introduced ServiceBus replication:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.ServiceBus;
using Azure.Messaging.ServiceBus;
using Microsoft.ServiceBus.Messaging;
namespace SBQueueSender
{
/*
* Program to complete structure and content replication of SB Entities from source namespace to destination namespace
*/
class Program
{
private static string connectionString = "";
private static string secondaryConnectionString = "";
private static string sourceTopicOrQueueName = "";
private static string sourceSubscriptionName = "";
private static string destinationTopicName = "";
private static string destinationSubscriptionName = "";
private static string replicateStructure = "";
private static string replicateContent = "";
static async Task Main(string[] args)
{
IConfigurationBuilder builder = new ConfigurationBuilder().AddJsonFile("appsettings.json");
IConfigurationRoot config = builder.Build();
connectionString = config["primaryConnectionString"];
secondaryConnectionString = config["secondaryConnectionString"];
sourceTopicOrQueueName = config["sourceTopicName"];
sourceSubscriptionName = config["sourceSubscriptionName"];
destinationTopicName = config["destinationTopicName"];
destinationSubscriptionName = config["destinationSubscriptionName"];
replicateStructure = config["replicateStructure"];
replicateContent = config["replicateContent"];
if (string.IsNullOrWhiteSpace(connectionString) ||
string.IsNullOrWhiteSpace(secondaryConnectionString) ||
string.IsNullOrWhiteSpace(sourceTopicOrQueueName) ||
string.IsNullOrWhiteSpace(sourceSubscriptionName) ||
string.IsNullOrWhiteSpace(destinationTopicName) ||
string.IsNullOrWhiteSpace(destinationSubscriptionName) ||
string.IsNullOrWhiteSpace(replicateStructure) ||
string.IsNullOrWhiteSpace(replicateContent))
{
Console.WriteLine("Please enter appropriate values in appsettings.json file. Exiting...");
await Task.Delay(2000);
}
else
{
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
if (Boolean.TrueString.CompareTo(replicateStructure) == 0)
{
await ReplicateStructure(namespaceManager, secondaryConnectionString);
}
if (Boolean.TrueString.CompareTo(replicateContent) == 0)
{
await ReplicateContent(namespaceManager, secondaryConnectionString);
}
}
}
/// <summary>
/// Replicates the structure of the SB namespace.
/// </summary>
/// <param name="namespaceManager"></param>
/// <param name="destinationConnectionString"></param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public static async Task ReplicateStructure(NamespaceManager namespaceManager, string destinationConnectionString)
{
var targetNamespaceManager = NamespaceManager.CreateFromConnectionString(destinationConnectionString);
foreach (var queue in namespaceManager.GetQueues())
{
try
{
Console.WriteLine(string.Format("Creating queue {0} ", queue.Path));
targetNamespaceManager.CreateQueue(queue);
}
catch (MessagingEntityAlreadyExistsException)
{
Console.WriteLine(string.Format("Queue {0} already exists.", queue.Path));
}
}
foreach (var topic in namespaceManager.GetTopics())
{
try
{
Console.WriteLine(string.Format("Creating topic {0} ", topic.Path));
targetNamespaceManager.CreateTopic(topic);
}
catch (MessagingEntityAlreadyExistsException)
{
Console.WriteLine(string.Format("Topic {0} already exits...", topic.Path));
}
foreach (var subscription in namespaceManager.GetSubscriptions(topic.Path))
{
try
{
Console.WriteLine(string.Format("Creating subscription {0} ", subscription.Name));
targetNamespaceManager.CreateSubscription(subscription);
}
catch (MessagingEntityAlreadyExistsException)
{
Console.WriteLine(string.Format("Subscription {0} already exits...", subscription.Name));
}
}
}
}
/// <summary>
/// Replicates the contents of the SB entities
/// </summary>
/// <param name="namespaceManager"></param>
/// <param name="destinationConnectionString"></param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public static async Task ReplicateContent(NamespaceManager namespaceManager, string destinationConnectionString)
{
await using var client = new ServiceBusClient(connectionString);
foreach (var sbQueue in namespaceManager.GetQueues("messageCount Gt 0"))
{
sourceTopicOrQueueName = sbQueue.Path;
ServiceBusReceiver receiver = client.CreateReceiver(sbQueue.Path);
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync(new TimeSpan(0,0,30));
await SendMessageToTopicAsync(destinationConnectionString, sbQueue.Path, receivedMessage);
}
foreach (var sbTopic in namespaceManager.GetTopics("messageCount Gt 0"))
{
sourceTopicOrQueueName = sbTopic.Path;
ServiceBusReceiver receiver = client.CreateReceiver(sbTopic.Path);
foreach (var sbSub in namespaceManager.GetSubscriptions(sbTopic.Path))
{
sourceSubscriptionName = sbSub.Name;
Console.WriteLine("Listening on topic: {0} with subscription: {1}", sourceTopicOrQueueName, sourceSubscriptionName);
await ReceiveMessagesFromSubscriptionAsync(namespaceManager, sbTopic.Path, connectionString, sbTopic.Path, sbSub.Name);
}
}
}
private static async Task ReceiveMessagesFromSubscriptionAsync(NamespaceManager namespaceManager, string sbTopicPath, string connectionString, string sourceTopicName, string sourceSubscriptionName)
{
try
{
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(sourceTopicName, sourceSubscriptionName, new ServiceBusProcessorOptions()
{
MaxConcurrentCalls = 1,
AutoCompleteMessages = false,
});
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine("press any key to stop ...");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
catch (Exception e)
{
Console.WriteLine("Error: ", e);
}
}
private static async Task MessageHandler(ProcessMessageEventArgs args)
{
string id = args.Message.MessageId;
Console.WriteLine($"Received: {id} from subscription: {sourceSubscriptionName}");
await SendMessageToTopicAsync(secondaryConnectionString, sourceTopicOrQueueName, args.Message);
Console.WriteLine($"Sent: {id} from subscription: {sourceSubscriptionName}");
// complete the message. messages is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}
private static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
private static async Task SendMessageToTopicAsync(string connectionString, string destinationTopicName, ServiceBusReceivedMessage serviceBusReceivedMessage)
{
if (serviceBusReceivedMessage != null)
{
try
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the topic
ServiceBusSender sender = client.CreateSender(destinationTopicName);
await sender.SendMessageAsync(new ServiceBusMessage(serviceBusReceivedMessage));
Console.WriteLine($"Sent a single message to the topic: {destinationTopicName}");
}
}
catch (Exception e)
{
Console.WriteLine("Error: ", e);
}
}
else
{
Console.WriteLine("received null message.");
}
}
private static async Task SendTestMessageToTopicAsync(string body)
{
// create a Service Bus client
try
{
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the topic
ServiceBusSender sender = client.CreateSender(sourceTopicOrQueueName);
await sender.SendMessageAsync(new ServiceBusMessage(body));
Console.WriteLine($"Sent a single message to the topic: {sourceTopicOrQueueName}");
}
}
catch (Exception e)
{
Console.WriteLine("Error: ", e);
}
}
}
}