This is a continuation of the earlier article that introduced ServiceBus replication:
Reference:
// Sample program for data migration
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.ServiceBus;
using Azure.Messaging.ServiceBus;
namespace SBQueueSender
{
class Program
{
private static string connectionString = "";
private static string secondaryConnectionString = "";
private static string sourceTopicOrQueueName = "";
private static string sourceSubscriptionName = "";
private static string destinationTopicOrQueueName = "";
private static string destinationSubscriptionName = "";
static async Task Main(string[] args)
{
IConfigurationBuilder builder = new ConfigurationBuilder().AddJsonFile("appsettings.json");
IConfigurationRoot config = builder.Build();
connectionString = config["primaryConnectionString"];
secondaryConnectionString = config["secondaryConnectionString"];
sourceTopicOrQueueName = config["sourceTopicOrQueueName"];
sourceSubscriptionName = config["sourceSubscriptionName"];
destinationTopicOrQueueName = config["destinationTopicOrQueueName"];
destinationSubscriptionName = config["destinationSubscriptionName"];
if (string.IsNullOrWhiteSpace(connectionString) ||
string.IsNullOrWhiteSpace(secondaryConnectionString) ||
string.IsNullOrWhiteSpace(sourceTopicOrQueueName) ||
string.IsNullOrWhiteSpace(sourceSubscriptionName) ||
string.IsNullOrWhiteSpace(destinationTopicOrQueueName) ||
string.IsNullOrWhiteSpace(destinationSubscriptionName))
{
Console.WriteLine("Please enter appropriate values in appsettings.json file. Exiting...");
await Task.Delay(2000);
}
else
{
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
await using var client = new ServiceBusClient(connectionString);
foreach (var sbQueue in namespaceManager.GetQueues("messageCount Gt 0"))
{
sourceTopicOrQueueName = sbQueue.Path;
ServiceBusReceiver receiver = client.CreateReceiver(sbQueue.Path);
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
await SendMessageToTopicAsync(secondaryConnectionString, sbQueue.Path, receivedMessage);
}
foreach (var sbTopic in namespaceManager.GetTopics("messageCount Gt 0"))
{
sourceTopicOrQueueName = sbTopic.Path;
ServiceBusReceiver receiver = client.CreateReceiver(sbTopic.Path);
foreach (var sbSub in namespaceManager.GetSubscriptions(sbTopic.Path))
{
sourceSubscriptionName = sbSub.Name;
Console.WriteLine("Listening on topic: {0} with subscription: {1}", sourceTopicOrQueueName, sourceSubscriptionName);
await ReceiveMessagesFromSubscriptionAsync(connectionString, sbTopic.Path, sbSub.Name);
}
}
}
}
private static async Task ReceiveMessagesFromSubscriptionAsync(string connectionString, string sourceTopicName, string sourceSubscriptionName)
{
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(sourceTopicName, sourceSubscriptionName, new ServiceBusProcessorOptions());
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
while (processor.IsProcessing)
{
System.Threading.Thread.Sleep(30000);
}
Console.WriteLine("Wait for a minute and then press any key for confirmation to end the processing");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
private static async Task MessageHandler(ProcessMessageEventArgs args)
{
string id = args.Message.MessageId;
Console.WriteLine($"Received: {id} from subscription: {sourceSubscriptionName}");
await SendMessageToTopicAsync(secondaryConnectionString, sourceTopicOrQueueName, args.Message);
Console.WriteLine($"Sent: {id} from subscription: {sourceSubscriptionName}");
// complete the message. messages is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}
private static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
private static async Task SendMessageToTopicAsync(string connectionString, string destinationTopicOrQueueName, ServiceBusReceivedMessage serviceBusReceivedMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the topic
ServiceBusSender sender = client.CreateSender(destinationTopicOrQueueName);
await sender.SendMessageAsync(new ServiceBusMessage(serviceBusReceivedMessage));
Console.WriteLine($"Sent a single message to the topic: {destinationTopicOrQueueName}");
}
}
private static async Task SendTestMessageToTopicAsync(string body)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the topic
ServiceBusSender sender = client.CreateSender(sourceTopicOrQueueName);
await sender.SendMessageAsync(new ServiceBusMessage(body));
Console.WriteLine($"Sent a single message to the topic: {sourceTopicOrQueueName}");
}
}
}
}
No comments:
Post a Comment