Thursday, July 31, 2025

 The previous posts explained how to leverage scale resolution of known vehicles to compute distances between landmarks given by their bounding boxes in aerial drone images but that is not the only way to calculate distance. With the information that aerial drone images are zoom in images of well-known mapping services images of known cities and urban environments, specifically in North America, the images can be analyzed with automation for determining culture, season, economic and regional styles to associate with a latitude and longitude. Even if it is not an exact match, an approximation to the city in which the aerial image was shot by the drone can help in narrowing down the suburb in which drone was moving. For example, the following a code enables a machine to understand which city the drone was flying over when a frame captured from its video is analyzed.

from geospyer import GeoSpy

import os

gemini_api_key = os.getenv("GEMINI_API_KEY").strip('"')

def get_nearest_latitude_longitude(image_path="frame23.jpg"):

    # Initialize GeoSpy with your Gemini API key

    geospy = GeoSpy(api_key=gemini_api_key)

    # Analyze the image

    result = geospy.locate(image_path=image_path)

    # Check for errors

    if "error" in result:

        print(f"Error: {result['error']}")

    else:

        # Extract location info

        if "locations" in result and result["locations"]:

            location = result["locations"][0]

            lat = location["coordinates"]["latitude"]

            lon = location["coordinates"]["longitude"]

            print(f"Estimated Coordinates: Latitude = {lat}, Longitude = {lon}")

            # Optional: Open in Google Maps

            # import webbrowser

            maps_url = f"https://www.google.com/maps?q={lat},{lon}"

            print(maps_url)

            #webbrowser.open(maps_url)

            return lat, lon

        else:

            print("No location data found.")

            return None, None

print(get_nearest_latitude_longitude())

 # output:

 # Estimated Coordinates: Latitude = 42.3736, Longitude = -71.1097

 # https://www.google.com/maps?q=42.3736,-71.1097

 # (42.3736, -71.1097)

And as with earlier capabilities, such modular functions can be easily included in the list of function tools to augment agentic retrieval on the analysis side of the aerial drone image processing pipeline.


Wednesday, July 30, 2025

 The following code sample explains how to leverage scale resolution to compute distances between objects given by their bounding boxes:

import numpy as np

def compute_pixel_distance(box1, box2):

    """Compute Euclidean distance between the centers of two bounding boxes"""

    x1_center = (box1[0] + box1[2]) / 2

    y1_center = (box1[1] + box1[3]) / 2

    x2_center = (box2[0] + box2[2]) / 2

    y2_center = (box2[1] + box2[3]) / 2

    return np.sqrt((x2_center - x1_center)**2 + (y2_center - y1_center)**2)

def estimate_scale(reference_box, vehicle_type):

    """Estimate feet per pixel using a reference vehicle box"""

    vehicle_lengths = {

        'motorcycle': 5,

        'car': 20,

        'truck': 40

    }

    pixel_length = reference_box[2] - reference_box[0] # width in pixels

    return vehicle_lengths[vehicle_type.lower()] / pixel_length

def estimate_actual_distance(landmark_box1, landmark_box2, reference_box, vehicle_type):

    pixel_dist = compute_pixel_distance(landmark_box1, landmark_box2)

    scale = estimate_scale(reference_box, vehicle_type)

    return pixel_dist * scale

# Example inputs:

landmark_box1 = (100, 200, 180, 280)

landmark_box2 = (400, 450, 480, 530)

reference_vehicle_box = (300, 300, 340, 340) # e.g., a car seen from side view

vehicle_type = 'car'

actual_distance_feet = estimate_actual_distance(landmark_box1, landmark_box2, reference_vehicle_box, vehicle_type)

print(f"Estimated actual distance between landmarks: {actual_distance_feet:.2f} feet")

This is a continuation of previous article on agentic retrieval on the analysis side of the aerial drone image processing pipeline using modular functions for specific insights into a scene.


Tuesday, July 29, 2025

 Location of objects detected in Aerial Drone Images.

Determining distance and location of objects found via video/image insights neither requires continuous processing of every image in the feed nor does it require the calculation of camera angles, orientation and direction of drones. Most urban and populated areas have a common set of well-known categories of vehicles such as sedans and trucks and their scale in an image can be used to calculate relative distances between points of interest such as landmarks. Therefore, processing it with the video processing pipeline is not needed and can be deferred until analysis and agentic retrieval of a query response. The dones reference with regard to the image can be assumed to be constant for all images because the camera usually does not move and even if it did, only the perpendicular to the earth passing through the drone is needed as a reference point in the image. As the objects and video/image insights are populated in the DroneWorld catalog, the distance information between selected pairs of objects, specifically important structures or landmarks can be determined. Wrapping the distance calculation in a function to determine the distance from an image that has both the objects the same helps for any such pairs of objects.

import torch

model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)

def read_image_from_blob(sas_url):

    """Reads an image from Azure Blob Storage using its SAS URL."""

    response = requests.get(sas_url)

    if response.status_code == 200:

        image_array = np.asarray(bytearray(response.content), dtype=np.uint8)

        image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)

        return image

    else:

        # raise Exception(f"Failed to fetch image. Status code: {response.status_code}")

        return None

