Wednesday, June 30, 2021

Kusto queries in Azure public cloud

 Kusto queries in Azure public cloud.

Query language become as popular as shell scripting for data scientist and analytics professionals. While SQL has been universally used for a wide variety of purposes, it was always called out for its shortcomings with respect to features available in shell scripting such as data pipelining and a wide range of native commands that can be used in stages between an input and a desired output. Pipelining is the term referring to the intermediary stages of processing where the output from one operator is taken as the input to another. PowerShell scripting bridged this gap with a wide variety of cmdlets that can be used in pipelined aka vectorized execution. While PowerShell enjoys phenomenal support in terms of the library of commands from all Azure services, resources, and their managers, it is not always easy to work with all kinds of data and makes no restrictions to the scope or intent of the changes to the data. Kusto addresses this is specifically by allowing only read operations on the data and requiring the use of copying into new tables, rows, and columns for any editing. This is the hallmark of Kusto’s design. It separates the read only queries from the control commands that are used to create the structure to hold intermediary data.

Kusto is popular both with Azure monitor as well as Azure data explorer. It is a read only request to process data and returns results in plain text. If uses a data flow model that is remarkably like the slice and dice operators in the shell commands.IT can work with structured data with the help of tables, rows, and columns but it is not restricted to schema-based entities. It can be applied to unstructured data such as telemetry data. It consists of sequence of statements delimited by semicolon operator and has at least one tabular query operator. The name of a table is sufficient to stream the rows to a pipeline operator that separates the filtering into its own stage with the help of a SQL like where clause. Sequences of where clauses can be chained to result in a more refined set of resulting rows. It can be as short as a tabular query operator, a data source, and a transformation. Any use of new tables, rows and columns require the use of control commands that are differentiated from Kusto queries because they begin with a dot character. The separation of these control commands helps with security of the overall data analysis routines. Administrators will have less hesitation for Kusto queries to run on their data. Control commands also help to manage entities or discover their metadata. A sample control command is a “.show” command that shows all the tables in the current database.

One of the features of Kusto is its set of immediate-visualization chart operators that render the data in a variety of plots. While the scalar operators could summarize rows of data and were quite popular already with their equivalence to shell commands, the ability to project tabular data and summarize before pipelining to a chart operator makes it even more data friendly and popular to data scientists. These visualizations are not restricted to the timecharts and can include multiple-series, cycles, and data types. A join operator can combine two different data sources to make it more convenient to plot and visualize the results. Distributions and percentiles are easy to compute and often required for time slice windows involving metrics. Results can be assigned to variables with the help of let command and data from several databases can be virtualized behind a Kusto query.


Tuesday, June 29, 2021

Writing a PowerShell cmdlet in C# for Azure Service Bus entities.

 

Problem statement: Azure Service Bus is a Microsoft public cloud resource that is the equivalent of Message Brokers such as RabbitMQ, ZeroMQ and others that implement the Advanced Message Queuing (AMQP) protocol. The entities of the Azure Service Bus include queues and topics. A queue allows a producer and consumer to send and receive messages in first-in-first-out order. A topic allows the messages sent by one or more producers to be distributed to several subscriptions where subscribers are registered and receive those messages.  This article describes some of the issues encountered when writing a PowerShell cmdlet for a migrating a service bus entity and their messages to another service bus.

Description: PowerShell is the language of automation especially for custom logic that does not have any support from the features or built-ins available directly from a resource. Usually, custom logic is reduced to a minimum but resources like products are focused on improving their offerings rather than their integration. Automation to migrate one service bus to another required the use of a built-in feature named geo-replication of ServiceBus. Geo-replication has two limitations. First, it replicates only the structure and not the content of the Service Bus entities. Second, it requires the source and target Service Bus to be hosted in different geographic regions that is quite unlike some of the other Azure resources. The latter can be overcome by replicating across one region and then replicating back to the same region. But it still does not resolve the message replication which is critical to have the second instance be like the first.

This calls for a custom program that implements the enumeration of service bus entities from one instance to another which is available from the SDK for NamespaceManager. The messages from these entities can be read with the help of the Software Development Kit (SDK) for Azure Service Bus. The trouble with these independent SDKs is that they require different versions of dependencies. One such dependency is the Windows.Powershell.5.reference assembly and the namespace manager has a specific version requirement for the assembly version of 4.0.4.1 while the Azure service bus SDK requires the use of much more recent versions. Changes to the program to allow for the use of the lowest common denominator in terms of versions is not a viable option because there is none. 

