Saturday, June 28, 2025

 The following code sample creates the frames from the succint video generated with the sample in the previous post:

import requests

import os

import cv2

import io

import uuid

from urllib.parse import urlparse

from azure.storage.blob import BlobClient

from dotenv import load_dotenv

load_dotenv(override=True)

import time

video_indexer_endpoint = os.getenv("AZURE_VIDEO_INDEXER_URL", "https://api.videoindexer.ai")

video_indexer_region = os.getenv("AZURE_VIDEO_INDEXER_REGION", "eastus")

video_indexer_account_id = os.getenv("AZURE_VIDEO_INDEXER_ACCOUNT")

video_Indexer_api_key = os.getenv("AZURE_VIDEO_INDEXER_API_KEY")

video_indexer_project = os.getenv("AZURE_VIDEO_INDEXER_PROJECT", "wfmat1ysct")

video_file_path = os.getenv("AZURE_VIDEO_INPUT", "mainindexedvideo.mp4")

access_token = os.getenv("AZURE_VIDEO_INDEXER_ACCESS_TOKEN")

video_id = os.getenv("AZURE_VIDEO_ID", "lwxjba8wy3")

duration = os.getenv("AZURE_VIDEO_DURATION_IN_SECONDS", "307")

video_sas_url=os.getenv("AZURE_VIDEO_SAS_URL")

local_only = True

def get_image_blob_url(video_url, frame_number):

    # Parse the original video URL to get account, container, and path

    parsed = urlparse(video_url)

    path_parts = parsed.path.split('/')

    container = path_parts[1]

    blob_path = '/'.join(path_parts[2:])

    # Remove the file name from the blob path

    blob_dir = '/'.join(blob_path.split('/')[:-1])

    if blob_dir == "" or blob_dir == None:

        blob_dir = "output"

    # Create image path

    image_path = f"{blob_dir}/images/frame{frame_number}.jpg"

    # Rebuild the base URL (without SAS token)

    base_url = f"{parsed.scheme}://{parsed.netloc}/{container}/{image_path}"

    # Add the SAS token if present

    sas_token = parsed.query

    if sas_token:

        image_url = f"{base_url}?{sas_token}"

    else:

        image_url = base_url

    return image_url

def download_blob_to_stream(blob_client):

    download_stream = blob_client.download_blob()

    return io.BytesIO(download_stream.readall())

def extract_and_upload_frames(video_sas_url):

    # Set up blob client for video

    video_blob_client = BlobClient.from_blob_url(video_sas_url)

    # Download video to memory stream

    video_stream = download_blob_to_stream(video_blob_client)

    # Use OpenCV to read from memory

    video_bytes = video_stream.getvalue()

    # Use cv2 to read from bytes

    video_stream.seek(0)

    video_temp = os.path.join(os.getcwd(), f"temp_{uuid.uuid4()}.mp4")

    print(video_temp)

    with open(video_temp, 'wb') as f:

        f.write(video_bytes)

    vidcap = cv2.VideoCapture(video_temp)

    # Extract frames

    frame_number = 0

    while True:

        success, frame = vidcap.read()

        if not success:

            break

        # Convert frame to bytes

        _, buffer = cv2.imencode('.jpg', frame)

        image_bytes = buffer.tobytes()

        if local_only:

            image_path = f"frame{frame_number}.jpg"

            with open(image_path, 'wb') as f:

                f.write(image_bytes)

        else:

            # Generate image blob URL

            image_url = get_image_blob_url(video_sas_url, frame_number)

            image_blob_client = BlobClient.from_blob_url(image_url)

            # Upload frame as image

            image_blob_client.upload_blob(image_bytes, overwrite=True)

            print(f"Uploaded frame {frame_number} to {image_url}")

        frame_number += 1

    # Clean up temp file

    vidcap.release()

    os.remove(video_temp)

video_sas_url=video_sas_url.strip('"')

print(video_sas_url)

extract_and_upload_frames(video_sas_url)

Output:

