Thursday, April 24, 2025

 Leveraging a database of objects detected with Standard Query Operators to build rich drone video sensing applications.

We mentioned the drone video sensing platform DFCS to comprise of a vision processor, an analytical engine and a drone router where the vision processor creates vectors for keypoints that are a tuple of pixel position and feature descriptor of the patch around the pixel which translates to world co-ordinates and time lapse information of that location. While many of the questions can directly be answered with a search on this vector database or with multimodal search directly on the selected frames, we also leverage RAG by creating a database of detected objects which comes in very useful to search with the public reviews of those objects such as say parking spaces from the internet. The aim of the product database as a regular structured data source of all detected objects is that we can now leverage standard query operators to build rich uav swarm sensing applications.

For example,

-- My Position

declare @myposition geography = geography::STGeomFromText('POINT(-0.2173896258649289, 51.484376146936256)' 4326)

-- Get Embeddings from OpenAI

declare @e varbinary(8000);

exec dbo.get_embeddings

@model = 'text-embedding-3-small'

@text = 'a place to park a car on Thursday 1-3 pm GMT',

@embedding = @e output;

with cte as

(

select

e.review_id,

vector_distance('cosine', embedding, @e) as distance

from

dbo.review_embeddings e

)

select top(10)

b.id as business_id,

b.name,

r.id as review_id,

r.stars,

@myposition.STDistance(geo_location) as geo_distance,

1-e.distance as similarity

from

cte e

inner join

dbo.reviews r on e.review_id = r.id

inner join

dbo.business b on r.business_id = b.id

where

b.city = 'London'

and

@myposition.STDistance(geo_location) < 5000 -- 5 km

and

regexp_like(cast(b.categories as varchar(1000)), 'Parking|Street')

and

r.stars >= 4

 and

b.reviews > 30

and

json_value(b.custom_attributes, '$."metered"') = 'yes'

order by

distance

go

The above direct SQL query on the database combined with built-in vector search allows a traditional web application to be created or the application can query a chatbot with a system message as “You are an AI assistant that helps people find parking. Give as many details as possible about each parking space such as price. Whenever you respond, please format your answer to make it readable including bullet points.” to define the AI's personality, tone and capabilities and leverage the detected objects database for Retrieval Augmented Generation.


Wednesday, April 23, 2025

 Waypoint selection strategies

The design, development and test of the waypoint selection and trajectory forming algorithm was discussed with the assumption that the users provide a geographic region that they are interested in observing. The region is then divided into a grid of cells under a user configurable cell size. Then acquiring information on the reachability of cells from one another, we create a graph represented with cells as nodes and the adjacencies as edges. This helps us determine waypoints as the set of nodes to select in a topographical sort between source to destination. One of the helper libraries for the implementation, therefore, involves the following graph object.

class Vertex(object):

    def __init__(self, id, point):

        self.id = id

        self.point = point

        self.in_edges = []

        self.out_edges = []

    def _neighbors(self):

        n = {}

        for edge in self.in_edges:

            n[edge.src] = edge

        for edge in self.out_edges:

            n[edge.dst] = edge

        return n

    def neighbors(self):

        return self._neighbors().keys()

    def __repr__(self):

        return 'Vertex({}, {}, {} in {} out)'.format(self.id, self.point, len(self.in_edges), len(self.out_edges))

class Edge(object):

    def __init__(self, id, src, dst):

        self.id = id

        self.src = src

        self.dst = dst

    def bounds(self):

        return self.src.point.bounds().extend(self.dst.point)

    def segment(self):

        return geom.Segment(self.src.point, self.dst.point)

    def closest_pos(self, point):

        p = self.segment().project(point)

        return EdgePos(self, p.distance(self.src.point))

    def is_opposite(self, edge):

        return edge.src == self.dst and edge.dst == self.src

    def get_opposite_edge(self):

        for edge in self.dst.out_edges:

            if self.is_opposite(edge):

                return edge

        return None

    def is_adjacent(self, edge):

        return edge.src == self.src or edge.src == self.dst or edge.dst == self.src or edge.dst == self.dst

    def orig_id(self):

        if hasattr(self, 'orig_edge_id'):

            return self.orig_edge_id

        else:

            return self.id


Tuesday, April 22, 2025

 SIFT feature extraction for drone imageries

SIFT, or Scale-Invariant Feature Transform, is a powerful algorithm used in computer vision for detecting, describing, and matching local features in images. SIFT is designed to identify features that remain consistent across changes in scale, rotation, and illumination. It is applied to drone imageries to compute keypoints in each video frame. A keypoint is a tuple of a pixel position and a feature descriptor that describes the image in a patch around that pixel - a vector representation of the local image region. SIFT matches features between images by comparing their descriptors using metrics like Euclidean distance. For every video frame, SIFT yields a set of keypoints.

