#!/usr/bin/env python3
"""
Consolidated Drone Video Analytics Framework
Merges UAV-CodeAgents message-passing with MCP-based plugin architecture.
Architecture layers:
1. Domain: Core data structures for frames, evidence, and geospatial primitives.
2. Perception: Frame capture and enrichment pipelines.
3. Retrieval/Index: Evidence storage and hybrid search via MCP.
4. Reasoning: ReAct-based task decomposition and LLM orchestration.
5. Coordination: Message-passing for UAV/manager collaboration.
6. Validation: LLM-as-a-judge scoring pipeline.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Protocol, Tuple, runtime_checkable
import time
from abc import ABC, abstractmethod
# ============================================================================
# DOMAIN LAYER: Core data structures and primitives
# ============================================================================
@dataclass
class Frame:
"""Raw frame captured by UAV."""
image_id: str
timestamp: float
image_bytes: bytes
geo: Tuple[float, float] # (lat, lon)
alt: Optional[float] = None
@dataclass
class SemanticAnnotation:
"""Object detection or scene label within a frame."""
label: str
bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2)
confidence: float
pixel_point: Optional[Tuple[int, int]] = None
@dataclass
class Waypoint:
"""GPS waypoint for UAV navigation."""
lat: float
lon: float
alt: float
meta: Dict[str, Any] = field(default_factory=dict)
@dataclass
class FrameEvidence:
"""Enriched frame with semantic and spatial metadata."""
video_id: str
tour_id: str
frame_id: str
timestamp_ms: int
tour_order: int
image_uri: str
caption: str = ""
objects: List[str] = field(default_factory=list)
ocr_text: str = ""
spatial_relations: List[str] = field(default_factory=list)
scene_type: str = ""
change_note: str = ""
confidence: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RetrievedEvidence:
"""Ranked retrieval result with provenance."""
evidence: FrameEvidence
score: float
matched_query: str = ""
rank_reason: str = ""
@dataclass
class QueryPlan:
"""Task decomposition output."""
original_query: str
subqueries: List[str]
intents: List[str]
needs_temporal: bool = False
needs_spatial: bool = False
needs_object: bool = True
# ============================================================================
# MCP PROTOCOL LAYER: Universal plugin interface
# ============================================================================
@dataclass
class MCPContext:
"""Standardized request envelope for MCP tools."""
request_id: str
session_id: str
agent_name: str
tool_name: str
action: str
payload: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MCPResult:
"""Standardized response envelope from MCP tools."""
ok: bool
data: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@runtime_checkable
class MCPTool(Protocol):
"""Protocol for all MCP-compliant tools."""
name: str
def invoke(self, context: MCPContext) -> MCPResult:
"""Execute tool with MCP context."""
...
# ============================================================================
# PERCEPTION LAYER: Frame capture and enrichment
# ============================================================================
class VideoTourIngestor:
"""Extract and prepare keyframes from video."""
def extract_frames(self, video_path: str) -> List[str]:
"""Extract all frames; returns list of frame paths."""
raise NotImplementedError
def select_keyframes(self, frames: List[str]) -> List[str]:
"""Select keyframes using heuristics."""
raise NotImplementedError
def attach_timestamps(self, frames: List[str]) -> List[Tuple[str, int]]:
"""Attach timestamps (ms) to each frame."""
raise NotImplementedError
class FrameEnrichmentService:
"""Enrich raw frames with VLM captions, object detection, and spatial tags."""
def analyze_frame(self, image_path: str) -> FrameEvidence:
"""Single-frame analysis: caption, objects, OCR, scene type."""
raise NotImplementedError
def enrich_batch(self, images: List[str]) -> List[FrameEvidence]:
"""Batch enrichment with deduplication."""
raise NotImplementedError
class PerceptionService:
"""Adapter layer for VLM and detector clients."""
def __init__(self, vlm_client: Any, detector: Any):
self.vlm = vlm_client
self.detector = detector
def describe_scene(self, frame: Frame) -> str:
"""Generate natural-language scene description."""
return self.vlm.describe(frame.image_bytes)
def detect_objects(self, frame: Frame) -> List[SemanticAnnotation]:
"""Run object detector on frame."""
detections = self.detector.run(frame.image_bytes)
return [
SemanticAnnotation(d["label"], d["bbox"], d["score"])
for d in detections
]
def pixel_point(self, frame: Frame, phrase: str) -> Tuple[int, int]:
"""Locate pixel coordinates for a natural-language phrase."""
return self.vlm.pixel_point(frame.image_bytes, phrase)
class EvidenceBuilder:
"""Construct index records and text representations from FrameEvidence."""
def build_text_record(self, evidence: FrameEvidence) -> str:
"""Serialize FrameEvidence for dense retrieval embedding."""
raise NotImplementedError
def build_index_document(self, evidence: FrameEvidence) -> Dict[str, Any]:
"""Format FrameEvidence as searchable document."""
raise NotImplementedError
# ============================================================================
# RETRIEVAL & INDEX LAYER: Evidence storage and hybrid search
# ============================================================================
class EvidenceIndexer:
"""Persist and index FrameEvidence records."""
def upsert(self, evidence: FrameEvidence) -> None:
"""Store or update single evidence record."""
raise NotImplementedError
def bulk_upsert(self, evidences: List[FrameEvidence]) -> None:
"""Batch insert/update."""
raise NotImplementedError
class HybridRetriever:
"""Hybrid keyword + semantic search over indexed evidence."""
def retrieve(self, subquery: str, top_k: int = 5) -> List[RetrievedEvidence]:
"""Unified hybrid search (keyword + vector)."""
raise NotImplementedError
def keyword_search(self, subquery: str) -> List[RetrievedEvidence]:
"""Keyword-only search."""
raise NotImplementedError
def vector_search(self, subquery: str) -> List[RetrievedEvidence]:
"""Vector embedding search."""
raise NotImplementedError
class RetrievalPlugin(MCPTool):
"""MCP plugin wrapping HybridRetriever for ReAct access."""
name = "retrieval"
def __init__(self, retriever: HybridRetriever):
self.retriever = retriever
def invoke(self, context: MCPContext) -> MCPResult:
"""Handle retrieval requests via MCP."""
try:
subquery = context.payload.get("subquery", "")
top_k = context.payload.get("top_k", 5)
results = self.retriever.retrieve(subquery=subquery, top_k=top_k)
return MCPResult(ok=True, data={"results": results})
except Exception as e:
return MCPResult(ok=False, error=str(e))
class TemporalReasoner:
"""Reason over temporal sequences in retrieved evidence."""
def summarize_progression(self, evidences: List[RetrievedEvidence]) -> str:
"""Summarize how scenes changed over time."""
raise NotImplementedError
def answer_change_query(self, evidences: List[RetrievedEvidence]) -> str:
"""Answer 'what changed?' questions."""
raise NotImplementedError
class SpatialReasoner:
"""Reason over spatial relationships in retrieved evidence."""
def infer_relations(self, evidences: List[RetrievedEvidence]) -> str:
"""Infer spatial relationships (inside, near, above, etc.)."""
raise NotImplementedError
# ============================================================================
# GEOSPATIAL PLUGIN: External geospatial services
# ============================================================================
class GeodnetPlugin(MCPTool):
"""MCP plugin for geospatial lookups (e.g., coordinates, location names)."""
name = "geodnet"
def invoke(self, context: MCPContext) -> MCPResult:
"""Handle geospatial queries."""
if context.action == "get_location":
# Placeholder: in production, call actual geospatial service.
return MCPResult(ok=True, data={"lat": 47.674, "lon": -122.121})
return MCPResult(ok=False, error=f"Unsupported Geodnet action: {context.action}")
# ============================================================================
# REASONING LAYER: ReAct-based orchestration and synthesis
# ============================================================================
@dataclass
class ReActStep:
"""Single step in ReAct thought-action-observation loop."""
thought: str = ""
action_name: str = ""
action_args: Dict[str, Any] = field(default_factory=dict)
observation: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ReActState:
"""Internal state of ReAct execution."""
query: str
steps: List[ReActStep] = field(default_factory=list)
retrieved: List[RetrievedEvidence] = field(default_factory=list)
max_steps: int = 3
done: bool = False
class QueryPlanner:
"""Decompose user query into structured plan."""
def plan(
self, query: str, context: Optional[List[Dict[str, Any]]] = None
) -> QueryPlan:
"""Produce QueryPlan with subqueries and intent detection."""
raise NotImplementedError
class ThoughtGenerator:
"""Generate next thought and action in ReAct loop."""
def next_step(
self, state: ReActState, plan: QueryPlan
) -> Tuple[str, str, Dict[str, Any]]:
"""Return (thought, action_name, action_args)."""
raise NotImplementedError
class SufficiencyJudge:
"""Determine if gathered evidence is sufficient to answer query."""
def is_sufficient(self, state: ReActState, plan: QueryPlan) -> bool:
"""True if ReAct should stop and synthesize answer."""
raise NotImplementedError
class GroundedAnswerSynthesizer:
"""Synthesize final answer grounded in retrieved evidence."""
def answer(
self,
query: str,
evidences: List[RetrievedEvidence],
plan: QueryPlan,
) -> Dict[str, Any]:
"""Return answer dict with citations and supporting evidence."""
raise NotImplementedError
class ToolRouter:
"""Route action calls to registered MCP tools."""
def __init__(self, tools: Dict[str, MCPTool]) -> None:
self.tools = tools
def call(self, action_name: str, context: MCPContext) -> MCPResult:
"""Invoke named tool with MCP context."""
if action_name not in self.tools:
return MCPResult(ok=False, error=f"Unknown tool: {action_name}")
return self.tools[action_name].invoke(context)
class FinalAnswerWriter:
"""Format final answer and metadata."""
def write(self, state: ReActState, plan: QueryPlan) -> Dict[str, Any]:
"""Return structured answer response."""
raise NotImplementedError
class ReActController:
"""Orchestrate ReAct loop: plan, think, act, observe, judge."""
def __init__(
self,
planner: QueryPlanner,
router: ToolRouter,
thinker: ThoughtGenerator,
judge: SufficiencyJudge,
synthesizer: GroundedAnswerSynthesizer,
writer: FinalAnswerWriter,
) -> None:
self.planner = planner
self.router = router
self.thinker = thinker
self.judge = judge
self.synthesizer = synthesizer
self.writer = writer
def run(
self,
query: str,
session_id: str,
context: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Execute ReAct loop and return final answer."""
plan = self.planner.plan(query, context=context)
state = ReActState(query=query, max_steps=3)
for step_idx in range(state.max_steps):
if self.judge.is_sufficient(state, plan):
state.done = True
break
thought, action_name, action_args = self.thinker.next_step(state, plan)
step = ReActStep(
thought=thought, action_name=action_name, action_args=action_args
)
if action_name == "finalize":
state.steps.append(step)
state.done = True
break
ctx = MCPContext(
request_id=f"req-{session_id}-{step_idx}",
session_id=session_id,
agent_name="drone_react_agent",
tool_name=action_name,
action=action_name,
payload=action_args,
)
result = self.router.call(action_name, ctx)
step.observation = {
"ok": result.ok,
"data": result.data,
"error": result.error,
}
state.steps.append(step)
# Accumulate retrieved evidence.
if result.ok and "results" in result.data:
state.retrieved.extend(result.data["results"])
# Synthesize final answer grounded in retrieved evidence.
answer = self.synthesizer.answer(query, state.retrieved, plan)
return self.writer.write(state, plan)
# ============================================================================
# VALIDATION LAYER: LLM-as-a-judge scoring
# ============================================================================
@dataclass
class JudgeScore:
"""Multi-dimensional quality score."""
groundedness: float
completeness: float
temporal_consistency: float
spatial_consistency: float
instruction_following: float
overall: float
rationale: str
issues: List[str] = field(default_factory=list)
@dataclass
class JudgeInput:
"""Input to judge evaluator."""
query: str
answer: Dict[str, Any]
retrieved: List[RetrievedEvidence]
plan: QueryPlan
steps: List[ReActStep]
expected_format: str = "json"
class JudgePromptBuilder:
"""Construct evaluation prompt for LLM judge."""
def build(self, judge_input: JudgeInput) -> str:
"""Format evaluation prompt with query, answer, and evidence."""
raise NotImplementedError
class LLMJudgeClient:
"""Client for remote or local LLM judge service."""
def score(self, prompt: str) -> str:
"""Call LLM to score based on evaluation prompt."""
raise NotImplementedError
class JudgeEvaluator:
"""Combine prompt builder and LLM client to produce JudgeScore."""
def __init__(
self, prompt_builder: JudgePromptBuilder, client: LLMJudgeClient
) -> None:
self.prompt_builder = prompt_builder
self.client = client
def evaluate(self, judge_input: JudgeInput) -> JudgeScore:
"""Produce JudgeScore by prompting LLM."""
prompt = self.prompt_builder.build(judge_input)
raw = self.client.score(prompt)
# In production, parse raw LLM output into structured scores.
return JudgeScore(
groundedness=0.0,
completeness=0.0,
temporal_consistency=0.0,
spatial_consistency=0.0,
instruction_following=0.0,
overall=0.0,
rationale=raw,
issues=[],
)
class ValidationPlugin(MCPTool):
"""MCP plugin for validation via LLM judge."""
name = "validation"
def __init__(self, evaluator: JudgeEvaluator) -> None:
self.evaluator = evaluator
def invoke(self, context: MCPContext) -> MCPResult:
"""Invoke judge evaluation."""
try:
# Extract evaluation inputs from context payload.
judge_input = JudgeInput(
query=context.payload.get("query", ""),
answer=context.payload.get("answer", {}),
retrieved=context.payload.get("retrieved", []),
plan=context.payload.get("plan"),
steps=context.payload.get("steps", []),
)
score = self.evaluator.evaluate(judge_input)
return MCPResult(
ok=True,
data={
"score": score,
"passed": score.overall >= 0.75,
},
)
except Exception as e:
return MCPResult(ok=False, error=str(e))
@dataclass
class ValidationResult:
"""Result of validation pipeline."""
passed: bool
score: JudgeScore
retry_recommended: bool = False
class ValidationPipeline:
"""Coordinate evidence gathering, answer synthesis, and judge scoring."""
def __init__(self, evaluator: JudgeEvaluator, threshold: float = 0.75) -> None:
self.evaluator = evaluator
self.threshold = threshold
def validate(
self,
query: str,
answer: Dict[str, Any],
retrieved: List[RetrievedEvidence],
plan: QueryPlan,
steps: List[ReActStep],
) -> ValidationResult:
"""Run judge evaluation and return validation result."""
judge_input = JudgeInput(
query=query,
answer=answer,
retrieved=retrieved,
plan=plan,
steps=steps,
)
score = self.evaluator.evaluate(judge_input)
passed = score.overall >= self.threshold
return ValidationResult(
passed=passed, score=score, retry_recommended=not passed
)
# ============================================================================
# COORDINATION LAYER: Async message-passing orchestration
# ============================================================================
class MessageBus:
"""Simple pub/sub message bus for agent coordination."""
def __init__(self) -> None:
self.subscribers: Dict[str, List[Callable[[Dict[str, Any]], None]]] = {}
def subscribe(self, topic: str, handler: Callable[[Dict[str, Any]], None]) -> None:
"""Register handler for topic."""
self.subscribers.setdefault(topic, []).append(handler)
def publish(self, topic: str, message: Dict[str, Any]) -> None:
"""Broadcast message to all subscribers."""
for handler in self.subscribers.get(topic, []):
handler(message)
class UAVAgent:
"""Individual drone agent: captures frames, runs perception, publishes observations."""
def __init__(
self, id: str, perception: PerceptionService, bus: MessageBus
) -> None:
self.id = id
self.perception = perception
self.bus = bus
self.bus.subscribe("task.assign", self.on_task)
def on_task(self, msg: Dict[str, Any]) -> None:
"""Handle task assignment from manager."""
plan = msg.get("plan")
if not plan:
return
# Simulate frame capture and perception.
frame = Frame(
image_id=f"frame-{self.id}-{time.time()}",
timestamp=time.time(),
image_bytes=b"", # Placeholder
geo=(0.0, 0.0),
)
# Run perception on captured frame.
annotations = self.perception.detect_objects(frame)
caption = self.perception.describe_scene(frame)
# Publish observations back to manager.
self.bus.publish(
"uav.observation",
{
"uav_id": self.id,
"frame_id": frame.image_id,
"timestamp": frame.timestamp,
"annotations": annotations,
"caption": caption,
},
)
class AirspaceManagerAgent:
"""Central manager: distributes tasks, collects observations, orchestrates reasoning."""
def __init__(
self,
reasoning_controller: ReActController,
bus: MessageBus,
validator: ValidationPipeline,
) -> None:
self.reasoning = reasoning_controller
self.bus = bus
self.validator = validator
self.bus.subscribe("uav.observation", self.on_observation)
self.observations: List[Dict[str, Any]] = []
def handle_request(
self, user_prompt: str, region: Optional[Any] = None
) -> Dict[str, Any]:
"""Process user query: plan, distribute, reason, validate."""
# Trigger ReAct reasoning loop.
answer = self.reasoning.run(
query=user_prompt,
session_id=f"session-{time.time()}",
)
# TODO: Extract state from reasoning controller for validation.
# In production, capture retrieved, plan, steps from controller state.
retrieved: List[RetrievedEvidence] = []
plan = QueryPlan(
original_query=user_prompt, subqueries=[user_prompt], intents=["general"]
)
steps: List[ReActStep] = []
# Validate result.
validation = self.validator.validate(
query=user_prompt,
answer=answer,
retrieved=retrieved,
plan=plan,
steps=steps,
)
return {
"answer": answer,
"validation": validation,
}
def on_observation(self, msg: Dict[str, Any]) -> None:
"""Collect observation from UAV."""
self.observations.append(msg)
# ============================================================================
# INTEGRATED SYSTEM: End-to-end drone video analytics
# ============================================================================
class DroneVideoSystem:
"""Top-level system: coordinates ReAct reasoning, validation, and UAV orchestration."""
def __init__(
self,
react_controller: ReActController,
validator: ValidationPipeline,
manager: AirspaceManagerAgent,
) -> None:
self.react = react_controller
self.validator = validator
self.manager = manager
def answer_and_validate(
self,
query: str,
session_id: str,
context: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Execute query and validate result."""
# Delegate to manager for full orchestration.
return self.manager.handle_request(query)
# ============================================================================
# EXAMPLE WIRING (Placeholder)
# ============================================================================
def build_system() -> DroneVideoSystem:
"""Assemble and wire all components."""
# Instantiate storage, clients, and services (placeholders).
# storage = ProductionStorage()
# vlm_client = VLMClient()
# llm_client = LLMClient()
# detector = ObjectDetector()
# vector_db = VectorDB()
# indexer = EvidenceIndexer()
# retriever = HybridRetriever()
# Build perception layer.
# perception = PerceptionService(vlm_client, detector)
# Build MCP tool registry.
# tools = {
# "retrieval": RetrievalPlugin(retriever),
# "geodnet": GeodnetPlugin(),
# "validation": ValidationPlugin(evaluator),
# }
# router = ToolRouter(tools)
# Build reasoning components.
# planner = QueryPlanner()
# thinker = ThoughtGenerator()
# judge = SufficiencyJudge()
# synthesizer = GroundedAnswerSynthesizer()
# writer = FinalAnswerWriter()
# Build ReAct controller.
# react_controller = ReActController(
# planner=planner,
# router=router,
# thinker=thinker,
# judge=judge,
# synthesizer=synthesizer,
# writer=writer,
# )
# Build validation pipeline.
# prompt_builder = JudgePromptBuilder()
# judge_client = LLMJudgeClient()
# evaluator = JudgeEvaluator(prompt_builder, judge_client)
# validator = ValidationPipeline(evaluator)
# Build message bus and agents.
# bus = MessageBus()
# uav = UAVAgent("uav-1", perception, bus)
# manager = AirspaceManagerAgent(react_controller, bus, validator)
# Assemble top-level system.
# system = DroneVideoSystem(react_controller, validator, manager)
# return system
raise NotImplementedError(
"build_system() requires real implementations of storage, "
"VLM, LLM, detector, and vector DB clients."
)
if __name__ == "__main__":
# system = build_system()
# result = system.answer_and_validate(
# query="Are there vehicles parked near the airport?",
# session_id="test-session-1"
# )
# print(result)
pass
No comments:
Post a Comment