Frame0: https://sadronevideo.blob.core.windows.net/vi-rendered-wfmat1ysct-0e4129//images/frame0.jpg?sp=racwdl&st=2025-06-27T16:57:39Z&se=2025-06-28T00:57:39Z&spr=https&sv=2024-11-04&sr=c&sig=6S7VNuigAZ%2BLq%2B5vaW5d4E8jmk3rcxQGabMeKngB1YM%3D

To

frame26: https://sadronevideo.blob.core.windows.net/vi-rendered-wfmat1ysct-0e4129//images/frame26.jpg?sp=racwdl&st=2025-06-27T16:57:39Z&se=2025-06-28T00:57:39Z&spr=https&sv=2024-11-04&sr=c&sig=6S7VNuigAZ%2BLq%2B5vaW5d4E8jmk3rcxQGabMeKngB1YM%3D


Friday, June 27, 2025

 This is a code sample for reducing the drone captured aerial video to a workable set for drone world analysis:

import requests

import os

from dotenv import load_dotenv

load_dotenv(override=True)

import time

video_indexer_endpoint = os.getenv("AZURE_VIDEO_INDEXER_URL", "https://api.videoindexer.ai")

video_indexer_region = os.getenv("AZURE_VIDEO_INDEXER_REGION", "eastus")

video_indexer_account_id = os.getenv("AZURE_VIDEO_INDEXER_ACCOUNT")

video_Indexer_api_key = os.getenv("AZURE_VIDEO_INDEXER_API_KEY")

video_indexer_project = os.getenv("AZURE_VIDEO_INDEXER_PROJECT", "wfmat1ysct")

video_file_path = os.getenv("AZURE_VIDEO_INPUT", "mainindexedvideo.mp4")

access_token = os.getenv("AZURE_VIDEO_INDEXER_ACCESS_TOKEN")

video_id = os.getenv("AZURE_VIDEO_ID", "lwxjba8wy3")

duration = os.getenv("AZURE_VIDEO_DURATION_IN_SECONDS", "307")

def get_access_token():

    """Retrieve an access token for the Video Indexer API."""

    url = f"{video_indexer_endpoint}/auth/{video_indexer_region}/Accounts/{video_indexer_account_id}/AccessToken"

    headers = {

        "Ocp-Apim-Subscription-Key": video_Indexer_api_key

    }

    response = requests.get(url, headers=headers)

    return response.text.strip('"')

def repeat_video_index(access_token, video_id):

    """Retrieve the index/insights for a video by its ID."""

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/ReIndex?accessToken={access_token}"

    response = requests.put(url)

    if response.status_code == 200:

        return response

    return get_video_insights(access_token, video_id)

def get_video_insights(access_token, video_id):

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/Index?accessToken={access_token}"

    count = 0

    while True:

        response = requests.get(url)

        data = response.json()

        if "state" in data and data['state'] == 'Processed':

            return data

        count+=1

        if count%10 == 0:

            print(data)

        print("Sleeping for ten seconds...")

        time.sleep(10) # Wait 10 seconds before checking again

def get_selected_segments(insights, threshold):

        indexed_duration = insights["summarizedInsights"]["duration"]["seconds"]

        reduced_duration = (threshold * indexed_duration) / 100

        selected_segments = []

        # total_duration = 0

        for video in insights["videos"]:

            for shot in video["insights"]["shots"]:

                shot_id = shot["id"]

                for key_frame in shot["keyFrames"]:

                    key_frame_id = key_frame["id"]

                    start = key_frame["instances"][0]["start"]

                    end = key_frame["instances"][0]["end"]

                    # total_duration += float(end) - float(start)

                    print(f"Clipping shot: {shot_id}, key_frame: {key_frame_id}, start: {start}, end: {end}")

                    selected_segments +=[(start,end)]

        # print(f"Total duration: {total_duration}")

        return selected_segments

