Agent to detect vehicles in aerial drone images:
#!/usr/bin/python
# azure-ai-agents==1.0.0
# azure-ai-projects==1.0.0b11
# azure-ai-vision-imageanalysis==1.0.0
# azure-common==1.1.28
# azure-core==1.34.0
# azure-identity==1.22.0
# azure-search-documents==11.6.0b12
# azure-storage-blob==12.25.1
# azure_ai_services==0.1.0
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from azure.ai.agents import AgentsClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.projects import AIProjectClient
from typing import Any, Callable, Set, Dict, List, Optional
import os, time, sys
import torch
from azure.ai.agents import AgentsClient
from azure.ai.agents.models import (
FunctionTool,
ListSortOrder,
RequiredFunctionToolCall,
SubmitToolOutputsAction,
ToolOutput,
)
from user_functions import fetch_weather, user_functions
sys.path.insert(0, os.path.abspath("."))
load_dotenv(override=True)
project_endpoint = os.environ["AZURE_PROJECT_ENDPOINT"]
project_api_key = os.environ["AZURE_PROJECT_API_KEY"]
agent_model = os.getenv("AZURE_AGENT_MODEL", "gpt-4o-mini")
agent_name = os.getenv("AZURE_VEHICLE_COUNT_AGENT_NAME", "vehicle-agent-in-a-team")
api_version = "2025-05-01-Preview"
agent_max_output_tokens=10000
object_uri = os.getenv("AZURE_RED_CAR_2_SAS_URL").strip('"')
scene_uri = os.getenv("AZURE_QUERY_SAS_URI").strip('"')
from azure.ai.projects import AIProjectClient
project_client = AIProjectClient(endpoint=project_endpoint, credential=DefaultAzureCredential())
agents_client = AgentsClient(
endpoint=project_endpoint,
credential=DefaultAzureCredential(),
)
def read_image_from_blob(sas_url):
"""Reads an image from Azure Blob Storage using its SAS URL."""
response = requests.get(sas_url)
if response.status_code == 200:
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
return image
else:
# raise Exception(f"Failed to fetch image. Status code: {response.status_code}")
return None
def detect_vehicles(frame):
results = model(frame)
# Keep only 'car', 'truck', 'bus', 'motorcycle' detections
vehicle_labels = ['car', 'truck', 'bus', 'motorcycle']
detections = results.pandas().xyxy[0]
vehicles = detections[detections['name'].isin(vehicle_labels)]
return vehicles
def get_image_output_url(scene_uri):
# Parse the original video URL to get account, container, and path
parsed = urlparse(scene_uri)
path_parts = parsed.path.split('/')
container = path_parts[1]
blob_path = '/'.join(path_parts[2:])
# Remove the file name from the blob path
blob_dir = '/'.join(blob_path.split('/')[:-1])
if blob_dir == "" or blob_dir == None:
blob_dir = "output"
# Create image path
image_path = f"{blob_dir}/images/vehiclesframe.jpg"
# Rebuild the base URL (without SAS token)
base_url = f"{parsed.scheme}://{parsed.netloc}/{container}/{image_path}"
# Add the SAS token if present
sas_token = parsed.query
if sas_token:
image_url = f"{base_url}?{sas_token}"
else:
image_url = base_url
return image_url
def detect_vehicles_from_uri(scene_uri: Optional[str] = None) -> str:
if not scene_uri:
return None
frame = read_image_from_blob(scene_uri)
if not frame:
return None
vehicles = detect_vehicles(frame)
print(vehicles)
for _, v in vehicles.iterrows():
x1, y1, x2, y2 = map(int, [v['xmin'], v['ymin'], v['xmax'], v['ymax']])
w, h = x2 - x1, y2 - y1
cv2.rectangle(frame, (x, y), (x +w, y + h), (255,0,0), 2)
_, buffer = cv2.imencode('.jpg', frame)
image_bytes = buffer.tobytes()
image_uri = get_image_output_url(scene_uri)
image_blob_client = BlobClient.from_blob_url(image_url)
image_blob_client.upload_blob(image_bytes, overwrite=True)
return image_uri
image_user_functions: Set[Callable[..., Any]] = {
detect_vehicles_from_uri
}
# Initialize function tool with user functions
functions = FunctionTool(functions=image_user_functions)
instructions = "You are an assistant that answers the question how many vehicles were found in an image when the image is given by an image URI. You evaluate a function to do this by passing their uri to the function and respond with the count."
query_text = f"How many vehicles are found in the image given by its image URI {scene_uri}?"
with agents_client:
# Create an agent and run user's request with function calls
# agent = agents_client.get_agent(agent_id="asst_qyMFcz1BnU0BS0QUmhxAAyFk")
# """
agent = agents_client.create_agent(
model=agent_model,
name=agent_name,
instructions=instructions,
tools=functions.definitions,
tool_resources=functions.resources,
top_p=1
)
# """
print(f"Created agent, ID: {agent.id}")
thread = agents_client.threads.create()
print(f"Created thread, ID: {thread.id}")
message = agents_client.messages.create(
thread_id=thread.id,
role="user",
content=query_text,
)
print(f"Created message, ID: {message.id}")
run = agents_client.runs.create(thread_id=thread.id, agent_id=agent.id)
print(f"Created run, ID: {run.id}")
while run.status in ["queued", "in_progress", "requires_action"]:
time.sleep(1)
run = agents_client.runs.get(thread_id=thread.id, run_id=run.id)
if run.status == "requires_action" and isinstance(run.required_action, SubmitToolOutputsAction):
tool_calls = run.required_action.submit_tool_outputs.tool_calls
if not tool_calls:
print("No tool calls provided - cancelling run")
agents_client.runs.cancel(thread_id=thread.id, run_id=run.id)
break
tool_outputs = []
for tool_call in tool_calls:
if isinstance(tool_call, RequiredFunctionToolCall):
print("Is an instance of RequiredFunctionToolCall")
try:
print(f"Executing tool call: {tool_call}")
output = functions.execute(tool_call)
print(output)
tool_outputs.append(
ToolOutput(
tool_call_id=tool_call.id,
output=output,
)
)
except Exception as e:
print(f"Error executing tool_call {tool_call.id}: {e}")
else:
print(f"{tool_call} skipped.")
print(f"Tool outputs: {tool_outputs}")
if tool_outputs:
agents_client.runs.submit_tool_outputs(thread_id=thread.id, run_id=run.id, tool_outputs=tool_outputs)
else:
print(f"No tool output.")
else:
print(f"Waiting: {run}")
print(f"Current run status: {run.status}")
print(f"Run completed with status: {run.status} and details {run}")
# Delete the agent when done
agents_client.delete_agent(agent.id)
print("Deleted agent")
# Fetch and log all messages
messages = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)
for msg in messages:
if msg.text_messages:
last_text = msg.text_messages[-1]
print(f"{msg.role}: {last_text.text.value}")
No comments:
Post a Comment