def detect_vehicles(frame):

    results = model(frame)

    # Keep only 'car', 'truck', 'bus', 'motorcycle' detections

    vehicle_labels = ['car', 'truck', 'bus', 'motorcycle']

    detections = results.pandas().xyxy[0]

    vehicles = detections[detections['name'].isin(vehicle_labels)]

    return vehicles

def get_image_output_url(scene_uri):

    # Parse the original video URL to get account, container, and path

    parsed = urlparse(scene_uri)

    path_parts = parsed.path.split('/')

    container = path_parts[1]

    blob_name = path_parts[-1].split('.')[0]

    blob_path = '/'.join(path_parts[2:])

    # Remove the file name from the blob path

    blob_dir = '/'.join(blob_path.split('/')[:-1])

    if blob_dir == "" or blob_dir == None:

        blob_dir = "output"

    # Create image path

    image_path = f"{blob_dir}/analyzed/{blob_name}withvehicles.jpg"

    # Rebuild the base URL (without SAS token)

    base_url = f"{parsed.scheme}://{parsed.netloc}/{container}/{image_path}"

    # Add the SAS token if present

    sas_token = parsed.query

    if sas_token:

        image_url = f"{base_url}?{sas_token}"

    else:

        image_url = base_url

    return image_url

def detect_vehicles_and get_url(scene_uri: Optional[str] = None) -> str:

    if not scene_uri:

        return None

    frame = read_image_from_blob(scene_uri)

    if not frame:

        return None

    vehicles = detect_vehicles(frame)

    print(vehicles)

    for _, v in vehicles.iterrows():

        cv2.rectangle(frame, (x, y), (x + w, y + h), (255,0,0), 2)

    _, buffer = cv2.imencode('.jpg', frame)

    image_bytes = buffer.tobytes()

    image_uri = get_image_output_url(scene_uri)

    image_blob_client = BlobClient.from_blob_url(image_url)

    image_blob_client.upload_blob(image_bytes, overwrite=True)

    return image_uri

import numpy as np

# Average lengths in feet for each vehicle class

VEHICLE_LENGTHS = {

    'motorcycle': 5,

    'car': 20,

    'truck': 30,

    'bus': 60,

}

def calculate_pixel_per_foot(vehicles):

    """

    Estimate average pixel-per-foot using the detected vehicle bounding boxes and known mean real-world lengths.

    Returns average pixel-per-foot scale.

    """

    pixels_per_foot = []

    for _, v in vehicles.iterrows():

        label = v['name']

        box_length_pixels = abs(v['xmax'] - v['xmin'])

        real_length_feet = VEHICLE_LENGTHS.get(label)

        if box_length_pixels > 0 and real_length_feet:

            # Estimate scale for this vehicle

            pixels_per_foot.append(box_length_pixels / real_length_feet)

    if pixels_per_foot:

        return np.mean(pixels_per_foot)

    else:

        raise ValueError("No vehicles with known real size detected.")

def get_vehicle_center(vehicle_row):

    """Returns (x_center, y_center) of a bounding box."""

    x_center = (vehicle_row['xmin'] + vehicle_row['xmax']) / 2

    y_center = (vehicle_row['ymin'] + vehicle_row['ymax']) / 2

    return np.array([x_center, y_center])

def calculate_vehicle_distance(vehicles, idx1, idx2):

    """

    Calculate real-world distance between two vehicles given their indices in the vehicles DataFrame.

    """

    # Estimate scale: pixels per 1 foot

    ppf = calculate_pixel_per_foot(vehicles)

    # Get positions

    center1 = get_vehicle_center(vehicles.iloc[idx1])

    center2 = get_vehicle_center(vehicles.iloc[idx2])

    # Pixel distance

    pixel_dist = np.linalg.norm(center1 - center2)

    # Convert to feet

    dist_feet = pixel_dist / ppf

    return dist_feet

def calculate_span(frame, first, last):

    vehicles = detect_vehicles(frame)

    distance_feet = calculate_vehicle_distance(vehicles, first, last)

    return distance_feet

image_user_functions: Set[Callable[..., Any]] = {

    calculate_span

}


Monday, July 28, 2025

 This is a summary of the book “The Art of the Interesting: What we miss in our pursuit of the good life and how to cultivate it” written by Lorraine Besser and published by Balance in 2024. The author is a philosophy professor who suggests that people value meaningfulness and happiness in their lives but seem to miss something. That missing piece is the “interesting” which she corroborates with research, theories and stories. She contends that by being curious and embracing new ideas, we can make even the most mundane moments into vibrant experiences and lead to a richer life. Engaging in challenges and curiosities creates psychological richness. You might stop planning and start exploring even in your daily life. Pushing yourself beyond your comfort zone can be rewarding if you reframe challenges as adventures. Developing strong connections with others can enhance your life. 

Her book explores this premise by weaving together philosophical insights, psychological research, TV scenes, personal anecdotes, and historical stories. Besser argues that the pursuit of the "interesting" holds a unique promise—it doesn't rely on goals or rewards, but on the way our mind responds to challenge, complexity, and curiosity. This psychological richness is the third dimension of what she believes constitutes the “good life,” sitting alongside happiness and meaning like the overlooked triplet finally getting a voice. 