def create_project(access_token, video_id, selected_segments):

        import random

        import string

        video_ranges = []

        for start,end in selected_segments:

            intervals = {}

            intervals["videoId"] = video_id

            intervalRange = {}

            intervalRange["start"] = start

            intervalRange["end"] = end

            intervals["range"] = intervalRange

            video_ranges += [intervals]

        project_name = ''.join(random.choices(string.hexdigits, k=8))

        data = {

            "name": project_name,

            "videosRanges": video_ranges,

            "isSearchable": "false"

        }

        headers = {

            "Content-Type": "application/json"

        }

        url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects?accessToken={access_token}"

        response = requests.post(url, json=data, headers=headers)

        print(response.content)

        if response.status_code == 200:

            data = response.json()

            project_id = data["id"]

            return project_id

        else:

            return None

def render_video(access_token, project_id):

        url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/render?sendCompletionEmail=false&accessToken={access_token}"

        headers = {

            "Content-Type": "application/json"

        }

        response = requests.post(url, headers=headers)

        print(response.content)

        if response.status_code == 202:

            return response

        else:

            return None

def get_render_operation(access_token, project_id):

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderoperation?accessToken={access_token}"

    while True:

        response = requests.get(url)

        data = response.json()

        if "state" in data and data['state'] == 'Succeeded':

            return data

        print("Sleeping for ten seconds before checking on rendering...")

        time.sleep(10) # Wait 10 seconds before checking again

def download_rendered_file(access_token, project_id):

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderedfile/downloadurl?accessToken={access_token}"

    response = requests.get(url)

    if response.status_code == 200:

        print(response.content)

        data = response.json()

        if "downloadUrl" in data:

            return data["downloadUrl"]

    return None

# Main workflow

# access_token = get_access_token()

insights = get_video_insights(access_token, video_id)

selected_segments = get_selected_segments(insights, 10)

project_id = video_indexer_project

if not project_id:

    project_id = create_project(access_token, video_id, selected_segments)

print(project_id)

if project_id:

    render_response = render_video(access_token, project_id)

    print(render_response)

    if render_response:

        status = get_render_operation(access_token, project_id)

        print(status)

        download_url = download_rendered_file(access_token, project_id)

        print(download_url)

"""

{'state': 'Succeeded', 'result': {'videoRanges': [{'videoId': 'lwxjba8wy3', 'range': {'start': '0:00:02.7806641', 'end': '0:00:02.8139974'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:00:30.1820313', 'end': '0:00:30.2153646'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:00:48.25', 'end': '0:00:48.2840495'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:00:54.5133464', 'end': '0:00:54.5466797'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:00:56.5806641', 'end': '0:00:56.6139974'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:01:15.8330078', 'end': '0:01:15.8663411'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:01:24.333724', 'end': '0:01:24.3670573'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:01:35.2317057', 'end': '0:01:35.2650391'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:01:41.280013', 'end': '0:01:41.3270182'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:01:50.4290365', 'end': '0:01:50.4623698'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:01:51.5296875', 'end': '0:01:51.5630208'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:01:56.8983724', 'end': '0:01:56.9317057'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:02:07.4166667', 'end': '0:02:07.45'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:02:21.9847005', 'end': '0:02:22.0180339'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:02:39.0670573', 'end': '0:02:39.1139974'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:02:49.449349', 'end': '0:02:49.4826823'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:02:58.1507161', 'end': '0:02:58.1840495'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:03:08.3180339', 'end': '0:03:08.3513672'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:03:20.4186849', 'end': '0:03:20.4520182'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:03:24.8433594', 'end': '0:03:24.8766927'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:03:27.025', 'end': '0:03:27.0583333'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:03:58.1970052', 'end': '0:03:58.2303385'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:04:13.4656901', 'end': '0:04:13.4990234'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:04:26.5160156', 'end': '0:04:26.549349'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:04:40.05', 'end': '0:04:40.0833333'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:04:49.764388', 'end': '0:04:49.7977214'}}, {'videoId': 'lwxjba8wy3', 'range': {'start': '0:04:58.364388', 'end': '0:04:58.3977214'}}]}, 'error': None}

b'{"downloadUrl":"https://sadronevideo.blob.core.windows.net/vi-rendered-wfmat1ysct-0e4129/wfmat1ysct_rendered.mp4?skoid=c855c9a9-c3b8-449d-b876-75304f769177&sktid=1f4c33e1-e960-43bf-a135-6db8b82b6885&skt=2025-06-25T03%3A54%3A43Z&ske=2025-07-02T03%3A54%3A43Z&sks=b&skv=2021-10-04&sv=2021-10-04&st=2025-06-27T03%3A34%3A35Z&se=2025-06-27T04%3A39%3A35Z&sr=b&sp=r&scid=942959fd-c90a-4aab-8cd8-3ab7881559b8&sig=GkzcpywilXA74afVr%2BaUtcF08Tu%2Fz5X4cS0jpGeudIA%3D"}'

Result:

https://sadronevideo.blob.core.windows.net/vi-rendered-wfmat1ysct-0e4129/wfmat1ysct_rendered.mp4?skoid=c855c9a9-c3b8-449d-b876-75304f769177&sktid=1f4c33e1-e960-43bf-a135-6db8b82b6885&skt=2025-06-25T03%3A54%3A43Z&ske=2025-07-02T03%3A54%3A43Z&sks=b&skv=2021-10-04&sv=2021-10-04&st=2025-06-27T03%3A34%3A35Z&se=2025-06-27T04%3A39%3A35Z&sr=b&sp=r&scid=942959fd-c90a-4aab-8cd8-3ab7881559b8&sig=GkzcpywilXA74afVr%2BaUtcF08Tu%2Fz5X4cS0jpGeudIA%3D

"""

