Micro-batching versus streaming for aerial drone video analytics
Azure AI Vision enables near real-time video analysis by extracting meaningful insights from live video streams. The process involves capturing frames from a video source, selecting which frames to analyze, submitting them to the API, and consuming the returned results. There are three progressively sophisticated approaches to implementing this workflow.
The simplest method uses an infinite loop that grabs a frame, analyzes it, and processes the result. While straightforward, this approach is limited by the latency of cloud-based API calls, which can slow down frame acquisition. To improve performance, the second method introduces parallelism by launching each analysis task asynchronously. This allows frame grabbing to continue uninterrupted but introduces challenges like out-of-order results and potential thread safety issues.
The most robust solution is a producer-consumer system. Here, a producer thread captures frames and queues analysis tasks, while a consumer thread process results sequentially. This design ensures orderly result handling and maximizes frame throughput without blocking operations. This draws inspiration from TCP protocol ring buffer processing and socket programming with overlapped I/O.
To help developers get started, Microsoft provides a sample library called FrameGrabber, which simplifies integration with Azure AI services. It supports event-driven programming, allowing developers to respond to new frames and analysis results efficiently. A sample python application demonstrates how to use the pygrabber for vehicle tracking in aerial drone feeds.
This hybrid approach—combining client-side preprocessing with cloud-based analysis—offers flexibility and scalability for building intelligent video applications by still retaining the bulk of the work on the analytics side instead of the video preprocessing side or with model training, testing and revision cycles. Developers can build interactive experiences on querying aerial drone images using agentic retrieval.
While many purpose-specific aerial drone video sensing applications do require custom models for various purposes, we believe a video sensing analytical platform1 removes much of the overhead and repetitive tasks in home-grown and DIY solutions while moving the complexity from vectorizing to analytics. It also provides an opportunity to stay nimble on alternative or augmentation techniques to image frame grabbing and processing such as with video indexing, thereby avoiding the high cost of repetitive tasks on the video preprocessing side.
Extending the concept of picking what to process from frame selection to video indexing, it becomes clearer that specific analysis can be done with a high degree of accuracy and high performance when videos are micro-batched versus when split into live feed frames by reducing the working set. Only in cases where continuous tracking is required, the latter may be paid for. In most cases, the drone world catalog suffices to be build incrementally from the iterative micro-batches for answering general questions including those relevant to vehicle tracking.
Sample application demonstrating this:
# Step 1: grab frames from live or indexed video
from pygrabber.dshow_graph import FilterGraph
import cv2
# Initialize the frame grabber device (typically, 0 means the first available camera)
graph = FilterGraph()
camera_index = 0
graph.add_video_input_device(camera_index)
# Define a function to process and return frames
def get_camera_frame(image_buffer):
# Convert the bgr image buffer to an OpenCV image
frame = image_buffer
return frame
graph.add_sample_grabber(get_camera_frame)
graph.add_null_render()
graph.prepare_preview_graph()
graph.run()
import torch
# Step 2: detect object
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
def detect_vehicles(frame):
results = model(frame)
# Keep only 'car', 'truck', 'bus', 'motorcycle' detections
vehicle_labels = ['car', 'truck', 'bus', 'motorcycle']
detections = results.pandas().xyxy[0]
vehicles = detections[detections['name'].isin(vehicle_labels)]
return vehicles
# Step 3: implement trackers
trackers = cv2.MultiTracker_create()
def update_trackers(frame):
ret, boxes = trackers.update(frame)
return boxes
def initialize_trackers(frame, vehicles):
for _, det in vehicles.iterrows():
bbox = tuple(det[['xmin', 'ymin', 'xmax', 'ymax']])
# OpenCV boxes: (x, y, w, h)
x1, y1, x2, y2 = map(int, bbox)
w = x2 - x1
h = y2 - y1
tracker = cv2.TrackerCSRT_create()
trackers.add(tracker, frame, (x1, y1, w, h))
# Step 4: visualize tracking results
while True:
frame = ... # Get latest frame from frame grabber
vehicles = detect_vehicles(frame)
if trackers.getObjects().empty():
initialize_trackers(frame, vehicles)
else:
boxes = update_trackers(frame)
for i, box in enumerate(boxes):
x, y, w, h = [int(v) for v in box]
cv2.rectangle(frame, (x, y), (x + w, y + h), (255,0,0), 2)
cv2.putText(frame, f'Vehicle {i+1}', (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)
cv2.imshow('Vehicle Tracking', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
graph.stop()
cv2.destroyAllWindows()
No comments:
Post a Comment