Tuesday, April 23, 2024

 This is a continuation of previous articles on IaC shortcomings and resolutions. While IaC code can be used deterministically to repeatedly create, update, and delete cloud resources, there are some dependencies that are managed by the resources themselves and become a concern for the end user when they are not properly cleaned up. Take for instance, the load balancers that compute instances and clusters create when they are provisioned using the Azure Machine Learning Workspaces. These are automatically provisioned. The purpose of this load balancer is to manage traffic even when the compute instance or cluster is stopped. Each compute instance has one load balancer associated with it, and for every 50 nodes in a compute cluster, one standard load balancer is billed. The load balancer ensures that requests are distributed evenly across the available compute resources, improving performance and availability. Each load balancer is billed at approximately $0.33 per day. If we have multiple compute instances, each one will have its own load balancer. For compute clusters, the load balancer cost is based on the total number of nodes in the cluster. One way to avoid load balancer costs on stopped compute instances and clusters, is to delete the compute resources when they are not in use. The IaC can help with the delete of the resources but whether the action is automated or manual, it is contingent on the delete of the load balancers and when delete fails for reasons such as locks on load balancers, then the user is left with a troublesome situation.

An understanding of the load balancer might help put things in perspective especially when trying to find them to unlock or delete. Many cloud resources and Azure Batch services create load balancers and the ways to distinguish them vary from resource groups, tags, or properties. These load balancers play a crucial role in distributing network traffic evenly across multiple compute resources to optimize performance and ensure high availability, they use various algorithms such as round-robin, least connections, or source IP affinity, to distribute incoming traffic to the available compute resources. This helps in maintaining a balanced workload and preventing any single resource from being overwhelmed. They also contribute to high availability by continuously monitoring the health of the compute resources. If a resource becomes unhealthy or unresponsive, the load balancer automatically redirects traffic to other healthy resources. They can seamlessly handle an increase in traffic by automatically scaling up the number of compute resources. Azure Machine Learning Workspace load balancers can scale up or down based on predefined rules or metrics, ensuring that the resources can handle the workload efficiently. Load balancing rules determine how traffic should be distributed. Rules can be configured based on protocols, ports, or other attributes to ensure that the traffic is routed correctly. Load balancers continuously monitor the health of the compute resources by sending health probes to check their responsiveness. If a resource fails the health probe, it is marked as unhealthy, and traffic is redirected to other healthy resources. Azure Machine Learning Workspace supports both internal and public load balancers. Internal load balancers are used for internal traffic within a virtual network, while public load balancers handle traffic from the internet. They can be seamlessly integrated with other Azure services, such as virtual networks, virtual machines, and container services, to build scalable and highly available machine learning solutions. Overall, load balancers in Azure Machine Learning Workspace play a critical role in optimizing performance, ensuring high availability, and handling increased traffic by evenly distributing it across multiple compute resources.

Creating the compute with node public ip set to false and disabling local auth can prevent load balancers from being created but if endpoints are involved, the Azure Batch Service will create them. Load balancers, public ip addresses and associated dependencies are created in the resource group of the virtual network and not the resource group of the machine learning workspace. Finding the load balancers and taking appropriate action on them can allow the compute resources to be cleaned up. This can be done on an ad hoc basis or scheduled basis.

Monday, April 22, 2024

 This is a continuation of a previous article on IaC shortcomings and resolutions. With regard to Azure Machine Learning Workspace, here is a sample request and response:

1. Go to https://learn.microsoft.com/en-us/rest/api/azureml/compute/create-or-update?view=rest-azureml-2023-10-01&tabs=HTTP#code-try-0 and signin with your secondary account:


Specify the following:

PUT https://management.azure.com/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.MachineLearningServices/workspaces/<ml-workspace-name>/computes/<compute-name>?api-version=2023-10-01

Authorization: Bearer <automatically created access token>

Content-type: application/json

{

  "properties": {

    "properties": {

      "vmSize": "STANDARD_DS11_V2",

      "subnet": {

        "id": "/subscriptions/<subscription-id>/resourceGroups/<rg-vnet-name>/providers/Microsoft.Network/virtualNetworks/<vnet-name>/subnets/<subnet-name>"

      },

      "applicationSharingPolicy": "Shared",

      "computeInstanceAuthorizationType": "personal",

      "enableNodePublicIp": false,

      "disableLocalAuth": true,

      "location": "centralus",

      "scaleSettings": {

        "maxNodeCount": 1,

        "minNodeCount": 0,

        "nodeIdleTimeBeforeScaleDown": "PT60M"

      }

    },

    "computeType": "AmlCompute",

    "disableLocalAuth": true

  },

  "location": "centralus",

  "disableLocalAuth": true

}



