Thursday, July 17, 2025

The following summarizes the workflow of a drone video sensing application: 

import requests 

import time 

import os 

from django.conf import settings 

from dotenv import load_dotenv 

load_dotenv(override=True) 

import urllib.parse 

from azure.core.credentials import AzureKeyCredential 

from azure.core.exceptions import HttpResponseError 

from azure.identity import DefaultAzureCredential 

from azure.ai.vision.imageanalysis import ImageAnalysisClient 

from azure.ai.vision.imageanalysis.models import VisualFeatures, ImageAnalysisResult 

from tenacity import retry, stop_after_attempt, wait_fixed 

from pprint import pprint, pformat 

from dotenv import load_dotenv   

import json  

import http.client 

 

vision_api_key = settings.vision_api_key 

vision_api_version = settings.vision_api_version 

vision_region = settings.vision_region 

vision_endpointsettings.vision_endpoint 

api_version = settings.api_version 

model_version = settings.model_version 

 

         

# Step 1: Get an access token 

def get_access_token(): 

    url = f"{settings.video_indexer_endpoint}/auth/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/AccessToken" 

    headers = { 

        "Ocp-Apim-Subscription-Key": settings.video_indexer_api_key 

    } 

    response = requests.get(url, headers=headers) 

    return response.text.strip('"') 

     

 

 

def trim_filename(filename: str, max_length: int = 255) -> str: 

    # Separate base name and extension 

    import os 

    base, ext = os.path.splitext(filename) 

     

    # Truncate base if total exceeds max_length 

    allowed_base_length = max_length - len(ext) 

    trimmed_base = base[:allowed_base_length] 

 

    return trimmed_base + ext 

     

# Step 2: Upload video and start indexing 

def upload_and_index_video(access_token, accountId, video_file_path, video_url = None): 

    video_name = None 

    if video_uri: 

        parsed_url = urllib.parse.urlparse(video_url) 

        video_path = parsed_url.path  

        video_name = accountId + "-" + video_path.split('/', 2)[-1] 

    if video_file_path: 

        video_name = accountId + "-" + trim_filename(os.path.basename(video_file_path)) 

    url = f"{settings.video_indexer_endpoint}/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/Videos?name={video_name}&accessToken={access_token}&privacy=Private" 

    if video_url: 

        encoded_url = urllib.parse.quote(video_url, safe='') 

        url += f"videoUrl={encoded_url}" 

        response = requests.post(url) 

        return response.json() 

    else: 

        with open(video_file_path, 'rb') as video_file: 

            files = {'file': video_file} 

            response = requests.post(url, files=files) 

            return response.json() 

 

# Step 3: Wait for indexing to complete and get insights 

def get_video_insights(access_token, video_id): 

    url = f"{settings.video_indexer_endpoint}/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/Videos/{video_id}/Index?accessToken={access_token}" 

    while True: 

        response = requests.get(url) 

        data = response.json() 

        if data['state'] == 'Processed': 

            return data 

        time.sleep(10)  # Wait 10 seconds before checking again 

 

# Step 4: Main workflow 

def get_uploaded_video_id(access_token, accountId, video_file_path, video_url = None): 

    video_data = upload_and_index_video(access_token, accountId, video_file_path, video_url) 

    print(video_data) 

    if 'id' in video_data: 

        video_id = video_data['id'] 

        return video_id 

    return None 

 

def get_insights_formatted(access_token, video_id): 

    insights = get_video_insights(access_token, video_id) 

    value = "Video highlights and key insights:\n" 

    value += ("=" * 50) + "\n" 

    # Extract highlights: keyframes, topics, and summarization 

    if 'summarizedInsights' in insights: 

        for theme in insights['summarizedInsights']['themes']: 

            value += f"Theme: {theme['name']}" 

            for highlight in theme['keyframes']: 

                value += f"  Keyframe at {highlight['adjustedStart']} to {highlight['adjustedEnd']}\n" 

                value += f"  Thumbnail: {highlight['thumbnailId']}\n" 

                value += f"  Description: {highlight.get('description', 'No description')}\n" 

    else: 

        value += f"No summarization available. See full insights: {insights}" 

    return value 

""" 

{'accountId': '26ff36de-cac7-4bea-ad7a-abdf0d63c19c', 'id': 'lwxjba8wy3', 'partition': None, 'externalId': None, 'metadata': None, 'name': 'mainindexedvideo.mp4', 'description': None, 'created': '2025-06-25T03:54:44.3133333+00:00', 'lastModified': '2025-06-25T03:54:44.3133333+00:00', 'lastIndexed': '2025-06-25T03:54:44.3133333+00:00', 'privacyMode': 'Private', 'userName': 'Ravi Rajamani', 'isOwned': True, 'isBase': True, 'hasSourceVideoFile': True, 'state': 'Uploaded', 'moderationState': 'OK', 'reviewState': 'None', 'isSearchable': True, 'processingProgress': '1%', 'durationInSeconds': 0, 'thumbnailVideoId': 'lwxjba8wy3', 'thumbnailId': '00000000-0000-0000-0000-000000000000', 'searchMatches': [], 'indexingPreset': 'Default', 'streamingPreset': 'Default', 'sourceLanguage': 'en-US', 'sourceLanguages': ['en-US'], 'personModelId': '00000000-0000-0000-0000-000000000000'} 

""" 

 

 

def repeat_video_index(access_token, video_id): 

    """Retrieve the index/insights for a video by its ID.""" 

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/ReIndex?accessToken={access_token}" 

    response = requests.put(url) 

    if response.status_code == 200: 

        return response 

    return get_video_insights(access_token, video_id) 

     

def get_video_insights(access_token, video_id): 

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/Index?accessToken={access_token}" 

    count = 0 

    while True: 

        response = requests.get(url) 

        data = response.json() 

        if "state" in data and data['state'] == 'Processed': 

            return data 

        count+=1 

        if count%10 == 0: 

            print(data) 

        print("Sleeping for ten seconds...") 

        time.sleep(10)  # Wait 10 seconds before checking again 

 

def get_selected_segments(insights, threshold): 

        indexed_duration = insights["summarizedInsights"]["duration"]["seconds"] 

        reduced_duration = (threshold * indexed_duration) / 100 

        selected_segments = [] 

        # total_duration = 0 

        for video in insights["videos"]: 

            for shot in video["insights"]["shots"]: 

                shot_id = shot["id"] 

                for key_frame in shot["keyFrames"]: 

                    key_frame_id = key_frame["id"] 

                    start = key_frame["instances"][0]["start"] 

                    end = key_frame["instances"][0]["end"] 

                    # total_duration += float(end) - float(start) 

                    print(f"Clipping shot: {shot_id}, key_frame: {key_frame_id}, start: {start}, end: {end}") 

                    selected_segments +=[(start,end)] 

        # print(f"Total duration: {total_duration}") 

        return selected_segments 

 

def create_project(access_token, video_id, selected_segments): 

        import random 

        import string 

        video_ranges = [] 

        for start,end in selected_segments: 

            intervals = {} 

            intervals["videoId"] = video_id 

            intervalRange = {} 

            intervalRange["start"] = start 

            intervalRange["end"] = end 

            intervals["range"] = intervalRange 

            video_ranges += [intervals] 

        project_name = ''.join(random.choices(string.hexdigits, k=8)) 

        data = { 

            "name": project_name, 

            "videosRanges": video_ranges, 

            "isSearchable": "false" 

        } 

        headers = { 

            "Content-Type": "application/json" 

        } 

        url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects?accessToken={access_token}" 

        response = requests.post(url, json=data, headers=headers) 

        print(response.content) 

        if response.status_code == 200: 

            data = response.json() 

            project_id = data["id"] 

            return project_id 

        else: 

            return None 

         

def render_video(access_token, project_id): 

        url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/render?sendCompletionEmail=false&accessToken={access_token}" 

        headers = { 

            "Content-Type": "application/json" 

        } 

        response = requests.post(url, headers=headers) 

        print(response.content) 

        if response.status_code == 202: 

            return response 

        else: 

            return None 

             

def get_render_operation(access_token, project_id): 

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderoperation?accessToken={access_token}" 

    while True: 

        response = requests.get(url) 

        data = response.json() 

        if "state" in data and data['state'] == 'Succeeded': 

            return data 

        print("Sleeping for ten seconds before checking on rendering...") 

        time.sleep(10)  # Wait 10 seconds before checking again         

 

def download_rendered_file(access_token, project_id): 

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderedfile/downloadurl?accessToken={access_token}" 

    response = requests.get(url) 

    if response.status_code == 200: 

        print(response.content) 

        data = response.json() 

        if "downloadUrl" in data: 

            return data["downloadUrl"] 

    return None 

 

def index_and_download_video(account_id = None, project_id = None, video_id = None, video_file_path = None, video_url = None): 

    if not account_id: 

        account_id = settings.video_indexer_default_account_id 

    # Main workflow 

    access_token = settings.video_indexer_access_token 

    if not access_token: 

        access_token = get_access_token() 

    if not access_token: 

        access_token = get_access_token() 

    if not uploaded_video_id and not video_file_path and not video_url: 

        return None 

    if not video_id: 

        if video_file_path: 

            video_id = get_uploaded_video_id(access_token, accountId, video_file_path) 

        if video_url: 

            video_id = get_uploaded_video_id(access_token, accountId, video_file_path, video_url=video_url) 

    insights = get_video_insights(access_token, video_id) 

    selected_segments = get_selected_segments(insights, 10) 

    if not project_id: 

        project_id = create_project(access_token, video_id, selected_segments) 

    print(project_id) 

    render_response = render_video(access_token, project_id) 

    print(render_response) 

    if render_response: 

        status = get_render_operation(access_token, project_id) 

        print(status) 

        download_url = download_rendered_file(access_token, project_id) 

        print(download_url) 

        return download_url 

    return None 

     

local_only = False 

 

def get_image_blob_url(video_url, frame_number): 

    # Parse the original video URL to get account, container, and path 

    parsed = urlparse(video_url) 

    path_parts = parsed.path.split('/') 

    container = path_parts[1] 

    blob_path = '/'.join(path_parts[2:]) 

    # Remove the file name from the blob path 

    blob_dir = '/'.join(blob_path.split('/')[:-1]) 

    if blob_dir == "" or blob_dir == None: 

        blob_dir = "output" 

    # Create image path 

    image_path = f"{blob_dir}/images/frame{frame_number}.jpg" 

    # Rebuild the base URL (without SAS token) 

    base_url = f"{parsed.scheme}://{parsed.netloc}/{container}/{image_path}" 

    # Add the SAS token if present 

    sas_token = parsed.query 

    if sas_token: 

        image_url = f"{base_url}?{sas_token}" 

    else: 

        image_url = base_url 

    return image_url 

 

def download_blob_to_stream(blob_client): 

    download_stream = blob_client.download_blob() 

    return io.BytesIO(download_stream.readall()) 

 

def extract_and_upload_frames(video_sas_url): 

    # Set up blob client for video 

    video_blob_client = BlobClient.from_blob_url(video_sas_url) 

    # Download video to memory stream 

    video_stream = download_blob_to_stream(video_blob_client) 

    # Use OpenCV to read from memory 

    video_bytes = video_stream.getvalue() 

    # Use cv2 to read from bytes 

    video_stream.seek(0) 

    video_temp = os.path.join(os.getcwd(), f"temp_{uuid.uuid4()}.mp4") 

    print(video_temp) 

    with open(video_temp, 'wb') as f: 

        f.write(video_bytes) 

    vidcap = cv2.VideoCapture(video_temp) 

    # Extract frames 

    frame_number = 0 

    while True: 

        success, frame = vidcap.read() 

        if not success: 

            break 

        # Convert frame to bytes 

        _, buffer = cv2.imencode('.jpg', frame) 

        image_bytes = buffer.tobytes() 

        if local_only: 

            image_path = f"frame{frame_number}.jpg" 

            with open(image_path, 'wb') as f: 

                f.write(image_bytes) 

        else: 

            # Generate image blob URL 

            image_url = get_image_blob_url(video_sas_url, frame_number) 

            image_blob_client = BlobClient.from_blob_url(image_url) 

            # Upload frame as image 

            image_blob_client.upload_blob(image_bytes, overwrite=True) 

            print(f"Uploaded frame {frame_number} to {image_url}") 

        frame_number += 1 

    # Clean up temp file 

    vidcap.release() 

    os.remove(video_temp) 

     

def vectorize_extracted_frames(video_sas_url): 

    extract_and_upload_frames(video_sas_url) 

    vision_credential = AzureKeyCredential(vision_api_key) 

    analysis_client = ImageAnalysisClient(vision_endpoint, vision_credential) 

    # Set up blob client for video 

    video_blob_client = BlobClient.from_blob_url(video_sas_url) 

    # Extract frames 

    frame_number = 0 

    tuples = [] 

    while True: 

        try: 

            # Generate image blob URL 

            image_url = get_image_blob_url(video_sas_url, frame_number) 

            image_blob_client = BlobClient.from_blob_url(image_url) 

            image_stream = download_blob_to_stream(image_blob_client) 

            vector = vectorize_image(image_url, vision_api_key, vision_region) 

            if vector: 

                vector = np.pad(vector, (0, 1536 - len(vector)), mode='constant') 

                print(f"Vectorized frame: {frame_number}") 

            description = analyze_image(analysis_client, image_url) 

            if description: 

                print(f"Analyzed frame: {frame_number}") 

                tuples += [(vector, description)] 

        except Exception as e: 

            print(f"No such image: {image_url[74:80]}. Giving up...") 

            break 

        raise 

        frame_number += 1   

    return tuples   

 

# access_token = os.getenv("AZURE_VIDEO_INDEXER_ACCESS_TOKEN", get_access_token()) 

# video_sas_url=video_sas_url.strip('"') 

# print(video_sas_url) 

# extract_and_upload_frames(video_sas_url) 

# vision_credential = AzureKeyCredential(vision_api_key) 

# analysis_client = ImageAnalysisClient(vision_endpoint, vision_credential) 

 

@retry(stop=stop_after_attempt(5), wait=wait_fixed(60)) 

def vectorize_image(image_path, key, region): 

    try: 

        # API version and model version 

        api_version = "2024-02-01" 

        model_version = "2023-04-15" 

 

        # Construct the request URL 

        url = f"{vision_endpoint}/computervision/retrieval:vectorizeImage?api-version={api_version}&model-version={model_version}" 

 

        # Set headers 

        headers = { 

            "Content-Type": "application/json", 

            "Ocp-Apim-Subscription-Key": key 

        } 

 

        # Set the payload with the SAS URL 

        payload = { 

            "url": image_path 

        } 

 

        # Make the POST request 

        response = requests.post(url, headers=headers, json=payload) 

 

        # Check the response 

        if response.status_code == 200: 

            result = response.json() 

            # The vector is in the 'vector' field of the response 

            vector = result.get("vector") 

             

            # print("Vector embedding:", vector) 

            return vector 

        else: 

            print("Error:", response.status_code, response.text) 

            vector = [0.0] * 1024 

            raise Exception(f"Error vectorizing image {image_path[74:80]}") 

 

    except (requests.exceptions.Timeout, http.client.HTTPException) as e: 

        print(f"Timeout/Error for {image_path[74:80]}. Retrying...") 

        raise 

 

@retry(stop=stop_after_attempt(5), wait=wait_fixed(60)) 

def analyze_image(client, image_url): 

    try: 

        # Define all available visual features for analysis 

        features = [ 

            VisualFeatures.CAPTION, 

            VisualFeatures.TAGS, 

            VisualFeatures.OBJECTS, 

            VisualFeatures.READ, 

            VisualFeatures.SMART_CROPS, 

            VisualFeatures.DENSE_CAPTIONS, 

            VisualFeatures.PEOPLE 

        ] 

         

        # Analyze the image from the SAS URL 

        result = client.analyze_from_url( 

            image_url=image_url, 

            visual_features=features, 

            gender_neutral_caption=True        ) 

        # Explicitly cast to ImageAnalysisResult (for clarity) 

        result: ImageAnalysisResult = result 

        if result is not None: 

            captions = [] 

            captions += [ f"{result.caption.text}" if result.caption is not None else "No Caption"] 

            captions += [ f"{caption.text}" for caption in result.dense_captions.list if result.dense_captions is not None] 

            # Enhance result 

            result.description = ",".join(captions) 

            description =  pformat(result.__dict__, depth=4, compact=False) 

            return description 

    except HttpResponseError as e: 

        print(str(e)) 

        raise 

    return "No description" 

No comments:

Post a Comment