Early on, Besser presents a scene from Somebody Somewhere, where characters Sam and Joel embody this philosophical struggle. Sam indulges in pleasure through wine-fueled weekends, Joel commits to meaningful activities like volunteering, yet both reveal quiet discontent. Their stories become metaphors for how people often assume that either joy or purpose will suffice, only to realize that something essential is missing. That missing piece is not another achievement—it’s the unexpected conversation, the quirky moment, or the challenging book that breaks the monotony and adds texture to life. 

To showcase this idea in action, Besser turns to the story of Neal Cassady, the charismatic wanderer of Denver’s gritty 1930s. Raised amid poverty and chaos, Cassady didn’t search for stability—he embraced the unpredictability and intensity of his environment. His larger-than-life spirit attracted Jack Kerouac, who immortalized him in On the Road and sparked the Beat Generation. Cassady’s life wasn’t conventionally happy or meaningful, but it was undeniably interesting. Besser uses his story to illustrate how the cultivation of the interesting can ripple outward, transforming not just personal experience but even cultural movements. 

Throughout the book, Besser urges readers to abandon rigid plans and replace “pursuit mode” with exploration. She draws from mythology, particularly the tale of Sisyphus, to challenge our instinct to avoid discomfort. Sisyphus, condemned to push a boulder uphill for eternity, typically symbolizes futility. But Besser reframes this: Sisyphus might find adventure, depth, and internal growth in his endless task if he chooses to engage with it emotionally and mentally. That’s the essence of living interestingly—not by escaping difficulty, but by meeting it with open curiosity. 

The key, Besser says, is to tweak how we experience daily life. Novelty doesn’t require dramatic changes—it might emerge from choosing chopsticks over a spoon to eat dessert, or noticing a shift in light patterns during your commute. These small moments of surprise activate our mind and emotions, creating psychological stimulation that adds flavor to otherwise routine days. And it’s not the scale of novelty that matters—it’s our openness to see it. 

She also emphasizes relationships as gateways to interesting experiences. Besser reflects on her own life, particularly her second marriage, where her husband’s spontaneous way of living taught her how to loosen her grip on expectations. By welcoming uncertainty and living more vibrantly, she discovered unexpected richness. Friends and family, she notes, don’t just add emotional support—they bring perspectives, laughter, and new lenses that reshape how we experience reality. In her words, “Relationships bring together people with different experiences, different responsiveness, and different interests.” And that convergence often makes life more dynamic. 

The book also explores how to develop this mindset intentionally. Besser recommends being mindful, asking “why” more often, and letting go of the need to know everything. She advises cultivating curiosity in mundane routines—like turning a walk into a chance to spot something new, or letting a conversation meander instead of steering it to conclusions. This kind of openness allows creativity and introspection to flourish, encouraging a deeper engagement with even the smallest details of life. 

Importantly, Besser distinguishes the “zone of the interesting” from both comfort and danger zones. It’s a sweet spot: where novelty and challenge exist without tipping into anxiety. Exploring this zone means gently pushing your boundaries—whether it’s trying a new hobby, facing a complex project, or entering an unfamiliar social setting. By staying tuned in to emotional responses, you can learn to expand your comfort zone while avoiding burnout. 

Her advice isn’t a prescription. It’s more like an invitation to build a life that feels fuller and more meaningful by embracing what captivates and stimulates your mind. She believes that we all have the power to create interesting experiences—not by chasing them, but by choosing to see with curiosity, to engage with complexity, and to welcome the unexpected. 

In the end, The Art of the Interesting reads like a manifesto for wonder. Besser’s message is clear: you don’t have to change your job, move countries, or become someone else to live more fully. You simply have to shift your lens and let the rich tapestry of life unfold in all its messy, fascinating glory. Because ultimately, no one can take away your power to make life interesting—it's yours, always. 

 

Sunday, July 27, 2025

 

Indexes in Azure AI Search resource can be shared between accounts and rolled over when they reach their limits. The following code automates how to do that:

import requests
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.indexes import SearchIndexClient
import os

project_endpoint = os.environ["AZURE_PROJECT_ENDPOINT"]
project_api_key = os.environ["AZURE_PROJECT_API_KEY"]
agent_model = os.getenv("AZURE_AGENT_MODEL", "gpt-4o-mini")
search_endpoint = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"]
api_version = os.getenv("AZURE_SEARCH_API_VERSION")
search_api_key = os.getenv("AZURE_SEARCH_ADMIN_KEY")
credential = AzureKeyCredential(search_api_key)
index_name = os.getenv("AZURE_SEARCH_INDEX_NAME", "index00")
azure_openai_endpoint = os.environ["AZURE_OPENAI_ENDPOINT"]
azure_openai_api_key = os.getenv("AZURE_OPENAI_API_KEY")
azure_openai_gpt_deployment = os.getenv("AZURE_OPENAI_GPT_DEPLOYMENT", "gpt-4o-mini")
azure_openai_gpt_model = os.getenv("AZURE_OPENAI_GPT_MODEL", "gpt-4o-mini")
azure_openai_embedding_deployment = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT", "text-embedding-ada-002")
azure_openai_embedding_model = os.getenv("AZURE_OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002")
chat_agent_name = os.getenv("AZURE_CHAT_AGENT_NAME", "chat-agent-in-a-team")
search_agent_name = os.getenv("AZURE_SEARCH_AGENT_NAME", "search-agent-in-a-team")
api_version = "2025-05-01-Preview"
agent_max_output_tokens=10000
vectorizer_name = "vectorizer-1748574121417"
semantic_configuration_name = "mysemantic1"
vector_dimension_size=1536
vector_search_profile_name = "myExhaustiveKnnProfile1"
new_index_name = "index05"