2. Check the response code to match as shown:

Response Code: 201

azure-asyncoperation: https://management.azure.com/subscriptions/<subscription-id>/providers/Microsoft.MachineLearningServices/locations/centralus/computeOperationsStatus/f6dcbe07-99cf-4bf7-aa71-0fdcfc542941?api-version=2023-10-01&service=new

cache-control: no-cache

content-length: 1483

content-type: application/json; charset=utf-8

date: Sat, 20 Apr 2024 02:28:50 GMT

expires: -1

pragma: no-cache

request-context: appId=cid-v1:2d2e8e63-272e-4b3c-8598-4ee570a0e70d

strict-transport-security: max-age=31536000; includeSubDomains

x-aml-cluster: vienna-centralus-02

x-content-type-options: nosniff

x-ms-correlation-request-id: f15d6510-5d21-426a-98e5-aa800322da83

x-ms-ratelimit-remaining-subscription-writes: 1199

x-ms-request-id: f15d6510-5d21-426a-98e5-aa800322da83

x-ms-response-type: standard

x-ms-routing-request-id: NORTHCENTRALUS:20240420T022850Z:f15d6510-5d21-426a-98e5-aa800322da83

x-request-time: 0.257


Sunday, April 21, 2024

 Given clock hands positions for different points of time as pairs A[I][0] and A[I][1] where the order of the hands does not matter but their angle enclosed, count the number of pairs of points of time where the angles are the same

    public static int[] getClockHandsDelta(int[][] A) {

        int[] angles = new int[A.length];

        for (int i = 0; i < A.length; i++){

            angles[i] = Math.max(A[i][0], A[i][1]) - Math.min(A[i][0],A[i][1]);

        }

        return angles;

    }

    public static int NChooseK(int n, int k)

    {

        if (k < 0 || k > n || n == 0) return 0;

        if ( k == 0 || k == n) return 1;

        return Factorial(n) / (Factorial(n-k) * Factorial(k));

    }

 

    public static int Factorial(int n) {

        if (n <= 1) return 1;

        return n * Factorial(n-1);

    }


    public static int countPairsWithIdenticalAnglesDelta(int[] angles){

        Arrays.sort(angles);

        int count = 1;

        int result = 0;

        for (int i = 1; i < angles.length; i++) {

            if (angles[i] == angles[i-1]) {

                count += 1;

            } else {

                if (count > 0) {

                    result += NChooseK(count, 2);

                }

                count = 1;

            }

        }

        if (count > 0) {

            result += NChooseK(count, 2);

            count = 0;

        }

        return result;

    }


        int [][] A = new int[5][2];

         A[0][0] = 1;    A[0][1] = 2;

         A[1][0] = 2;    A[1][1] = 4;

         A[2][0] = 4;    A[2][1] = 3;

         A[3][0] = 2;    A[3][1] = 3;

         A[4][0] = 1;    A[4][1] = 3;

 1 2 1 1 2 

1 1 1 2 2 

4


Saturday, April 20, 2024

 This is a continuation of previous articles on IaC shortcomings and resolutions. No infrastructure is useful without considerations for usability. As with the earlier example of using Azure Machine Learning workspace to train models using Snowflake data source, some consideration must be given to allow connections to data source and importing data. We cited resolving versions between Spark, Scala and Snowflake libraries within the kernel to allow data to be imported into a dataframe for use with SQL and this could be difficult for end-users if they were to locate the jars and download themselves. While the infrastructure could provide pre-configured kernels such as Almond kernel with appropriate jars such as for Scala, some samples might ease the task for datascientists wrangling with Snowflake data on existing workspaces.

For example, they could stage their action in multiple steps with pulling the data from snowflake and then loading it into a dataframe.

This is a sample code to do so:

from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, StringType


spark = SparkSession.builder.appName("BytesToDataFrame").getOrCreate()


# Sample raw bytes (replace with your actual data from Snowflake using snowflake-connector cursor)

# https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-example

# either using 

# a)               df = pd.DataFrame(cursor.fetchall())

# or

# b)               df = cursor.fetch_pandas_all()

# or

# c)


raw_bytes = [b'\xba\xed\x85\x8e\x91\xd4\xc7\xb0', b'\xba\xed\x85\x8e\x91\xd4\xc7\xb1']

schema = StructType([StructField("id", StringType(), True)])

rdd = spark.sparkContext.parallelize(raw_bytes)

df = spark.createDataFrame(rdd, schema=schema)

df.show()