The error encountered appears something like this:

Azure.Messaging.ServiceBus.ServiceBusException: Could not load file or assembly 'System.Runtime.CompilerServices.Unsafe, Version=4.0.4.1, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The system cannot find the file specified. (ServiceCommunicationProblem) ---> System.IO.FileNotFoundException: Could not load file or assembly 'System.Runtime.CompilerServices.Unsafe, Version=4.0.4.1, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The system cannot find the file specified.

Any binding redirect issued in the Application configuration to require runtime dependencies to be bound to the higher version also does not mitigate this error.

In addition, target frameworks have been changed from netstandard2.1 to netcoreapp3.1 and finally net48, which results in other kinds of assembly errors. These include:

FileNotFoundException: Could not load file or assembly 'Microsoft.Bcl.AsyncInterfaces, Version=1.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' or one of its dependencies. The system cannot find the file specified. 

And System.AggregateException, HResult=0x80131500,  Message=One or more errors occurred. and  Source=mscorlib

Each of those framework options results in a lot of trial and errors. The Cmdlet also has dependencies other than the two mentioned above and any changes to the versioning also changes their versions.

Conclusion: Registering the required assembly to the Global Assembly Cache via offline mechanisms enables us to get past the error and although it is not a great experience, it enables the automation to run.


Monday, June 28, 2021

Writing project plan continued...

 

A summary of a five point plan to tackle new project is listed below:

Step 1: Definition:

This involves defining six elements:

1.      Objectives: use Specific, Measurable, Achievable, Realistic and Time-bound (SMART) technique to set objectives with internal and external input.

2.      Scope: While it is tempting to define what’s in scope, it’s usually the ones that are out of scope that is easier to write

3.      Success criteria: determine the project success and failure as it pertains to business

4.      Deliverables: list work items in detail, if possible

5.      Requirements: This is about what you need as well as what the stakeholders need.

6.      Schedule: A baseline schedule will have milestones and deadline.

Step 2: Identify risk assumptions and constraints: If the risk management cannot be planned ahead of the work items, designate a person to monitor for risks to the projects such as with time and cost.

Step 3: Organize the people for the project into customers, stakeholders and accountables with roles and responsibilities.

Step 4: List the project resources with a bill of materials including licenses, tools and any other external dependencies.

Step 5: Establish a project communications plan including a channel for team discussions, stakeholder discussions and customer discussions. Usually, these communications will elaborate on the preceding steps.

Conclusion: Defining the projects, identifying risks, assembling teams, gathering resources and providing communication channels will get a project off the ground. You will love it when a plan comes together.

Sunday, June 27, 2021

 Writing a Project plan for a new project:

Introduction: This article talks about the preparation needed for getting started with a new software project that involves providing improvements to complex software systems. There are several considerations to be made. When the project starts new resources might have been assigned who would require ramp-up time. The mission and the execution plan of the improvements might not have any clarity and even summed up in a sentence. The relevant dependencies might not have been listed. The stakeholders might not all be in alignment with the agenda. The architectural enhancements for the delivery of the software features might not have been thought through. A lot of steps might need to be prioritized before the end goal is reached. This is a helpful introduction to the planning involved.

Description: One of the advantages of software development is that this is a beaten path with plenty of templates and learnings available from the past. The objective may be new and unclear but there are several tools available to put the plan in place. First among these is the project management tools and the Gantt chart. This tool helps not only with the distribution of tasks and alignment of the task with the timeline, but it also helps with determining cost, as well as the critical path for the completion of milestones.  The timeline can be put in perspective based on a pegged release data and worked backward budgeting for tasks that can be squeezed in.  When a new project begins, even the tasks are not enumerated sufficiently. The difficulty to put them on a chart or in sprint cycles and with milestones becomes difficult. This can be improved by gathering the stakeholders and the resources in the same room for multiple discussions. These discussions will empower them both to present the facts and keep the milestones practical. These could be done internally as well as with biweekly stakeholder meeting where incremental deliverables are realized via two-week sprint planning, meetings, and demonstrations of experiments and prototypes. The earlier these conversations take place, the better the planning and the outcome for the team. New technologies require significant investments in learning and the convergence happens better when there are introductions by subject matter experts from stakeholder’s organizations. It will be more equitable for everyone when it happens in a conference room.  

