Wednesday, July 9, 2025

 The previous posts explained how to detect and count instances of objects in a scene with the help of Hdbscan clustering algorithm. This article explains how to delegate this logic to an agent so that it can be brought on to answer specific questions on “how many” from users.

#!/usr/bin/python

# azure-ai-agents==1.0.0

# azure-ai-projects==1.0.0b11

# azure-ai-vision-imageanalysis==1.0.0

# azure-common==1.1.28

# azure-core==1.34.0

# azure-identity==1.22.0

# azure-search-documents==11.6.0b12

# azure-storage-blob==12.25.1

# azure_ai_services==0.1.0

from dotenv import load_dotenv

from azure.identity import DefaultAzureCredential, get_bearer_token_provider

from azure.ai.agents import AgentsClient

from azure.core.credentials import AzureKeyCredential

from azure.ai.projects import AIProjectClient

from azure.ai.agents.models import AzureAISearchTool, AzureAISearchQueryType, MessageRole, ListSortOrder

import os

load_dotenv(override=True)

project_endpoint = os.environ["AZURE_PROJECT_ENDPOINT"]

project_api_key = os.environ["AZURE_PROJECT_API_KEY"]

agent_model = os.getenv("AZURE_AGENT_MODEL", "gpt-4o-mini")

search_endpoint = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"]

api_version = os.getenv("AZURE_SEARCH_API_VERSION")

search_api_key = os.getenv("AZURE_SEARCH_ADMIN_KEY")

credential = AzureKeyCredential(search_api_key)

token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://search.azure.com/.default")

index_name = os.getenv("AZURE_SEARCH_INDEX_NAME", "index00")

azure_openai_endpoint = os.environ["AZURE_OPENAI_ENDPOINT"]

azure_openai_api_key = os.getenv("AZURE_OPENAI_API_KEY")

azure_openai_gpt_deployment = os.getenv("AZURE_OPENAI_GPT_DEPLOYMENT", "gpt-4o-mini")

azure_openai_gpt_model = os.getenv("AZURE_OPENAI_GPT_MODEL", "gpt-4o-mini")

azure_openai_embedding_deployment = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT", "text-embedding-ada-002")

azure_openai_embedding_model = os.getenv("AZURE_OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002")

chat_agent_name = os.getenv("AZURE_CHAT_AGENT_NAME", "chat-agent-in-a-team")

search_agent_name = os.getenv("AZURE_SEARCH_AGENT_NAME", "sceneobject-agent-in-a-team")

search_connection_id = os.environ["AI_AZURE_AI_CONNECTION_ID"] # resource id of AI Search resource

api_version = "2025-05-01-Preview"

agent_max_output_tokens=10000

object_uri = os.getenv("AZURE_RED_CAR_2_SAS_URL").strip('"')

scene_uri = os.getenv("AZURE_QUERY_SAS_URI").strip('"')

from azure.search.documents.indexes.models import KnowledgeAgent, KnowledgeAgentAzureOpenAIModel, KnowledgeAgentTargetIndex, KnowledgeAgentRequestLimits, AzureOpenAIVectorizerParameters

from azure.search.documents.indexes import SearchIndexClient

from azure.ai.projects import AIProjectClient

project_client = AIProjectClient(endpoint=project_endpoint, credential=DefaultAzureCredential())

instructions = """

You are an AI assistant that answers questions specifically about how many objects are detected in an image when both the object and image are given as image urls.

Your response must be a count of the objects in the image or 0 if you can't find any. If you encounter errors or exceptions, you must respond with "I don't know".

"""

messages = [

    {

        "role":"system",

        "content": instructions

    }

]

search_tool = AzureAISearchTool(

    index_connection_id=search_connection_id,

    index_name=index_name,

    query_type=AzureAISearchQueryType.VECTOR_SEMANTIC_HYBRID,

    filter="", # Optional filter expression

    top_k=5 # Number of results to return

)

agent = None

for existing_agent in list(project_client.agents.list_agents()):

    if existing_agent.name == search_agent_name:

        print(existing_agent.id)

        agent = existing_agent

if agent == None:

    agent = project_client.agents.create_agent(

        model=azure_openai_gpt_model,

        # deployment=azure_openai_gpt_deployment,

        name=search_agent_name,

        instructions=instructions,

        tools=search_tool.definitions,

        tool_resources=search_tool.resources,

        top_p=1

    )