from azure.search.documents.indexes.models import (
    SearchIndex, SimpleField, SearchableField, SearchField, SearchFieldDataType,
    SimpleField, SearchableField, VectorSearch, VectorSearchAlgorithmConfiguration, VectorSearchProfile,
    HnswParameters, ExhaustiveKnnParameters, VectorSearchAlgorithmMetric, HnswAlgorithmConfiguration, ExhaustiveKnnAlgorithmConfiguration,
    AzureOpenAIVectorizer, AzureOpenAIVectorizerParameters, VectorSearchAlgorithmKind,
    # AzureOpenAIParameters, VectorSearchVectorizer,
    SemanticSearch, SemanticConfiguration, SemanticPrioritizedFields, SemanticField, BM25SimilarityAlgorithm
)

# 1. Create the vectorizer
# vectorizer_url = f"{search_endpoint}/vectorizers/{vectorizer_name}?api-version=2023-11-01-preview"
# vectorizer_payload = {
    # "name": vectorizer_name,
    # "kind": "azureOpenAI",
    # "azureOpenAIParameters": {
        # "resourceUri": azure_openai_endpoint,
        # "deploymentId": azure_openai_embedding_deployment,
        # "modelName": azure_openai_embedding_model,
    # }
# }
# vectorizer_response = requests.put(vectorizer_url, json=vectorizer_payload, headers=headers)
# print("Vectorizer:", vectorizer_response.status_code, vectorizer_response.json())

vectorizer_config = AzureOpenAIVectorizer(
    vectorizer_name = vectorizer_name,
    parameters = AzureOpenAIVectorizerParameters(
        resource_url=azure_openai_endpoint,
        api_key = azure_openai_api_key,
        deployment_name=azure_openai_embedding_deployment,
        model_name=azure_openai_embedding_model
    )
)   
print(f"Vectorizer config '{vectorizer_name}' created for index.")

# 2. Create the vector search with the vectorizer
vector_search = VectorSearch(
    algorithms=[
        HnswAlgorithmConfiguration(
            parameters = HnswParameters(metric=VectorSearchAlgorithmMetric.COSINE, m=4, ef_construction=400, ef_search=1000),
            name="myHnsw1",
            kind=VectorSearchAlgorithmKind.HNSW,
        ),
        ExhaustiveKnnAlgorithmConfiguration(
            parameters = ExhaustiveKnnParameters(metric=VectorSearchAlgorithmMetric.COSINE),
            name="myExhaustiveKnn1",
            kind=VectorSearchAlgorithmKind.EXHAUSTIVE_KNN
        )
    ],
    profiles=[
        VectorSearchProfile(
            name="myHnswProfile1",
            algorithm_configuration_name="myHnsw1",
            vectorizer=vectorizer_name
        ),
        VectorSearchProfile(
            name=vector_search_profile_name,
            algorithm_configuration_name="MyExhaustiveKnn1",
            vectorizer=vectorizer_name
        )
    ],
    vectorizers = [vectorizer_config]
    # vectorizers=None  # Already created if using REST APIs for latest features
)
print(f"VectorSearch with '{vectorizer_name}' created for index.")

semantic_search = SemanticSearch(
    default_configuration_name=semantic_configuration_name,
    configurations=[
        SemanticConfiguration(
            name=semantic_configuration_name,
            prioritized_fields=SemanticPrioritizedFields(
                title_field=SemanticField(field_name="description"),
                prioritized_content_fields=[
                    SemanticField(field_name="id"),
                    SemanticField(field_name="description")
                ],
                prioritized_keywords_fields=[
                    SemanticField(field_name="id"),
                    SemanticField(field_name="description")
                ]
            ),
            ranking_order="BoostedRerankerScore",
            flighting_opt_in=False
        )
    ]
)

similarity_algorithm = BM25SimilarityAlgorithm()
print(f"Semantic configuration '{semantic_configuration_name}' created for index.")
fields = [
    SimpleField(name="id", type=SearchFieldDataType.String, key=True, retrievable=True, stored=True),
    SearchableField(name="accountid", type=SearchFieldDataType.String, searchable=True, filterable=True,
                    retrievable=True, stored=True, sortable=True, facetable=True),
    SearchableField(name="description", type=SearchFieldDataType.String, searchable=True, filterable=True,
                    retrievable=True, stored=True, sortable=True, facetable=True),
    SearchField(name="vector", type="SearchFieldDataType.Collection(Edm.Single)", searchable=True, retrievable=True,
                stored=True, vector_search_dimensions=vector_dimension_size, vector_search_profile_name=vector_search_profile_name),
    SearchableField(name="objects", type=SearchFieldDataType.String, analyzer_name="standard.lucene",
                    searchable=True, filterable=True, retrievable=True, stored=True,
                    sortable=True, facetable=True),
    SearchableField(name="tags", type=SearchFieldDataType.String, analyzer_name="standard.lucene",
                    searchable=True, filterable=True, retrievable=True, stored=True,
                    sortable=True, facetable=True),
    SearchableField(name="title", type=SearchFieldDataType.String, analyzer_name="standard.lucene",
                    searchable=True, filterable=True, retrievable=True, stored=True,
                    sortable=True, facetable=True),
]