Another tool that helps with this kind of software development is a functional specification document that has a well-known template for organizing thoughts. It starts with critical use cases that are articulated as stories involving a person who is trying to solve a problem. The narrative for the use case outlines both what is available to the user and what would have helped. The gamut of options and the impact of the feature requested become clearer with a well-written use case. It might involve a quantitative aspect to what needs to be done and elaborate on the kind of outcome that would best serve the user. When the objective becomes clearer this way, intermediate deliverables fall in place. There is no limit to the number or description of use cases if the end goal becomes clearer. The means and mode of delivering the features are usually not that hard to envision once the goal is in place. The delivery of the features takes time and the allotment of tasks to resources must consider internal and external factors that might alter the schedule.

One technique that helps with the deliverables is determining the priority and the severity. The former describes the relevance each item might have based on the feedback from the stakeholders. Usually, higher priority items are in the critical path for the release and must be tackled first. A backlog of items can help maintain the list that needs to be burned down while the priority determines their selection into the sprints. The severity is dependent on the impact of the task. If the task is touching data or configuration and potential to affect many users, it has a higher severity than others. It is somewhat easier to determine severity when the source code and architecture are well-understood. Sound architecture can handle evolving business needs with the least changes. The determination of the right design will help mitigate the high-severity tasks with ease. The design part of the software development lifecycle involves multiple perspectives, calculations, and validations. With a large team for the software development, it might be easy to distribute the design to virtual teams involving multiple or different hats for individuals rather than based on components alone. A DevOps cadence can help improve the mode in which the features are developed.

Another technique that helps with removing surprises and distractions is by handing out showcase user interfaces even before the actual functionality is delivered. This is a sound practice even for cases where it does not involve software. Such dry runs including emails to stakeholders can significantly eliminate noise and refine the expectations. Many tasks are done better when the communication is clear and involves the right audience. With more and more communication, it is possible to change course when things do not pan out. Mistakes are easy to occur, and corrective actions are easy to take when there is visibility.

Lastly, software development is about people and not just about process and practice. Changes, moderations, and selection of techniques depend largely on the intentions of those who benefit from them.


Saturday, June 26, 2021

 Zone Down failure simulation: 

Introduction:  A public cloud enables geo-redundancy for resources by providing many geographical regions where resources can be allocated. A region comprises several availability zones where resources allocated redundantly across the zones. If the resources fail in one zone, they are switched out with those from another zone. A zone may comprise several datacenters each of which may be stadium-sized centers that provision compute, storage and networking. When the user requests a resource from the public cloud, it usually has 99.99% availability. When the resources are provisioned for zone-redundancy, their availability increases to 99.999%. The verification for resources to failover to an alternate zone when one goes down is key to measuring that improvement in availability. This has been a manual exercise so far. This article attempts to explore the options to automate the testing from a resource perspective. 

Description: 

Though they sound like Availability sets, the AZs comprise datacenters with independent power, cooling, and networking and the availability sets are logical groupings of virtual machines. AZ is a combination of both a fault domain as well as an update domain, so changes do not occur at the same time. Services that support availability zones fall under two categories: zonal services – where a resource is pinned to a specific zone and a zone-redundant service for a platform that replicates automatically across zones. Business continuity is established with a combination of zones and azure region pairs. 

The availability zones can be queried from SDK, and they are mere numbers within a location. For example, az VM list-skus --location eastus2 --output table will list VM SKUs based on region and zones. The zones are identified by numbers such as 1, 2, 3 and these do not mean anything other than that the zones are distinct. The numbers don’t change for the lifetime of the zone, but they don’t have any direct correlation to physical zone representations. 

There are ways in which individual zone-resilient services can allow zone redundancy to be configured. 

When the services allow in-place migration of resources from zonal to zone-redundancy or changing the number of zones for the resource that the service provisions, the simulation of the zone down behavior is as straightforward as asking the service to reconfigure the resource by specifying exactly what zones to have. For example, it could start with [“1”, “2”, “3”] and to simulate a zone down the failure of “3”, it could be reprovisioned with [“1”, “2”] This in-place migration is not expected to cause any downtime for the resource because “1” and “2” continue to remain as part of the configuration. Also, the re-provisioning can be revolving around the zones requiring only the source and target zone pair and since there are three zones, that resource can always be accessed from one zone or the other.  

Conclusion: Zone down can be simulated when there is adequate support from the services that provide the resource. 

Reference the earlier discussion on this topic: https://1drv.ms/w/s!Ashlm-Nw-wnWzhemFZTD0rT35pTS?e=kTGWox


