Friday, May 23, 2025

 Building a reasoning model from an LLM can be a great return-on-investment and involves just reinforcement learning. The fun part is that it can work for both text-to-text LLMs such as GPT3.5/4 and Multi-modal LLMs such as GPT4o. Reasoning models have a dedicated chain-of-thought phase to decompose and logically work through problems before responding. Examples of reasoning models are o4, and Phi but let us review how to do that with drone imagery models for drone sensing applications where an agent can send a drone image to a fine-tuned model to get observations and then send the observations to a reasoning model to get a better analysis. In this example of building a reasoning model, we use Group Relative Policy Optimization aka GRPO technique and use Azure AI Machine Learning platform because it helps with debugging, logging, profiling and metrics. GRPO is a reinforcement learning technique that compares multiple answers within a group, rewards the best-performing outputs, penalizes poor ones and applies careful updates to avoid sudden changes.

#! /usr/bin/python

# requisites:

#! pip install azure-core azure-ai-ml rich huggingface_hub

#! curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash

#! export HF_TOKEN="hf_xxxxxxxxx"

from aml_setup import setup

ml_clent, drone_mcqa_data, model, compute, environment = setup()

"""

# Sample multiple choice

Which direction has more population density?

A. North

B. East

C. West

D. South

The Ideal reasoning model response:

<think>

Start by rotating the current image to align with the true North relative to the drone camera. Find people in any of the four quadrants. Select the direction from the quadrant that improves the probability the most. If there are no winners among the quadrants, select the direction based on the direction of travel.

</think>

The presence of habitats such as building or outgoing traffic indicates the direction where population density is most.

Final Answer: C.

"""

trainer = GRPOTrainer(

     model=current_policy,

     reward_funcs=reward_function,

     train_dataset=dataset[script_args.dataset_train_split],

     args=training_args,

     peft_config=get_peft_config(model_args),

     processing_class=tokenizer,

     eval_dataset=(

       dataset[script_args.dataset_test_split]

       if training_args.eval_strategy != "no"

       else None

     ),

     callbacks=[save_mlflow_callback],

)

trainer.train()

def format_reward(completions, **kwargs):

    """

    This function determines whether the predicted answer is in the correct format.

    It checks if the reasoning process is enclosed within <think> and </think> tags,

    while the final answer is enclosed within <answer> and </answer> tags.

    Args:

        completions (list): List of model predictions.

    Returns:

        list: List of rewards (1.0 for correct format, 0.0 for incorrect format)

    """

    pattern = r"^<think>\n.*?\n</think>\n<answer>\n.*?\n</answer>$"

    completion_contents = [completion[0]["content"] for completion in completions]

    matches = [

       re.match(pattern, content, re.DOTALL | re.MULTILINE)

       for content in completion_contents

    ]

    return [1.0 if match else 0.0 for match in matches]

"""

# The following are configurations to

# run vllm to generate samples

# there are two ways to do this: server and collocate

# collocate mode runs sampler and trainer on same GPU.

use_vllm: True

vllm_mode: "collocate"

vllm_gpu_memory_utilization: 0.25

vllm_tensor_parallel_size: 4

reward_funcs:

- accuracy

- format

# reward = 0.8*accuracy + 0.2*format

"""

from azure.ai.ml import command, Input, Output

from azure.ai.ml.entities import (

    ManagedOnlineEndpoint,

    ManagedOnlineDeployment,

    Model

)

from azure.ai.ml.constants import AssetTypes

# the following is a command job that takes grpo config, deepspeed config, dataset and the mdoel parameters and kicks off a distributed job.

command_str = """python train.py \

    --config grpo_trainer_config.yaml \

    --model_name_or_path ${{inputs.model_dir}} \

    --dataset_name ${{inputs.dataset}} \

    --output_dir ${{oututs.checkpoint_folder}} \

    --final_model_save_path ${{outputs.mlflow_model_folder}} \

    --deepspeeed deepspeed_stage3_zero_config.json \

    --mlflow_task_type "chat-completion" """