# 3. create the index with fields, vectorizer and semantic configuration
index = SearchIndex(
    name=new_index_name,
    fields=fields,
    semantic_search=semantic_search,
    vector_search=vector_search,
    similarity=similarity_algorithm
)

index_client = SearchIndexClient(
    endpoint=search_endpoint,
    credential=AzureKeyCredential(search_api_key)
)

index_client.create_or_update_index(index)
print(f"Index '{new_index_name}' created with vector and semantic search.")
"""
Output:
Vectorizer config 'vectorizer-1748574121417' created for index.
vectorizer is not a known attribute of class <class 'azure.search.documents.indexes._generated.models._models_py3.VectorSearchProfile'> and will be ignored
vectorizer is not a known attribute of class <class 'azure.search.documents.indexes._generated.models._models_py3.VectorSearchProfile'> and will be ignored
VectorSearch with 'vectorizer-1748574121417' created for index.
prioritized_content_fields is not a known attribute of class <class 'azure.search.documents.indexes._generated.models._models_py3.SemanticPrioritizedFields'> and will be ignored
prioritized_keywords_fields is not a known attribute of class <class 'azure.search.documents.indexes._generated.models._models_py3.SemanticPrioritizedFields'> and will be ignored
Semantic configuration 'mysemantic1' created for index.
Index 'index05' created with vector and semantic search.
"""

 


Saturday, July 26, 2025

 Multitenancy on an index store of an Azure AI search requires entries to be filtered based on tenants/accounts and the operation is different from executing SQL standard query operators. The expression used to filter the entries is based exclusively on OData search syntax. Some examples might explain this:

#! /usr/bin/python

import json

import sys

import os

import requests

from azure.core.credentials import AzureKeyCredential

from azure.identity import DefaultAzureCredential

from azure.search.documents import SearchClient

from azure.search.documents.indexes import SearchIndexClient

sys.path.insert(0, os.path.abspath(".."))

from visionprocessor.vectorizer import vectorize_image

search_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")

index_name = os.getenv("AZURE_SEARCH_INDEX_NAME")

api_version = os.getenv("AZURE_SEARCH_API_VERSION")

search_api_key = os.getenv("AZURE_SEARCH_ADMIN_KEY")

vision_api_key = os.getenv("AZURE_AI_VISION_API_KEY")

vision_api_version = os.getenv("AZURE_AI_VISION_API_VERSION")

vision_region = os.getenv("AZURE_AI_VISION_REGION")

vision_endpoint = os.getenv("AZURE_AI_VISION_ENDPOINT")

credential = AzureKeyCredential(search_api_key)

search_client = SearchClient(endpoint=search_endpoint, index_name=index_name, credential=credential)

query_text = "green street crossing mark for bicycles"

odata_filter = "id eq '008333'"

odata_filter = "search.in(title, 'aerial', ' ')"

odata_filter = "search.in(tags, 'urban design')"

odata_filter = "tags eq 'building,urban design,car,house,land vehicle,vehicle,outdoor,city,aerial,truck'"

odata_filter = "search.ismatch('urban*', 'tags')"

#only-for-collection-fields odata_filter = "tags/any(g: search.in(g, 'urban', ' '))"

# and effect of alternate jargon

results = search_client.search(

    # query_type='simple',

    # search_text=query_text,

    select='id,description',

    filter=odata_filter,

    include_total_count=True,

    top=10)

print(repr(results))

if results:

    print(f"Number of results: {results.get_count()}")

    for result in results:

         if result:

            print(f"{result['id']}")

"""

<iterator object azure.core.paging.ItemPaged at 0x1df4252ee40>

Number of results: 1

015969

<iterator object azure.core.paging.ItemPaged at 0x227467dee40>

Number of results: 7893

000688

000735

000720

000766

000741

000771

000775

000820

000824

000791

"""

Please note that id is not a filterable attribute. Additional attribute such as account id becomes necessary to do that.

Friday, July 25, 2025

Most drones don’t have radars. They merely have positions that they change based on fully autonomous decisions or provided by a controller. In the former case, the waypoints and trajectory determine the flight path, and each drone independently tries to minimize the errors in deviations from the flight path while aligning its path using least squares method. The selection of waypoints and the velocity and ETA at each waypoint is determined for each unit in a UAV swarm with ability to make up delays or adjust ETAs using conditional probability between past and next waypoint while choosing a path of least resistance or conflict between the two. Usually, a formation, say matrix, already spreads out the units and its center of mass is used to calculate the progress on the flight path for the formation. This article discusses a novel approach to minimize the conflicts and adhere to the path of least resistance.

For example, to transform between an “Abreast” and a “Diamond” formation, any technique must demonstrate efficiency in minimizing transformation distance and maintaining formation coherence. Similarly, to transform between matrix formation to flying linearly under a bridge between its piers, any technique must demonstrate a consensus based pre-determined order.

