Friday, July 18, 2025

 In Strategic: The Skill to Set Direction, Create Advantage, and Achieve Executive Excellence, Rich Horwath delivers a compelling guide for leaders aiming to forge clear, differentiated strategies and drive lasting success. At its core, the book emphasizes that strategy is not just a plan—it’s a deliberate set of choices about where to play and how to win, rooted in clarity and trade-offs. Horwath argues that imitation, particularly competing on price or replicating a rival’s approach, is a weak substitute for true strategic thinking. Instead, he champions distinctiveness—identifying and nurturing what sets your organization apart.

He Introduces the GOST Framework—goals, objectives, strategy, and tactics—clarifying how high-level aspirations translate into specific, actionable plans. A goal is broad, an objective is precise, strategy is conceptual, and tactics are tangible. His Rule of Touch offers a practical litmus test: if you can physically touch it, it’s a tactic, not strategy.

Horwath critiques the widespread reliance on annual resource reallocation, highlighting that continuous and agile reassessment is key to performance. Through research with major organizations, he shows that timely resource shifts fuel growth and innovation. Concentration, not diversification, is where greatness often lies. Drawing on examples like Apple, Google, and Amazon, he reveals how focusing on core strengths rather than spreading thin leads to market leadership.

The book also explores strategic pitfalls—indecisiveness, risk aversion, and failure to innovate—arguing that the worst mistake isn’t making the wrong choice, but making none at all. Emotional intelligence emerges as another pillar of leadership: self-awareness and relationship management boost communication, resilience, and overall effectiveness.

Horwath pays close attention to organizational culture, suggesting that purpose-driven declarations (mission, vision, values) should steer behaviors and decisions. Toxic influences—such as persistent negativity or blame—must be rooted out to preserve strategic integrity. Effective culture, much like strategy, doesn’t happen by accident; it must be designed, reinforced, and protected.

On planning, he urges leaders to balance short-term tactics with long-term vision. Despite widespread reliance on one-year plans, imagining your company years down the road enhances adaptability and foresight. Talent management plays a key role here: hiring based on past behavior, not just experience, ensures stronger team performance.

Finally, Horwath encourages an open-minded, question-driven approach to innovation. Strategic options shouldn’t be treated as either/or propositions; instead, elements from different paths can be fused. He champions collaborative environments while warning against unproductive meetings and the myth of multitasking, advocating for structured, focused sessions and monotasking for clarity and impact.

Through vivid examples, thoughtful frameworks, and sharp insights, Horwath guides readers to build businesses that are strategically sound, culturally vibrant, and constantly evolving. His message is clear: strategic excellence isn’t reserved for the few—it’s a skill that can be cultivated, executed, and mastered.

#codingexercise: https://1drv.ms/w/c/d609fb70e39b65c8/EffYN3c80HpIg4GMHe5H--EBQLfEIZId0sDkkyfLjD8bwA?e=wrlieq


Thursday, July 17, 2025

The following summarizes the workflow of a drone video sensing application: 

import requests 

import time 

import os 

from django.conf import settings 

from dotenv import load_dotenv 

load_dotenv(override=True) 

import urllib.parse 

from azure.core.credentials import AzureKeyCredential 

from azure.core.exceptions import HttpResponseError 

from azure.identity import DefaultAzureCredential 

from azure.ai.vision.imageanalysis import ImageAnalysisClient 

from azure.ai.vision.imageanalysis.models import VisualFeatures, ImageAnalysisResult 

from tenacity import retry, stop_after_attempt, wait_fixed 

from pprint import pprint, pformat 

from dotenv import load_dotenv   

import json  

import http.client 

 

vision_api_key = settings.vision_api_key 

vision_api_version = settings.vision_api_version 

vision_region = settings.vision_region 

vision_endpointsettings.vision_endpoint 

api_version = settings.api_version 

model_version = settings.model_version 

 

         

# Step 1: Get an access token 

def get_access_token(): 

    url = f"{settings.video_indexer_endpoint}/auth/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/AccessToken" 

    headers = { 

        "Ocp-Apim-Subscription-Key": settings.video_indexer_api_key 

    } 

    response = requests.get(url, headers=headers) 

    return response.text.strip('"') 

     

 

 

def trim_filename(filename: str, max_length: int = 255) -> str: 

    # Separate base name and extension 

    import os 

    base, ext = os.path.splitext(filename) 

     

    # Truncate base if total exceeds max_length 

    allowed_base_length = max_length - len(ext) 

    trimmed_base = base[:allowed_base_length] 

 

    return trimmed_base + ext 

     

# Step 2: Upload video and start indexing 

def upload_and_index_video(access_token, accountId, video_file_path, video_url = None): 

    video_name = None 

    if video_uri: 

        parsed_url = urllib.parse.urlparse(video_url) 

        video_path = parsed_url.path  

        video_name = accountId + "-" + video_path.split('/', 2)[-1] 

    if video_file_path: 

        video_name = accountId + "-" + trim_filename(os.path.basename(video_file_path)) 

    url = f"{settings.video_indexer_endpoint}/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/Videos?name={video_name}&accessToken={access_token}&privacy=Private" 

    if video_url: 

        encoded_url = urllib.parse.quote(video_url, safe='') 

        url += f"videoUrl={encoded_url}" 

        response = requests.post(url) 

        return response.json() 

    else: 

        with open(video_file_path, 'rb') as video_file: 

            files = {'file': video_file} 

            response = requests.post(url, files=files) 

            return response.json() 

 

# Step 3: Wait for indexing to complete and get insights 

def get_video_insights(access_token, video_id): 

    url = f"{settings.video_indexer_endpoint}/{settings.video_indexer_region}/Accounts/{settings.video_indexer_account_id}/Videos/{video_id}/Index?accessToken={access_token}" 

    while True: 

        response = requests.get(url) 

        data = response.json() 

        if data['state'] == 'Processed': 

            return data 

        time.sleep(10)  # Wait 10 seconds before checking again 

 

# Step 4: Main workflow 

def get_uploaded_video_id(access_token, accountId, video_file_path, video_url = None): 

    video_data = upload_and_index_video(access_token, accountId, video_file_path, video_url) 

    print(video_data) 

    if 'id' in video_data: 

        video_id = video_data['id'] 

        return video_id 

    return None 

 

def get_insights_formatted(access_token, video_id): 

    insights = get_video_insights(access_token, video_id) 

    value = "Video highlights and key insights:\n" 

    value += ("=" * 50) + "\n" 

    # Extract highlights: keyframes, topics, and summarization 

    if 'summarizedInsights' in insights: 

        for theme in insights['summarizedInsights']['themes']: 

            value += f"Theme: {theme['name']}" 

            for highlight in theme['keyframes']: 

                value += f"  Keyframe at {highlight['adjustedStart']} to {highlight['adjustedEnd']}\n" 

                value += f"  Thumbnail: {highlight['thumbnailId']}\n" 

                value += f"  Description: {highlight.get('description', 'No description')}\n" 

    else: 

        value += f"No summarization available. See full insights: {insights}" 

    return value 

""" 

{'accountId': '26ff36de-cac7-4bea-ad7a-abdf0d63c19c', 'id': 'lwxjba8wy3', 'partition': None, 'externalId': None, 'metadata': None, 'name': 'mainindexedvideo.mp4', 'description': None, 'created': '2025-06-25T03:54:44.3133333+00:00', 'lastModified': '2025-06-25T03:54:44.3133333+00:00', 'lastIndexed': '2025-06-25T03:54:44.3133333+00:00', 'privacyMode': 'Private', 'userName': 'Ravi Rajamani', 'isOwned': True, 'isBase': True, 'hasSourceVideoFile': True, 'state': 'Uploaded', 'moderationState': 'OK', 'reviewState': 'None', 'isSearchable': True, 'processingProgress': '1%', 'durationInSeconds': 0, 'thumbnailVideoId': 'lwxjba8wy3', 'thumbnailId': '00000000-0000-0000-0000-000000000000', 'searchMatches': [], 'indexingPreset': 'Default', 'streamingPreset': 'Default', 'sourceLanguage': 'en-US', 'sourceLanguages': ['en-US'], 'personModelId': '00000000-0000-0000-0000-000000000000'} 

""" 

 

 

def repeat_video_index(access_token, video_id): 

    """Retrieve the index/insights for a video by its ID.""" 

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/ReIndex?accessToken={access_token}" 

    response = requests.put(url) 

    if response.status_code == 200: 

        return response 

    return get_video_insights(access_token, video_id) 

     

def get_video_insights(access_token, video_id): 

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Videos/{video_id}/Index?accessToken={access_token}" 

    count = 0 

    while True: 

        response = requests.get(url) 

        data = response.json() 

        if "state" in data and data['state'] == 'Processed': 

            return data 

        count+=1 

        if count%10 == 0: 

            print(data) 

        print("Sleeping for ten seconds...") 

        time.sleep(10)  # Wait 10 seconds before checking again 

 

def get_selected_segments(insights, threshold): 

        indexed_duration = insights["summarizedInsights"]["duration"]["seconds"] 

        reduced_duration = (threshold * indexed_duration) / 100 

        selected_segments = [] 

        # total_duration = 0 

        for video in insights["videos"]: 

            for shot in video["insights"]["shots"]: 

                shot_id = shot["id"] 

                for key_frame in shot["keyFrames"]: 

                    key_frame_id = key_frame["id"] 

                    start = key_frame["instances"][0]["start"] 

                    end = key_frame["instances"][0]["end"] 

                    # total_duration += float(end) - float(start) 

                    print(f"Clipping shot: {shot_id}, key_frame: {key_frame_id}, start: {start}, end: {end}") 

                    selected_segments +=[(start,end)] 

        # print(f"Total duration: {total_duration}") 

        return selected_segments 

 

def create_project(access_token, video_id, selected_segments): 

        import random 

        import string 

        video_ranges = [] 

        for start,end in selected_segments: 

            intervals = {} 

            intervals["videoId"] = video_id 

            intervalRange = {} 

            intervalRange["start"] = start 

            intervalRange["end"] = end 

            intervals["range"] = intervalRange 

            video_ranges += [intervals] 

        project_name = ''.join(random.choices(string.hexdigits, k=8)) 

        data = { 

            "name": project_name, 

            "videosRanges": video_ranges, 

            "isSearchable": "false" 

        } 

        headers = { 

            "Content-Type": "application/json" 

        } 

        url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects?accessToken={access_token}" 

        response = requests.post(url, json=data, headers=headers) 

        print(response.content) 

        if response.status_code == 200: 

            data = response.json() 

            project_id = data["id"] 

            return project_id 

        else: 

            return None 

         

def render_video(access_token, project_id): 

        url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/render?sendCompletionEmail=false&accessToken={access_token}" 

        headers = { 

            "Content-Type": "application/json" 

        } 

        response = requests.post(url, headers=headers) 

        print(response.content) 

        if response.status_code == 202: 

            return response 

        else: 

            return None 

             

def get_render_operation(access_token, project_id): 

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderoperation?accessToken={access_token}" 

    while True: 

        response = requests.get(url) 

        data = response.json() 

        if "state" in data and data['state'] == 'Succeeded': 

            return data 

        print("Sleeping for ten seconds before checking on rendering...") 

        time.sleep(10)  # Wait 10 seconds before checking again         

 

def download_rendered_file(access_token, project_id): 

    url = f"{video_indexer_endpoint}/{video_indexer_region}/Accounts/{video_indexer_account_id}/Projects/{project_id}/renderedfile/downloadurl?accessToken={access_token}" 

    response = requests.get(url) 

    if response.status_code == 200: 

        print(response.content) 

        data = response.json() 

        if "downloadUrl" in data: 

            return data["downloadUrl"] 

    return None 

 

def index_and_download_video(account_id = None, project_id = None, video_id = None, video_file_path = None, video_url = None): 

    if not account_id: 

        account_id = settings.video_indexer_default_account_id 

    # Main workflow 

    access_token = settings.video_indexer_access_token 

    if not access_token: 

        access_token = get_access_token() 

    if not access_token: 

        access_token = get_access_token() 

    if not uploaded_video_id and not video_file_path and not video_url: 

        return None 

    if not video_id: 

        if video_file_path: 

            video_id = get_uploaded_video_id(access_token, accountId, video_file_path) 

        if video_url: 

            video_id = get_uploaded_video_id(access_token, accountId, video_file_path, video_url=video_url) 

    insights = get_video_insights(access_token, video_id) 

    selected_segments = get_selected_segments(insights, 10) 

    if not project_id: 

        project_id = create_project(access_token, video_id, selected_segments) 

    print(project_id) 

    render_response = render_video(access_token, project_id) 

    print(render_response) 

    if render_response: 

        status = get_render_operation(access_token, project_id) 

        print(status) 

        download_url = download_rendered_file(access_token, project_id) 

        print(download_url) 

        return download_url 

    return None 

     

local_only = False 

 

def get_image_blob_url(video_url, frame_number): 

    # Parse the original video URL to get account, container, and path 

    parsed = urlparse(video_url) 

    path_parts = parsed.path.split('/') 

    container = path_parts[1] 

    blob_path = '/'.join(path_parts[2:]) 

    # Remove the file name from the blob path 

    blob_dir = '/'.join(blob_path.split('/')[:-1]) 

    if blob_dir == "" or blob_dir == None: 

        blob_dir = "output" 

    # Create image path 

    image_path = f"{blob_dir}/images/frame{frame_number}.jpg" 

    # Rebuild the base URL (without SAS token) 

    base_url = f"{parsed.scheme}://{parsed.netloc}/{container}/{image_path}" 

    # Add the SAS token if present 

    sas_token = parsed.query 

    if sas_token: 

        image_url = f"{base_url}?{sas_token}" 

    else: 

        image_url = base_url 

    return image_url 

 

def download_blob_to_stream(blob_client): 

    download_stream = blob_client.download_blob() 

    return io.BytesIO(download_stream.readall()) 

 

def extract_and_upload_frames(video_sas_url): 

    # Set up blob client for video 

    video_blob_client = BlobClient.from_blob_url(video_sas_url) 

    # Download video to memory stream 

    video_stream = download_blob_to_stream(video_blob_client) 

    # Use OpenCV to read from memory 

    video_bytes = video_stream.getvalue() 

    # Use cv2 to read from bytes 

    video_stream.seek(0) 

    video_temp = os.path.join(os.getcwd(), f"temp_{uuid.uuid4()}.mp4") 

    print(video_temp) 

    with open(video_temp, 'wb') as f: 

        f.write(video_bytes) 

    vidcap = cv2.VideoCapture(video_temp) 

    # Extract frames 

    frame_number = 0 

    while True: 

        success, frame = vidcap.read() 

        if not success: 

            break 

        # Convert frame to bytes 

        _, buffer = cv2.imencode('.jpg', frame) 

        image_bytes = buffer.tobytes() 

        if local_only: 

            image_path = f"frame{frame_number}.jpg" 

            with open(image_path, 'wb') as f: 

                f.write(image_bytes) 

        else: 

            # Generate image blob URL 

            image_url = get_image_blob_url(video_sas_url, frame_number) 

            image_blob_client = BlobClient.from_blob_url(image_url) 

            # Upload frame as image 

            image_blob_client.upload_blob(image_bytes, overwrite=True) 

            print(f"Uploaded frame {frame_number} to {image_url}") 

        frame_number += 1 

    # Clean up temp file 

    vidcap.release() 

    os.remove(video_temp) 

     

def vectorize_extracted_frames(video_sas_url): 

    extract_and_upload_frames(video_sas_url) 

    vision_credential = AzureKeyCredential(vision_api_key) 

    analysis_client = ImageAnalysisClient(vision_endpoint, vision_credential) 

    # Set up blob client for video 

    video_blob_client = BlobClient.from_blob_url(video_sas_url) 

    # Extract frames 

    frame_number = 0 

    tuples = [] 

    while True: 

        try: 

            # Generate image blob URL 

            image_url = get_image_blob_url(video_sas_url, frame_number) 

            image_blob_client = BlobClient.from_blob_url(image_url) 

            image_stream = download_blob_to_stream(image_blob_client) 

            vector = vectorize_image(image_url, vision_api_key, vision_region) 

            if vector: 

                vector = np.pad(vector, (0, 1536 - len(vector)), mode='constant') 

                print(f"Vectorized frame: {frame_number}") 

            description = analyze_image(analysis_client, image_url) 

            if description: 

                print(f"Analyzed frame: {frame_number}") 

                tuples += [(vector, description)] 

        except Exception as e: 

            print(f"No such image: {image_url[74:80]}. Giving up...") 

            break 

        raise 

        frame_number += 1   

    return tuples   

 

# access_token = os.getenv("AZURE_VIDEO_INDEXER_ACCESS_TOKEN", get_access_token()) 

# video_sas_url=video_sas_url.strip('"') 

# print(video_sas_url) 

# extract_and_upload_frames(video_sas_url) 

# vision_credential = AzureKeyCredential(vision_api_key) 

# analysis_client = ImageAnalysisClient(vision_endpoint, vision_credential) 

 

@retry(stop=stop_after_attempt(5), wait=wait_fixed(60)) 

def vectorize_image(image_path, key, region): 

    try: 

        # API version and model version 

        api_version = "2024-02-01" 

        model_version = "2023-04-15" 

 

        # Construct the request URL 

        url = f"{vision_endpoint}/computervision/retrieval:vectorizeImage?api-version={api_version}&model-version={model_version}" 

 

        # Set headers 

        headers = { 

            "Content-Type": "application/json", 

            "Ocp-Apim-Subscription-Key": key 

        } 

 

        # Set the payload with the SAS URL 

        payload = { 

            "url": image_path 

        } 

 

        # Make the POST request 

        response = requests.post(url, headers=headers, json=payload) 

 

        # Check the response 

        if response.status_code == 200: 

            result = response.json() 

            # The vector is in the 'vector' field of the response 

            vector = result.get("vector") 

             

            # print("Vector embedding:", vector) 

            return vector 

        else: 

            print("Error:", response.status_code, response.text) 

            vector = [0.0] * 1024 

            raise Exception(f"Error vectorizing image {image_path[74:80]}") 

 

    except (requests.exceptions.Timeout, http.client.HTTPException) as e: 

        print(f"Timeout/Error for {image_path[74:80]}. Retrying...") 

        raise 

 

@retry(stop=stop_after_attempt(5), wait=wait_fixed(60)) 

def analyze_image(client, image_url): 

    try: 

        # Define all available visual features for analysis 

        features = [ 

            VisualFeatures.CAPTION, 

            VisualFeatures.TAGS, 

            VisualFeatures.OBJECTS, 

            VisualFeatures.READ, 

            VisualFeatures.SMART_CROPS, 

            VisualFeatures.DENSE_CAPTIONS, 

            VisualFeatures.PEOPLE 

        ] 

         

        # Analyze the image from the SAS URL 

        result = client.analyze_from_url( 

            image_url=image_url, 

            visual_features=features, 

            gender_neutral_caption=True        ) 

        # Explicitly cast to ImageAnalysisResult (for clarity) 

        result: ImageAnalysisResult = result 

        if result is not None: 

            captions = [] 

            captions += [ f"{result.caption.text}" if result.caption is not None else "No Caption"] 

            captions += [ f"{caption.text}" for caption in result.dense_captions.list if result.dense_captions is not None] 

            # Enhance result 

            result.description = ",".join(captions) 

            description =  pformat(result.__dict__, depth=4, compact=False) 

            return description 

    except HttpResponseError as e: 

        print(str(e)) 

        raise 

    return "No description"