# agent = project_client.agents.get_agent("asst_lsH8uwS4hrg4v1lRpXm6sdtR")

print(f"AI agent '{search_agent_name}' created or retrieved successfully:{agent}")

from azure.ai.agents.models import FunctionTool, ToolSet, ListSortOrder

from azure.search.documents.agent import KnowledgeAgentRetrievalClient

from azure.search.documents.agent.models import KnowledgeAgentRetrievalRequest, KnowledgeAgentMessage, KnowledgeAgentMessageTextContent, KnowledgeAgentIndexParams

query_text = f"How many {object_uri} can be found in {image_uri}?"

messages.append({

    "role": "user",

    "content": query_text

    #"How many parking lots are empty when compared to all the parking lots?"

})

thread = project_client.agents.threads.create()

retrieval_results = {}

def agentic_retrieval(scene_uri, object_uri) -> str:

    import dbscan

    return count_multiple_matches(scene_uri, object_uri)

# https://learn.microsoft.com/en-us/azure/ai-services/agents/how-to/tools/function-calling

functions = FunctionTool({ agentic_retrieval })

toolset = ToolSet()

toolset.add(functions)

toolset.add(search_tool)

project_client.agents.enable_auto_function_calls(toolset)

from azure.ai.agents.models import AgentsNamedToolChoice, AgentsNamedToolChoiceType, FunctionName

message = project_client.agents.messages.create(

    thread_id=thread.id,

    role="user",

    content = query_text

    # "How many red cars can be found near a building with a roof that has a circular structure?"

    # content= "How many parking lots are empty when compared to all the parking lots?"

)

run = project_client.agents.runs.create_and_process(

    thread_id=thread.id,

    agent_id=agent.id,

    tool_choice=AgentsNamedToolChoice(type=AgentsNamedToolChoiceType.FUNCTION, function=FunctionName(name="agentic_retrieval")),

    toolset=toolset)

if run.status == "failed":

    raise RuntimeError(f"Run failed: {run.last_error}")

output = project_client.agents.messages.get_last_message_text_by_role(thread_id=thread.id, role="assistant").text.value

print("Agent response:", output.replace(".", "\n"))

import json

retrieval_result = retrieval_results.get(message.id)

if retrieval_result is None:

    raise RuntimeError(f"No retrieval results found for message {message.id}")

print("Retrieval activity")

print(json.dumps([activity.as_dict() for activity in retrieval_result.activity], indent=2))

print("Retrieval results")