The approach included here defines a drone’s formation state with six parameters: time, 3D positions, yaw angle (heading), and velocity. For a formation to be considered coherent, all drones must share the same heading and speed while maintaining relative positions—essential for realistic aerial maneuvers.

The transformation itself consists of two steps: location assignment and path programming. First, to determine which drone should move to which position in the new formation, the Hungarian algorithm, a centralized optimization method is used or in its absence the information about the greatest common denominator for volume between two waypoints determines the number of multiple simultaneous paths to choose and the matrix model is used to assign the positions for the drones to the nearest path. If there is only one path and no centralized controller, the units use Paxos algorithm for coming to a consensus on the linear order. This first step evaluates the cost of moving each drone to each new position by considering spatial displacement, heading change, and velocity difference. This ensures the assignment minimizes overall disruption and maneuvering effort.

Second, each drone calculates its own flight path to the newly assigned position using a Dubins path model, which generates the shortest possible route under a minimum turning radius constraint—a requirement for fixed-wing drones that can’t make sharp turns or hover. Positions alone do not guarantee compliance and the velocity adjustments for each unit must also be layered over the transition. The adjustment of velocities follows a Bayesian conditional probability along the associated path for the unit. This involves computing acceleration and deceleration phases to fine-tune the duration and dynamics of the transition with error corrections against deviations.

Overall, this provides a cohesive framework for in-flight drone formation reconfiguration that balances centralized planning with distributed execution. By coding the physical constraints and states for each unit and classifying the adherence, outliers can be handled by rotating them with other units for a smooth overall progression for the formation and overcoming environmental factors such as turbulence with error corrections.

 

Thursday, July 24, 2025

Agentic retrieval often requires functions to be invoked. This is how to do that:

def run_function_tools(query_text, account_id):

    project_client = AIProjectClient(endpoint=project_endpoint, credential=DefaultAzureCredential()) 

    agents_client = AgentsClient(

        endpoint=project_endpoint,

        credential=DefaultAzureCredential(),

    )

    from analyzer_functions import analyzer_functions, image_user_functions


    # Initialize function tool with user functions

    functions = FunctionTool(functions=image_user_functions)

    instructions = "You are an assistant that answers the question how many objects were found in an image when both are given by their image URI. You evaluate a function to do this by passing their uri to the function and respond with the count."

    query_text = f"How many objects given by its image URI {object_uri} are found in the image given by its image URI {scene_uri}?"

    with agents_client:

        # Create an agent and run user's request with function calls

        # agent = agents_client.get_agent(agent_id="asst_qyMFcz1BnU0BS0QUmhxAAyFk")

        # """

        agent = agents_client.create_agent(

            model=agent_model,

            name=fn_agent_name,

            instructions=instructions,

            tools=functions.definitions,

            tool_resources=functions.resources,

            top_p=1

        )

        # """

        #print(f"Created agent, ID: {agent.id}")


        thread = agents_client.threads.create()

        #print(f"Created thread, ID: {thread.id}")


        message = agents_client.messages.create(

            thread_id=thread.id,

            role="user",

            content=query_text,

        )

        #print(f"Created message, ID: {message.id}")


        run = agents_client.runs.create(thread_id=thread.id, agent_id=agent.id)

        #print(f"Created run, ID: {run.id}")


        while run.status in ["queued", "in_progress", "requires_action"]:

            time.sleep(1)

            run = agents_client.runs.get(thread_id=thread.id, run_id=run.id)


            if run.status == "requires_action" and isinstance(run.required_action, SubmitToolOutputsAction):

                tool_calls = run.required_action.submit_tool_outputs.tool_calls

                if not tool_calls:

                    print("No tool calls provided - cancelling run")

                    agents_client.runs.cancel(thread_id=thread.id, run_id=run.id)

                    break


                tool_outputs = []

                for tool_call in tool_calls:

                    if isinstance(tool_call, RequiredFunctionToolCall):

                        #print("Is an instance of RequiredFunctionToolCall")

                        try:

                            #print(f"Executing tool call: {tool_call}")

                            output = functions.execute(tool_call)

                            #print(output)

                            tool_outputs.append(

                                ToolOutput(

                                    tool_call_id=tool_call.id,

                                    output=output,

                                )

                            )

                        except Exception as e:

                            print(f"Error executing tool_call {tool_call.id}: {e}")

                    else:

                        print(f"{tool_call} skipped.")


                print(f"Tool outputs: {tool_outputs}")

                if tool_outputs:

                    agents_client.runs.submit_tool_outputs(thread_id=thread.id, run_id=run.id, tool_outputs=tool_outputs)

                else:

                    print(f"No tool output.")

            else:

                print(f"Waiting: {run}")


            print(f"Current run status: {run.status}")


        print(f"Run completed with status: {run.status} and details {run}")


        # Delete the agent when done

        # agents_client.delete_agent(agent.id)

        # print("Deleted agent")


        # Fetch and log all messages

        messages = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)

        for msg in messages:

            if msg.text_messages:

                last_text = msg.text_messages[-1]

                print(f"{msg.role}: {last_text.text.value}")

                return last_text.text.value

        return None 