The implementation to get sift features is as follows:

import cv2

sift = cv2.xfeatures2d.SIFT_create()

def compute_one(im):

        return sift.detectAndCompute(im, None)

def compute_sift(frames):

        print('get sift features')

        sift_features = [(None, None) for _ in frames]

        for frame_idx, im in enumerate(frames):

            if im is None or frame_idx % 3 != 0:

                continue

            print('... sift {}/{}'.format(frame_idx, len(frames)))

            keypoints, descs = compute_one(im)

            sift_features[frame_idx] = (keypoints, descs)

        return sift_features


Monday, April 21, 2025

 Multimodal image search

The following code snippet describes how multimodal search can come useful to search images. The images are indexed and searched based on vector embeddings but the query is text based.

from dotenv import load_dotenv,dotenv_values

import json

import os

import requests

from tenacity import retry, stop_after_attempt, wait_fixed

from dotenv import load_dotenv

from azure.core.credentials import AzureKeyCredential

from azure.identity import DefaultAzureCredential

from azure.search.documents import SearchClient

from azure.search.documents.indexes import SearchIndexClient

from azure.search.documents.models import (

    RawVectorQuery,

)

from azure.search.documents.indexes.models import (

    ExhaustiveKnnParameters,

    ExhaustiveKnnVectorSearchAlgorithmConfiguration,

    HnswParameters,

    HnswVectorSearchAlgorithmConfiguration,

    SimpleField,

    SearchField,

    SearchFieldDataType,

    SearchIndex,

    VectorSearch,

    VectorSearchAlgorithmKind,

    VectorSearchProfile,

)

from IPython.display import Image, display

 load_dotenv()

service_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")

index_name = os.getenv("AZURE_SEARCH_INDEX_NAME")

api_version = os.getenv("AZURE_SEARCH_API_VERSION")

key = os.getenv("AZURE_SEARCH_ADMIN_KEY")

aiVisionApiKey = os.getenv("AZURE_AI_VISION_API_KEY")

aiVisionRegion = os.getenv("AZURE_AI_VISION_REGION")

aiVisionEndpoint = os.getenv("AZURE_AI_VISION_ENDPOINT")

credential = AzureKeyCredential(key)

search_client = SearchClient(endpoint=service_endpoint, index_name=index_name, credential=credential)

query_image_path = "images/PIC01.jpeg"

@retry(stop=stop_after_attempt(5), wait=wait_fixed(1))

def get_image_vector(image_path, key, region):

    headers = {

        'Ocp-Apim-Subscription-Key': key,

    }

    params = urllib.parse.urlencode({

        'model-version': '2023-04-15',

    })

    try:

        if image_path.startswith(('http://', 'https://')):

            headers['Content-Type'] = 'application/json'

            body = json.dumps({"url": image_path})

        else:

            headers['Content-Type'] = 'application/octet-stream'

            with open(image_path, "rb") as filehandler:

                image_data = filehandler.read()

                body = image_data

        conn = http.client.HTTPSConnection(f'{region}.api.cognitive.microsoft.com', timeout=3)

        conn.request("POST", "/computervision/retrieval:vectorizeImage?api-version=2023-04-01-preview&%s" % params, body, headers)

        response = conn.getresponse()

        data = json.load(response)

        conn.close()

        if response.status != 200:

            raise Exception(f"Error processing image {image_path}: {data.get('message', '')}")

        return data.get("vector")

    except (requests.exceptions.Timeout, http.client.HTTPException) as e:

        print(f"Timeout/Error for {image_path}. Retrying...")

        raise

vector_query = RawVectorQuery(vector=get_image_vector(query_image_path,

                                                      aiVisionApiKey,

                                                      aiVisionRegion),

                              k=3,

                              fields="image_vector")

def generate_embeddings(text, aiVisionEndpoint, aiVisionApiKey):

    url = f"{aiVisionEndpoint}/computervision/retrieval:vectorizeText"

    params = {

        "api-version": "2023-02-01-preview"

    }

    headers = {

        "Content-Type": "application/json",

        "Ocp-Apim-Subscription-Key": aiVisionApiKey

    }

    data = {

        "text": text

    }

    response = requests.post(url, params=params, headers=headers, json=data)

    if response.status_code == 200:

        embeddings = response.json()["vector"]

        return embeddings

    else:

        print(f"Error: {response.status_code} - {response.text}")

        return None

query = "farm"

vector_text = generate_embeddings(query, aiVisionEndpoint, aiVisionApiKey)

vector_query = RawVectorQuery(vector=vector_text,

                              k=3,

                              fields="image_vector")

# Perform vector search