command_str += f'--base_model_name "model.name"'

job_input = {

     "model_dir": Input(

          path=model.path,

          type=AssetTypes.CUSTOM_MODEL,

     ),

     "mlflow_model_folder": Output(

          type=AssetTypes.CUSTOM_MODEL,

          mode="rw_mount",

     ),

     "checkpoint_folder": Output(

          type=AssetTypes.URI_FOLDER,

          mode="rw_mount",

     )

} # notice checkpoints are saved in a separate folder.

job = command(

    code="./src"

    inputs=job_input,

    command=command_str,

    environment=environment,

    compute=compute.name,

    instance_count=2,

    outputs=job_output,

    distribution={

       "type": "PyTorch",

        "process_count_per_instance": 8,

    },

    experiment_name = "drone-images-reasoning-training-jobs",

    display_name = "drone-images-reasoning-train-batchsize-16,

    properties = {

        "_azureml.LogTrainingMetricsToAzMon": "true"

    },

    environment variables = {

        "KINETO_USE_DAEMON": "1",

        "ENABLE_AZUREML_TRAINING_PROFILER": "true",

        "AZUREML_PROFILER_WAIT_DURATION_SECOND": "2",

        "AZUREML_PROFILER_RUN_DURATION_MILLISECOND": "500",

        "AZUREML_COMMON_RUNTIME_USE_APPINSIGHTS_CAPABILITY": "true",

    }

}

# the following is for training

train_job = ml_client.jobs.create_or_update(job)

train_job

# the following registers the model which is required to deploy it to an endpoint.

model_output_path = f"azureml://jobs/{train_job.name}/outputs/mlflow_model_folder"

run_model = Model(

    path=model_output_path,

    name="grpo-reasoning-model",

    description=f"Model created from run {train_job.name}.",

    type=AssetTypes.MLFLOW_MODEL

)

ft_model = ml_client.models.create_or_update(run_model)

deployment = ManagedOnlineDeployment(

    name="grpo-rft-model-deployment",

    endpoint_name=online_endpoint_name,

    model=ft_model,

    instance_type="Standard_ND96amsr_A100_v4",

    instance_count=1

)

ml_client.begin_create_or_update(deployment)


Thursday, May 22, 2025

 A previous post talked about writing SQL queries to create embedding and perform vector search over the shredded description in JSON format from drone image analysis output along with associated vectors and then using the built-in operators to query the objects associated with the vectors. This article talks about creating an agent in Azure AI search with consolidated vector search from local vectors and those in the SQL database and where the agent acts as a wrapper for an LLM deployed to Azure Open AI. The LLM is used to send queries to an agentic retrieval pipeline.

from azure.search.documents.indexes import SearchIndexClient

from azure.search.documents.indexes.models import (

    KnowledgeAgent,

    KnowledgeAgentAzureOpenAIModel,

    KnowledgeAgentRequestLimits,

    KnowledgeAgentTargetIndex

)

agent=KnowledgeAgent(

    name=agent_name,

    target_indexes=[

        KnowledgeAgentTargetIndex(

            index_name=index_name, default_include_reference_source_data=True,

default_reranker_threshold=2.5

        )

    ],

    models=[

        KnowledgeAgentAzureOpenAIModel(

            azure_open_ai_parameters=AzureOpenAIVectorizerParameters(

                resource_url=azure_openai_endpoint,

                deployment_name=azure_openai_gpt_deployment,

                model_name=azure_openai_gpt_model,

            )

        )

    ],

    request_limits=KnowledgeAgentRequestLimits(

        max_output_size=agent_max_output_tokens

    )

)

index_client = SearchIndexClient(endpoint=endpoint, credential=credential)

index_client.create_or_update_agent(agent)

And with constants such as

AZURE_OPENAI_ENDPOINT=https://<openai-resource-name>.openai.azure.com

AZURE_OPENAI_GPT_DEPLOYMENT=gpt-4o-mini

AZURE_SEARCH_ENDPOINT=https://<search-resource-name>.search.windows.net