Wednesday, July 23, 2025

 The following explains the workflow of a drone video sensing application: the code orchestrates an end-to-end pipeline for processing drone-captured videos and transforming them into searchable, semantically rich insights using Azure's AI capabilities.

It starts by using Azure Video Indexer to:

• Authenticate via API keys and retrieve an access token.

• Upload a video either from a local file or from a URL.

• Trigger indexing to extract metadata like keyframes, themes, and highlights.

• Optionally, it can reindex a video and monitor the indexing status.

Once indexed, the workflow:

• Parses insights to extract meaningful segments.

• Creates a custom project around those segments.

• Renders a new video that highlights specific themes and keyframes.

• Downloads the rendered output for further processing.

With OpenCV, it:

• Downloads the video into memory.

• Extracts frames one by one.

• Each frame is optionally saved locally or uploaded as an image blob to Azure Storage, structured by frame number and path conventions.

For every uploaded frame:

• Generates vector embeddings using the Azure AI Vision vectorization API.

• Performs semantic analysis to extract:

o Tags

o Captions

o Detected objects

o Smart crops

o Dense captions

o Text reading

o People detection

• The results are packaged into structured descriptions.

If enabled:

• It dives into dense captions from the image analysis.

• Extracts bounding boxes for individual objects.

• Clips regions of the image based on those boxes.

• Vectorizes the clipped image.

• Re-analyzes it semantically.

• Uploads object-level insights as new searchable entities.

Finally, every analyzed frame (and optionally, detected objects) is:

• Packed as a document with its ID, vector, bounding box, and description.

• Uploaded to Azure AI Search, making the insights retrievable via semantic queries.

This code is modular, retry-safe, and production-oriented, with detailed logging and fallback mechanisms. Overall,

def indexing_workflow(source_video_url, account_id = None):

    if not account_id:

        account_id = settings.video_indexer_default_account_id

    video_url = index_and_download_video(account_id = account_id, video_url = source_video_url)

    if not video_url:

        return

    extract_and_upload_frames(video_url)

    vector_descriptions = vectorize_extracted_frames(video_url)

    client = get_search_client()

    frame_number = 0

    source_sas_url = get_image_blob_url(video_sas_url, frame_number)

    for vector, description in vector_descriptions:

        form_and_upload_document(client, account_id, frame_number, vector, description, source_sas_url, deep = False)

        frame_number += 1


Tuesday, July 22, 2025

 Agent to detect vehicles in aerial drone images: 

#!/usr/bin/python  

# azure-ai-agents==1.0.0  

# azure-ai-projects==1.0.0b11  

# azure-ai-vision-imageanalysis==1.0.0  

# azure-common==1.1.28  

# azure-core==1.34.0  

# azure-identity==1.22.0  

# azure-search-documents==11.6.0b12  

# azure-storage-blob==12.25.1  

# azure_ai_services==0.1.0  

from dotenv import load_dotenv  

from azure.identity import DefaultAzureCredential, get_bearer_token_provider  

from azure.ai.agents import AgentsClient  

from azure.core.credentials import AzureKeyCredential  

from azure.ai.projects import AIProjectClient  

from typing import Any, Callable, Set, Dict, List, Optional 

import os, time, sys 

import torch 

from azure.ai.agents import AgentsClient 

from azure.ai.agents.models import ( 

    FunctionTool, 

    ListSortOrder, 

    RequiredFunctionToolCall, 

    SubmitToolOutputsAction, 

    ToolOutput, 

) 

from user_functions import fetch_weather, user_functions 

sys.path.insert(0, os.path.abspath(".")) 

load_dotenv(override=True)  

project_endpoint = os.environ["AZURE_PROJECT_ENDPOINT"]  

project_api_key = os.environ["AZURE_PROJECT_API_KEY"]  

agent_model = os.getenv("AZURE_AGENT_MODEL", "gpt-4o-mini")  

agent_name = os.getenv("AZURE_VEHICLE_COUNT_AGENT_NAME", "vehicle-agent-in-a-team") 

api_version = "2025-05-01-Preview"  

agent_max_output_tokens=10000  

object_uri = os.getenv("AZURE_RED_CAR_2_SAS_URL").strip('"') 

scene_uri = os.getenv("AZURE_QUERY_SAS_URI").strip('"')  

from azure.ai.projects import AIProjectClient  

project_client = AIProjectClient(endpoint=project_endpoint, credential=DefaultAzureCredential())  

agents_client = AgentsClient( 

    endpoint=project_endpoint, 

    credential=DefaultAzureCredential(), 

) 

 

def read_image_from_blob(sas_url): 

    """Reads an image from Azure Blob Storage using its SAS URL.""" 

    response = requests.get(sas_url) 

    if response.status_code == 200: 

        image_array = np.asarray(bytearray(response.content), dtype=np.uint8) 

        image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) 

        return image 

    else: 

        # raise Exception(f"Failed to fetch image. Status code: {response.status_code}") 

        return None 

         

def detect_vehicles(frame): 

    results = model(frame) 

    # Keep only 'car', 'truck', 'bus', 'motorcycle' detections 

    vehicle_labels = ['car', 'truck', 'bus', 'motorcycle'] 

    detections = results.pandas().xyxy[0] 

    vehicles = detections[detections['name'].isin(vehicle_labels)] 

    return vehicles 

 