Friday, June 25, 2021

 Learnings from Deployment technologies: 

Introduction: The following article summarizes some learnings from deployment technologies that serve to take an organization’s software redistributable, package it and deploy it so that the software may run in different environments and as separate instances. They evolve from time to time and become narrowed to the environments to which they serve. For example, earlier there was WixSharp to build MS for installing and uninstalling applications or tarball on Linux for the deployment of binaries as an archive. Now, we have a lot more involved technologies that deploy to both on-premises and the cloud. Some of the salient learnings from such technologies are included here and this article continues from the previous discussion here. 

Description: 

An introduction to WixSharp might be in order. It is a language for writing Microsoft installers (MSI) that generates Wix extension files that describe the set of actions to be taken on a target computer for installing, upgrading, or rolling back a software.  The artifacts are compiled with an executable called candle and therefore the artifacts have a rhyming file extension as Wix. WixSharp makes it easy to author the logic for listing all the dependencies of your application for deployment. It converts the C# code to wxs file which in turn is compiled to build the MSI. There are a wide variety of samples in the WixSharp toolkit. Some of them require very few lines to be written for a wide variety of deployment time actions. The appeal in using such libraries is to be able to get the task done sooner with few lines of code. The earlier way of writing and editing the WXS file was error-prone and tedious. This is probably one of the most important learnings. Any language or framework that allows precise and succinct code or declarations is going to be a lot more popular than verbose ones. 

The second learning is about the preference for declarative syntax over logic. It is very tempting to encapsulate all the deployment logic in one place as a procedure. But this tends to become monolithic and takes away all the benefits of out-of-band testing and validation of artifacts. It is also involving developer attention as versions change. On the other hand, the declarative format expands the number of artifacts into self-contained declarations that can be independently validated.  

The third learning is about the reduction of custom logic. Having several and involved custom logic for organizations defeats the purpose of a general-purpose infrastructure that can bring automation, consistency, and framework-based improvements to deployment. Prevention of custom logic also prevents hacks and makes the deployments more mainstream and less vulnerable to conflicts and issues. The use of general-purpose logic will help with enhancements that serve new and multiple teams as opposed to a single customer. 

 

 

Thursday, June 24, 2021

 

This is a continuation of the article originally written here.

This diagram depicts the organization of deployment-as-a-service stack.

Some of the features for the above platform include:

·        best practice in deployment for all tenant's
·        automatic migration for all cloud dependencies
·        Virtualization of deployment technologies and migrations from one set of artifacts to another
·        Asynchronous and background processing with continuous monitoring
·        Programmability options to work with various clients.
·        Analysis and Reporting dashboard
·        Ability to scope down deployments from cloud to on-premises.
·        Curated collection of recipes in automation
·        App-Store integration for allowing clients to opt into deployment stack via published and downloadable applications.
·        Support for browser-based request-response processing
·        Isolated and protected data for all tenants
·        Globally accessible and remote invocable deployment automations
·        Scalable number of client connections allowed for same repository of artifacts.
·        Extensions and customizations for all tenants.
·        Full transparency in the form of logs and events
·        Continuous availability of Deployment stack
·        Ability to provide service-levels for clients.
·        Providing multiple language support including internationalization and globalization
·        Opt-in modernization of existing deployment dependencies
·        One-stop-shop for all deployment activities in the public cloud

Some of the challenges that would be encountered in this regard would include the following:

1) The scope and the environment for a deployment might be different between technologies and mapping must be formed to enable declarations against one form of technology to be repurposed for another.

2) There are many external data stores where configurations may be kept.  File-system or source control is not the only source. Collecting and collocating all the configuration poses a significant task.

3) Artifacts might vary in syntax and semantics and a one-to-one correspondence might not exist between two technologies. This would require some canonicalization, reconciliation and prior translations to be setup.

4) Even if the same technology is used, the artifacts might have different scope and levels of change which may vary widely across deployments. Rolling one deployment to another environment or topology might require additional steps.

5) The recipes must be portable across infrastructure to allow them to vary independently. The more hardcoded literals are used in a script, the tougher it becomes to move or re-purpose the script for another angle.

6) There are several deployments where the environment variable and transient data are used a lot. These must be eliminated in favor of declarative syntax.

Conclusion:  This article tried to pose deployment options for a cloud software maker to be one that is repetitive and requires continued investments over time. In such a case, outsourcing the deployment logic to a deployment-as-a-service offering provides opportunities to save costs and develop best practices.

 