results = search_client.search(

    search_text=query,

    vector_queries= [vector_query],

    select=["description"]

)

for result in results:

    print(f"{result['description']}")

    display(Image(DIR_PATH + "/images/" + result["description"]))

    print("\n")


Sunday, April 20, 2025

 Continuous indexing

Azure AI Search supports continuous indexing of documents, enabling real-time updates to the search index as new data is ingested. It can connect to various data sources, such as Azure Blob Storage, SQL databases, or Cosmos DB, to ingest documents continuously. Indexers are configured to monitor these sources for changes and update the search index accordingly. The indexer scans the data source for new, updated, or deleted documents. The time taken to index new documents depends on factors like the size of the data, complexity of the schema, and the indexing tier. For large datasets, indexing may take longer, especially if the indexer is resource starved. Once documents are indexed, they are available for querying. However, query latency can vary based on the size of the index, query complexity, and service tier. The minimum interval for indexer runs is 5 minutes. If this pull from data source is not sufficiently fast enough, individual data item can be indexed by directly pushing to index using the index client. Both these are shown via code samples below:

from azure.identity import DefaultAzureCredential

from azure.mgmt.search import SearchManagementClient

Replace with your Azure credentials and configuration

subscription_id = ""

resource_group_name = ""

search_service_name = ""

blob_storage_account_name = ""

blob_container_name = ""

connection_string = ""

Authenticate using DefaultAzureCredential

credential = DefaultAzureCredential()

Initialize the Azure Search Management Client

search_client = SearchManagementClient(credential, subscription_id)

Define the data source

data_source_name = "blob-data-source"

data_source_definition = {

  type": "AzureBlob",

  credentials": {

      connectionString": connection_string

  },

  container": { name": blob_container_name } }

Create or update the data source in Azure Search

search_client.data_sources.create_or_update( resource_group_name=resource_group_name, search_service_name=search_service_name,

data_source_name=data_source_name,

data_source=data_source_definition )

Define the index

index_name = "blob-index"

index_definition =

{

  fields": [

    {"name": "id", "type": "Edm.String", "key": True},

    {"name": "content", "type": "Edm.String"},

    {"name": "category", "type": "Edm.String"},

    {"name": "sourcefile", "type": "Edm.String"},

    {"name": "metadata_storage_name", "type": "Edm.String"} ] }

Create or update the index

search_client.indexes.create_or_update(

resource_group_name=resource_group_name, search_service_name=search_service_name,

index_name=index_name,

index=index_definition )

Define the indexer

indexer_name = "blob-indexer"

indexer_definition = {

dataSourceName": data_source_name,

targetIndexName": index_name,

schedule":

{

interval": "PT5M" # Run every 5 minutes

} }

Create or update the indexer

search_client.indexers.create_or_update( resource_group_name=resource_group_name, search_service_name=search_service_name, indexer_name=indexer_name, indexer=indexer_definition )

print("Configured continuous indexing from Azure Blob Storage to Azure AI Search!")

Replace with your Azure credentials and configuration

service_name = ""

admin_key = ""

Initialize the SearchIndexClient

endpoint = f"https://{service_name}.search.windows.net/"

credential = AzureKeyCredential(admin_key)

index_client = SearchIndexClient(endpoint=endpoint, credential=credential)

Upload documents to index:

def index_document(filename):

     print(f"Indexing document '{filename}' into search index '{index_name}'")

     search_client = SearchClient(endpoint=f"https://{searchservice}.search.windows.net/", index_name=index, credential=search_creds)

     batch = []

     with open(filename, 'r') as fin:

          text = fin.read()

          batch += [text]

     if len(batch) > 0:

results = search_client.upload_documents(documents=batch)

succeeded = sum([1 for r in results if r.succeeded])

print(f"\tIndexed {len(results)} documents, {succeeded} succeeded")

The default rate limit for adding documents to the index varies with service tiers, replicas and partitions. Higher service tiers have higher rate limit. Adding replicas increases query throughput. Adding partitions increases indexing throughput. 1000 documents can be sent in a batch, and batching optimizes throughput and reduces the likelihood of hitting rate limits.


Saturday, April 19, 2025

 How the DFCS differs from SkyQuery platform?

DFCS is a UAV swarm imagery driven knowledge base and analytics stack based entirely in the public cloud that can be used to create a trajectory involving waypoints from source to destination over a given landscape. The capabilities to store and query drone imageries for information that can be used to build a knowledge base for retrieval augmented generation in AI applications is quite generic and has many requirements like a wide variety of image querying systems. Most notably, SkyQuery, platform also has similar requirements to deal with a large dataset of images and to provide contextual information on queries. SkyView is an aerial drone video sensing platform with a high-level programming language that makes it quite suitable for developing long-running sensing applications. SkyView performs with fast video frame alignment and detection of small objects which works well for querying with its expressive domain specific language in which programs specify sensing-analytics-routing loops. It also provides a library of analytical operators to encode these steps. By separating out workflows that can be written using these operators, it allows takeoff, waypoint following and landing to be automated.