#Codingexercise:

https://1drv.ms/w/c/d609fb70e39b65c8/Echlm-Nw-wkggNb7JAEAAAABu53rpIuTS5AsMb3lNiM7SQ?e=fx6G9w



Thursday, June 26, 2025

 These are some performance improvement considerations for drone sensing applications when querying aerial flyover images from drones as studied from the explanations of the case study in previous articles. The following table outlines some of the comparisons made in terms of precision and recall.

For example, with the query “red car”, a query response limit of 50 images, vector dimensions of 1536 to enable embedding models and a baseline precision and recall from multimodal search the table outlines the relative improvements for various features:

Multimodal search on images in vector store Semantic configuration added on id, description fields on vector store schema Query options of semantic ranker and vector search on vector store One-shot RAG with chat interface on LLM Agentic Retrieval with embeddings model and gpt-4o for query decomposition

Precision of 75% Precision of 80% Precision of 75% Precision of 75% Precision of 90%

Recall of 50% Recall of 50% Recall of 70% Recall of 70% Recall of 80%

From the comparisons, it seems the ranking of the images plays a significant role in precision, but the variety of images recalled significantly improves with query rewrites.

The query is vectorized and the descriptions of the objects detected in the images are part of the semantic configuration, the recall is healthy to suit many drone sensing applications providing a chat like interface to retrieve images only from the drone world proves sufficient. But the real gain in improvement happens with agentic retrieval when the responses to the queries from the drone sensing applications are merged and re-ranked. Many of the images retrieved across various approaches had red cars in them and some displayed the images with the greatest number of red cars from aerial shots as the first few results even when the size of the object in the image was less than 5% of the overall aerial image size in terms of pixels.

Caching of responses so that the store does not get hit for query re-use certainly improves performance as well as cost. Re-indexing operations are not counted in the comparisons above because they were completed prior to the comparisons. Re-indexing can be avoided if we setup the vectorizer with the openai embedding models on the algorithms used with the vector search and the dimensions of the vectors during upsert agrees with that needed by the embeddings model.

Token usage increases linearly with agentic retrieval as each agent leverages an LLM for its task and toolset. Token usage can be limited to keep the costs low and by reducing the response size.

This case study clearly shows the suitability of agentic retrieval for drone sensing applications.