Wednesday, June 23, 2021

 Problem statement: Public cloud computing infrastructure hosts customer workloads as well as the ever-increasing portfolio of services offered by the cloud to its customers. These services from publishers both external and internal to the cloud require deployments over the public cloud. They write and maintain this logic and bear the cost of keeping up with the improvements and features available for this deployment logic. This article investigates the implementation platform for a global multi-tenant deployment-as-a-service offering from the public cloud. 

Solution:  

Multi-tenancy and software-as-a-service model is possible only with a cloud computing infrastructure.  The deployment logic for a service for a cloud differs significantly from that for a desktop. A cloud expects more conformance than a desktop or enterprise deployment justifying the need for a managed program. As Cloud service developers struggle to keep up with the speed of software development for cloud-savvy clients, they face deployment as a necessary evil that draws their effort from their mission. Even when organizations pay the cost up upfront in the first version released with a dedicated staff, they realize that the cloud is evolving at a pace that rivals their own release timeframes. Some may be able to keep up with the investments year after year but for most, this is better outsourced so that they spend less time on rewriting with newer deployment technologies or embracing the enhancements features to the cloud. 

Cloud service developers have an incentive to join this program. They need not be declarative about the resources they use. They just need to define the policies. This is a significant shift in the paradigm that has cost them tremendously in all their deployments so far. Cloud resources are not only scalable and limitless, but their efficient usage is also often neglected by service developers who often use the expedient solution or over-allocate the resources.  A managed program for deployment across these internal and external software makers not only passes on savings to the customers but also helps the cloud migrate quickly to better forms of development with little or no disruption to their customers. 

Deployment logic is quite complicated involving considerations for install, upgrade, rollback, and cleanup of control resources as well as the storage, migration, protection, and replication of data. Fortunately, the bulk of the deployment logic involves cloud resources and a managed program from the cloud is best positioned to onboard the service to the cloud. Concerns addressed and considerations made in the cloud can now be offered in the form of a billable service that will articulate the savings passed on to the customer.  


Tuesday, June 22, 2021

 Recipes and their relevance to automation. 

The world of automation is dependent on commands and sequences that achieve a certain outcome or move to the desired state for a resource. The infrastructure may be elastic, scalable, self-healing, and state-reconciling but it’s the collections of recipes that make one automation system more popular than another. Let us take a closer look at these recipes. Logic and recipes are somewhat different. Logic is defined as flows and represented by flow charts. These are suitable for programming languages. Recipes are more suited for scripting languages and cookie-cutter tasks. An automation engineer will demand consistency, easy identification, troubleshooting, and resolution. She will have little patience for debugging complex multi-branch workflows and focus instead on dividing and conquering a flat list of steps to find the buggy step. The scripts and programs have at least one thing in common. Both are curated over time so they become even more sought after as they get better at not only what they do but also what they can do. The joy of automation is the savings in time and cost when it is repeated without human intervention. Thousands of repetitions can be kicked off in no time and the process will not only assign individual identifiers but also correctly maintain the mapping between the results and their respective set of input variables. One such example is the commissioning of computing systems where the stack involves multiple layers. Automations have interesting by-products in the form of intermediary results, states, and configurations. They execute all the tasks local to a host, unlike services and functions that may even be serverless. Automation can easily be monitored for a predictable start, progress, and stop. A controller leverages this aspect for scheduling their executions which are also called jobs. Automations have certain characteristics just like many other massive software products. They are usually written once and run many times over and on different hosts. They are portable. They embrace shell-based invocations. They become a library of low-level commands and high-level scripts, or recipes, and their organization is determined by their reusability and the requirements of those that use them.  Some hierarchy might be introduced for nomenclature, but the recipes remain a single-level collection even if they have groupings represented by prefixes and suffixes. This collection is often called a cookbook or run book and includes shell-based invocations. The language of automation is PowerShell on windows and Bash on Linux. Remote processing tools such as curl and data transformation to JSON have popularized the scripts and made them even more powerful as they delegate the intensive processing to the right resource. Another aspect of automation that is quite popular is pipelining. The data output from a previous stage becomes the input to the next stage and the operators allow for transformation and analysis between stages of the pipelining process. Automations have a ubiquitous affinity to state and configuration persistence in the form of a relational store, a high-performance message broker such as RabbitMQ, ZeroMQ, or MSMQ, a query, and analysis stack as well as a reporting and visualization stack. Even if their representations vary, automation stands out for their recipes.