AZURE_SEARCH_INDEX_NAME=agentic-retrieval-drone-images

And its usage as follows:

from azure.search.documents.agent import KnowledgeAgentRetrievalClient

from azure.search.documents.agent.models import KnowledgeAgentRetrievalClient, KnowledgeAgentMessage

agent_client = KnowledgeAgentRetrievalClient(

  endpoint=AZURE_SEARCH_ENDPOINT, agent_name=AZURE_SEARCH_AGENT, credential=azure_credential

 )

messages.append({

  “role”: “user”,

  “content”:

“““

How do the landmarks detailed in the object detection output compare in proximity to those found near high population density?

”””

})

retrieval_result = agent_client.knowledge_retrieval.retrieve(

   messages[KnowledgeAgentMessage(

  role=msgp[“role”],

            content=[KnowledgeAgentMessageTextContent(text=msg[“content”])])

        for msg in messages if msg[“role”] != “system”],

   Target_index_params=[KnowedgeAgentIndexParams(index_name=index_name, reranker_threshold=3, include_reference_source_data=True)],

   )

)

messages.append({

   “role”: “assistant”,

   “content”: retrieval_result.response[0].content[0].text

})


Wednesday, May 21, 2025

 A previous article1 talked about vectorizing and analyzing drone images to populate a vector index with id, description and vector schema where the description was a JSONobject complete with the results of the analysis of object detection. This article customizes the search to answer questions posed by drone sensing applications by leveraging a SQL Server database to store the shredded description JSON with associated vectors and then using the built-in operators to query the objects associated with the vectors.

DROP TABLE IF EXISTS Production.ImageDescriptionEmbeddings;

GO

CREATE TABLE Production.ImageDescriptionEmbeddings

(

  ImageDescEmbeddingID INT IDENTITY NOT NULL PRIMARY KEY CLUSTERED,

  ImageID INT NOT NULL,

  ImageDescriptionID INT NOT NULL,

  ImageModelID INT NOT NULL,

  CultureID nchar(6) NOT NULL,

  Embedding vector(1024)

);

-- Populate the table with the shredded json, written as a query but can also be used with a script and python odbc to insert as shown in references:

INSERT INTO Production.ImageDescriptionEmbeddings

SELECT p.ImageID, img.ImageDescriptionID, img.ImageModelID, img.CultureID,

AI_GENERATE_EMBEDDINGS(id.Description MODEL MyOllamaEmbeddingModel)

FROM Production.ImageModelImageDescriptionCulture img

JOIN Production Image p

ON img.ImageModelID = p.ImageModelID

JOIN Production.ImageDescription id

ON id.ImageDescriptionID = img.ImageDescriptionID

ORDER BY p.ImageID;

GO

CREATE UNIQUE NONCLUSTERED INDEX [IX_ImageDescriptionEmbeddings_AlternateKey]

ON [Production].[ImageDescriptionEmbeddings]

# and this is how we vector_search

EXEC find_relevant_objects_vector_search

@prompt = N'Show me buildings with outdoor installations and fixtures',

@stock = 100,

@top = 20;

GO