print(json.dumps([reference.as_dict() for reference in retrieval_result.refere

Tuesday, July 8, 2025

 This is a summary of the book: “Design for All Learners: Create Accessible and Inclusive Learning Experiences” written by Sarah Mercier and published by Association for Talent Development in 2025. 

The author brings together voices from across the learning and development world to advocate for a future where education is genuinely inclusive. Much like how the Americans with Disabilities Act reimagined physical spaces, this book calls on content creators and educators to reshape digital learning environments so that no one is left behind, regardless of ability or circumstance. 

Central to the book’s philosophy is the concept of universal design—a proactive approach that ensures learning experiences are usable by the widest range of people. It’s not just about accommodating individuals with permanent disabilities; it’s about designing with flexibility and empathy so that even temporary setbacks—like a sprained wrist that makes mouse usage difficult—don’t become barriers. The principles guiding universal design include adaptability, clarity, perceptibility, and minimal effort, all of which contribute to making content accessible, intuitive, and inclusive. 

But the book goes further than frameworks. It challenges designers to recognize and dismantle their biases. Assumptions about who benefits from learning content—like assuming visually impaired individuals wouldn’t be interested in flight training—limit potential. Mercier and her contributors urge creators to use tools like empathy mapping to understand diverse learner needs and break down those unexamined barriers. After all, learning has value beyond job relevance—it can empower, entertain, and inspire. 

To guide this reimagining of inclusive learning, the book recommends evaluating design choices through multiple thoughtful “lenses.” For instance, it cautions against excessive animation, which could trigger seizures or vertigo, and stresses the importance of closed captions and readable fonts. It calls for color palettes that don’t alienate those with color blindness or sensory sensitivities and highlights the importance of interface elements that are well-spaced and keyboard-navigable. From layout to structure, every design element should be reconsidered for inclusivity. 

One chapter zeroes in on the transformative role of captions. While originally designed to support people with hearing impairments, captions now benefit all kinds of learners—from those navigating noisy environments to Gen Z binge-watchers who prefer them turned on by default. Their widespread use in media platforms sets a precedent that learning experiences must follow, not as a courtesy but as a standard. 

Remote learning, too, is a key frontier. It unlocks flexibility and reach, especially for learners with disabilities. As contributor Karen Hyder illustrates, offering options in font size, contrast, audio delivery, or input methods makes courses more inviting and effective. She builds learner personas to guide her process, creating content that works whether a student uses a screen reader, captions, or keyboard-only navigation. 

Finally, Mercier reminds readers that accessibility isn’t a destination—it’s a journey guided by progress, not perfection. Missteps are inevitable, but they’re part of the process. Advocates like Meryl Evans champion a calm, constructive communication model (TEACH) to push for change, emphasizing education, empathy, and continuous effort. 

This book is a rallying call: design with intention, iterate with empathy, and build learning spaces that truly welcome everyone. 


Monday, July 7, 2025

 The following samples are ways of detecting and counting objects in images that do not require custom models to be trained and can be done after image processing and vectorizing. It is preferable to do agentic search where agents are versed in one of these techniques.

#! /usr/bin/python

import requests

import cv2

import numpy as np

from sklearn.cluster import DBSCAN

from sklearn.preprocessing import normalize

import hdbscan

import matplotlib.pyplot as plt

from io import BytesIO

from azure.core.credentials import AzureKeyCredential

from azure.ai.vision.imageanalysis import ImageAnalysisClient

from azure.ai.vision.imageanalysis.models import VisualFeatures

import os

match_threshold = 0.5

min_number_of_cluster_members = 2

# Azure Vision credentials

vision_endpoint=os.getenv("AZURE_AI_VISION_ENDPOINT")

vision_api_key = os.getenv("AZURE_AI_VISION_API_KEY")

object_uri = os.getenv("AZURE_RED_CAR_2_SAS_URL").strip('"')

scene_uri = os.getenv("AZURE_QUERY_SAS_URI").strip('"')

# Step 1: Download images from SAS URLs

def download_image(url):

    response = requests.get(url)

    image_array = np.frombuffer(response.content, np.uint8)

    return cv2.imdecode(image_array, cv2.IMREAD_COLOR)

# Step 2: Use OpenCV template matching to find object occurrences

def count_object_occurrences(scene, template, threshold=match_threshold):

    scene_gray = cv2.cvtColor(scene, cv2.COLOR_BGR2GRAY)

    template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)

    result = cv2.matchTemplate(scene_gray, template_gray, cv2.TM_CCOEFF_NORMED)

    locations = np.where(result >= threshold)

    w, h = template_gray.shape[::-1]

    rects = [[pt[0], pt[1], pt[0] + w, pt[1] + h] for pt in zip(*locations[::-1])]

    rects, _ = cv2.groupRectangles(rects, groupThreshold=1, eps=0.5)

    return len(rects)

# Step 3: Count matches

def count_matches():

    scene_img = download_image(scene_uri)

    object_img = download_image(object_uri)

    count = count_object_occurrences(scene_img, object_img)

    return count

# print(f"Detected {count_matches()} occurrences of the object.")

#Output: Detected 1 occurrences of the object.

# Load image from SAS URL

def load_image_from_sas(url):

    response = requests.get(url)

    image_array = np.frombuffer(response.content, np.uint8)

    return cv2.imdecode(image_array, cv2.IMREAD_COLOR)

def keypoints_and_descriptors(scene_img, object_img):

    orb = cv2.ORB_create(nfeatures=1000)

    kp1, des1 = orb.detectAndCompute(object_img, None)

    kp2, des2 = orb.detectAndCompute(scene_img, None)

    if des1 is None or des2 is None:

        return None, None, None, None

    return kp1, des1, kp2, des2

# Feature detection and matching

def get_matched_keypoints(scene_img, object_img):

    kp1,des1,kp2,des2 = keypoints_and_descriptors(scene_img, object_img)

    if des1 is None or des2 is None:

        return []

    matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)

    matches = matcher.match(des1, des2)

    matches = sorted(matches, key=lambda x: x.distance)

    matched_pts = np.float32([kp2[m.trainIdx].pt for m in matches])

    print(f"matched_pts={matched_pts}")

    return matched_pts

 # Extract matched descriptors using ORB

def get_matched_descriptors(scene_img, object_img):

    kp1,des1,kp2,des2 = keypoints_and_descriptors(scene_img, object_img)

    if des1 is None or des2 is None:

        return np.array([])

    matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)

    matches = matcher.match(des1, des2)

    matches = sorted(matches, key=lambda x: x.distance)

    matched_descriptors = np.array([des2[m.trainIdx] for m in matches])

    return matched_descriptors