Monday, June 21, 2021

This is a continuation of the earlier article that introduced ServiceBus replication:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.ServiceBus;
using Azure.Messaging.ServiceBus;
using Microsoft.ServiceBus.Messaging;

namespace SBQueueSender
{
    /*
     * Program to complete structure and content replication of SB Entities from source namespace to destination namespace
     */
    class Program
    {
        private static string connectionString = "";
        private static string secondaryConnectionString = "";
        private static string sourceTopicOrQueueName = "";
        private static string sourceSubscriptionName = "";
        private static string destinationTopicName = "";
        private static string destinationSubscriptionName = "";
        private static string replicateStructure = "";
        private static string replicateContent = "";

        static async Task Main(string[] args)
        {
            IConfigurationBuilder builder = new ConfigurationBuilder().AddJsonFile("appsettings.json");
            IConfigurationRoot config = builder.Build();
            connectionString = config["primaryConnectionString"];
            secondaryConnectionString = config["secondaryConnectionString"];
            sourceTopicOrQueueName = config["sourceTopicName"];
            sourceSubscriptionName = config["sourceSubscriptionName"];
            destinationTopicName = config["destinationTopicName"];
            destinationSubscriptionName = config["destinationSubscriptionName"];
            replicateStructure = config["replicateStructure"];
            replicateContent = config["replicateContent"];

            if (string.IsNullOrWhiteSpace(connectionString) ||
                string.IsNullOrWhiteSpace(secondaryConnectionString) ||
                string.IsNullOrWhiteSpace(sourceTopicOrQueueName) ||
                string.IsNullOrWhiteSpace(sourceSubscriptionName) ||
                string.IsNullOrWhiteSpace(destinationTopicName) ||
                string.IsNullOrWhiteSpace(destinationSubscriptionName) ||
                string.IsNullOrWhiteSpace(replicateStructure) ||
                string.IsNullOrWhiteSpace(replicateContent))
            {
                Console.WriteLine("Please enter appropriate values in appsettings.json file. Exiting...");
                await Task.Delay(2000);
            }
            else
            {
                NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
                if (Boolean.TrueString.CompareTo(replicateStructure) == 0)
                {
                    await ReplicateStructure(namespaceManager, secondaryConnectionString);
                }

                if (Boolean.TrueString.CompareTo(replicateContent) == 0)
                {
                    await ReplicateContent(namespaceManager, secondaryConnectionString);
                }
            }
        }

        /// <summary>
        /// Replicates the structure of the SB namespace.
        /// </summary>
        /// <param name="namespaceManager"></param>
        /// <param name="destinationConnectionString"></param>
        /// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
        public static async Task ReplicateStructure(NamespaceManager namespaceManager, string destinationConnectionString)
        {
            var targetNamespaceManager = NamespaceManager.CreateFromConnectionString(destinationConnectionString);
            foreach (var queue in namespaceManager.GetQueues())
            {
                try
                {
                    Console.WriteLine(string.Format("Creating queue {0} ", queue.Path));
                    targetNamespaceManager.CreateQueue(queue);
                }
                catch (MessagingEntityAlreadyExistsException)
                {
                    Console.WriteLine(string.Format("Queue {0} already exists.", queue.Path));
                }
            }

            foreach (var topic in namespaceManager.GetTopics())
            {
                try
                {
                    Console.WriteLine(string.Format("Creating topic {0} ", topic.Path));
                    targetNamespaceManager.CreateTopic(topic);
                }
                catch (MessagingEntityAlreadyExistsException)
                {
                    Console.WriteLine(string.Format("Topic {0}  already exits...", topic.Path));
                }

                foreach (var subscription in namespaceManager.GetSubscriptions(topic.Path))
                {
                    try
                    {
                        Console.WriteLine(string.Format("Creating subscription {0} ", subscription.Name));
                        targetNamespaceManager.CreateSubscription(subscription);
                    }
                    catch (MessagingEntityAlreadyExistsException)
                    {
                        Console.WriteLine(string.Format("Subscription {0}  already exits...", subscription.Name));
                    }
                }
            }
        }