CREATE DATABASE SCOPED CREDENTIAL [https://droneimagesopenai.openai.azure.com]

WITH IDENTITY = 'Managed Identity', SECRET - '{"resourceid": "https://cognitiveservices.azure.com"}';

GO

DROP EXTERNAL MODEL MyOpenAICompactEmbeddingModel

WITH (

     LOCATION = 'https://droneimages.centralus.cloudapp.azure.om/v1/embeddings',

     API_FORMAT = 'OpenAI',

     MODEL_TYPE = EMBEDDINGS,

     MODEL = 'nvidia/nv-embedqa-e5-v5-query',

     CREDENTIAL = [https://droneimabes.centralus.cloudapp.azure.com]

);

GO

Reference:

1. Image processing: https://1drv.ms/w/c/d609fb70e39b65c8/EWWmJNjERq1GqNkvyTYN1XkB8KxwYX4TpqI8IDbPZY59Ag?e=y4Odxs

2. previous articles: https://1drv.ms/w/c/d609fb70e39b65c8/EZQisMsUJqRHglAF6Mb2S7UB9qaUe2zd8uAs6qygmCXPYw?e=AYWB9r

3. Script to populate the image description embeddings table:

import requests

# Azure AI Search configuration

SEARCH_ENDPOINT = "https://<your-search-service>.search.windows.net"

SEARCH_API_KEY = "<your-api-key>"

INDEX_NAME = "index00"

# API headers

headers = {

    "Content-Type": "application/json",

    "api-key": SEARCH_API_KEY

}

# Query to fetch all entries and show only 'description'

query_url = f"{SEARCH_ENDPOINT}/indexes/{INDEX_NAME}/docs?api-version=2023-07-01&search=*"

response = requests.get(query_url, headers=headers)

# Process response

if response.status_code == 200:

    results = response.json()

    descriptions = [doc["description"] for doc in results.get("value", [])]

    return [desc for desc in descriptions]

else:

    print(f"Error: {response.status_code}, {response.text}")

    return []

4. Script to populate the table:

import pyodbc

server = 'tcp:<your_server.database.windows.net>'

database = 'DroneImagesDB'

username = '<your_username>'

password = '<your_password>'

port = '<your_port_number>'

cnxn = pyodbc.connect('DRIVER={ODBC Driver 18 for SQL Server};SERVER='+server+','+port+';DATABASE='+database+';ENCRYPT=yes;TrustServerCertificate=yes;UID='+username+';PWD='+ password)

cursor = cnxn.cursor()

print ('Inserting a new row into table')

#Insert Query

tsql = "INSERT INTO Production.ImageDescriptionEmbeddings (ImageID, ImageDescriptionID, ImageModelID, CultureID, Embedding) VALUES (?,?,?,?,?);"

with cursor.execute(id,description, generate_sha256_digest(description),'United States', vector):

    print ('Successfully Inserted!')


Tuesday, May 20, 2025

The following code illustrates object detection and image search:
#! /usr/bin/python

#from azure.ai.vision import VisionClient
from azure.core.credentials import AzureKeyCredential
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.exceptions import HttpResponseError
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.ai.vision.imageanalysis import ImageAnalysisClient
from azure.ai.vision.imageanalysis.models import VisualFeatures, ImageAnalysisResult
from tenacity import retry, stop_after_attempt, wait_fixed
from pprint import pprint, pformat
from dotenv import load_dotenv  
import json  
import requests
import http.client, urllib.parse
import os

load_dotenv()  
search_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")  
index_name = os.getenv("AZURE_SEARCH_INDEX_NAME")
search_api_version = os.getenv("AZURE_SEARCH_API_VERSION")
search_api_key = os.getenv("AZURE_SEARCH_ADMIN_KEY")  
vision_api_key = os.getenv("AZURE_AI_VISION_API_KEY")
vision_api_version = os.getenv("AZURE_AI_VISION_API_VERSION")
vision_region = os.getenv("AZURE_AI_VISION_REGION")
vision_endpoint = os.getenv("AZURE_AI_VISION_ENDPOINT")
credential = DefaultAzureCredential()
#search_credential = AzureKeyCredential(search_api_key)
vision_credential = AzureKeyCredential(vision_api_key)

# Initialize Azure clients
#vision_client = VisionClient(endpoint=vision_endpoint, credential=AzureKeyCredential(vision_api_key))
search_client = SearchClient(endpoint=search_endpoint, index_name=index_name, credential=AzureKeyCredential(search_api_key))
analysis_client = ImageAnalysisClient(vision_endpoint, vision_credential)

# Define SAS URL template
sas_template = os.getenv("AZURE_SA_CONTAINER_SASURL")

# Process images in batches of 10
batch_size = 10
initial_start = 2040
total_images = 17853 # Adjust this as needed


@retry(stop=stop_after_attempt(5), wait=wait_fixed(60))
def vectorize_image(image_path, key, region):
    try:
        # API version and model version
        api_version = "2024-02-01"
        model_version = "2023-04-15"

        # Construct the request URL
        url = f"{vision_endpoint}/computervision/retrieval:vectorizeImage?api-version={api_version}&model-version={model_version}"

        # Set headers
        headers = {
            "Content-Type": "application/json",
            "Ocp-Apim-Subscription-Key": key
        }

        # Set the payload with the SAS URL
        payload = {
            "url": image_path
        }

        # Make the POST request
        response = requests.post(url, headers=headers, json=payload)

        # Check the response
        if response.status_code == 200:
            result = response.json()
            # The vector is in the 'vector' field of the response
            vector = result.get("vector")
            # print("Vector embedding:", vector)
            return vector
        else:
            print("Error:", response.status_code, response.text)
            vector = [0.0] * 1024
            raise Exception(f"Error vectorizing image {image_path[74:80]}")

    except (requests.exceptions.Timeout, http.client.HTTPException) as e:
        print(f"Timeout/Error for {image_path[74:80]}. Retrying...")
        raise

@retry(stop=stop_after_attempt(5), wait=wait_fixed(60))
def analyze_image(client, image_url):
    try:
        # Define all available visual features for analysis
        features = [
            VisualFeatures.CAPTION,
            VisualFeatures.TAGS,
            VisualFeatures.OBJECTS,
            VisualFeatures.READ,
            VisualFeatures.SMART_CROPS,
            VisualFeatures.DENSE_CAPTIONS,
            VisualFeatures.PEOPLE
        ]
        
        # Analyze the image from the SAS URL
        result = client.analyze_from_url(
            image_url=image_url,
            visual_features=features,
            gender_neutral_caption=True )
        # Explicitly cast to ImageAnalysisResult (for clarity)
        result: ImageAnalysisResult = result
        if result is not None:
            captions = []
            captions += [ f"{result.caption.text}" if result.caption is not None else "No Caption"]
            captions += [ f"{caption.text}" for caption in result.dense_captions.list if result.dense_captions is not None]
            # Enhance result
            result.description = ",".join(captions)
            # print("Full ImageAnalysisResult object:")
            # pprint(result.__dict__, depth=4, compact=False)
            description = pformat(result.__dict__, depth=4, compact=False)
            return description
    except HttpResponseError as e:
        print(str(e))
        raise
    return "No description"


for batch_start in range(initial_start, total_images + 1, batch_size):
    documents = []

    # Vectorize 100 images at a time
    batch_end = min(batch_start + batch_size, total_images + 1)
    for i in range(batch_start, batch_end):
        file_name = f"{i:06}"
        blob_url = sas_template.format(file=file_name)

        try:
            vector = vectorize_image(blob_url, vision_api_key, "eastus")
            if vector is not None:
               description = analyze_image(analysis_client, blob_url)
               documents += [
                  {"id": file_name, "description": description, "image_vector": vector}
               ]
        except Exception as e:
            print(f"Error processing {file_name}.jpg: {e}")

    # print(f"Vectorization complete for images {batch_start} to {min(batch_start + batch_size - 1, total_images)}")
    # Upload batch to Azure AI Search
    if len(documents) > 0:
        # [pprint(document, depth=4, compact=False) for document in documents]
        try:
            search_client.upload_documents(documents=documents)
            print(f"Uploaded {len(documents)} images {batch_start} to {batch_end} to {index_name}.")
        except Exception as e:
            print(f"Error {e} uploading {len(documents)} images {batch_start} to {batch_end} to {index_name}.")

print(f"Vectorized images successfully added to {index_name}!") 

Monday, May 19, 2025

 This is a summary of the book titled “How I built this” written by Guy Raz and published by Pan Books in 2020. It dives into the stories of successful entrepreneurs to show readers how to navigate and overcome the challenges of starting your own business. It draws from the experiences of the interviewees of the author as a podcast host. His compilations: Start with an idea that resonates with your personal passion and solves real problems. What’s dangerous is not scary and vice versa. Solid research must be paired with creativity and vision. Getting funding from professional investors must be last step in the long-drawn process of fundraising. Consider competition and location to successfully launch a new business. Get others to talk about your product. Distance yourself from difficult situations to get a better perspective. If your business cannot survive without you long-term, you have failed. Partnership is much like marriage. You must work on it. Selling your business is not about losing control but gaining contentment.

A great entrepreneurial idea should built on personal passion and solve real problems. Entrepreneurs need an open mind to see opportunities and must weigh the risks and viability of their business idea. Starting a business can be dangerous, but it can be less so if you have a backup plan. Many successful entrepreneurs didn't immediately quit their day jobs to start their ventures, as having a skill to fall back on can make the first step easier.

Pairing solid research with creativity and vision allows entrepreneurs to find innovative solutions. However, they must research their markets and be wise about what data to consider. For example, Jen Rubio and Steph Korey, who had no prior affiliation with luggage manufacturing, found that understanding people's packing habits and asking open-ended questions about travel scenarios inspired their luggage brand, Away.

Partnership is crucial for successful business ventures, as it can balance weaknesses, add a different perspective, and offer support when things get difficult. For example, management consultant Jim Koch started Samuel Adams Boston Lager with his assistant Rhonda Kallman, who had mastery of practical skills he lacked.

Getting funding from professional investors is often the last step in the fundraising process for start-ups. Entrepreneurs often seek resources from family, friends, and acquaintances to scale up their business. A good story that explains why people should care about your product, sets you apart from competitors, and explains your solution to a problem people may not realize they have is essential.

Competition and location are crucial for successful business launch. Bigger players in the market can put up obstacles to new competitors, so it's easier to enter through a "side door" and establish your product before they notice you. Location can also have a significant effect on how your business develops.

Creating buzz around your product, even pre-launch, can be more effective than talking about it yourself. Instagram founders Kevin Systrom and Mike Krieger invited journalists and designers to use their app before it launched, knowing that they cared about good images.

Securing funding from professional investors, attracting support from family and friends, and considering competition and location are essential for a successful business.

Business founders often face obstacles that seem insurmountable, such as legal battles or product disputes. Jeni Britton Bauer, founder of Jeni's Splendid Ice Creams, faced a crisis when listeria was found in her ice cream, but chose to face it with honesty and transparency. She and her employees fixed the problem, explaining the process and hiring a quality control leader. This approach helped customers stick with Jeni's Splendid Ice Creams and left encouraging notes on the closed shops.

Leadership professor Ronald Heifetz suggests that entrepreneurs should step away from the day-to-day of running their business and onto the balcony to discern patterns within the difficult situation. If a business cannot survive without the founder, it has failed. Entrepreneurs should learn to entrust their business to the people they work with, sharing their purpose and values with their staff. A mission-focused business culture will attract employees who understand the company's goals and set the tone for the future.

Collaboration with a partner is essential for creating a successful business, but it can also be a stumbling block. Unresolved issues and conflicts can lead to internal dysfunction and suboptimal business performance. Managing professional differences and clashing roles can be challenging, especially in friendships. Considering your business as a child and yourself as a parent can help manage interpersonal work. Selling your business can be a difficult decision, but it can also lead to contentment and a better understanding of the company and its employees.


Sunday, May 18, 2025

 Steps to setup airflow with SSO on Azure Kubernetes Service with Active Directory identity and Kubernetes RBAC.

1. Login to the AKS:

az login

az account set --subscription <subscription-id>

az aks get-credentials --resource-group <resource-group> --name <cluster-name>

kubelogin convert-kubeconfig -l azurecli

2. Install nginx-ingress-controller

nginx-ingress-controller:

helm repo add bitnami https://charts.bitnami.com/bitnami

helm repo update

helm install my-ingress oci://registry-1.docker.io/bitnamicharts/nginx-ingress-controller --namespace ingress-nginx --create-namespace

3. Verify installation: check that public ip is provided to the type: LoadBalancer as highlighted below:

kubectl get services --all-namespaces

NAMESPACE NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE

default kubernetes ClusterIP 10.0.0.1 <none> 443/TCP 3d19h

ingress-nginx my-ingress-nginx-ingress-controller LoadBalancer 10.0.45.167 52.230.235.159 80:32303/TCP,443:30457/TCP 2d19h

ingress-nginx my-ingress-nginx-ingress-controller-default-backend ClusterIP 10.0.17.7 <none> 80/TCP 2d19h

kube-system ama-metrics-ksm ClusterIP 10.0.212.159 <none> 8080/TCP 3d19h

kube-system ama-metrics-operator-targets ClusterIP 10.0.216.158 <none> 80/TCP 3d19h

kube-system azure-wi-webhook-webhook-service ClusterIP 10.0.23.89 <none> 443/TCP 3d19h

kube-system kube-dns ClusterIP 10.0.0.10 <none> 53/UDP,53/TCP 3d19h

kube-system metrics-server ClusterIP 10.0.167.74 <none> 443/TCP 3d19h

kube-system network-observability ClusterIP 10.0.63.23 <none> 10093/TCP 3d19h

4. Install airflow:

helm repo add apache-airflow https://airflow.apache.org

helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace

5. Verify airflow: check airflow-webserver service has an ip address

kubectl get pods -n airflow

kubectl get services --all-namespaces

NAMESPACE NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE

airflow airflow-postgresql ClusterIP 10.0.190.39 <none> 5432/TCP 2d19h

airflow airflow-postgresql-hl ClusterIP None <none> 5432/TCP 2d19h

airflow airflow-redis ClusterIP 10.0.198.77 <none> 6379/TCP 2d19h

airflow airflow-statsd ClusterIP 10.0.235.127 <none> 9125/UDP,9102/TCP 2d19h

airflow airflow-triggerer ClusterIP None <none> 8794/TCP 2d19h

airflow airflow-webserver ClusterIP 10.0.174.37 <none> 8080/TCP 2d19h

airflow airflow-worker ClusterIP None <none> 8793/TCP 2d19h

6. Install nginx:

kubectl create –f ingress.yaml

Ingress.yaml:

7. Verify nginx:

8. Setup SSO:

9. Create a Service account:

kubectl annotate serviceaccount airflow

  azure.workload.identity/client-id=<CLIENT_ID>

  azure.workload.identity/tenant-id=<TENANT_ID>

  -n airflow

10. Annotate it:

kubectl annotate serviceaccount airflow \

  azure.workload.identity/client-id=<client-id> \

  azure.workload.identity/tenant-id=<tenant-id> \

  -n airflow

11. Modify the enterprise application behind the client_id with the following:

a. Set up redirect_uri with the following:

i. https://<public-ip>/oauth_provider/azure

ii. http://localhost:8080/oauth_provider/azure

iii. https://52.230.235.159/auth-response

iv. https://localhost:8080/auth-response

v. https://<public-ip>/oauth-authorized/azure

vi. http://localhost:8080/oauth-authorized/azure

vii. https://<public-ip>/login/callback

viii. http://localhost:8080/login/callback

ix. And auth for api with https://<public-ip>/api/auth/callback

x. https://<public-ip>/api/auth/callback

b. Setup token configuration for

i. id to include email,family_name, given_name, preferred_username, upn

ii. Group claim to set SAMAccountName for id and access tokens

c. Api permissions for Microsoft.Graph email and user.read

d. Setup app roles for airflow_nonprod_admin, airflow_nonprod_dev/op, airflow_nonprod_viewer

12. Apply the environment variables to the deployment with

kubectl set env deployment/airflow-webserver AAD_TENANT_ID=<your-tenant-id> -n airflow

kubectl set env deployment/airflow-webserver AAD_CLIENT_ID=<your-client-id> -n airflow

kubectl set env deployment/airflow-webserver AAD_CLIENT_SECRET=<your-client-secret> -n airflow

kubectl set env deployment/airflow-webserver OAUTH_PROVIDERS="[{\n'name':'azure',\n'token_key':'access_token',\n'icon':'fa-windows',\n'remote_app': {\n'api_base_url': 'https://login.microsoftonline.com/{}'.format(os.getenv('AAD_TENANT_ID')),\n'request_token_url': None,\n'request_token_params': {\n'scope': 'openid email profile'\n},\n'access_token_url': 'https://login.microsoftonline.com/{}/oauth2/v2.0/token'.format(os.getenv('AAD_TENANT_ID')),\n'access_token_params': {\n'scope': 'openid email profile'\n},\n'authorize_url': 'https://login.microsoftonline.com/{}/oauth2/v2.0/authorize'.format(os.getenv('AAD_TENANT_ID')),\n'authorize_params': {\n'scope': 'openid email profile'\n},\n'client_id': os.getenv('AAD_CLIENT_ID'),\n'client_secret': os.getenv('AAD_CLIENT_SECRET'),\n'jwks_uri': 'https://login.microsoftonline.com/common/discovery/v2.0/keys',\n'redirect_uri': 'https://52.230.235.159/oauth-authorized/azure'\n" -n airflow

13. Modify the webserver_config.py by uncommenting the sections for AUTH_TYPE and OAUTH_PROVIDER or uploading the attached webserver_config.py

14. Restart and test the airflow user interface

kubectl rollout restart deployment airflow-webserver -n airflow


Saturday, May 17, 2025

 Problem: determine if a graph has cycles:

import java.util.*;

import java.lang.*;

import java.io.*;

class Ideone

{

 public static void main (String[] args) throws java.lang.Exception

 {

  int[][] m = new int[5][5]();

  for (int i = 0; i < m.length; i++) {

   for (int j = 0; j < m[0].length; j++) {

    m[i][j] = 0;

   }

  }

  m[0][1] = 1;

  m[0][2] = 1;

  m[1][0] = 1;

  m[1][3] = 1;

  m[2][0] = 1;

  m[2][3] = 1;

  m[3][1] = 1;

  m[3][2] = 1;

  m[3][4] = 1;

  m[4][3] = 1;

  var vertices = InitializeSingleSource(m, 0);

  var edges = new HashMap<String, String>();

  edges.put("A", "B");

  edges.put("A", "C");

  edges.put("B", "A");

  edges.put("B", "D");

  edges.put("C", "A");

  edges.put("C", "D");

  edges.put("D", "B");

  edges.put("D", "C");

  edges.put("D", "E");

  edges.put("E", "D");

  System.out.println(hasCyclesByBellmanFord(vertices, edges));

 }

 private static List<Vertex> InitializeSingleSource(int[][] m, int start) {

  var vertices = new ArrayList<Vertex>();

  for (int i = 0; i < m.length; i++){

   var v = new Vertex();

   v.id = String.valueOf(Character.valueOf('A' + i));

   v.d = Integer.MAX_VALUE;

   if (i == start) { v.d = 0; }

   v.parent = null;

  }

  return vertices

 }

 private static Vertex getVertex(List<Vertex> vertices, String id){

  for (int i = 0; i < vertices.size(); i++){

   if (vertices.get(i).id.equals(id)){

    return vertices.get(i);

   }

  }

  return null;

 }

 // A ->C <-D ->E

 // ->B->

 private static boolean hasCyclesByBellmanFord(List<Vertex> vertices, Map<String, String> edgeMap) {

  boolean result = false;

  for (int i = 0; i < vertices.length; i++){

   for(var entry: edgeMap.entrySet()) {

    var u = getVertex(entry.getKey());

    var v = getVertex(entry.getValue());

    relax(u, v);

   }

  }

  for (var entry: edgeMap.entrySet()) {

   var u = getVertex(entry.getKey());

   var v = getVertex(entry.getValue());

   if (u != null &&

    v != null &&

    v.d > u.d + 1) {

    result = true;

    return result;

   }

  }

  return result;

 }

 private static void Relax(Vertex u, Vertex v) {

  if (u == null || v == null) { return; }

  if (v.d > u.d + 1) {

   v.d = u.d + 1;

   v.parent = u;

  }

 }

 class Vertex {

  public String id;

  public Vertex parent;

  public Integer d;

 }

}