# Cluster matched keypoints using DBSCAN

def cluster_keypoints(points, eps=30, min_samples=min_number_of_cluster_members):

    if len(points) == 0:

        return []

    clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points)

    labels = clustering.labels_

    return labels

# Cluster keypoints using HDBSCAN

def cluster_keypoints_hdbscan(points, min_cluster_size=min_number_of_cluster_members):

    if len(points) == 0:

        return np.array([])

    clusterer = hdbscan.HDBSCAN(min_cluster_size=min_cluster_size)

    labels = clusterer.fit_predict(points)

    # if len(labels) > 0:

    # plot_clusters(matched_points, labels)

    return labels

# Cluster descriptors using cosine similarity

def cluster_by_similarity(descriptors, min_cluster_size=min_number_of_cluster_members):

    if len(descriptors) == 0:

        return np.array([])

    # Normalize for cosine similarity

    descriptors = normalize(descriptors, norm='l2')

    clusterer = hdbscan.HDBSCAN(

        min_cluster_size=min_cluster_size,

        metric='euclidean', # Euclidean on normalized vectors ≈ cosine similarity

        cluster_selection_method='eom'

    )

    labels = clusterer.fit_predict(descriptors)

    return labels

# Optional: visualize clusters

def plot_clusters(points, labels):

    plt.figure(figsize=(8, 6))

    for label in set(labels):

        mask = labels == label

        color = 'gray' if label == -1 else None

        plt.scatter(points[mask, 0], points[mask, 1], label=f"Cluster {label}", alpha=0.6, s=30, c=color)

    plt.title("HDBSCAN Clusters of Matched Keypoints")

    plt.xlabel("X")

    plt.ylabel("Y")

    plt.legend()

    plt.show()

def count_multiple_matches():

    scene_img = load_image_from_sas(scene_uri)

    object_img = load_image_from_sas(object_uri)

    # matched_points = get_matched_keypoints(scene_img, object_img)

    # labels = cluster_keypoints_hdbscan(matched_points)

    descriptors = get_matched_descriptors(scene_img, object_img)

    labels = cluster_by_similarity(descriptors)

    print(f"len of labels={len(labels)} and labels={labels}")

    # Count valid clusters (excluding noise label -1)

    count = len(set(labels)) - (1 if -1 in labels else 0)

    return count

print(f"Estimated object instances: {count_multiple_matches()}")

# for dbscan

#Output: Estimated object instances: 3

# for hdbscan based on similarity

# len of labels=24 and labels=[ 1 -1 -1 1 -1 1 0 -1 -1 -1 1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 1 -1]

# Estimated object instances: 2


Sunday, July 6, 2025

 The following is one of the techniques to detect objects in images 

#! /usr/bin/python

import requests

import cv2

import numpy as np

from sklearn.cluster import DBSCAN

from sklearn.preprocessing import normalize

import hdbscan

import matplotlib.pyplot as plt

from io import BytesIO

from azure.core.credentials import AzureKeyCredential

from azure.ai.vision.imageanalysis import ImageAnalysisClient

from azure.ai.vision.imageanalysis.models import VisualFeatures

import os

match_threshold = 0.5

min_number_of_cluster_members = 2

object_uri = os.getenv("AZURE_RED_CAR_2_SAS_URL").strip('"')

scene_uri = os.getenv("AZURE_QUERY_SAS_URI").strip('"')


# Step 1: Download images from SAS URLs

def download_image(url):

    response = requests.get(url)

    image_array = np.frombuffer(response.content, np.uint8)

    return cv2.imdecode(image_array, cv2.IMREAD_COLOR)



# Step 2: Use OpenCV template matching to find object occurrences

def count_object_occurrences(scene, template, threshold=match_threshold):

    scene_gray = cv2.cvtColor(scene, cv2.COLOR_BGR2GRAY)

    template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)

    result = cv2.matchTemplate(scene_gray, template_gray, cv2.TM_CCOEFF_NORMED)

    locations = np.where(result >= threshold)

    w, h = template_gray.shape[::-1]

    rects = [[pt[0], pt[1], pt[0] + w, pt[1] + h] for pt in zip(*locations[::-1])]

    rects, _ = cv2.groupRectangles(rects, groupThreshold=1, eps=0.5)

    return len(rects)