        /// <summary>
        /// Replicates the contents of the SB entities
        /// </summary>
        /// <param name="namespaceManager"></param>
        /// <param name="destinationConnectionString"></param>
        /// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
        public static async Task ReplicateContent(NamespaceManager namespaceManager, string destinationConnectionString)
        {
            await using var client = new ServiceBusClient(connectionString);
            foreach (var sbQueue in namespaceManager.GetQueues("messageCount Gt 0"))
            {
                sourceTopicOrQueueName = sbQueue.Path;
                ServiceBusReceiver receiver = client.CreateReceiver(sbQueue.Path);
                ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync(new TimeSpan(0,0,30));
                await SendMessageToTopicAsync(destinationConnectionString, sbQueue.Path, receivedMessage);
            }

            foreach (var sbTopic in namespaceManager.GetTopics("messageCount Gt 0"))
            {
                sourceTopicOrQueueName = sbTopic.Path;
                ServiceBusReceiver receiver = client.CreateReceiver(sbTopic.Path);
                foreach (var sbSub in namespaceManager.GetSubscriptions(sbTopic.Path))
                {
                    sourceSubscriptionName = sbSub.Name;
                    Console.WriteLine("Listening on topic: {0} with subscription: {1}", sourceTopicOrQueueName, sourceSubscriptionName);
                    await ReceiveMessagesFromSubscriptionAsync(namespaceManager, sbTopic.Path, connectionString, sbTopic.Path, sbSub.Name);
                }
            }
        }

        private static async Task ReceiveMessagesFromSubscriptionAsync(NamespaceManager namespaceManager, string sbTopicPath, string connectionString, string sourceTopicName, string sourceSubscriptionName)
        {
            try
            {
                await using (ServiceBusClient client = new ServiceBusClient(connectionString))
                {
                    // create a processor that we can use to process the messages
                    ServiceBusProcessor processor = client.CreateProcessor(sourceTopicName, sourceSubscriptionName, new ServiceBusProcessorOptions()
                    {
                        MaxConcurrentCalls = 1,
                        AutoCompleteMessages = false,
                    });

                    // add handler to process messages
                    processor.ProcessMessageAsync += MessageHandler;

                    // add handler to process any errors
                    processor.ProcessErrorAsync += ErrorHandler;

                    // start processing 
                    await processor.StartProcessingAsync();

                    Console.WriteLine("press any key to stop ...");
                    Console.ReadKey();

                    // stop processing 
                    Console.WriteLine("\nStopping the receiver...");
                    await processor.StopProcessingAsync();
                    Console.WriteLine("Stopped receiving messages");
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Error: ", e);
            }
        }

        private static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string id = args.Message.MessageId;
            Console.WriteLine($"Received: {id} from subscription: {sourceSubscriptionName}");

            await SendMessageToTopicAsync(secondaryConnectionString, sourceTopicOrQueueName, args.Message);
            Console.WriteLine($"Sent: {id} from subscription: {sourceSubscriptionName}");

            // complete the message. messages is deleted from the queue.
            await args.CompleteMessageAsync(args.Message);
        }

        private static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }

        private static async Task SendMessageToTopicAsync(string connectionString, string destinationTopicName, ServiceBusReceivedMessage serviceBusReceivedMessage)
        {
            if (serviceBusReceivedMessage != null)
            {
                try
                {
                    // create a Service Bus client
                    await using (ServiceBusClient client = new ServiceBusClient(connectionString))
                    {
                        // create a sender for the topic
                        ServiceBusSender sender = client.CreateSender(destinationTopicName);
                        await sender.SendMessageAsync(new ServiceBusMessage(serviceBusReceivedMessage));
                        Console.WriteLine($"Sent a single message to the topic: {destinationTopicName}");
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine("Error: ", e);
                }
            }
            else
            {
                Console.WriteLine("received null message.");
            }
        }

        private static async Task SendTestMessageToTopicAsync(string body)
        {
            // create a Service Bus client 
            try
            {
                await using (ServiceBusClient client = new ServiceBusClient(connectionString))
                {
                    // create a sender for the topic
                    ServiceBusSender sender = client.CreateSender(sourceTopicOrQueueName);
                    await sender.SendMessageAsync(new ServiceBusMessage(body));
                    Console.WriteLine($"Sent a single message to the topic: {sourceTopicOrQueueName}");
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Error: ", e);
            }
        }
    }
}

Sunday, June 20, 2021

This is a continuation of the earlier article that introduced ServiceBus replication:
 Reference:
// Sample program for data migration
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.ServiceBus;
using Azure.Messaging.ServiceBus;
 
namespace SBQueueSender
{
    class Program
    {
        private static string connectionString = "";
        private static string secondaryConnectionString = "";
        private static string sourceTopicOrQueueName = "";
        private static string sourceSubscriptionName = "";
        private static string destinationTopicOrQueueName = "";
        private static string destinationSubscriptionName = "";
 