In this example, the data is retrieved first with a cursor and then loaded into a dataframe.


Friday, April 19, 2024

 

This is a continuation of previous articles on allowing Spark/Scala/Snowflake code to execute on Azure Machine Learning Compute. The built-in Jupyter kernel of “Azure ML – Python 3.8” does not have pyspark and we discussed the choices of downloading version compatible jars as well as alternative code to get data from Snowflake.

In this article, we will review the steps to set up a Jupyter notebook for Snowpark Scala. An “Almond” kernel can be used to setup Scala and coursier can be used to install the Almond Kernel to specify a supported version of Scala. The Almond Kernel has a prerequisite that a Java Virtual Machine needs to be installed on the system. This can be done by installing AdoptOpenJDK version 8. Then Almond can be fetched with coursier, by downloading its release and running the executable with the command line parameters to install Almond and Scala. Coursier is a Scala application that makes it easy to manage artifacts. It can setup the Scala development environment by downloading and caching artifacts from the web.

The Jupyter notebook for Snowpark can then be configured by defining a variable for the path and directory for classes generated by the Scala REPL and creating it. The Scala REPL generates classes for the Scala code that user writes. This process of configuring the compiler is not complete without adding the directory created earlier as a dependency of the REPL interpreter. Next we create a new session in SnowPark with an example as below:

import $ivy.`com.snowflake:snowpark:1.12.0`

import com.snowflake.snowpark._

import com.snowflake.snowpark.functions._

val session = Session.builder.configs(Map(

    "URL" -> "https://<account_identifier>.snowflakecomputing.com",

    "USER" -> "<username>",

    "PASSWORD" -> "<password>",

    "ROLE" -> "<role_name>",

    "WAREHOUSE" -> "<warehouse_name>",

    "DB" -> "<database_name>",

    "SCHEMA" -> "<schema_name>"

)).create

session.addDependency(replClassPath)

and then the Ammonite Kernel classes can be added for the code.

The session can be used to run a SQL query and populate a dataframe which can then be used independent of the data source.

Previous articles: IaCResolutionsPart107.docx

Thursday, April 18, 2024

 

This is a continuation of previous articles on IaC shortcomings and resolutions. No infrastructure is useful without considerations for usability. As with the earlier example of using Azure Machine Learning workspace to train models using Snowflake data source, some consideration must be given to allow connections to data source and importing data. We cited resolving versions between Spark, Scala and Snowflake libraries within the kernel to allow data to be imported into a dataframe for use with SQL and this could be difficult for end-users if they were to locate the jars and download themselves.

One way to resolve this would be to use a different coding style as shown below:

import snowflake.connector

conn = snowflake.connector.connect(

    account=’<snowflake_account>’,

    host='<account>.east-us-2.azure.snowflakecomputing.com',

    user='<login>',

    private_key=<bytes-to-private_key>,

    role='<data-scientist-role>',

    warehouse='<name-of-warehouse>',

    database='<demo_db>',

    schema='<demo_table>'

)

 

cursor = conn.cursor()

cursor.execute('select ID from <schema> limit 10')

rows = cursor.fetchall()

for row in rows:

    print(row)

cursor.close()

conn.close()

 

When compared to the following code:

spark = (

    SparkSession.builder \

    .appName('SnowflakeSample') \

    .config("spark.jars","/anaconda/envs/azureml_py38/lib/python3.8/site-packages/pyspark/jars/snowflake-jdbc-3.12.2.jar,/anaconda/envs/azureml_py38/lib/python3.8/site-packages/pyspark/jars/snowflake-ingest-sdk-0.9.6.jar,/anaconda/envs/azureml_py38/lib/python3.8/site-packages/pyspark/jars/spark-snowflake_2.13-2.11.3-spark_3.3.jar,/anaconda/envs/azureml_py38/lib/python3.8/site-packages/pyspark/jars/scala-library-2.12.19.jar,/anaconda/envs/azureml_py38/lib/python3.8/site-packages/pyspark/jars/hadoop-azure-3.2.1.jar,/anaconda/envs/azureml_py38/lib/python3.8/site-packages/pyspark/jars/azure-storage-7.0.0.jar")

    .config(conf = conf) \

    .getOrCreate()

)

print(spark.version)

 

sfOptions = {

            "sfUser" : "<login>",

            "sfURL" : "<account>.east-us-2.azure.snowflakecomputing.com",

            "sfRole" : "<data-scientist-role>",

            "sfWarehouse" : "<warehouse>",

            "sfDatabase" : "<demo_database>",

            "sfSchema" : "<demo_table>",

            "pem_private_key" : <private-key-bytes>

            }

 

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