Therefore, both DFCS and SkyQuery provide computer vision pipelines and processors to convert drone video data into queryable representations, a way to contextualize queries along with an engine that provides fast responses suitable for use to provide routing directives to UAV swarm and all these with the help of programmable interfaces.

But the differences are in the use of representations for these datasets and the way they are queried. DFCS leverages AI and vector search while SkyQuery leverages language constructs. Even image processors are multimodal for DFCS while SkyQuery leverages cataloguing of output from SIFT feature extractors. The use of Retrieval-augmented-generation in queries makes the query results more meaningful for DFCS while SkyQuery requires workflows to experiments with their own querying logic. Objects are referred to with Keypoints comprising of pixel positions and a feature descriptor that are then formed into “stable groups” with SkyQuery. DFCS, on the other hand, leverages vector search that work well with contextual information presented via spatial co-ordinates, progress along waypoints and error corrections.

It could be said that the DFCS focuses more on the flight path of the UAV swarm and provides error correction feedback to let the swarm remain on course to its destination. It bolsters this with information for humans as well as feedback loops for autonomous flights and comes with Telemetry pipelines that continuously indicate manner and measure of progress along the trajectory.

By separating the cataloguing, grouping and querying of objects to remain independent of the vector representations, DFCS facilitates working with third party datastores including those that were built to be product catalogs. This help to diversify the method and means of querying for different purposes and not be restricted to leverage only one form of language. DFCS is polyglot and provides a chatbot like interface that leverages the state of the union in Retrieval Augmented Generation.

#codingexercise: https://1drv.ms/w/c/d609fb70e39b65c8/Echlm-Nw-wkggNb7JAEAAAABu53rpIuTS5AsMb3lNiM7SQ?e=u6kTma


Friday, April 18, 2025

Telemetry pipelines

 Collected and emitted telemetry data makes data ingestion and processing of sensor data independent of the input for the models used to predict the next orientation. This strategy leans on telemetry pipelines as an effective technology to solve data problems and turn expansive datasets into concise actionable insights without losing information. Waypoints, trajectory, position on the trajectory, deviations and error corrections are all that is needed, maintained and tracked for the UAV swarm to negotiate the obstacles and stay on course to reach the destination from the source. An intelligent telemetry pipeline will demonstrate these five-step approach to maximizing its value:

1. Noise filtering: This involves sifting through data to spotlight the essentials.

2. Long-Term data retention: this involves safeguarding valuable data for future use

3. Event-trimming: This tailors data for optimal analytics so that the raw data is not dictating eccentricities in the charts and graphs.

4. Data condensation: this translates voluminous MELT data into focused metrics

5. Operational Efficiency Boosting: This amplifies operating speed and reliability.

This approach is widely applicable across domains and is also visible in many projects that span Kaggle datasets, open source such as GitHub, and many publications. Emitting to an S3 or S3 compatible storage and calculating number and size of emitted events indicates the reduction in size compared to original data and as a measure of effectiveness in using telemetry instead of actual data.

With the metrics emitted for drones, the first step of noise filtering involves removing duplicates, false positives, recurring notifications and superfluous information while registering their frequency for future use. Dissecting data within specific windows, keeping unique events and eliminating excessive repetitions can be offloaded to a dedupe processor but this step is not limited to that and strives to keep the data as precise and concise as required to not lose information and still be good enough for the same analytics.

Specific datasets and SIEM are indispensable for future needs and with real-time data refinement requirements. So, leveraging cloud architecture patterns that write to multiple destinations while collecting data from multiple sources such as a service bus is a requisite for the second stage. This step could also implement filtering capabilities and journaling that ensures robustness and reliability and without loss of fidelity.

The third step is a take on advanced telemetry management with the introduction of concepts like Traffic flow segregation such as with grouping and streamlining. It does involve parsing but it improves overall performance. Deeper analysis is often better with some transformations

The fourth step for data condensation builds on the concept of refinement that proactively prevents another instance of data deluge so that even streams are manageable and meaningful. The value extends beyond volume reduction as this approach reduces data processing overheads.

The fifth step is about managing the data and ensuring the speed and reliability of operations that process this data. With increasing ingestion rates, vectorization and search may lag. Agile robust solutions that maximize the value derived from their data while making costs manageable are required here.

Data accumulation without purposeful action leads to stagnation and efficient operations aid streamlining and refining data. Speed and reliability is a function of both