Wednesday, June 25, 2025

 The following is a sample for video indexing using Azure libraries:

import requests

import time

import os

# Replace these with your actual values

AZURE_VIDEO_INDEXER_API_URL = "https://api.videoindexer.ai"

AZURE_LOCATION = "westus2" # e.g., "westus2"

AZURE_ACCOUNT_ID = "your-account-id"

AZURE_API_KEY = "your-api-key"

VIDEO_FILE_PATH = "path/to/your/video.mp4"

# Step 1: Get an access token

def get_access_token():

    url = f"{AZURE_VIDEO_INDEXER_API_URL}/auth/{AZURE_LOCATION}/Accounts/{AZURE_ACCOUNT_ID}/AccessToken"

    headers = {

        "Ocp-Apim-Subscription-Key": AZURE_API_KEY

    }

    response = requests.get(url, headers=headers)

    return response.text.strip('"')

# Step 2: Upload video and start indexing

def upload_and_index_video(video_file_path, access_token):

    video_name = os.path.basename(video_file_path)

    url = f"{AZURE_VIDEO_INDEXER_API_URL}/{AZURE_LOCATION}/Accounts/{AZURE_ACCOUNT_ID}/Videos?name={video_name}&accessToken={access_token}&privacy=Private"

    with open(video_file_path, 'rb') as video_file:

        files = {'file': video_file}

        response = requests.post(url, files=files)

    return response.json()

# Step 3: Wait for indexing to complete and get insights

def get_video_insights(access_token, video_id):

    url = f"{AZURE_VIDEO_INDEXER_API_URL}/{AZURE_LOCATION}/Accounts/{AZURE_ACCOUNT_ID}/Videos/{video_id}/Index?accessToken={access_token}"

    while True:

        response = requests.get(url)

        data = response.json()

        if data['state'] == 'Processed':

            return data

        time.sleep(10) # Wait 10 seconds before checking again

# Step 4: Main workflow

access_token = get_access_token()

video_data = upload_and_index_video(VIDEO_FILE_PATH, access_token)

video_id = video_data['id']

insights = get_video_insights(access_token, video_id)

print("Video highlights and key insights:")

print("=" * 50)

# Extract highlights: keyframes, topics, and summarization

if 'summarizedInsights' in insights and ‘themes’ in summarizedInsights:

    for theme in insights['summarizedInsights']['themes']:

        print(f"Theme: {theme['name']}")

        for highlight in theme['keyframes']:

            print(f" Keyframe at {highlight['adjustedStart']} to {highlight['adjustedEnd']}")

            print(f" Thumbnail: {highlight['thumbnailId']}")

            print(f" Description: {highlight.get('description', 'No description')}")

else:

    print("No summarization available. See full insights:", insights)

Indexed Video:

{'accountId': '26ff36de-cac7-4bea-ad7a-abdf0d63c19c', 'id': 'lwxjba8wy3', 'partition': None, 'externalId': None, 'metadata': None, 'name': 'mainindexedvideo.mp4', 'description': None, 'created': '2025-06-25T03:54:44.3133333+00:00', 'lastModified': '2025-06-25T03:54:44.3133333+00:00', 'lastIndexed': '2025-06-25T03:54:44.3133333+00:00', 'privacyMode': 'Private', 'userName': 'Ravi Rajamani', 'isOwned': True, 'isBase': True, 'hasSourceVideoFile': True, 'state': 'Uploaded', 'moderationState': 'OK', 'reviewState': 'None', 'isSearchable': True, 'processingProgress': '1%', 'durationInSeconds': 0, 'thumbnailVideoId': 'lwxjba8wy3', 'thumbnailId': '00000000-0000-0000-0000-000000000000', 'searchMatches': [], 'indexingPreset': 'Default', 'streamingPreset': 'Default', 'sourceLanguage': 'en-US', 'sourceLanguages': ['en-US'], 'personModelId': '00000000-0000-0000-0000-000000000000'}

