The following summarizes the workflow of a drone video sensing application:
from django.conf import settings
from dotenv import load_dotenv
load_dotenv(override=True)
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.identity import DefaultAzureCredential
from azure.ai.vision.imageanalysis import ImageAnalysisClient
from azure.ai.vision.imageanalysis.models import VisualFeatures, ImageAnalysisResult
from tenacity import retry, stop_after_attempt, wait_fixed
from pprint import pprint, pformat
from dotenv import load_dotenv
vision_api_key = settings.vision_api_key
vision_api_version = settings.vision_api_version
vision_region = settings.vision_region
vision_endpoint = settings.vision_endpoint
api_version = settings.api_version
model_version = settings.model_version
# Step 1: Get an access token
url = f"{settings.video_indexer_endpoint}/auth/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/AccessToken"
"Ocp-Apim-Subscription-Key": settings.video_indexer_api_key
response = requests.get(url, headers=headers)
return response.text.strip('"')
def trim_filename(filename: str, max_length: int = 255) -> str:
# Separate base name and extension
base, ext = os.path.splitext(filename)
# Truncate base if total exceeds max_length
allowed_base_length = max_length - len(ext)
trimmed_base = base[:allowed_base_length]
return trimmed_base + ext
# Step 2: Upload video and start indexing
def upload_and_index_video(access_token, accountId, video_file_path, video_url = None):
parsed_url = urllib.parse.urlparse(video_url)
video_path = parsed_url.path
video_name = accountId + "-" + video_path.split('/', 2)[-1]
video_name = accountId + "-" + trim_filename(os.path.basename(video_file_path))
url = f"{settings.video_indexer_endpoint}/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/Videos?name={video_name}&accessToken={access_token}&privacy=Private"
encoded_url = urllib.parse.quote(video_url, safe='')
url += f"videoUrl={encoded_url}"
response = requests.post(url)
with open(video_file_path, 'rb') as video_file:
files = {'file': video_file}
response = requests.post(url, files=files)
# Step 3: Wait for indexing to complete and get insights
def get_video_insights(access_token, video_id):
url = f"{settings.video_indexer_endpoint}/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/Videos/{video_id}/Index?accessToken={access_token}"
response = requests.get(url)
if data['state'] == 'Processed':
time.sleep(10) # Wait 10 seconds before checking again
def get_uploaded_video_id(access_token, accountId, video_file_path, video_url = None):
video_data = upload_and_index_video(access_token, accountId, video_file_path, video_url)
video_id = video_data['id']
def get_insights_formatted(access_token, video_id):
insights = get_video_insights(access_token, video_id)
value = "Video highlights and key insights:\n"
value += ("=" * 50) + "\n"
# Extract highlights: keyframes, topics, and summarization
if 'summarizedInsights' in insights:
for theme in insights['summarizedInsights']['themes']:
value += f"Theme: {theme['name']}"
for highlight in theme['keyframes']:
value += f" Keyframe at {highlight['adjustedStart']} to {highlight['adjustedEnd']}\n"
value += f" Thumbnail: {highlight['thumbnailId']}\n"
value += f" Description: {highlight.get('description', 'No description')}\n"
value += f"No summarization available. See full insights: {insights}"
{'accountId': '26ff36de-cac7-4bea-ad7a-abdf0d63c19c', 'id': 'lwxjba8wy3', 'partition': None, 'externalId': None, 'metadata': None, 'name': 'mainindexedvideo.mp4', 'description': None, 'created': '2025-06-25T03:54:44.3133333+00:00', 'lastModified': '2025-06-25T03:54:44.3133333+00:00', 'lastIndexed': '2025-06-25T03:54:44.3133333+00:00', 'privacyMode': 'Private', 'userName': 'Ravi Rajamani', 'isOwned': True, 'isBase': True, 'hasSourceVideoFile': True, 'state': 'Uploaded', 'moderationState': 'OK', 'reviewState': 'None', 'isSearchable': True, 'processingProgress': '1%', 'durationInSeconds': 0, 'thumbnailVideoId': 'lwxjba8wy3', 'thumbnailId': '00000000-0000-0000-0000-000000000000', 'searchMatches': [], 'indexingPreset': 'Default', 'streamingPreset': 'Default', 'sourceLanguage': 'en-US', 'sourceLanguages': ['en-US'], 'personModelId': '00000000-0000-0000-0000-000000000000'}
def repeat_video_index(access_token, video_id):
"""Retrieve the index/insights for a video by its ID."""
url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/ReIndex?accessToken={access_token}"
response = requests.put(url)
if response.status_code == 200:
return get_video_insights(access_token, video_id)
def get_video_insights(access_token, video_id):
url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/Index?accessToken={access_token}"
response = requests.get(url)
if "state" in data and data['state'] == 'Processed':
print("Sleeping for ten seconds...")
time.sleep(10) # Wait 10 seconds before checking again
def get_selected_segments(insights, threshold):
indexed_duration = insights["summarizedInsights"]["duration"]["seconds"]
reduced_duration = (threshold * indexed_duration) / 100
for video in insights["videos"]:
for shot in video["insights"]["shots"]:
for key_frame in shot["keyFrames"]:
key_frame_id = key_frame["id"]
start = key_frame["instances"][0]["start"]
end = key_frame["instances"][0]["end"]
# total_duration += float(end) - float(start)
print(f"Clipping shot: {shot_id}, key_frame: {key_frame_id}, start: {start}, end: {end}")
selected_segments +=[(start,end)]
# print(f"Total duration: {total_duration}")
def create_project(access_token, video_id, selected_segments):
for start,end in selected_segments:
intervals["videoId"] = video_id
intervalRange["start"] = start
intervalRange["end"] = end
intervals["range"] = intervalRange
video_ranges += [intervals]
project_name = ''.join(random.choices(string.hexdigits, k=8))
"videosRanges": video_ranges,
"Content-Type": "application/json"
url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects?accessToken={access_token}"
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
def render_video(access_token, project_id):
url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/render?sendCompletionEmail=false&accessToken={access_token}"
"Content-Type": "application/json"
response = requests.post(url, headers=headers)
if response.status_code == 202:
def get_render_operation(access_token, project_id):
url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderoperation?accessToken={access_token}"
response = requests.get(url)
if "state" in data and data['state'] == 'Succeeded':
print("Sleeping for ten seconds before checking on rendering...")
time.sleep(10) # Wait 10 seconds before checking again
def download_rendered_file(access_token, project_id):
url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderedfile/downloadurl?accessToken={access_token}"
response = requests.get(url)
if response.status_code == 200:
if "downloadUrl" in data:
return data["downloadUrl"]
def index_and_download_video(account_id = None, project_id = None, video_id = None, video_file_path = None, video_url = None):
account_id = settings.video_indexer_default_account_id
access_token = settings.video_indexer_access_token
access_token = get_access_token()
access_token = get_access_token()
if not uploaded_video_id and not video_file_path and not video_url:
video_id = get_uploaded_video_id(access_token, accountId, video_file_path)
video_id = get_uploaded_video_id(access_token, accountId, video_file_path, video_url=video_url)
insights = get_video_insights(access_token, video_id)
selected_segments = get_selected_segments(insights, 10)
project_id = create_project(access_token, video_id, selected_segments)
render_response = render_video(access_token, project_id)
status = get_render_operation(access_token, project_id)
download_url = download_rendered_file(access_token, project_id)
def get_image_blob_url(video_url, frame_number):
# Parse the original video URL to get account, container, and path
parsed = urlparse(video_url)
path_parts = parsed.path.split('/')
container = path_parts[1]
blob_path = '/'.join(path_parts[2:])
# Remove the file name from the blob path
blob_dir = '/'.join(blob_path.split('/')[:-1])
if blob_dir == "" or blob_dir == None:
image_path = f"{blob_dir}/images/frame{frame_number}.jpg"
# Rebuild the base URL (without SAS token)
base_url = f"{parsed.scheme}://{parsed.netloc}/{container}/{image_path}"
# Add the SAS token if present
image_url = f"{base_url}?{sas_token}"
def download_blob_to_stream(blob_client):
download_stream = blob_client.download_blob()
return io.BytesIO(download_stream.readall())
def extract_and_upload_frames(video_sas_url):
# Set up blob client for video
video_blob_client = BlobClient.from_blob_url(video_sas_url)
# Download video to memory stream
video_stream = download_blob_to_stream(video_blob_client)
# Use OpenCV to read from memory
video_bytes = video_stream.getvalue()
# Use cv2 to read from bytes
video_temp = os.path.join(os.getcwd(), f"temp_{uuid.uuid4()}.mp4")
with open(video_temp, 'wb') as f:
vidcap = cv2.VideoCapture(video_temp)
success, frame = vidcap.read()
_, buffer = cv2.imencode('.jpg', frame)
image_bytes = buffer.tobytes()
image_path = f"frame{frame_number}.jpg"
with open(image_path, 'wb') as f:
# Generate image blob URL
image_url = get_image_blob_url(video_sas_url, frame_number)
image_blob_client = BlobClient.from_blob_url(image_url)
image_blob_client.upload_blob(image_bytes, overwrite=True)
print(f"Uploaded frame {frame_number} to {image_url}")
def vectorize_extracted_frames(video_sas_url):
extract_and_upload_frames(video_sas_url)
vision_credential = AzureKeyCredential(vision_api_key)
analysis_client = ImageAnalysisClient(vision_endpoint, vision_credential)
# Set up blob client for video
video_blob_client = BlobClient.from_blob_url(video_sas_url)
# Generate image blob URL
image_url = get_image_blob_url(video_sas_url, frame_number)
image_blob_client = BlobClient.from_blob_url(image_url)
image_stream = download_blob_to_stream(image_blob_client)
vector = vectorize_image(image_url, vision_api_key, vision_region)
vector = np.pad(vector, (0, 1536 - len(vector)), mode='constant')
print(f"Vectorized frame: {frame_number}")
description = analyze_image(analysis_client, image_url)
print(f"Analyzed frame: {frame_number}")
tuples += [(vector, description)]
print(f"No such image: {image_url[74:80]}. Giving up...")
# access_token = os.getenv("AZURE_VIDEO_INDEXER_ACCESS_TOKEN", get_access_token())
# video_sas_url=video_sas_url.strip('"')
# extract_and_upload_frames(video_sas_url)
# vision_credential = AzureKeyCredential(vision_api_key)
# analysis_client = ImageAnalysisClient(vision_endpoint, vision_credential)
@retry(stop=stop_after_attempt(5), wait=wait_fixed(60))
def vectorize_image(image_path, key, region):
# API version and model version
api_version = "2024-02-01"
model_version = "2023-04-15"
# Construct the request URL
url = f"{vision_endpoint}/computervision/retrieval:vectorizeImage?api-version={api_version}&model-version={model_version}"
"Content-Type": "application/json",
"Ocp-Apim-Subscription-Key": key
# Set the payload with the SAS URL
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
# The vector is in the 'vector' field of the response
vector = result.get("vector")
# print("Vector embedding:", vector)
print("Error:", response.status_code, response.text)
raise Exception(f"Error vectorizing image {image_path[74:80]}")
except (requests.exceptions.Timeout, http.client.HTTPException) as e:
print(f"Timeout/Error for {image_path[74:80]}. Retrying...")
@retry(stop=stop_after_attempt(5), wait=wait_fixed(60))
def analyze_image(client, image_url):
# Define all available visual features for analysis
VisualFeatures.SMART_CROPS,
VisualFeatures.DENSE_CAPTIONS,
# Analyze the image from the SAS URL
result = client.analyze_from_url(
visual_features=features,
gender_neutral_caption=True )
# Explicitly cast to ImageAnalysisResult (for clarity)
result: ImageAnalysisResult = result
captions += [ f"{result.caption.text}" if result.caption is not None else "No Caption"]
captions += [ f"{caption.text}" for caption in result.dense_captions.list if result.dense_captions is not None]
result.description = ",".join(captions)
description = pformat(result.__dict__, depth=4, compact=False)
except HttpResponseError as e: