The following summarizes the workflow of a drone video sensing application: 
from django.conf import settings 
from dotenv import load_dotenv 
load_dotenv(override=True) 
from azure.core.credentials import AzureKeyCredential 
from azure.core.exceptions import HttpResponseError 
from azure.identity import DefaultAzureCredential 
from azure.ai.vision.imageanalysis import ImageAnalysisClient 
from azure.ai.vision.imageanalysis.models import VisualFeatures, ImageAnalysisResult 
from tenacity import retry, stop_after_attempt, wait_fixed 
from pprint import pprint, pformat 
from dotenv import load_dotenv   
vision_api_key = settings.vision_api_key 
vision_api_version = settings.vision_api_version 
vision_region = settings.vision_region 
vision_endpoint =  settings.vision_endpoint 
api_version = settings.api_version 
model_version = settings.model_version 
# Step 1: Get an access token 
    url = f"{settings.video_indexer_endpoint}/auth/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/AccessToken" 
        "Ocp-Apim-Subscription-Key": settings.video_indexer_api_key 
    response = requests.get(url, headers=headers) 
    return response.text.strip('"') 
def trim_filename(filename: str, max_length: int = 255) -> str: 
    # Separate base name and extension 
    base, ext = os.path.splitext(filename) 
    # Truncate base if total exceeds max_length 
    allowed_base_length = max_length - len(ext) 
    trimmed_base = base[:allowed_base_length] 
    return trimmed_base + ext 
# Step 2: Upload video and start indexing 
def upload_and_index_video(access_token, accountId, video_file_path, video_url = None): 
        parsed_url = urllib.parse.urlparse(video_url) 
        video_path = parsed_url.path  
        video_name = accountId + "-" + video_path.split('/', 2)[-1] 
        video_name = accountId + "-" + trim_filename(os.path.basename(video_file_path)) 
    url = f"{settings.video_indexer_endpoint}/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/Videos?name={video_name}&accessToken={access_token}&privacy=Private" 
        encoded_url = urllib.parse.quote(video_url, safe='') 
        url += f"videoUrl={encoded_url}" 
        response = requests.post(url) 
        with open(video_file_path, 'rb') as video_file: 
            files = {'file': video_file} 
            response = requests.post(url, files=files) 
# Step 3: Wait for indexing to complete and get insights 
def get_video_insights(access_token, video_id): 
    url = f"{settings.video_indexer_endpoint}/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/Videos/{video_id}/Index?accessToken={access_token}" 
        response = requests.get(url) 
        if data['state'] == 'Processed': 
        time.sleep(10)  # Wait 10 seconds before checking again 
def get_uploaded_video_id(access_token, accountId, video_file_path, video_url = None): 
    video_data = upload_and_index_video(access_token, accountId, video_file_path, video_url) 
        video_id = video_data['id'] 
def get_insights_formatted(access_token, video_id): 
    insights = get_video_insights(access_token, video_id) 
    value = "Video highlights and key insights:\n" 
    value += ("=" * 50) + "\n" 
    # Extract highlights: keyframes, topics, and summarization 
    if 'summarizedInsights' in insights: 
        for theme in insights['summarizedInsights']['themes']: 
            value += f"Theme: {theme['name']}" 
            for highlight in theme['keyframes']: 
                value += f"  Keyframe at {highlight['adjustedStart']} to {highlight['adjustedEnd']}\n" 
                value += f"  Thumbnail: {highlight['thumbnailId']}\n" 
                value += f"  Description: {highlight.get('description', 'No description')}\n" 
        value += f"No summarization available. See full insights: {insights}" 
{'accountId': '26ff36de-cac7-4bea-ad7a-abdf0d63c19c', 'id': 'lwxjba8wy3', 'partition': None, 'externalId': None, 'metadata': None, 'name': 'mainindexedvideo.mp4', 'description': None, 'created': '2025-06-25T03:54:44.3133333+00:00', 'lastModified': '2025-06-25T03:54:44.3133333+00:00', 'lastIndexed': '2025-06-25T03:54:44.3133333+00:00', 'privacyMode': 'Private', 'userName': 'Ravi Rajamani', 'isOwned': True, 'isBase': True, 'hasSourceVideoFile': True, 'state': 'Uploaded', 'moderationState': 'OK', 'reviewState': 'None', 'isSearchable': True, 'processingProgress': '1%', 'durationInSeconds': 0, 'thumbnailVideoId': 'lwxjba8wy3', 'thumbnailId': '00000000-0000-0000-0000-000000000000', 'searchMatches': [], 'indexingPreset': 'Default', 'streamingPreset': 'Default', 'sourceLanguage': 'en-US', 'sourceLanguages': ['en-US'], 'personModelId': '00000000-0000-0000-0000-000000000000'} 
def repeat_video_index(access_token, video_id): 
    """Retrieve the index/insights for a video by its ID.""" 
    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/ReIndex?accessToken={access_token}" 
    response = requests.put(url) 
    if response.status_code == 200: 
    return get_video_insights(access_token, video_id) 
def get_video_insights(access_token, video_id): 
    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/Index?accessToken={access_token}" 
        response = requests.get(url) 
        if "state" in data and data['state'] == 'Processed': 
        print("Sleeping for ten seconds...") 
        time.sleep(10)  # Wait 10 seconds before checking again 
def get_selected_segments(insights, threshold): 
        indexed_duration = insights["summarizedInsights"]["duration"]["seconds"] 
        reduced_duration = (threshold * indexed_duration) / 100 
        for video in insights["videos"]: 
            for shot in video["insights"]["shots"]: 
                for key_frame in shot["keyFrames"]: 
                    key_frame_id = key_frame["id"] 
                    start = key_frame["instances"][0]["start"] 
                    end = key_frame["instances"][0]["end"] 
                    # total_duration += float(end) - float(start) 
                    print(f"Clipping shot: {shot_id}, key_frame: {key_frame_id}, start: {start}, end: {end}") 
                    selected_segments +=[(start,end)] 
        # print(f"Total duration: {total_duration}") 
def create_project(access_token, video_id, selected_segments): 
        for start,end in selected_segments: 
            intervals["videoId"] = video_id 
            intervalRange["start"] = start 
            intervalRange["end"] = end 
            intervals["range"] = intervalRange 
            video_ranges += [intervals] 
        project_name = ''.join(random.choices(string.hexdigits, k=8)) 
            "videosRanges": video_ranges, 
            "Content-Type": "application/json" 
        url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects?accessToken={access_token}" 
        response = requests.post(url, json=data, headers=headers) 
        if response.status_code == 200: 
def render_video(access_token, project_id): 
        url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/render?sendCompletionEmail=false&accessToken={access_token}" 
            "Content-Type": "application/json" 
        response = requests.post(url, headers=headers) 
        if response.status_code == 202: 
def get_render_operation(access_token, project_id): 
    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderoperation?accessToken={access_token}" 
        response = requests.get(url) 
        if "state" in data and data['state'] == 'Succeeded': 
        print("Sleeping for ten seconds before checking on rendering...") 
        time.sleep(10)  # Wait 10 seconds before checking again         
def download_rendered_file(access_token, project_id): 
    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderedfile/downloadurl?accessToken={access_token}" 
    response = requests.get(url) 
    if response.status_code == 200: 
        if "downloadUrl" in data: 
            return data["downloadUrl"] 
def index_and_download_video(account_id = None, project_id = None, video_id = None, video_file_path = None, video_url = None): 
        account_id = settings.video_indexer_default_account_id 
    access_token = settings.video_indexer_access_token 
        access_token = get_access_token() 
        access_token = get_access_token() 
    if not uploaded_video_id and not video_file_path and not video_url: 
            video_id = get_uploaded_video_id(access_token, accountId, video_file_path) 
            video_id = get_uploaded_video_id(access_token, accountId, video_file_path, video_url=video_url) 
    insights = get_video_insights(access_token, video_id) 
    selected_segments = get_selected_segments(insights, 10) 
        project_id = create_project(access_token, video_id, selected_segments) 
    render_response = render_video(access_token, project_id) 
        status = get_render_operation(access_token, project_id) 
        download_url = download_rendered_file(access_token, project_id) 
def get_image_blob_url(video_url, frame_number): 
    # Parse the original video URL to get account, container, and path 
    parsed = urlparse(video_url) 
    path_parts = parsed.path.split('/') 
    container = path_parts[1] 
    blob_path = '/'.join(path_parts[2:]) 
    # Remove the file name from the blob path 
    blob_dir = '/'.join(blob_path.split('/')[:-1]) 
    if blob_dir == "" or blob_dir == None: 
    image_path = f"{blob_dir}/images/frame{frame_number}.jpg" 
    # Rebuild the base URL (without SAS token) 
    base_url = f"{parsed.scheme}://{parsed.netloc}/{container}/{image_path}" 
    # Add the SAS token if present 
        image_url = f"{base_url}?{sas_token}" 
def download_blob_to_stream(blob_client): 
    download_stream = blob_client.download_blob() 
    return io.BytesIO(download_stream.readall()) 
def extract_and_upload_frames(video_sas_url): 
    # Set up blob client for video 
    video_blob_client = BlobClient.from_blob_url(video_sas_url) 
    # Download video to memory stream 
    video_stream = download_blob_to_stream(video_blob_client) 
    # Use OpenCV to read from memory 
    video_bytes = video_stream.getvalue() 
    # Use cv2 to read from bytes 
    video_temp = os.path.join(os.getcwd(), f"temp_{uuid.uuid4()}.mp4") 
    with open(video_temp, 'wb') as f: 
    vidcap = cv2.VideoCapture(video_temp) 
        success, frame = vidcap.read() 
        _, buffer = cv2.imencode('.jpg', frame) 
        image_bytes = buffer.tobytes() 
            image_path = f"frame{frame_number}.jpg" 
            with open(image_path, 'wb') as f: 
            # Generate image blob URL 
            image_url = get_image_blob_url(video_sas_url, frame_number) 
            image_blob_client = BlobClient.from_blob_url(image_url) 
            image_blob_client.upload_blob(image_bytes, overwrite=True) 
            print(f"Uploaded frame {frame_number} to {image_url}") 
def vectorize_extracted_frames(video_sas_url): 
    extract_and_upload_frames(video_sas_url) 
    vision_credential = AzureKeyCredential(vision_api_key) 
    analysis_client = ImageAnalysisClient(vision_endpoint, vision_credential) 
    # Set up blob client for video 
    video_blob_client = BlobClient.from_blob_url(video_sas_url) 
            # Generate image blob URL 
            image_url = get_image_blob_url(video_sas_url, frame_number) 
            image_blob_client = BlobClient.from_blob_url(image_url) 
            image_stream = download_blob_to_stream(image_blob_client) 
            vector = vectorize_image(image_url, vision_api_key, vision_region) 
                vector = np.pad(vector, (0, 1536 - len(vector)), mode='constant') 
                print(f"Vectorized frame: {frame_number}") 
            description = analyze_image(analysis_client, image_url) 
                print(f"Analyzed frame: {frame_number}") 
                tuples += [(vector, description)] 
            print(f"No such image: {image_url[74:80]}. Giving up...") 
# access_token = os.getenv("AZURE_VIDEO_INDEXER_ACCESS_TOKEN", get_access_token()) 
# video_sas_url=video_sas_url.strip('"') 
# extract_and_upload_frames(video_sas_url) 
# vision_credential = AzureKeyCredential(vision_api_key) 
# analysis_client = ImageAnalysisClient(vision_endpoint, vision_credential) 
@retry(stop=stop_after_attempt(5), wait=wait_fixed(60)) 
def vectorize_image(image_path, key, region): 
        # API version and model version 
        api_version = "2024-02-01" 
        model_version = "2023-04-15" 
        # Construct the request URL 
        url = f"{vision_endpoint}/computervision/retrieval:vectorizeImage?api-version={api_version}&model-version={model_version}" 
            "Content-Type": "application/json", 
            "Ocp-Apim-Subscription-Key": key 
        # Set the payload with the SAS URL 
        response = requests.post(url, headers=headers, json=payload) 
        if response.status_code == 200: 
            # The vector is in the 'vector' field of the response 
            vector = result.get("vector") 
            # print("Vector embedding:", vector) 
            print("Error:", response.status_code, response.text) 
            raise Exception(f"Error vectorizing image {image_path[74:80]}") 
    except (requests.exceptions.Timeout, http.client.HTTPException) as e: 
        print(f"Timeout/Error for {image_path[74:80]}. Retrying...") 
@retry(stop=stop_after_attempt(5), wait=wait_fixed(60)) 
def analyze_image(client, image_url): 
        # Define all available visual features for analysis 
            VisualFeatures.SMART_CROPS, 
            VisualFeatures.DENSE_CAPTIONS, 
        # Analyze the image from the SAS URL 
        result = client.analyze_from_url( 
            visual_features=features, 
            gender_neutral_caption=True        ) 
        # Explicitly cast to ImageAnalysisResult (for clarity) 
        result: ImageAnalysisResult = result 
            captions += [ f"{result.caption.text}" if result.caption is not None else "No Caption"] 
            captions += [ f"{caption.text}" for caption in result.dense_captions.list if result.dense_captions is not None] 
            result.description = ",".join(captions) 
            description =  pformat(result.__dict__, depth=4, compact=False) 
    except HttpResponseError as e: