Monday, April 21, 2025

 Multimodal image search

The following code snippet describes how multimodal search can come useful to search images. The images are indexed and searched based on vector embeddings but the query is text based.

from dotenv import load_dotenv,dotenv_values

import json

import os

import requests

from tenacity import retry, stop_after_attempt, wait_fixed

from dotenv import load_dotenv

from azure.core.credentials import AzureKeyCredential

from azure.identity import DefaultAzureCredential

from azure.search.documents import SearchClient

from azure.search.documents.indexes import SearchIndexClient

from azure.search.documents.models import (

    RawVectorQuery,

)

from azure.search.documents.indexes.models import (

    ExhaustiveKnnParameters,

    ExhaustiveKnnVectorSearchAlgorithmConfiguration,

    HnswParameters,

    HnswVectorSearchAlgorithmConfiguration,

    SimpleField,

    SearchField,

    SearchFieldDataType,

    SearchIndex,

    VectorSearch,

    VectorSearchAlgorithmKind,

    VectorSearchProfile,

)

from IPython.display import Image, display

 load_dotenv()

service_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")

index_name = os.getenv("AZURE_SEARCH_INDEX_NAME")

api_version = os.getenv("AZURE_SEARCH_API_VERSION")

key = os.getenv("AZURE_SEARCH_ADMIN_KEY")

aiVisionApiKey = os.getenv("AZURE_AI_VISION_API_KEY")

aiVisionRegion = os.getenv("AZURE_AI_VISION_REGION")

aiVisionEndpoint = os.getenv("AZURE_AI_VISION_ENDPOINT")

credential = AzureKeyCredential(key)

search_client = SearchClient(endpoint=service_endpoint, index_name=index_name, credential=credential)

query_image_path = "images/PIC01.jpeg"

@retry(stop=stop_after_attempt(5), wait=wait_fixed(1))

def get_image_vector(image_path, key, region):

    headers = {

        'Ocp-Apim-Subscription-Key': key,

    }

    params = urllib.parse.urlencode({

        'model-version': '2023-04-15',

    })

    try:

        if image_path.startswith(('http://', 'https://')):

            headers['Content-Type'] = 'application/json'

            body = json.dumps({"url": image_path})

        else:

            headers['Content-Type'] = 'application/octet-stream'

            with open(image_path, "rb") as filehandler:

                image_data = filehandler.read()

                body = image_data

        conn = http.client.HTTPSConnection(f'{region}.api.cognitive.microsoft.com', timeout=3)

        conn.request("POST", "/computervision/retrieval:vectorizeImage?api-version=2023-04-01-preview&%s" % params, body, headers)

        response = conn.getresponse()

        data = json.load(response)

        conn.close()

        if response.status != 200:

            raise Exception(f"Error processing image {image_path}: {data.get('message', '')}")

        return data.get("vector")

    except (requests.exceptions.Timeout, http.client.HTTPException) as e:

        print(f"Timeout/Error for {image_path}. Retrying...")

        raise

vector_query = RawVectorQuery(vector=get_image_vector(query_image_path,

                                                      aiVisionApiKey,

                                                      aiVisionRegion),

                              k=3,

                              fields="image_vector")

def generate_embeddings(text, aiVisionEndpoint, aiVisionApiKey):

    url = f"{aiVisionEndpoint}/computervision/retrieval:vectorizeText"

    params = {

        "api-version": "2023-02-01-preview"

    }

    headers = {

        "Content-Type": "application/json",

        "Ocp-Apim-Subscription-Key": aiVisionApiKey

    }

    data = {

        "text": text

    }

    response = requests.post(url, params=params, headers=headers, json=data)

    if response.status_code == 200:

        embeddings = response.json()["vector"]

        return embeddings

    else:

        print(f"Error: {response.status_code} - {response.text}")

        return None

query = "farm"

vector_text = generate_embeddings(query, aiVisionEndpoint, aiVisionApiKey)

vector_query = RawVectorQuery(vector=vector_text,

                              k=3,

                              fields="image_vector")

# Perform vector search

results = search_client.search(

    search_text=query,

    vector_queries= [vector_query],

    select=["description"]

)

for result in results:

    print(f"{result['description']}")

    display(Image(DIR_PATH + "/images/" + result["description"]))

    print("\n")


Sunday, April 20, 2025

 Continuous indexing

Azure AI Search supports continuous indexing of documents, enabling real-time updates to the search index as new data is ingested. It can connect to various data sources, such as Azure Blob Storage, SQL databases, or Cosmos DB, to ingest documents continuously. Indexers are configured to monitor these sources for changes and update the search index accordingly. The indexer scans the data source for new, updated, or deleted documents. The time taken to index new documents depends on factors like the size of the data, complexity of the schema, and the indexing tier. For large datasets, indexing may take longer, especially if the indexer is resource starved. Once documents are indexed, they are available for querying. However, query latency can vary based on the size of the index, query complexity, and service tier. The minimum interval for indexer runs is 5 minutes. If this pull from data source is not sufficiently fast enough, individual data item can be indexed by directly pushing to index using the index client. Both these are shown via code samples below:

from azure.identity import DefaultAzureCredential

from azure.mgmt.search import SearchManagementClient

Replace with your Azure credentials and configuration

subscription_id = ""

resource_group_name = ""

search_service_name = ""

blob_storage_account_name = ""

blob_container_name = ""

connection_string = ""

Authenticate using DefaultAzureCredential

credential = DefaultAzureCredential()

Initialize the Azure Search Management Client

search_client = SearchManagementClient(credential, subscription_id)

Define the data source

data_source_name = "blob-data-source"

data_source_definition = {

  type": "AzureBlob",

  credentials": {

      connectionString": connection_string

  },

  container": { name": blob_container_name } }

Create or update the data source in Azure Search

search_client.data_sources.create_or_update( resource_group_name=resource_group_name, search_service_name=search_service_name,

data_source_name=data_source_name,

data_source=data_source_definition )

Define the index

index_name = "blob-index"

index_definition =

{

  fields": [

    {"name": "id", "type": "Edm.String", "key": True},

    {"name": "content", "type": "Edm.String"},

    {"name": "category", "type": "Edm.String"},

    {"name": "sourcefile", "type": "Edm.String"},

    {"name": "metadata_storage_name", "type": "Edm.String"} ] }

Create or update the index

search_client.indexes.create_or_update(

resource_group_name=resource_group_name, search_service_name=search_service_name,

index_name=index_name,

index=index_definition )

Define the indexer

indexer_name = "blob-indexer"

indexer_definition = {

dataSourceName": data_source_name,

targetIndexName": index_name,

schedule":

{

interval": "PT5M" # Run every 5 minutes

} }

Create or update the indexer

search_client.indexers.create_or_update( resource_group_name=resource_group_name, search_service_name=search_service_name, indexer_name=indexer_name, indexer=indexer_definition )

print("Configured continuous indexing from Azure Blob Storage to Azure AI Search!")

Replace with your Azure credentials and configuration

service_name = ""

admin_key = ""

Initialize the SearchIndexClient

endpoint = f"https://{service_name}.search.windows.net/"

credential = AzureKeyCredential(admin_key)

index_client = SearchIndexClient(endpoint=endpoint, credential=credential)

Upload documents to index:

def index_document(filename):

     print(f"Indexing document '{filename}' into search index '{index_name}'")

     search_client = SearchClient(endpoint=f"https://{searchservice}.search.windows.net/", index_name=index, credential=search_creds)

     batch = []

     with open(filename, 'r') as fin:

          text = fin.read()

          batch += [text]

     if len(batch) > 0:

results = search_client.upload_documents(documents=batch)

succeeded = sum([1 for r in results if r.succeeded])

print(f"\tIndexed {len(results)} documents, {succeeded} succeeded")

The default rate limit for adding documents to the index varies with service tiers, replicas and partitions. Higher service tiers have higher rate limit. Adding replicas increases query throughput. Adding partitions increases indexing throughput. 1000 documents can be sent in a batch, and batching optimizes throughput and reduces the likelihood of hitting rate limits.


Saturday, April 19, 2025

 How the DFCS differs from SkyQuery platform?

DFCS is a UAV swarm imagery driven knowledge base and analytics stack based entirely in the public cloud that can be used to create a trajectory involving waypoints from source to destination over a given landscape. The capabilities to store and query drone imageries for information that can be used to build a knowledge base for retrieval augmented generation in AI applications is quite generic and has many requirements like a wide variety of image querying systems. Most notably, SkyQuery, platform also has similar requirements to deal with a large dataset of images and to provide contextual information on queries. SkyView is an aerial drone video sensing platform with a high-level programming language that makes it quite suitable for developing long-running sensing applications. SkyView performs with fast video frame alignment and detection of small objects which works well for querying with its expressive domain specific language in which programs specify sensing-analytics-routing loops. It also provides a library of analytical operators to encode these steps. By separating out workflows that can be written using these operators, it allows takeoff, waypoint following and landing to be automated.

Therefore, both DFCS and SkyQuery provide computer vision pipelines and processors to convert drone video data into queryable representations, a way to contextualize queries along with an engine that provides fast responses suitable for use to provide routing directives to UAV swarm and all these with the help of programmable interfaces.

But the differences are in the use of representations for these datasets and the way they are queried. DFCS leverages AI and vector search while SkyQuery leverages language constructs. Even image processors are multimodal for DFCS while SkyQuery leverages cataloguing of output from SIFT feature extractors. The use of Retrieval-augmented-generation in queries makes the query results more meaningful for DFCS while SkyQuery requires workflows to experiments with their own querying logic. Objects are referred to with Keypoints comprising of pixel positions and a feature descriptor that are then formed into “stable groups” with SkyQuery. DFCS, on the other hand, leverages vector search that work well with contextual information presented via spatial co-ordinates, progress along waypoints and error corrections.

It could be said that the DFCS focuses more on the flight path of the UAV swarm and provides error correction feedback to let the swarm remain on course to its destination. It bolsters this with information for humans as well as feedback loops for autonomous flights and comes with Telemetry pipelines that continuously indicate manner and measure of progress along the trajectory.

By separating the cataloguing, grouping and querying of objects to remain independent of the vector representations, DFCS facilitates working with third party datastores including those that were built to be product catalogs. This help to diversify the method and means of querying for different purposes and not be restricted to leverage only one form of language. DFCS is polyglot and provides a chatbot like interface that leverages the state of the union in Retrieval Augmented Generation.

#codingexercise: https://1drv.ms/w/c/d609fb70e39b65c8/Echlm-Nw-wkggNb7JAEAAAABu53rpIuTS5AsMb3lNiM7SQ?e=u6kTma


Friday, April 18, 2025

Telemetry pipelines

 Collected and emitted telemetry data makes data ingestion and processing of sensor data independent of the input for the models used to predict the next orientation. This strategy leans on telemetry pipelines as an effective technology to solve data problems and turn expansive datasets into concise actionable insights without losing information. Waypoints, trajectory, position on the trajectory, deviations and error corrections are all that is needed, maintained and tracked for the UAV swarm to negotiate the obstacles and stay on course to reach the destination from the source. An intelligent telemetry pipeline will demonstrate these five-step approach to maximizing its value:

1. Noise filtering: This involves sifting through data to spotlight the essentials.

2. Long-Term data retention: this involves safeguarding valuable data for future use

3. Event-trimming: This tailors data for optimal analytics so that the raw data is not dictating eccentricities in the charts and graphs.

4. Data condensation: this translates voluminous MELT data into focused metrics

5. Operational Efficiency Boosting: This amplifies operating speed and reliability.

This approach is widely applicable across domains and is also visible in many projects that span Kaggle datasets, open source such as GitHub, and many publications. Emitting to an S3 or S3 compatible storage and calculating number and size of emitted events indicates the reduction in size compared to original data and as a measure of effectiveness in using telemetry instead of actual data.

With the metrics emitted for drones, the first step of noise filtering involves removing duplicates, false positives, recurring notifications and superfluous information while registering their frequency for future use. Dissecting data within specific windows, keeping unique events and eliminating excessive repetitions can be offloaded to a dedupe processor but this step is not limited to that and strives to keep the data as precise and concise as required to not lose information and still be good enough for the same analytics.

Specific datasets and SIEM are indispensable for future needs and with real-time data refinement requirements. So, leveraging cloud architecture patterns that write to multiple destinations while collecting data from multiple sources such as a service bus is a requisite for the second stage. This step could also implement filtering capabilities and journaling that ensures robustness and reliability and without loss of fidelity.

The third step is a take on advanced telemetry management with the introduction of concepts like Traffic flow segregation such as with grouping and streamlining. It does involve parsing but it improves overall performance. Deeper analysis is often better with some transformations

The fourth step for data condensation builds on the concept of refinement that proactively prevents another instance of data deluge so that even streams are manageable and meaningful. The value extends beyond volume reduction as this approach reduces data processing overheads.

The fifth step is about managing the data and ensuring the speed and reliability of operations that process this data. With increasing ingestion rates, vectorization and search may lag. Agile robust solutions that maximize the value derived from their data while making costs manageable are required here.

Data accumulation without purposeful action leads to stagnation and efficient operations aid streamlining and refining data. Speed and reliability is a function of both


Thursday, April 17, 2025

 Always pertinent:

Problem: determine if a graph has cycles:

import java.util.*;

import java.lang.*;

import java.io.*;

class Ideone

{

 public static void main (String[] args) throws java.lang.Exception

 {

  int[][] m = new int[5][5]();

  for (int i = 0; i < m.length; i++) {

   for (int j = 0; j < m[0].length; j++) {

    m[i][j] = 0;

   }

  }

  m[0][1] = 1;

  m[0][2] = 1;

  m[1][0] = 1;

  m[1][3] = 1;

  m[2][0] = 1;

  m[2][3] = 1;

  m[3][1] = 1;

  m[3][2] = 1;

  m[3][4] = 1;

  m[4][3] = 1;

  var vertices = InitializeSingleSource(m, 0);

  var edges = new HashMap<String, String>();

  edges.put("A", "B");

  edges.put("A", "C");

  edges.put("B", "A");

  edges.put("B", "D");

  edges.put("C", "A");

  edges.put("C", "D");

  edges.put("D", "B");

  edges.put("D", "C");

  edges.put("D", "E");

  edges.put("E", "D");

  System.out.println(hasCyclesByBellmanFord(vertices, edges));

 }

 private static List<Vertex> InitializeSingleSource(int[][] m, int start) {

  var vertices = new ArrayList<Vertex>();

  for (int i = 0; i < m.length; i++){

   var v = new Vertex();

   v.id = String.valueOf(Character.valueOf('A' + i));

   v.d = Integer.MAX_VALUE;

   if (i == start) { v.d = 0; }

   v.parent = null;

  }

  return vertices

 }

 private static Vertex getVertex(List<Vertex> vertices, String id){

  for (int i = 0; i < vertices.size(); i++){

   if (vertices.get(i).id.equals(id)){

    return vertices.get(i);

   }

  }

  return null;

 }

 // A ->C <-D ->E

 // ->B->

 private static boolean hasCyclesByBellmanFord(List<Vertex> vertices, Map<String, String> edgeMap) {

  boolean result = false;

  for (int i = 0; i < vertices.length; i++){

   for(var entry: edgeMap.entrySet()) {

    var u = getVertex(entry.getKey());

    var v = getVertex(entry.getValue());

    relax(u, v);

   }

  }

  for (var entry: edgeMap.entrySet()) {

   var u = getVertex(entry.getKey());

   var v = getVertex(entry.getValue());

   if (u != null &&

    v != null &&

    v.d > u.d + 1) {

    result = true;

    return result;

   }

  }

  return result;

 }

 private static void Relax(Vertex u, Vertex v) {

  if (u == null || v == null) { return; }

  if (v.d > u.d + 1) {

   v.d = u.d + 1;

   v.parent = u;

  }

 }

 class Vertex {

  public String id;

  public Vertex parent;

  public Integer d;

 }

}


Wednesday, April 16, 2025

 Secure-by-design

The boundary between infrastructure and application engineering is one where the concerns for security are played differently. Application engineering focuses on architecture with boundaries such that the some of the resources are considered within trust boundary. The infrastructure engineering implements security with network perimeter protection and defense-in-depth strategy such that even the internal or hidden-from-world resources enjoy a certain level of protection. Both sides cannot deny the need to upskill and leverage the tools available. Developers must be trained in secure coding and infrastructure engineers must hold them true to it. Proficiency in using approved tools and establishing and maintaining effective oversight and administration go hand in hand.

With a sprawl in digital landscape of resources. tools, frameworks and platforms used to host data and run code, organizations often find it hard to benchmark their security against industry standards. “Embracing security and resilience by design” is indeed a challenge and progress to meet it must be tracked. CISA has published pivotal guidelines on the subject. One of the techniques frequently used is the Secure Code Warrior’s “Trust Score” technology which opens a new frontier of actionable security insights and benchmarking.

Cybersecurity is a discipline, and it is dynamic. While it was a $2 billion industry in the 90’s dominated by transaction systems, it is now over $2 trillion with an insatiable demand for products, services and AI applications. Virtually every company writes code in some way. Organizations have grappled with bringing security upfront into SDLC amidst cultural resistance and disagreements while having AppSec professionals deplete with a high rate of burnout. Movements like “shift left” have attempted to correct this. But accountability continues to be a sticking point at all levels and scopes. Case in point is the CrowdStrike introduced defect that affected Airlines industry. Oversight and management of software development processes must ensure Secure-by-Design is front-of-mind and achievable for each deployment.

Some of the tenets include: “Provide secure defaults for developers” with the default route during software development as one that is “paved road” or “well-lit path” and “Foster a software developer workforce that understands security” by training them on the best practices and including security education into the hired skillset. Developers need to be enabled through continuous precision learning pathways and tools to suit their tech stack and to share the responsibility for security.


Tuesday, April 15, 2025

 This is a summary of the book titled “The Yellow Pad” written by Robert E. Rubin and published by Penguin Press in 2023. The author is a former Goldman Sachs Executive and US Secretary of the Treasury who discusses how to navigate difficult, controversial decisions saying that he learned it requires adhering to specific principles. He also found that one must always recognize the unpredictable human element, since no event ever unfolds exactly as planned. He brings his knowledge and experience to this framework of principles. A prisoner’s insights made a lasting impact on him. In his work, he learned that risk management demands acknowledging the remotest possibilities. Mental toughness enables strong leaders to overcome bumps in the road and great leaders are curious, authentic, and true to their beliefs. A retrospective view provides clarity and fosters more effective decision-making.

He was impressed by the forthrightness of prisoners about their crimes and the importance of pause, assessment, and weighing the possible repercussions of their actions. He believed that making informed decisions amid intense upheaval requires special skill and discipline. Rubin emphasized the importance of understanding one's emotional biases and regulating them during risky crunch times. He also compared the late 20th century to the time when Vice President Al Gore warned of the dangers of global warming. Rubin and Henry Paulson, former treasury secretary under President George W. Bush, approached the Securities and Exchange Commission to advocate for financial establishments to openly acknowledge the possible costs of global warming. They believed that if more leaders around the world had thought about the issue like Al Gore, the world today would be a safer place.

Mental toughness is crucial for strong leaders to overcome challenges and maintain confidence in their decision-making abilities. They are resilient and embrace optimism, which is important within organizations. Successful leaders are known for their resilience, especially in response to public criticism. They are also known for their energetic curiosity, which leads them to take a skeptical approach and search for answers beyond obvious conclusions. Authenticity is a character trait that serves leaders well, as it allows them to explore the world around them. Being true to oneself requires consistency, even when it means disagreeing with others' opinions. Traditional management often overlooks the human element, as seen in the case of Lawrence Bossidy's management philosophy. Rubin prefers to focus on people's individuality and uniqueness, rather than specific rules and detailed lists of do's and don'ts. Despite their skepticism, leaders like Rubin can be talented and perceptive, making them valuable assets in their organizations.

Good executives prioritize the best interests of their organizations, ignoring personal feelings and fostering empathetic and patient decision-making. They credit their employees for department successes, accept blame when things go wrong, and are open to feedback from everyone. Organizational culture influences employee success, and leaders must avoid deviating from their foundational values. Success accrues upward, reflecting well on the leader. Analyzing past actions helps make informed decisions moving forward. In some cases, carefully considered decisions can still generate negative results. Intellectual openness creates an environment where people can work with leaders to make the best decisions. However, organizations often assign blame, leaving employees without valuable input. Chastising, blaming, or unfairly punishing people for making honest mistakes can fuel an unhealthy culture. Rubin seldom states his negative judgments of anyone's actions in public, as long as the people who made poor decisions undertake unflinching reviews of how their actions led to unsatisfactory outcomes.