#Codingexercise: https://1drv.ms/w/c/d609fb70e39b65c8/EYMCYvb9NRtOtcJwdXRDUi0BVzUEyGL-Rz2NKFaKj6KLgA?e=JZuZkm

Tuesday, June 24, 2025

 The use of embedded models is an approach different from the traditional error corrections to trajectory and ideal formation structure of a matrix to morph to different structures to maximize throughput along the trajectory. Other well known approaches include:

1. Task Assignment Algorithms These determine which drone goes to which position in the next formation.

a. Hungarian Algorithm: Often used to minimize total travel distance during formation transitions.

b. Dynamic Task Assignment: Adjusts assignments in real time to account for deviations or potential collisions.

2. Trajectory Planning Algorithms These generate smooth, collision-free paths for each drone.

a. Dubins Path Planning: Ensures drones follow feasible curved paths, especially for fixed-wing drones.

b. Bezier or B-spline Curves: Used for smooth interpolation between waypoints.

c. Artificial Potential Fields: Create repulsive forces to avoid collisions while guiding drones to targets.

3. Formation Transformation Models

a. Six-Tuple State Coherence (STSC): Ensures all drones maintain consistent position, heading, and speed during transitions.

b. DFCA (Drone Formation Change Algorithm): Combines centralized assignment with distributed path planning to maintain formation integrity.

4. Swarm Coordination and Consensus Algorithms

a. Consensus-Based Control: Ensures all drones agree on shared parameters like velocity and heading.

b. Flocking Algorithms: Inspired by bird flocks, these maintain cohesion and spacing dynamically.

5. Optimization Techniques

a. Particle Swarm Optimization (PSO) and Genetic Algorithms: Sometimes used to optimize formation layouts or transitions under constraints.

6. Simulation and Animation Tools Commercial systems often include 3D animation interfaces to design and preview formations before deployment, ensuring feasibility and visual appeal.


Monday, June 23, 2025

 A previous post described agentic retrieval against an Azure AI search knowledge base but it is especially beneficial to augment the data store with structured data in a relational table for all the objects detected in the drone world from aerial images:

1. Build a table for the detected objects as follows:

import mysql.connector

from mysql.connector import errorcode

# Connection parameters

config = {

    'user': 'your_username',

    'password': 'your_password',

    'host': 'your_server.mysql.database.azure.com',

    'database': 'droneworld',

    'ssl_ca': '/path/to/BaltimoreCyberTrustRoot.crt.pem',

    'ssl_verify_cert': True

}

# SQL to create the table

create_table_sql = """

CREATE TABLE IF NOT EXISTS drone_assets (

    name VARCHAR(255) NOT NULL,

    description TEXT,

    tags TEXT,

    boundingbox TEXT NOT NULL,

    sourcefile VARCHAR(255) NOT NULL,

    location POINT,

    created DATETIME,

    modified DATETIME,

    state VARCHAR(100),

    SPATIAL INDEX(location)

);

"""

try:

    cnx = mysql.connector.connect(**config)

    cursor = cnx.cursor()

    cursor.execute(create_table_sql)

    print("Table 'drone_assets' created successfully (if it didn't exist).")

except mysql.connector.Error as err:

    if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:

        print("Access denied: Check your username or password.")

    elif err.errno == errorcode.ER_BAD_DB_ERROR:

        print("Database does not exist.")

    else:

        print(f"Error: {err}")

finally:

    if 'cursor' in locals():

        cursor.close()

    if 'cnx' in locals() and cnx.is_connected():

        cnx.close()

2. Integrate the table with the AI search, the code uploading the document vector can also upsert to the database.

3. Execute SQL queries with structured query operators and leverage location and timestamp for filtering as appropriate.

#Codingexercise: https://1drv.ms/w/c/d609fb70e39b65c8/Echlm-Nw-wkggNYlIwEAAAABD8nSsN--hM7kfA-W_mzuWw?e=zPuj9l 


Sunday, June 22, 2025

 The following is a sample to retrieve drone images using agentic retrieval:

#!/usr/bin/python

# azure-ai-agents==1.0.0

# azure-ai-projects==1.0.0b11

# azure-ai-vision-imageanalysis==1.0.0

# azure-common==1.1.28

# azure-core==1.34.0

# azure-identity==1.22.0

# azure-search-documents==11.6.0b12

# azure-storage-blob==12.25.1

# azure_ai_services==0.1.0

from dotenv import load_dotenv

from azure.identity import DefaultAzureCredential, get_bearer_token_provider

from azure.ai.agents import AgentsClient

from azure.core.credentials import AzureKeyCredential

from azure.ai.projects import AIProjectClient

from azure.ai.agents.models import AzureAISearchTool, AzureAISearchQueryType, MessageRole, ListSortOrder

import os

load_dotenv(override=True)

project_endpoint = os.environ["AZURE_PROJECT_ENDPOINT"]

project_api_key = os.environ["AZURE_PROJECT_API_KEY"]

agent_model = os.getenv("AZURE_AGENT_MODEL", "gpt-4o-mini")

search_endpoint = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"]

api_version = os.getenv("AZURE_SEARCH_API_VERSION")

search_api_key = os.getenv("AZURE_SEARCH_ADMIN_KEY")

credential = AzureKeyCredential(search_api_key)

token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://search.azure.com/.default")

index_name = os.getenv("AZURE_SEARCH_INDEX_NAME", "index00")

azure_openai_endpoint = os.environ["AZURE_OPENAI_ENDPOINT"]

azure_openai_api_key = os.getenv("AZURE_OPENAI_API_KEY")

azure_openai_gpt_deployment = os.getenv("AZURE_OPENAI_GPT_DEPLOYMENT", "gpt-4o-mini")

azure_openai_gpt_model = os.getenv("AZURE_OPENAI_GPT_MODEL", "gpt-4o-mini")

azure_openai_embedding_deployment = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT", "text-embedding-ada-002")

azure_openai_embedding_model = os.getenv("AZURE_OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002")

chat_agent_name = os.getenv("AZURE_CHAT_AGENT_NAME", "chat-agent-in-a-team")

search_agent_name = os.getenv("AZURE_SEARCH_AGENT_NAME", "search-agent-in-a-team")

search_connection_id = os.getenv("AI_AZURE_AI_CONNECTION_ID","/subscriptions/656e67c6-f810-4ea6-8b89-636dd0b6774c/resourceGroups/rg-ctl-2/providers/Microsoft.CognitiveServices/accounts/found-vision-1/projects/droneimage/connections/srchvision01")

api_version = "2025-05-01-Preview"

agent_max_output_tokens=10000

from azure.search.documents.indexes.models import KnowledgeAgent, KnowledgeAgentAzureOpenAIModel, KnowledgeAgentTargetIndex, KnowledgeAgentRequestLimits, AzureOpenAIVectorizerParameters

from azure.search.documents.indexes import SearchIndexClient

from azure.ai.projects import AIProjectClient

project_client = AIProjectClient(endpoint=project_endpoint, credential=DefaultAzureCredential())

instructions = """

You are an AI assistant that answers questions about the stored and indexed drone images and objects in search index index02.

The data source is an Azure AI Search resource where the schema has JSON description field, a vector field and an id field and this id field must be cited in your answer.

If you do not find a match for the query, respond with "I don't know", otherwise cite references with the value of the id field.

"""

messages = [

    {

        "role":"system",

        "content": instructions

    }

]

search_tool = AzureAISearchTool(

    index_connection_id=search_connection_id,

    index_name=index_name,

    query_type=AzureAISearchQueryType.VECTOR_SEMANTIC_HYBRID,

    filter="", # Optional filter expression

    top_k=5 # Number of results to return

)

agent = None

for existing_agent in list(project_client.agents.list_agents()):

    if existing_agent.name == search_agent_name:

        print(existing_agent.id)

        agent = existing_agent