# Step 3: Count matches

def count_matches():

    scene_img = download_image(scene_uri)

    object_img = download_image(object_uri)

    count = count_object_occurrences(scene_img, object_img)

    return count

# print(f"Detected {count_matches()} occurrences of the object.")

#Output: Detected 1 occurrences of the object.


#Codingexercise: codingexercise-07-06-2025.pdf

https://1drv.ms/b/c/d609fb70e39b65c8/EQti1RQIDMpNrxM0jjPPYzkBI0PX5cBV-eUmQR_Js0nsBQ?e=sssW8n

Saturday, July 5, 2025

 The previous articles described position and velocity adjustments for drone formation to transform from one phase to another. While those methods allow individual units to transition to the new formation it does not detect conflicts. So the conflict detection strategy must be layered on the position, velocity and heading determination. This is especially true for fixed wing drones that do not have hover and yawing capabilities  

An example for this is now illustrated below: 

import heapq 

from collections import defaultdict, namedtuple 

 

Constraint = namedtuple('Constraint', ['agent', 'position', 'time']) 

 

class AStarPlanner: 

    def __init__(self, grid): 

        self.grid = grid 

        self.rows = len(grid) 

        self.cols = len(grid[0]) 

     

    def neighbors(self, pos): 

        r, c = pos 

        for dr, dc in [(0,1), (1,0), (0,-1), (-1,0), (0,0)]:  # 4 directions + wait 

            nr, nc = r + dr, c + dc 

            if 0 <= nr < self.rows and 0 <= nc < self.cols and self.grid[nr][nc] != 1: 

                yield (nr, nc) 

 

    def heuristic(self, a, b): 

        return abs(a[0] - b[0]) + abs(a[1] - b[1]) 

 

    def plan(self, start, goal, constraints): 

        open_set = [(0 + self.heuristic(start, goal), 0, start, [start])] 

        seen = set() 

        constraint_dict = defaultdict(set) 

        for c in constraints: 

            constraint_dict[(c.position, c.time)].add(c.agent) 

 

        while open_set: 

            f, g, current, path = heapq.heappop(open_set) 

            if (current, g) in seen: 

                continue 

            seen.add((current, g)) 

 

            if current == goal and g >= max(t for _, t in constraint_dict.keys() if _[0] == goal or _[1] == goal or not constraint_dict else 0): 

                return path 

             

            for neighbor in self.neighbors(current): 

                if any(c.agent == 'any' or c.agent == agent for c in constraint_dict.get((neighbor, g+1), set())): 

                    continue 

                heapq.heappush(open_set, (g + 1 + self.heuristic(neighbor, goal), g + 1, neighbor, path + [neighbor])) 

        return None  # No path found 

 

class CBS: 

    def __init__(self, grid, starts, goals): 

        self.grid = grid 

        self.starts = starts 

        self.goals = goals 

        self.num_agents = len(starts) 

 

    def detect_conflict(self, paths): 

        max_t = max(len(p) for p in paths) 

        for t in range(max_t): 

            positions = {} 

            for i, path in enumerate(paths): 

                pos = path[t] if t < len(path) else path[-1] 

                if pos in positions: 

                    return {'time': t, 'a1': positions[pos], 'a2': i, 'pos': pos} 

                positions[pos] = i 

        return None 

 

    def search(self): 

        root = {'paths': [], 'constraints': []} 

        astar = AStarPlanner(self.grid) 

         

        for i in range(self.num_agents): 

            path = astar.plan(self.starts[i], self.goals[i], []) 

            if not path: 

                return None 

            root['paths'].append(path) 

         

        open_set = [root] 

        while open_set: 

            node = open_set.pop(0) 

            conflict = self.detect_conflict(node['paths']) 

            if not conflict: 

                return node['paths'] 

 

            for agent in [conflict['a1'], conflict['a2']]: 

                new_constraints = node['constraints'] + [Constraint(agent, conflict['pos'], conflict['time'])] 

                new_paths = list(node['paths']) 

                path = astar.plan(self.starts[agent], self.goals[agent], new_constraints) 

                if not path: 

                    continue 

                new_paths[agent] = path 

                open_set.append({'constraints': new_constraints, 'paths': new_paths}) 

        return None