        static async Task Main(string[] args)
        {
            IConfigurationBuilder builder = new ConfigurationBuilder().AddJsonFile("appsettings.json");
            IConfigurationRoot config = builder.Build();
            connectionString = config["primaryConnectionString"];
            secondaryConnectionString = config["secondaryConnectionString"];
            sourceTopicOrQueueName = config["sourceTopicOrQueueName"];
            sourceSubscriptionName = config["sourceSubscriptionName"];
            destinationTopicOrQueueName = config["destinationTopicOrQueueName"];
            destinationSubscriptionName = config["destinationSubscriptionName"];
 
            if (string.IsNullOrWhiteSpace(connectionString) ||
                string.IsNullOrWhiteSpace(secondaryConnectionString) ||
                string.IsNullOrWhiteSpace(sourceTopicOrQueueName) ||
                string.IsNullOrWhiteSpace(sourceSubscriptionName) ||
                string.IsNullOrWhiteSpace(destinationTopicOrQueueName) ||
                string.IsNullOrWhiteSpace(destinationSubscriptionName))
            {
                Console.WriteLine("Please enter appropriate values in appsettings.json file. Exiting...");
                await Task.Delay(2000);
            }
            else
            {
                NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
                await using var client = new ServiceBusClient(connectionString);
                foreach (var sbQueue in namespaceManager.GetQueues("messageCount Gt 0"))
                {
                    sourceTopicOrQueueName = sbQueue.Path;
                    ServiceBusReceiver receiver = client.CreateReceiver(sbQueue.Path);
                    ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
                    await SendMessageToTopicAsync(secondaryConnectionString, sbQueue.Path, receivedMessage);
                }
 
                foreach (var sbTopic in namespaceManager.GetTopics("messageCount Gt 0"))
                {
                    sourceTopicOrQueueName = sbTopic.Path;
                    ServiceBusReceiver receiver = client.CreateReceiver(sbTopic.Path);
                    foreach (var sbSub in namespaceManager.GetSubscriptions(sbTopic.Path))
                    {
                        sourceSubscriptionName = sbSub.Name;
                        Console.WriteLine("Listening on topic: {0} with subscription: {1}", sourceTopicOrQueueName, sourceSubscriptionName);
                        await ReceiveMessagesFromSubscriptionAsync(connectionString, sbTopic.Path, sbSub.Name);
                    }
                }
            }
        }
 
        private static async Task ReceiveMessagesFromSubscriptionAsync(string connectionString, string sourceTopicName, string sourceSubscriptionName)
        {
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a processor that we can use to process the messages
                ServiceBusProcessor processor = client.CreateProcessor(sourceTopicName, sourceSubscriptionName, new ServiceBusProcessorOptions());
 
                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;
 
                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;
 
                // start processing 
                await processor.StartProcessingAsync();
 
                while (processor.IsProcessing)
                {
                    System.Threading.Thread.Sleep(30000);
                }
 
                Console.WriteLine("Wait for a minute and then press any key for confirmation to end the processing");
                Console.ReadKey();
 
                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
        }
 
        private static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string id = args.Message.MessageId;
            Console.WriteLine($"Received: {id} from subscription: {sourceSubscriptionName}");
 
            await SendMessageToTopicAsync(secondaryConnectionString, sourceTopicOrQueueName, args.Message);
            Console.WriteLine($"Sent: {id} from subscription: {sourceSubscriptionName}");
 
            // complete the message. messages is deleted from the queue. 
            await args.CompleteMessageAsync(args.Message);
        }
 
        private static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
 
        private static async Task SendMessageToTopicAsync(string connectionString, string destinationTopicOrQueueName, ServiceBusReceivedMessage serviceBusReceivedMessage)
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the topic
                ServiceBusSender sender = client.CreateSender(destinationTopicOrQueueName);
                await sender.SendMessageAsync(new ServiceBusMessage(serviceBusReceivedMessage));
                Console.WriteLine($"Sent a single message to the topic: {destinationTopicOrQueueName}");
            }
        }
 
        private static async Task SendTestMessageToTopicAsync(string body)
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the topic
                ServiceBusSender sender = client.CreateSender(sourceTopicOrQueueName);
                await sender.SendMessageAsync(new ServiceBusMessage(body));
                Console.WriteLine($"Sent a single message to the topic: {sourceTopicOrQueueName}");
            }
        }
    }
}