query = 'select ID from <schema> limit 10'

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \

    .options(**sfOptions) \

    .option("query",query) \

    .load()

 

It becomes clear that the spark.read.format can be difficult to run without the proper jars passed in the config.

Therefore, samples are important to be provided with infrastructure.

Previous articles: IaCResolutionsPart106.docx

Wednesday, April 17, 2024

 Given a wire grid of size N * N with N-1 horizontal edges and N-1 vertical edges along the X and Y axis respectively, and a wire burning out every instant as per the given order using three matrices A, B, C such that the wire that burns is 

(A[T], B[T] + 1), if C[T] = 0 or

(A[T] + 1, B[T]), if C[T] = 1

Determine the instant after which the circuit is broken 

     public static boolean checkConnections(int[] h, int[] v, int N) {

        boolean[][] visited = new boolean[N][N];

        dfs(h, v, visited,0,0);

        return visited[N-1][N-1];

    }

    public static void dfs(int[]h, int[]v, boolean[][] visited, int i, int j) {

        int N = visited.length;

        if (i < N && j < N && i>= 0 && j >= 0 && !visited[i][j]) {

            visited[i][j] = true;

            if (v[i * (N-1) + j] == 1) {

                dfs(h, v, visited, i, j+1);

            }

            if (h[i * (N-1) + j] == 1) {

                dfs(h, v, visited, i+1, j);

            }

            if (i > 0 && h[(i-1)*(N-1) + j] == 1) {

                dfs(h,v, visited, i-1, j);

            }

            if (j > 0 && h[(i * (N-1) + (j-1))] == 1) {

                dfs(h,v, visited, i, j-1);

            }

        }

    }

    public static int burnout(int N, int[] A, int[] B, int[] C) {

        int[] h = new int[N*N];

        int[] v = new int[N*N];

        for (int i = 0; i < N*N; i++) { h[i] = 1; v[i] = 1; }

        for (int i = 0; i < N; i++) {

            h[(i * (N)) + N - 1] = 0;

            v[(N-1) * (N) + i] = 0;

        }

        System.out.println(printArray(h));

        System.out.println(printArray(v));

        for (int i = 0; i < A.length; i++) {

            if (C[i] == 0) {

                v[A[i] * (N-1) + B[i]] = 0;

            } else {

                h[A[i] * (N-1) + B[i]] = 0;

            }

            if (!checkConnections(h,v, N)) {

                return i+1;

            }

        }

        return -1;

    }

        int[] A = new int[9];

        int[] B = new int[9];

        int[] C = new int[9];

        A[0] = 0;    B [0] = 0;    C[0] = 0;

        A[1] = 1;    B [1] = 1;    C[1] = 1;

        A[2] = 1;    B [2] = 1;    C[2] = 0;

        A[3] = 2;    B [3] = 1;    C[3] = 0;

        A[4] = 3;    B [4] = 2;    C[4] = 0;

        A[5] = 2;    B [5] = 2;    C[5] = 1;

        A[6] = 1;    B [6] = 3;    C[6] = 1;

        A[7] = 0;    B [7] = 1;    C[7] = 0;

        A[8] = 0;    B [8] = 0;    C[8] = 1;

        System.out.println(burnout(9, A, B, C));

1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 

1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 0 

8

Alternatively,

    public static boolean burnWiresAtT(int N, int[] A, int[] B, int[] C, int t) {

        int[] h = new int[N*N];

        int[] v = new int[N*N];

        for (int i = 0; i < N*N; i++) { h[i] = 1; v[i] = 1; }

        for (int i = 0; i < N; i++) {

            h[(i * (N)) + N - 1] = 0;

            v[(N-1) * (N) + i] = 0;

        }

        System.out.println(printArray(h));

        System.out.println(printArray(v));

        for (int i = 0; i < t; i++) {

            if (C[i] == 0) {

                v[A[i] * (N-1) + B[i]] = 0;

            } else {

                h[A[i] * (N-1) + B[i]] = 0;

            }

        }

        return checkConnections(h, v, N);

    }

    public static int binarySearch(int N, int[] A, int[] B, int[] C, int start, int end) {

        if (start == end) {

            if (!burnWiresAtT(N, A, B, C, end)){

                return end;

            }

            return  -1;

        } else {

            int mid = (start + end)/2;

            if (burnWiresAtT(N, A, B, C, mid)) {

                return binarySearch(N, A, B, C, mid + 1, end);

            } else {

                return binarySearch(N, A, B, C, start, mid);

            }

        }

    }

1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 

1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 0 

8