if agent == None:

    agent = project_client.agents.create_agent(

        model=azure_openai_gpt_model,

        # deployment=azure_openai_gpt_deployment,

        name=search_agent_name,

        instructions=instructions,

        tools=search_tool.definitions,

        tool_resources=search_tool.resources,

        top_p=1

    )

# agent = project_client.agents.get_agent("asst_lsH8uwS4hrg4v1lRpXm6sdtR")

print(f"AI agent '{search_agent_name}' created or retrieved successfully:{agent}")

from azure.ai.agents.models import FunctionTool, ToolSet, ListSortOrder

from azure.search.documents.agent import KnowledgeAgentRetrievalClient

from azure.search.documents.agent.models import KnowledgeAgentRetrievalRequest, KnowledgeAgentMessage, KnowledgeAgentMessageTextContent, KnowledgeAgentIndexParams

agent_client = KnowledgeAgentRetrievalClient(endpoint=search_endpoint, agent_name=search_agent_name, credential=credential)

# query_text = "How many parking lots are empty when compared to all the parking lots?"

query_text = "How many red cars can be found near the building with a roof that has a circular structure?"

messages.append({

    "role": "user",

    "content": query_text

    #"How many parking lots are empty when compared to all the parking lots?"

})

thread = project_client.agents.threads.create()

retrieval_results = {}

def agentic_retrieval() -> str:

    # Searches drone images about objects detected and their facts.

    # The returned string is in a JSON format that contains the reference id.

    # Be sure to use the same format in your agent's response

    # You must refer to references by id number

    # Take the last 5 messages in the conversation

    messages = project_client.agents.messages.list(thread.id, limit=5, order=ListSortOrder.DESCENDING)

    # Reverse the order so the most recent message is last

    messages = list(messages)

    messages.reverse()

    retrieval_result = agent_client.retrieve(

        retrieval_request=KnowledgeAgentRetrievalRequest(

            messages=[KnowledgeAgentMessage(role=msg["role"], content=[KnowledgeAgentMessageTextContent(text=msg["content"])]) for msg in messages if msg["role"] != "system"],

            target_index_params=[KnowledgeAgentIndexParams(index_name=index_name, reranker_threshold=2.5, include_reference_source_data=True)] # add filter_add_on here

        )

    )

    # Associate the retrieval results with the last message in the conversation

    last_message = messages[-1]

    retrieval_results[last_message.id] = retrieval_result

    # Return the grounding response to the agent

    return retrieval_result.response[0].content[0].text

# https://learn.microsoft.com/en-us/azure/ai-services/agents/how-to/tools/function-calling

functions = FunctionTool({ agentic_retrieval })

toolset = ToolSet()

toolset.add(functions)

toolset.add(search_tool)

project_client.agents.enable_auto_function_calls(toolset)

from azure.ai.agents.models import AgentsNamedToolChoice, AgentsNamedToolChoiceType, FunctionName

message = project_client.agents.messages.create(

    thread_id=thread.id,

    role="user",

    content = query_text

    # "How many red cars can be found near a building with a roof that has a circular structure?"

    # content= "How many parking lots are empty when compared to all the parking lots?"

)

run = project_client.agents.runs.create_and_process(

    thread_id=thread.id,

    agent_id=agent.id,

    tool_choice=AgentsNamedToolChoice(type=AgentsNamedToolChoiceType.FUNCTION, function=FunctionName(name="agentic_retrieval")),

    toolset=toolset)

if run.status == "failed":

    raise RuntimeError(f"Run failed: {run.last_error}")

output = project_client.agents.messages.get_last_message_text_by_role(thread_id=thread.id, role="assistant").text.value

print("Agent response:", output.replace(".", "\n"))

import json

retrieval_result = retrieval_results.get(message.id)

if retrieval_result is None:

    raise RuntimeError(f"No retrieval results found for message {message.id}")

print("Retrieval activity")

print(json.dumps([activity.as_dict() for activity in retrieval_result.activity], indent=2))

print("Retrieval results")

print(json.dumps([reference.as_dict() for reference in retrieval_result.references], indent=2))