def get_image_output_url(scene_uri): 

    # Parse the original video URL to get account, container, and path 

    parsed = urlparse(scene_uri) 

    path_parts = parsed.path.split('/') 

    container = path_parts[1] 

    blob_path = '/'.join(path_parts[2:]) 

    # Remove the file name from the blob path 

    blob_dir = '/'.join(blob_path.split('/')[:-1]) 

    if blob_dir == "" or blob_dir == None: 

        blob_dir = "output" 

    # Create image path 

    image_path = f"{blob_dir}/images/vehiclesframe.jpg" 

    # Rebuild the base URL (without SAS token) 

    base_url = f"{parsed.scheme}://{parsed.netloc}/{container}/{image_path}" 

    # Add the SAS token if present 

    sas_token = parsed.query 

    if sas_token: 

        image_url = f"{base_url}?{sas_token}" 

    else: 

        image_url = base_url 

    return image_url 

     

def detect_vehicles_from_uri(scene_uri: Optional[str] = None) -> str: 

    if not scene_uri: 

        return None 

    frame = read_image_from_blob(scene_uri) 

    if not frame: 

        return None 

    vehicles = detect_vehicles(frame) 

    print(vehicles) 

    for _, v in vehicles.iterrows(): 

x1, y1, x2, y2 = map(int, [v['xmin'], v['ymin'], v['xmax'], v['ymax']])  

w, h = x2 - x1, y2 - y1 

        cv2.rectangle(frame, (x, y), (x +w, y + h), (255,0,0), 2) 

    _, buffer = cv2.imencode('.jpg', frame) 

    image_bytes = buffer.tobytes() 

    image_uri = get_image_output_url(scene_uri) 

    image_blob_client = BlobClient.from_blob_url(image_url) 

    image_blob_client.upload_blob(image_bytes, overwrite=True)  

    return image_uri 

     

image_user_functions: Set[Callable[..., Any]] = { 

    detect_vehicles_from_uri 

} 

 

# Initialize function tool with user functions 

functions = FunctionTool(functions=image_user_functions) 

instructions = "You are an assistant that answers the question how many vehicles were found in an image when the image is given by an image URI. You evaluate a function to do this by passing their uri to the function and respond with the count." 

query_text = f"How many vehicles are found in the image given by its image URI {scene_uri}?" 

with agents_client: 

    # Create an agent and run user's request with function calls 

    # agent = agents_client.get_agent(agent_id="asst_qyMFcz1BnU0BS0QUmhxAAyFk") 

    # """ 

    agent = agents_client.create_agent( 

        model=agent_model, 

        name=agent_name, 

        instructions=instructions, 

        tools=functions.definitions, 

        tool_resources=functions.resources, 

        top_p=1 

    ) 

    # """ 

    print(f"Created agent, ID: {agent.id}") 

 

    thread = agents_client.threads.create() 

    print(f"Created thread, ID: {thread.id}") 

 

    message = agents_client.messages.create( 

        thread_id=thread.id, 

        role="user", 

        content=query_text, 

    ) 

    print(f"Created message, ID: {message.id}") 

 

    run = agents_client.runs.create(thread_id=thread.id, agent_id=agent.id) 

    print(f"Created run, ID: {run.id}") 

 

    while run.status in ["queued", "in_progress", "requires_action"]: 

        time.sleep(1) 

        run = agents_client.runs.get(thread_id=thread.id, run_id=run.id) 

 

        if run.status == "requires_action" and isinstance(run.required_action, SubmitToolOutputsAction): 

            tool_calls = run.required_action.submit_tool_outputs.tool_calls 

            if not tool_calls: 

                print("No tool calls provided - cancelling run") 

                agents_client.runs.cancel(thread_id=thread.id, run_id=run.id) 

                break 

 

            tool_outputs = [] 

            for tool_call in tool_calls: 

                if isinstance(tool_call, RequiredFunctionToolCall): 

                    print("Is an instance of RequiredFunctionToolCall") 

                    try: 

                        print(f"Executing tool call: {tool_call}") 

                        output = functions.execute(tool_call) 

                        print(output) 

                        tool_outputs.append( 

                            ToolOutput( 

                                tool_call_id=tool_call.id, 

                                output=output, 

                            ) 

                        ) 

                    except Exception as e: 

                        print(f"Error executing tool_call {tool_call.id}: {e}") 

                else: 

                    print(f"{tool_call} skipped.") 

 

            print(f"Tool outputs: {tool_outputs}") 

            if tool_outputs: 

                agents_client.runs.submit_tool_outputs(thread_id=thread.id, run_id=run.id, tool_outputs=tool_outputs) 

            else: 

                print(f"No tool output.") 

        else: 

            print(f"Waiting: {run}") 

 

        print(f"Current run status: {run.status}") 

 

    print(f"Run completed with status: {run.status} and details {run}") 

 

    # Delete the agent when done 

    agents_client.delete_agent(agent.id) 

    print("Deleted agent") 

 

    # Fetch and log all messages 

    messages = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING) 

    for msg in messages: 

        if msg.text_messages: 

            last_text = msg.text_messages[-1] 

            print(f"{msg.role}: {last_text.text.value}")