Tiling of aerial drone survey area
Problem statement:
Most UAV with top-down camera provide a video of their tour regardless of their flight path. This video can be split into several contiguous aerial drone images which may often number in hundreds even for a short duration. We need a software implementation that can take these images as input in the sorted order of the timeline and select those images that can complete tiling of the area surveyed by drone tour. The output of the implementation must be a selection of the original input. Flight path could be assumed to be rectangular with the lower left as origin for simplicity but none of the images have gps information available. Additionally, the selection could be output in the sorted order to start from lower left of the area surveyed and move clockwise along the perimeter but this can be skipped from the implementation.
Solution:
The following implementation uses computer vision (feature matching + homography estimation) to determine spatial relationships between images and then solves a set-cover problem to select a minimal tiling subset. A set of five stages completes the pipeline towards the goal:
Pipeline:
Stage What happens Key algorithm
1. Feature matching ORB keypoints are detected in each image; nearby frames (within search_radius) are matched using brute-force Hamming + Lowe's ratio test, then a homography is estimated with RANSAC. ORB [Rublee 2011], RANSAC [Fischler 1981]
2. Global registration A BFS traversal from the most-connected image propagates homographies so every frame is placed in a single mosaic coordinate system. No GPS needed. Brown & Lowe (2007) panoramic stitching
3. Coverage gridding The mosaic bounding box is discretised into a grid. Each image's warped footprint is rasterised to determine which cells it covers. OpenCV fillConvexPoly
4. Greedy set cover Iteratively selects the image covering the most uncovered cells until 100% coverage is reached. This is the classic greedy approximation (ln n + 1 factor). Chvatal (1979)
5. Spatial sort Selected images are sorted clockwise starting from the lower-left corner using angular ordering around the centroid. Convex-hull sweep
Usage:
# Install dependencies
pip install opencv-python-headless numpy
# Run (CLI)
python drone_tiling.py --input_dir ./my_drone_frames --output tiles.json
# Run (programmatic)
from drone_tiling import select_tiling_from_directory
selected = select_tiling_from_directory("./my_drone_frames")
print(selected)
Implementation:
"""
Drone Image Tiling Selector
============================
Selects a minimal subset of contiguous aerial drone images that completely
tiles (covers) the area surveyed during a drone flight.
Algorithm & Citations
---------------------
1. **Feature Detection & Matching**: ORB (Oriented FAST and Rotated BRIEF)
detector with brute-force Hamming-distance matching.
- Rublee, E., Rabaud, V., Konolige, K., & Bradski, G. (2011).
"ORB: An efficient alternative to SIFT or SURF."
IEEE International Conference on Computer Vision (ICCV), pp. 2564-2571.
DOI: 10.1109/ICCV.2011.6126544
- OpenCV documentation: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html
2. **Homography Estimation (RANSAC)**: Used to compute the projective
transformation between overlapping image pairs, which gives us the
relative spatial position of each image.
- Fischler, M. A., & Bolles, R. C. (1981).
"Random Sample Consensus: A Paradigm for Model Fitting with
Applications to Image Analysis and Automated Cartography."
Communications of the ACM, 24(6), 381-395.
- OpenCV documentation: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html
3. **Image Stitching / Registration Pipeline**: The pairwise registration
approach follows the methodology in:
- Brown, M., & Lowe, D. G. (2007).
"Automatic Panoramic Image Stitching using Invariant Features."
International Journal of Computer Vision, 74(1), 59-73.
DOI: 10.1007/s11263-006-0002-3
4. **Greedy Weighted Set Cover** for selecting the minimal tiling subset:
- Chvatal, V. (1979).
"A Greedy Heuristic for the Set-Covering Problem."
Mathematics of Operations Research, 4(3), 233-235.
- Vazirani, V. V. (2001). "Approximation Algorithms", Chapter 2.
Springer-Verlag. ISBN: 3-540-65367-8.
Dependencies
------------
pip install opencv-python-headless numpy
Usage
-----
python drone_tiling.py --input_dir ./drone_images --output tiles.json
"""
import os
import glob
import json
import argparse
import logging
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional, Set
import cv2
import numpy as np
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class TilingConfig:
"""Tuneable parameters for the tiling pipeline."""
# ORB feature detector
orb_n_features: int = 3000
orb_scale_factor: float = 1.2
orb_n_levels: int = 8
# Feature matching
match_ratio_thresh: float = 0.75 # Lowe's ratio test threshold
min_good_matches: int = 30 # minimum inlier matches to
# consider a pair overlapping
# RANSAC homography
ransac_reproj_thresh: float = 5.0 # reprojection error in pixels
# Coverage grid resolution (number of cells along the longer axis)
grid_resolution: int = 100
# Overlap: minimum fraction of an image that must overlap with the
# already-covered area for the image to be considered redundant.
redundancy_overlap: float = 0.95
# Image scaling for speed (process at this fraction of original size)
work_scale: float = 0.5
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ImageRecord:
"""Metadata for a single drone image."""
index: int
filepath: str
filename: str
width: int = 0
height: int = 0
# Position of the image centre in the *mosaic* coordinate frame.
cx: float = 0.0
cy: float = 0.0
# 3x3 homography that maps this image into mosaic coordinates.
H: Optional[np.ndarray] = None
@dataclass
class PairwiseMatch:
"""Result of matching two images."""
idx_a: int
idx_b: int
n_inliers: int
H_ab: np.ndarray # homography from image A to image B
confidence: float = 0.0
# ---------------------------------------------------------------------------
# 1. Feature detection & pair-wise matching
# ---------------------------------------------------------------------------
class FeatureMatcher:
"""
Detects ORB features in every image and matches consecutive /
nearby frames to estimate pairwise homographies.
Reference
---------
Rublee et al., "ORB: An efficient alternative to SIFT or SURF", ICCV 2011.
OpenCV ORB docs: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html
"""
def __init__(self, config: TilingConfig):
self.cfg = config
self.orb = cv2.ORB_create(
nfeatures=config.orb_n_features,
scaleFactor=config.orb_scale_factor,
nlevels=config.orb_n_levels,
)
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING)
@staticmethod
def _load_grey(path: str, scale: float) -> np.ndarray:
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
if img is None:
raise FileNotFoundError(f"Cannot read image: {path}")
if scale != 1.0:
img = cv2.resize(img, None, fx=scale, fy=scale,
interpolation=cv2.INTER_AREA)
return img
def _detect(self, grey: np.ndarray):
kp, des = self.orb.detectAndCompute(grey, None)
return kp, des
def _match_pair(self, des_a, des_b, kp_a, kp_b) -> Optional[PairwiseMatch]:
"""
Match descriptors with Lowe's ratio test, then estimate a homography
with RANSAC.
Reference
---------
Lowe, D. G. (2004). "Distinctive Image Features from Scale-Invariant
Keypoints." IJCV, 60(2), 91-110.
Fischler & Bolles (1981). "Random Sample Consensus." CACM.
"""
if des_a is None or des_b is None:
return None
if len(des_a) < 2 or len(des_b) < 2:
return None
raw_matches = self.bf.knnMatch(des_a, des_b, k=2)
good = []
for m_pair in raw_matches:
if len(m_pair) < 2:
continue
m, n = m_pair
if m.distance < self.cfg.match_ratio_thresh * n.distance:
good.append(m)
if len(good) < self.cfg.min_good_matches:
return None
pts_a = np.float32([kp_a[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
pts_b = np.float32([kp_b[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
H, mask = cv2.findHomography(
pts_a, pts_b, cv2.RANSAC, self.cfg.ransac_reproj_thresh
)
if H is None:
return None
n_inliers = int(mask.sum())
if n_inliers < self.cfg.min_good_matches:
return None
confidence = n_inliers / (8.0 + 0.3 * len(good)) # Brown & Lowe 2007
return PairwiseMatch(
idx_a=-1, idx_b=-1,
n_inliers=n_inliers,
H_ab=H,
confidence=confidence,
)
def match_sequence(
self,
records: List[ImageRecord],
search_radius: int = 5,
) -> List[PairwiseMatch]:
"""
For every image, match against up to *search_radius* neighbours in
both directions (handles forward overlap AND lateral overlap from
adjacent flight strips).
"""
n = len(records)
scale = self.cfg.work_scale
features = []
for rec in records:
grey = self._load_grey(rec.filepath, scale)
rec.height, rec.width = grey.shape[:2]
kp, des = self._detect(grey)
features.append((kp, des))
logger.debug(" %s - %d keypoints", rec.filename, len(kp))
matches: List[PairwiseMatch] = []
tested: Set[Tuple[int, int]] = set()
for i in range(n):
for j in range(i + 1, min(i + search_radius + 1, n)):
if (i, j) in tested:
continue
tested.add((i, j))
kp_a, des_a = features[i]
kp_b, des_b = features[j]
pm = self._match_pair(des_a, des_b, kp_a, kp_b)
if pm is not None:
pm.idx_a = i
pm.idx_b = j
matches.append(pm)
logger.debug(
" match %s <-> %s inliers=%d conf=%.3f",
records[i].filename, records[j].filename,
pm.n_inliers, pm.confidence,
)
logger.info("Pairwise matching complete: %d valid pairs from %d images.",
len(matches), n)
return matches
# ---------------------------------------------------------------------------
# 2. Global registration - place every image in a common mosaic frame
# ---------------------------------------------------------------------------
def register_images(
records: List[ImageRecord],
matches: List[PairwiseMatch],
) -> List[ImageRecord]:
"""
Build a connected graph of images and propagate homographies via BFS
so that every image is mapped into the coordinate frame of a single
reference image (the one with the most connections).
Reference
---------
Brown, M. & Lowe, D. G. (2007). "Automatic Panoramic Image Stitching
using Invariant Features." IJCV 74(1), 59-73.
OpenCV Stitching: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html
"""
n = len(records)
adj: Dict[int, List[Tuple[int, np.ndarray, float]]] = {i: [] for i in range(n)}
for pm in matches:
a, b = pm.idx_a, pm.idx_b
adj[a].append((b, pm.H_ab, pm.confidence))
H_inv = np.linalg.inv(pm.H_ab)
adj[b].append((a, H_inv, pm.confidence))
ref = max(range(n), key=lambda i: len(adj[i]))
logger.info("Reference image: %s (index %d, %d neighbours)",
records[ref].filename, ref, len(adj[ref]))
records[ref].H = np.eye(3, dtype=np.float64)
visited = {ref}
queue = [ref]
while queue:
cur = queue.pop(0)
for nb, H_cur_to_nb, _conf in adj[cur]:
if nb in visited:
continue
H_nb_to_mosaic = records[cur].H @ np.linalg.inv(H_cur_to_nb)
records[nb].H = H_nb_to_mosaic
visited.add(nb)
queue.append(nb)
disconnected = [i for i in range(n) if records[i].H is None]
if disconnected:
logger.warning(
"%d images could not be registered (disconnected): %s",
len(disconnected),
[records[i].filename for i in disconnected],
)
for rec in records:
if rec.H is None:
continue
centre_local = np.array([[rec.width / 2, rec.height / 2]], dtype=np.float64)
centre_mosaic = cv2.perspectiveTransform(
centre_local.reshape(-1, 1, 2), rec.H
)
rec.cx, rec.cy = centre_mosaic[0, 0]
return records
# ---------------------------------------------------------------------------
# 3. Coverage grid & greedy set-cover selection
# ---------------------------------------------------------------------------
def _image_footprint(rec: ImageRecord) -> Optional[np.ndarray]:
"""Return the four mosaic-frame corners of *rec* as an (4,2) array."""
if rec.H is None:
return None
corners = np.float64([
[0, 0],
[rec.width, 0],
[rec.width, rec.height],
[0, rec.height],
]).reshape(-1, 1, 2)
warped = cv2.perspectiveTransform(corners, rec.H)
return warped.reshape(-1, 2)
def _build_coverage_grid(
records: List[ImageRecord],
resolution: int,
) -> Tuple[np.ndarray, float, float, float]:
"""Create a boolean grid representing the total survey area."""
all_corners = []
for rec in records:
fp = _image_footprint(rec)
if fp is not None:
all_corners.append(fp)
if not all_corners:
raise RuntimeError("No images could be registered - cannot build grid.")
all_pts = np.vstack(all_corners)
x_min, y_min = all_pts.min(axis=0)
x_max, y_max = all_pts.max(axis=0)
span = max(x_max - x_min, y_max - y_min)
cell_size = span / resolution
cols = int(np.ceil((x_max - x_min) / cell_size)) + 1
rows = int(np.ceil((y_max - y_min) / cell_size)) + 1
grid = np.zeros((rows, cols), dtype=bool)
return grid, cell_size, float(x_min), float(y_min)
def _rasterise_footprint(
corners: np.ndarray,
cell_size: float,
x_min: float,
y_min: float,
grid_shape: Tuple[int, int],
) -> Set[Tuple[int, int]]:
"""Return the set of grid cells covered by the quadrilateral *corners*."""
rows, cols = grid_shape
gc = ((corners - [x_min, y_min]) / cell_size).astype(np.int32)
mask = np.zeros((rows, cols), dtype=np.uint8)
cv2.fillConvexPoly(mask, gc, 1)
cells = set(zip(*np.where(mask > 0)))
return cells
def select_tiling_images(
records: List[ImageRecord],
config: TilingConfig,
) -> List[ImageRecord]:
"""
Greedy weighted set-cover: iteratively pick the image that covers the
most *uncovered* grid cells until the entire survey area is covered.
Reference
---------
Chvatal, V. (1979). "A Greedy Heuristic for the Set-Covering Problem."
Mathematics of Operations Research, 4(3), 233-235.
"""
grid, cell_size, x_min, y_min = _build_coverage_grid(
records, config.grid_resolution
)
grid_shape = grid.shape
universe: Set[Tuple[int, int]] = set()
image_cells: Dict[int, Set[Tuple[int, int]]] = {}
for rec in records:
fp = _image_footprint(rec)
if fp is None:
continue
cells = _rasterise_footprint(fp, cell_size, x_min, y_min, grid_shape)
image_cells[rec.index] = cells
universe |= cells
logger.info("Survey area: %d grid cells to cover.", len(universe))
uncovered = set(universe)
selected_indices: List[int] = []
used: Set[int] = set()
while uncovered:
best_idx = -1
best_gain = 0
for idx, cells in image_cells.items():
if idx in used:
continue
gain = len(cells & uncovered)
if gain > best_gain:
best_gain = gain
best_idx = idx
if best_idx == -1 or best_gain == 0:
logger.warning(
"Cannot cover %d remaining cells - possible registration gaps.",
len(uncovered),
)
break
selected_indices.append(best_idx)
used.add(best_idx)
uncovered -= image_cells[best_idx]
logger.debug(
" selected %s - covers %d new cells, %d remaining",
records[best_idx].filename, best_gain, len(uncovered),
)
logger.info("Selected %d / %d images for full tiling.",
len(selected_indices), len(records))
selected = [records[i] for i in selected_indices]
return selected
# ---------------------------------------------------------------------------
# 4. (Optional) Sort selected images - lower-left origin, clockwise
# ---------------------------------------------------------------------------
def sort_clockwise_from_lower_left(
selected: List[ImageRecord],
) -> List[ImageRecord]:
"""
Sort images starting from the lower-left corner and proceeding clockwise
along the convex-hull perimeter.
"""
if len(selected) <= 1:
return selected
centres = np.array([[r.cx, r.cy] for r in selected])
centroid = centres.mean(axis=0)
dx = centres[:, 0] - centroid[0]
dy = centres[:, 1] - centroid[1]
angles = np.arctan2(dx, dy)
order = np.argsort(angles)
lower_left_idx = int(
np.argmin(centres[:, 0] - centres[:, 1])
)
start = int(np.where(order == lower_left_idx)[0][0])
order = np.roll(order, -start)
return [selected[i] for i in order]
# ---------------------------------------------------------------------------
# 5. Main pipeline
# ---------------------------------------------------------------------------
def load_image_records(input_dir: str) -> List[ImageRecord]:
"""Load image file paths from a directory, sorted by name."""
extensions = ("*.jpg", "*.jpeg", "*.png", "*.tif", "*.tiff", "*.bmp")
paths: List[str] = []
for ext in extensions:
paths.extend(glob.glob(os.path.join(input_dir, ext)))
paths.extend(glob.glob(os.path.join(input_dir, ext.upper())))
paths = sorted(set(paths))
if not paths:
raise FileNotFoundError(f"No image files found in {input_dir}")
records = [
ImageRecord(index=i, filepath=p, filename=os.path.basename(p))
for i, p in enumerate(paths)
]
logger.info("Loaded %d image paths from %s", len(records), input_dir)
return records
def select_tiling_from_directory(
input_dir: str,
config: Optional[TilingConfig] = None,
sort_result: bool = True,
) -> List[str]:
"""
End-to-end pipeline.
Parameters
----------
input_dir : str
Directory containing contiguous drone images.
config : TilingConfig, optional
Pipeline configuration; uses defaults when None.
sort_result : bool
If True, sort selected images clockwise from lower-left.
Returns
-------
list[str]
File paths of the selected tiling images.
"""
if config is None:
config = TilingConfig()
records = load_image_records(input_dir)
matcher = FeatureMatcher(config)
matches = matcher.match_sequence(records)
if not matches:
logger.warning("No pairwise matches found. Returning all images.")
return [r.filepath for r in records]
records = register_images(records, matches)
selected = select_tiling_images(records, config)
if sort_result:
selected = sort_clockwise_from_lower_left(selected)
return [rec.filepath for rec in selected]
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Select a minimal set of drone images that tile the surveyed area."
)
parser.add_argument(
"--input_dir", required=True,
help="Directory containing contiguous aerial drone images.",
)
parser.add_argument(
"--output", default=None,
help="Optional JSON file to write the selected file list to.",
)
parser.add_argument(
"--grid_resolution", type=int, default=100,
help="Grid resolution for coverage computation (default: 100).",
)
parser.add_argument(
"--work_scale", type=float, default=0.5,
help="Down-scale factor for images during processing (default: 0.5).",
)
parser.add_argument(
"--search_radius", type=int, default=5,
help="Match each image against this many neighbours (default: 5).",
)
parser.add_argument(
"--no_sort", action="store_true",
help="Skip clockwise spatial sorting of the result.",
)
args = parser.parse_args()
config = TilingConfig(
grid_resolution=args.grid_resolution,
work_scale=args.work_scale,
)
selected = select_tiling_from_directory(
args.input_dir, config, sort_result=not args.no_sort
)
print(f"\n{'='*60}")
print(f"Selected {len(selected)} images for tiling:")
print(f"{'='*60}")
for i, path in enumerate(selected, 1):
print(f" {i:>3d}. {os.path.basename(path)}")
if args.output:
with open(args.output, "w") as f:
json.dump({"selected_images": selected}, f, indent=2)
print(f"\nResults written to {args.output}")
if __name__ == "__main__":
main()
Tiling of aerial drone survey area
Problem statement:
Most UAV with top-down camera provide a video of their tour regardless of their flight path. This video can be split into several contiguous aerial drone images which may often number in hundreds even for a short duration. We need a software implementation that can take these images as input in the sorted order of the timeline and select those images that can complete tiling of the area surveyed by drone tour. The output of the implementation must be a selection of the original input. Flight path could be assumed to be rectangular with the lower left as origin for simplicity but none of the images have gps information available. Additionally, the selection could be output in the sorted order to start from lower left of the area surveyed and move clockwise along the perimeter but this can be skipped from the implementation.
Solution:
The following implementation uses computer vision (feature matching + homography estimation) to determine spatial relationships between images and then solves a set-cover problem to select a minimal tiling subset. A set of five stages completes the pipeline towards the goal:
Pipeline:
Stage What happens Key algorithm
1. Feature matching ORB keypoints are detected in each image; nearby frames (within search_radius) are matched using brute-force Hamming + Lowe's ratio test, then a homography is estimated with RANSAC. ORB [Rublee 2011], RANSAC [Fischler 1981]
2. Global registration A BFS traversal from the most-connected image propagates homographies so every frame is placed in a single mosaic coordinate system. No GPS needed. Brown & Lowe (2007) panoramic stitching
3. Coverage gridding The mosaic bounding box is discretised into a grid. Each image's warped footprint is rasterised to determine which cells it covers. OpenCV fillConvexPoly
4. Greedy set cover Iteratively selects the image covering the most uncovered cells until 100% coverage is reached. This is the classic greedy approximation (ln n + 1 factor). Chvatal (1979)
5. Spatial sort Selected images are sorted clockwise starting from the lower-left corner using angular ordering around the centroid. Convex-hull sweep
Usage:
# Install dependencies
pip install opencv-python-headless numpy
# Run (CLI)
python drone_tiling.py --input_dir ./my_drone_frames --output tiles.json
# Run (programmatic)
from drone_tiling import select_tiling_from_directory
selected = select_tiling_from_directory("./my_drone_frames")
print(selected)
Implementation:
"""
Drone Image Tiling Selector
============================
Selects a minimal subset of contiguous aerial drone images that completely
tiles (covers) the area surveyed during a drone flight.
Algorithm & Citations
---------------------
1. **Feature Detection & Matching**: ORB (Oriented FAST and Rotated BRIEF)
detector with brute-force Hamming-distance matching.
- Rublee, E., Rabaud, V., Konolige, K., & Bradski, G. (2011).
"ORB: An efficient alternative to SIFT or SURF."
IEEE International Conference on Computer Vision (ICCV), pp. 2564-2571.
DOI: 10.1109/ICCV.2011.6126544
- OpenCV documentation: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html
2. **Homography Estimation (RANSAC)**: Used to compute the projective
transformation between overlapping image pairs, which gives us the
relative spatial position of each image.
- Fischler, M. A., & Bolles, R. C. (1981).
"Random Sample Consensus: A Paradigm for Model Fitting with
Applications to Image Analysis and Automated Cartography."
Communications of the ACM, 24(6), 381-395.
- OpenCV documentation: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html
3. **Image Stitching / Registration Pipeline**: The pairwise registration
approach follows the methodology in:
- Brown, M., & Lowe, D. G. (2007).
"Automatic Panoramic Image Stitching using Invariant Features."
International Journal of Computer Vision, 74(1), 59-73.
DOI: 10.1007/s11263-006-0002-3
4. **Greedy Weighted Set Cover** for selecting the minimal tiling subset:
- Chvatal, V. (1979).
"A Greedy Heuristic for the Set-Covering Problem."
Mathematics of Operations Research, 4(3), 233-235.
- Vazirani, V. V. (2001). "Approximation Algorithms", Chapter 2.
Springer-Verlag. ISBN: 3-540-65367-8.
Dependencies
------------
pip install opencv-python-headless numpy
Usage
-----
python drone_tiling.py --input_dir ./drone_images --output tiles.json
"""
import os
import glob
import json
import argparse
import logging
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional, Set
import cv2
import numpy as np
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class TilingConfig:
"""Tuneable parameters for the tiling pipeline."""
# ORB feature detector
orb_n_features: int = 3000
orb_scale_factor: float = 1.2
orb_n_levels: int = 8
# Feature matching
match_ratio_thresh: float = 0.75 # Lowe's ratio test threshold
min_good_matches: int = 30 # minimum inlier matches to
# consider a pair overlapping
# RANSAC homography
ransac_reproj_thresh: float = 5.0 # reprojection error in pixels
# Coverage grid resolution (number of cells along the longer axis)
grid_resolution: int = 100
# Overlap: minimum fraction of an image that must overlap with the
# already-covered area for the image to be considered redundant.
redundancy_overlap: float = 0.95
# Image scaling for speed (process at this fraction of original size)
work_scale: float = 0.5
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ImageRecord:
"""Metadata for a single drone image."""
index: int
filepath: str
filename: str
width: int = 0
height: int = 0
# Position of the image centre in the *mosaic* coordinate frame.
cx: float = 0.0
cy: float = 0.0
# 3x3 homography that maps this image into mosaic coordinates.
H: Optional[np.ndarray] = None
@dataclass
class PairwiseMatch:
"""Result of matching two images."""
idx_a: int
idx_b: int
n_inliers: int
H_ab: np.ndarray # homography from image A to image B
confidence: float = 0.0
# ---------------------------------------------------------------------------
# 1. Feature detection & pair-wise matching
# ---------------------------------------------------------------------------
class FeatureMatcher:
"""
Detects ORB features in every image and matches consecutive /
nearby frames to estimate pairwise homographies.
Reference
---------
Rublee et al., "ORB: An efficient alternative to SIFT or SURF", ICCV 2011.
OpenCV ORB docs: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html
"""
def __init__(self, config: TilingConfig):
self.cfg = config
self.orb = cv2.ORB_create(
nfeatures=config.orb_n_features,
scaleFactor=config.orb_scale_factor,
nlevels=config.orb_n_levels,
)
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING)
@staticmethod
def _load_grey(path: str, scale: float) -> np.ndarray:
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
if img is None:
raise FileNotFoundError(f"Cannot read image: {path}")
if scale != 1.0:
img = cv2.resize(img, None, fx=scale, fy=scale,
interpolation=cv2.INTER_AREA)
return img
def _detect(self, grey: np.ndarray):
kp, des = self.orb.detectAndCompute(grey, None)
return kp, des
def _match_pair(self, des_a, des_b, kp_a, kp_b) -> Optional[PairwiseMatch]:
"""
Match descriptors with Lowe's ratio test, then estimate a homography
with RANSAC.
Reference
---------
Lowe, D. G. (2004). "Distinctive Image Features from Scale-Invariant
Keypoints." IJCV, 60(2), 91-110.
Fischler & Bolles (1981). "Random Sample Consensus." CACM.
"""
if des_a is None or des_b is None:
return None
if len(des_a) < 2 or len(des_b) < 2:
return None
raw_matches = self.bf.knnMatch(des_a, des_b, k=2)
good = []
for m_pair in raw_matches:
if len(m_pair) < 2:
continue
m, n = m_pair
if m.distance < self.cfg.match_ratio_thresh * n.distance:
good.append(m)
if len(good) < self.cfg.min_good_matches:
return None
pts_a = np.float32([kp_a[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
pts_b = np.float32([kp_b[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
H, mask = cv2.findHomography(
pts_a, pts_b, cv2.RANSAC, self.cfg.ransac_reproj_thresh
)
if H is None:
return None
n_inliers = int(mask.sum())
if n_inliers < self.cfg.min_good_matches:
return None
confidence = n_inliers / (8.0 + 0.3 * len(good)) # Brown & Lowe 2007
return PairwiseMatch(
idx_a=-1, idx_b=-1,
n_inliers=n_inliers,
H_ab=H,
confidence=confidence,
)
def match_sequence(
self,
records: List[ImageRecord],
search_radius: int = 5,
) -> List[PairwiseMatch]:
"""
For every image, match against up to *search_radius* neighbours in
both directions (handles forward overlap AND lateral overlap from
adjacent flight strips).
"""
n = len(records)
scale = self.cfg.work_scale
features = []
for rec in records:
grey = self._load_grey(rec.filepath, scale)
rec.height, rec.width = grey.shape[:2]
kp, des = self._detect(grey)
features.append((kp, des))
logger.debug(" %s - %d keypoints", rec.filename, len(kp))
matches: List[PairwiseMatch] = []
tested: Set[Tuple[int, int]] = set()
for i in range(n):
for j in range(i + 1, min(i + search_radius + 1, n)):
if (i, j) in tested:
continue
tested.add((i, j))
kp_a, des_a = features[i]
kp_b, des_b = features[j]
pm = self._match_pair(des_a, des_b, kp_a, kp_b)
if pm is not None:
pm.idx_a = i
pm.idx_b = j
matches.append(pm)
logger.debug(
" match %s <-> %s inliers=%d conf=%.3f",
records[i].filename, records[j].filename,
pm.n_inliers, pm.confidence,
)
logger.info("Pairwise matching complete: %d valid pairs from %d images.",
len(matches), n)
return matches
# ---------------------------------------------------------------------------
# 2. Global registration - place every image in a common mosaic frame
# ---------------------------------------------------------------------------
def register_images(
records: List[ImageRecord],
matches: List[PairwiseMatch],
) -> List[ImageRecord]:
"""
Build a connected graph of images and propagate homographies via BFS
so that every image is mapped into the coordinate frame of a single
reference image (the one with the most connections).
Reference
---------
Brown, M. & Lowe, D. G. (2007). "Automatic Panoramic Image Stitching
using Invariant Features." IJCV 74(1), 59-73.
OpenCV Stitching: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html
"""
n = len(records)
adj: Dict[int, List[Tuple[int, np.ndarray, float]]] = {i: [] for i in range(n)}
for pm in matches:
a, b = pm.idx_a, pm.idx_b
adj[a].append((b, pm.H_ab, pm.confidence))
H_inv = np.linalg.inv(pm.H_ab)
adj[b].append((a, H_inv, pm.confidence))
ref = max(range(n), key=lambda i: len(adj[i]))
logger.info("Reference image: %s (index %d, %d neighbours)",
records[ref].filename, ref, len(adj[ref]))
records[ref].H = np.eye(3, dtype=np.float64)
visited = {ref}
queue = [ref]
while queue:
cur = queue.pop(0)
for nb, H_cur_to_nb, _conf in adj[cur]:
if nb in visited:
continue
H_nb_to_mosaic = records[cur].H @ np.linalg.inv(H_cur_to_nb)
records[nb].H = H_nb_to_mosaic
visited.add(nb)
queue.append(nb)
disconnected = [i for i in range(n) if records[i].H is None]
if disconnected:
logger.warning(
"%d images could not be registered (disconnected): %s",
len(disconnected),
[records[i].filename for i in disconnected],
)
for rec in records:
if rec.H is None:
continue
centre_local = np.array([[rec.width / 2, rec.height / 2]], dtype=np.float64)
centre_mosaic = cv2.perspectiveTransform(
centre_local.reshape(-1, 1, 2), rec.H
)
rec.cx, rec.cy = centre_mosaic[0, 0]
return records
# ---------------------------------------------------------------------------
# 3. Coverage grid & greedy set-cover selection
# ---------------------------------------------------------------------------
def _image_footprint(rec: ImageRecord) -> Optional[np.ndarray]:
"""Return the four mosaic-frame corners of *rec* as an (4,2) array."""
if rec.H is None:
return None
corners = np.float64([
[0, 0],
[rec.width, 0],
[rec.width, rec.height],
[0, rec.height],
]).reshape(-1, 1, 2)
warped = cv2.perspectiveTransform(corners, rec.H)
return warped.reshape(-1, 2)
def _build_coverage_grid(
records: List[ImageRecord],
resolution: int,
) -> Tuple[np.ndarray, float, float, float]:
"""Create a boolean grid representing the total survey area."""
all_corners = []
for rec in records:
fp = _image_footprint(rec)
if fp is not None:
all_corners.append(fp)
if not all_corners:
raise RuntimeError("No images could be registered - cannot build grid.")
all_pts = np.vstack(all_corners)
x_min, y_min = all_pts.min(axis=0)
x_max, y_max = all_pts.max(axis=0)
span = max(x_max - x_min, y_max - y_min)
cell_size = span / resolution
cols = int(np.ceil((x_max - x_min) / cell_size)) + 1
rows = int(np.ceil((y_max - y_min) / cell_size)) + 1
grid = np.zeros((rows, cols), dtype=bool)
return grid, cell_size, float(x_min), float(y_min)
def _rasterise_footprint(
corners: np.ndarray,
cell_size: float,
x_min: float,
y_min: float,
grid_shape: Tuple[int, int],
) -> Set[Tuple[int, int]]:
"""Return the set of grid cells covered by the quadrilateral *corners*."""
rows, cols = grid_shape
gc = ((corners - [x_min, y_min]) / cell_size).astype(np.int32)
mask = np.zeros((rows, cols), dtype=np.uint8)
cv2.fillConvexPoly(mask, gc, 1)
cells = set(zip(*np.where(mask > 0)))
return cells
def select_tiling_images(
records: List[ImageRecord],
config: TilingConfig,
) -> List[ImageRecord]:
"""
Greedy weighted set-cover: iteratively pick the image that covers the
most *uncovered* grid cells until the entire survey area is covered.
Reference
---------
Chvatal, V. (1979). "A Greedy Heuristic for the Set-Covering Problem."
Mathematics of Operations Research, 4(3), 233-235.
"""
grid, cell_size, x_min, y_min = _build_coverage_grid(
records, config.grid_resolution
)
grid_shape = grid.shape
universe: Set[Tuple[int, int]] = set()
image_cells: Dict[int, Set[Tuple[int, int]]] = {}
for rec in records:
fp = _image_footprint(rec)
if fp is None:
continue
cells = _rasterise_footprint(fp, cell_size, x_min, y_min, grid_shape)
image_cells[rec.index] = cells
universe |= cells
logger.info("Survey area: %d grid cells to cover.", len(universe))
uncovered = set(universe)
selected_indices: List[int] = []
used: Set[int] = set()
while uncovered:
best_idx = -1
best_gain = 0
for idx, cells in image_cells.items():
if idx in used:
continue
gain = len(cells & uncovered)
if gain > best_gain:
best_gain = gain
best_idx = idx
if best_idx == -1 or best_gain == 0:
logger.warning(
"Cannot cover %d remaining cells - possible registration gaps.",
len(uncovered),
)
break
selected_indices.append(best_idx)
used.add(best_idx)
uncovered -= image_cells[best_idx]
logger.debug(
" selected %s - covers %d new cells, %d remaining",
records[best_idx].filename, best_gain, len(uncovered),
)
logger.info("Selected %d / %d images for full tiling.",
len(selected_indices), len(records))
selected = [records[i] for i in selected_indices]
return selected
# ---------------------------------------------------------------------------
# 4. (Optional) Sort selected images - lower-left origin, clockwise
# ---------------------------------------------------------------------------
def sort_clockwise_from_lower_left(
selected: List[ImageRecord],
) -> List[ImageRecord]:
"""
Sort images starting from the lower-left corner and proceeding clockwise
along the convex-hull perimeter.
"""
if len(selected) <= 1:
return selected
centres = np.array([[r.cx, r.cy] for r in selected])
centroid = centres.mean(axis=0)
dx = centres[:, 0] - centroid[0]
dy = centres[:, 1] - centroid[1]
angles = np.arctan2(dx, dy)
order = np.argsort(angles)
lower_left_idx = int(
np.argmin(centres[:, 0] - centres[:, 1])
)
start = int(np.where(order == lower_left_idx)[0][0])
order = np.roll(order, -start)
return [selected[i] for i in order]
# ---------------------------------------------------------------------------
# 5. Main pipeline
# ---------------------------------------------------------------------------
def load_image_records(input_dir: str) -> List[ImageRecord]:
"""Load image file paths from a directory, sorted by name."""
extensions = ("*.jpg", "*.jpeg", "*.png", "*.tif", "*.tiff", "*.bmp")
paths: List[str] = []
for ext in extensions:
paths.extend(glob.glob(os.path.join(input_dir, ext)))
paths.extend(glob.glob(os.path.join(input_dir, ext.upper())))
paths = sorted(set(paths))
if not paths:
raise FileNotFoundError(f"No image files found in {input_dir}")
records = [
ImageRecord(index=i, filepath=p, filename=os.path.basename(p))
for i, p in enumerate(paths)
]
logger.info("Loaded %d image paths from %s", len(records), input_dir)
return records
def select_tiling_from_directory(
input_dir: str,
config: Optional[TilingConfig] = None,
sort_result: bool = True,
) -> List[str]:
"""
End-to-end pipeline.
Parameters
----------
input_dir : str
Directory containing contiguous drone images.
config : TilingConfig, optional
Pipeline configuration; uses defaults when None.
sort_result : bool
If True, sort selected images clockwise from lower-left.
Returns
-------
list[str]
File paths of the selected tiling images.
"""
if config is None:
config = TilingConfig()
records = load_image_records(input_dir)
matcher = FeatureMatcher(config)
matches = matcher.match_sequence(records)
if not matches:
logger.warning("No pairwise matches found. Returning all images.")
return [r.filepath for r in records]
records = register_images(records, matches)
selected = select_tiling_images(records, config)
if sort_result:
selected = sort_clockwise_from_lower_left(selected)
return [rec.filepath for rec in selected]
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Select a minimal set of drone images that tile the surveyed area."
)
parser.add_argument(
"--input_dir", required=True,
help="Directory containing contiguous aerial drone images.",
)
parser.add_argument(
"--output", default=None,
help="Optional JSON file to write the selected file list to.",
)
parser.add_argument(
"--grid_resolution", type=int, default=100,
help="Grid resolution for coverage computation (default: 100).",
)
parser.add_argument(
"--work_scale", type=float, default=0.5,
help="Down-scale factor for images during processing (default: 0.5).",
)
parser.add_argument(
"--search_radius", type=int, default=5,
help="Match each image against this many neighbours (default: 5).",
)
parser.add_argument(
"--no_sort", action="store_true",
help="Skip clockwise spatial sorting of the result.",
)
args = parser.parse_args()
config = TilingConfig(
grid_resolution=args.grid_resolution,
work_scale=args.work_scale,
)
selected = select_tiling_from_directory(
args.input_dir, config, sort_result=not args.no_sort
)
print(f"\n{'='*60}")
print(f"Selected {len(selected)} images for tiling:")
print(f"{'='*60}")
for i, path in enumerate(selected, 1):
print(f" {i:>3d}. {os.path.basename(path)}")
if args.output:
with open(args.output, "w") as f:
json.dump({"selected_images": selected}, f, indent=2)
print(f"\nResults written to {args.output}")
if __name__ == "__main__":
main()
Tiling of aerial drone survey area
Problem statement:
Most UAV with top-down camera provide a video of their tour regardless of their flight path. This video can be split into several contiguous aerial drone images which may often number in hundreds even for a short duration. We need a software implementation that can take these images as input in the sorted order of the timeline and select those images that can complete tiling of the area surveyed by drone tour. The output of the implementation must be a selection of the original input. Flight path could be assumed to be rectangular with the lower left as origin for simplicity but none of the images have gps information available. Additionally, the selection could be output in the sorted order to start from lower left of the area surveyed and move clockwise along the perimeter but this can be skipped from the implementation.
Solution:
The following implementation uses computer vision (feature matching + homography estimation) to determine spatial relationships between images and then solves a set-cover problem to select a minimal tiling subset. A set of five stages completes the pipeline towards the goal:
Pipeline:
Stage What happens Key algorithm
1. Feature matching ORB keypoints are detected in each image; nearby frames (within search_radius) are matched using brute-force Hamming + Lowe's ratio test, then a homography is estimated with RANSAC. ORB [Rublee 2011], RANSAC [Fischler 1981]
2. Global registration A BFS traversal from the most-connected image propagates homographies so every frame is placed in a single mosaic coordinate system. No GPS needed. Brown & Lowe (2007) panoramic stitching
3. Coverage gridding The mosaic bounding box is discretised into a grid. Each image's warped footprint is rasterised to determine which cells it covers. OpenCV fillConvexPoly
4. Greedy set cover Iteratively selects the image covering the most uncovered cells until 100% coverage is reached. This is the classic greedy approximation (ln n + 1 factor). Chvatal (1979)
5. Spatial sort Selected images are sorted clockwise starting from the lower-left corner using angular ordering around the centroid. Convex-hull sweep
Usage:
# Install dependencies
pip install opencv-python-headless numpy
# Run (CLI)
python drone_tiling.py --input_dir ./my_drone_frames --output tiles.json
# Run (programmatic)
from drone_tiling import select_tiling_from_directory
selected = select_tiling_from_directory("./my_drone_frames")
print(selected)
Implementation:
"""
Drone Image Tiling Selector
============================
Selects a minimal subset of contiguous aerial drone images that completely
tiles (covers) the area surveyed during a drone flight.
Algorithm & Citations
---------------------
1. **Feature Detection & Matching**: ORB (Oriented FAST and Rotated BRIEF)
detector with brute-force Hamming-distance matching.
- Rublee, E., Rabaud, V., Konolige, K., & Bradski, G. (2011).
"ORB: An efficient alternative to SIFT or SURF."
IEEE International Conference on Computer Vision (ICCV), pp. 2564-2571.
DOI: 10.1109/ICCV.2011.6126544
- OpenCV documentation: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html
2. **Homography Estimation (RANSAC)**: Used to compute the projective
transformation between overlapping image pairs, which gives us the
relative spatial position of each image.
- Fischler, M. A., & Bolles, R. C. (1981).
"Random Sample Consensus: A Paradigm for Model Fitting with
Applications to Image Analysis and Automated Cartography."
Communications of the ACM, 24(6), 381-395.
- OpenCV documentation: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html
3. **Image Stitching / Registration Pipeline**: The pairwise registration
approach follows the methodology in:
- Brown, M., & Lowe, D. G. (2007).
"Automatic Panoramic Image Stitching using Invariant Features."
International Journal of Computer Vision, 74(1), 59-73.
DOI: 10.1007/s11263-006-0002-3
4. **Greedy Weighted Set Cover** for selecting the minimal tiling subset:
- Chvatal, V. (1979).
"A Greedy Heuristic for the Set-Covering Problem."
Mathematics of Operations Research, 4(3), 233-235.
- Vazirani, V. V. (2001). "Approximation Algorithms", Chapter 2.
Springer-Verlag. ISBN: 3-540-65367-8.
Dependencies
------------
pip install opencv-python-headless numpy
Usage
-----
python drone_tiling.py --input_dir ./drone_images --output tiles.json
"""
import os
import glob
import json
import argparse
import logging
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional, Set
import cv2
import numpy as np
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class TilingConfig:
"""Tuneable parameters for the tiling pipeline."""
# ORB feature detector
orb_n_features: int = 3000
orb_scale_factor: float = 1.2
orb_n_levels: int = 8
# Feature matching
match_ratio_thresh: float = 0.75 # Lowe's ratio test threshold
min_good_matches: int = 30 # minimum inlier matches to
# consider a pair overlapping
# RANSAC homography
ransac_reproj_thresh: float = 5.0 # reprojection error in pixels
# Coverage grid resolution (number of cells along the longer axis)
grid_resolution: int = 100
# Overlap: minimum fraction of an image that must overlap with the
# already-covered area for the image to be considered redundant.
redundancy_overlap: float = 0.95
# Image scaling for speed (process at this fraction of original size)
work_scale: float = 0.5
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ImageRecord:
"""Metadata for a single drone image."""
index: int
filepath: str
filename: str
width: int = 0
height: int = 0
# Position of the image centre in the *mosaic* coordinate frame.
cx: float = 0.0
cy: float = 0.0
# 3x3 homography that maps this image into mosaic coordinates.
H: Optional[np.ndarray] = None
@dataclass
class PairwiseMatch:
"""Result of matching two images."""
idx_a: int
idx_b: int
n_inliers: int
H_ab: np.ndarray # homography from image A to image B
confidence: float = 0.0
# ---------------------------------------------------------------------------
# 1. Feature detection & pair-wise matching
# ---------------------------------------------------------------------------
class FeatureMatcher:
"""
Detects ORB features in every image and matches consecutive /
nearby frames to estimate pairwise homographies.
Reference
---------
Rublee et al., "ORB: An efficient alternative to SIFT or SURF", ICCV 2011.
OpenCV ORB docs: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html
"""
def __init__(self, config: TilingConfig):
self.cfg = config
self.orb = cv2.ORB_create(
nfeatures=config.orb_n_features,
scaleFactor=config.orb_scale_factor,
nlevels=config.orb_n_levels,
)
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING)
@staticmethod
def _load_grey(path: str, scale: float) -> np.ndarray:
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
if img is None:
raise FileNotFoundError(f"Cannot read image: {path}")
if scale != 1.0:
img = cv2.resize(img, None, fx=scale, fy=scale,
interpolation=cv2.INTER_AREA)
return img
def _detect(self, grey: np.ndarray):
kp, des = self.orb.detectAndCompute(grey, None)
return kp, des
def _match_pair(self, des_a, des_b, kp_a, kp_b) -> Optional[PairwiseMatch]:
"""
Match descriptors with Lowe's ratio test, then estimate a homography
with RANSAC.
Reference
---------
Lowe, D. G. (2004). "Distinctive Image Features from Scale-Invariant
Keypoints." IJCV, 60(2), 91-110.
Fischler & Bolles (1981). "Random Sample Consensus." CACM.
"""
if des_a is None or des_b is None:
return None
if len(des_a) < 2 or len(des_b) < 2:
return None
raw_matches = self.bf.knnMatch(des_a, des_b, k=2)
good = []
for m_pair in raw_matches:
if len(m_pair) < 2:
continue
m, n = m_pair
if m.distance < self.cfg.match_ratio_thresh * n.distance:
good.append(m)
if len(good) < self.cfg.min_good_matches:
return None
pts_a = np.float32([kp_a[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
pts_b = np.float32([kp_b[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
H, mask = cv2.findHomography(
pts_a, pts_b, cv2.RANSAC, self.cfg.ransac_reproj_thresh
)
if H is None:
return None
n_inliers = int(mask.sum())
if n_inliers < self.cfg.min_good_matches:
return None
confidence = n_inliers / (8.0 + 0.3 * len(good)) # Brown & Lowe 2007
return PairwiseMatch(
idx_a=-1, idx_b=-1,
n_inliers=n_inliers,
H_ab=H,
confidence=confidence,
)
def match_sequence(
self,
records: List[ImageRecord],
search_radius: int = 5,
) -> List[PairwiseMatch]:
"""
For every image, match against up to *search_radius* neighbours in
both directions (handles forward overlap AND lateral overlap from
adjacent flight strips).
"""
n = len(records)
scale = self.cfg.work_scale
features = []
for rec in records:
grey = self._load_grey(rec.filepath, scale)
rec.height, rec.width = grey.shape[:2]
kp, des = self._detect(grey)
features.append((kp, des))
logger.debug(" %s - %d keypoints", rec.filename, len(kp))
matches: List[PairwiseMatch] = []
tested: Set[Tuple[int, int]] = set()
for i in range(n):
for j in range(i + 1, min(i + search_radius + 1, n)):
if (i, j) in tested:
continue
tested.add((i, j))
kp_a, des_a = features[i]
kp_b, des_b = features[j]
pm = self._match_pair(des_a, des_b, kp_a, kp_b)
if pm is not None:
pm.idx_a = i
pm.idx_b = j
matches.append(pm)
logger.debug(
" match %s <-> %s inliers=%d conf=%.3f",
records[i].filename, records[j].filename,
pm.n_inliers, pm.confidence,
)
logger.info("Pairwise matching complete: %d valid pairs from %d images.",
len(matches), n)
return matches
# ---------------------------------------------------------------------------
# 2. Global registration - place every image in a common mosaic frame
# ---------------------------------------------------------------------------
def register_images(
records: List[ImageRecord],
matches: List[PairwiseMatch],
) -> List[ImageRecord]:
"""
Build a connected graph of images and propagate homographies via BFS
so that every image is mapped into the coordinate frame of a single
reference image (the one with the most connections).
Reference
---------
Brown, M. & Lowe, D. G. (2007). "Automatic Panoramic Image Stitching
using Invariant Features." IJCV 74(1), 59-73.
OpenCV Stitching: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html
"""
n = len(records)
adj: Dict[int, List[Tuple[int, np.ndarray, float]]] = {i: [] for i in range(n)}
for pm in matches:
a, b = pm.idx_a, pm.idx_b
adj[a].append((b, pm.H_ab, pm.confidence))
H_inv = np.linalg.inv(pm.H_ab)
adj[b].append((a, H_inv, pm.confidence))
ref = max(range(n), key=lambda i: len(adj[i]))
logger.info("Reference image: %s (index %d, %d neighbours)",
records[ref].filename, ref, len(adj[ref]))
records[ref].H = np.eye(3, dtype=np.float64)
visited = {ref}
queue = [ref]
while queue:
cur = queue.pop(0)
for nb, H_cur_to_nb, _conf in adj[cur]:
if nb in visited:
continue
H_nb_to_mosaic = records[cur].H @ np.linalg.inv(H_cur_to_nb)
records[nb].H = H_nb_to_mosaic
visited.add(nb)
queue.append(nb)
disconnected = [i for i in range(n) if records[i].H is None]
if disconnected:
logger.warning(
"%d images could not be registered (disconnected): %s",
len(disconnected),
[records[i].filename for i in disconnected],
)
for rec in records:
if rec.H is None:
continue
centre_local = np.array([[rec.width / 2, rec.height / 2]], dtype=np.float64)
centre_mosaic = cv2.perspectiveTransform(
centre_local.reshape(-1, 1, 2), rec.H
)
rec.cx, rec.cy = centre_mosaic[0, 0]
return records
# ---------------------------------------------------------------------------
# 3. Coverage grid & greedy set-cover selection
# ---------------------------------------------------------------------------
def _image_footprint(rec: ImageRecord) -> Optional[np.ndarray]:
"""Return the four mosaic-frame corners of *rec* as an (4,2) array."""
if rec.H is None:
return None
corners = np.float64([
[0, 0],
[rec.width, 0],
[rec.width, rec.height],
[0, rec.height],
]).reshape(-1, 1, 2)
warped = cv2.perspectiveTransform(corners, rec.H)
return warped.reshape(-1, 2)
def _build_coverage_grid(
records: List[ImageRecord],
resolution: int,
) -> Tuple[np.ndarray, float, float, float]:
"""Create a boolean grid representing the total survey area."""
all_corners = []
for rec in records:
fp = _image_footprint(rec)
if fp is not None:
all_corners.append(fp)
if not all_corners:
raise RuntimeError("No images could be registered - cannot build grid.")
all_pts = np.vstack(all_corners)
x_min, y_min = all_pts.min(axis=0)
x_max, y_max = all_pts.max(axis=0)
span = max(x_max - x_min, y_max - y_min)
cell_size = span / resolution
cols = int(np.ceil((x_max - x_min) / cell_size)) + 1
rows = int(np.ceil((y_max - y_min) / cell_size)) + 1
grid = np.zeros((rows, cols), dtype=bool)
return grid, cell_size, float(x_min), float(y_min)
def _rasterise_footprint(
corners: np.ndarray,
cell_size: float,
x_min: float,
y_min: float,
grid_shape: Tuple[int, int],
) -> Set[Tuple[int, int]]:
"""Return the set of grid cells covered by the quadrilateral *corners*."""
rows, cols = grid_shape
gc = ((corners - [x_min, y_min]) / cell_size).astype(np.int32)
mask = np.zeros((rows, cols), dtype=np.uint8)
cv2.fillConvexPoly(mask, gc, 1)
cells = set(zip(*np.where(mask > 0)))
return cells
def select_tiling_images(
records: List[ImageRecord],
config: TilingConfig,
) -> List[ImageRecord]:
"""
Greedy weighted set-cover: iteratively pick the image that covers the
most *uncovered* grid cells until the entire survey area is covered.
Reference
---------
Chvatal, V. (1979). "A Greedy Heuristic for the Set-Covering Problem."
Mathematics of Operations Research, 4(3), 233-235.
"""
grid, cell_size, x_min, y_min = _build_coverage_grid(
records, config.grid_resolution
)
grid_shape = grid.shape
universe: Set[Tuple[int, int]] = set()
image_cells: Dict[int, Set[Tuple[int, int]]] = {}
for rec in records:
fp = _image_footprint(rec)
if fp is None:
continue
cells = _rasterise_footprint(fp, cell_size, x_min, y_min, grid_shape)
image_cells[rec.index] = cells
universe |= cells
logger.info("Survey area: %d grid cells to cover.", len(universe))
uncovered = set(universe)
selected_indices: List[int] = []
used: Set[int] = set()
while uncovered:
best_idx = -1
best_gain = 0
for idx, cells in image_cells.items():
if idx in used:
continue
gain = len(cells & uncovered)
if gain > best_gain:
best_gain = gain
best_idx = idx
if best_idx == -1 or best_gain == 0:
logger.warning(
"Cannot cover %d remaining cells - possible registration gaps.",
len(uncovered),
)
break
selected_indices.append(best_idx)
used.add(best_idx)
uncovered -= image_cells[best_idx]
logger.debug(
" selected %s - covers %d new cells, %d remaining",
records[best_idx].filename, best_gain, len(uncovered),
)
logger.info("Selected %d / %d images for full tiling.",
len(selected_indices), len(records))
selected = [records[i] for i in selected_indices]
return selected
# ---------------------------------------------------------------------------
# 4. (Optional) Sort selected images - lower-left origin, clockwise
# ---------------------------------------------------------------------------
def sort_clockwise_from_lower_left(
selected: List[ImageRecord],
) -> List[ImageRecord]:
"""
Sort images starting from the lower-left corner and proceeding clockwise
along the convex-hull perimeter.
"""
if len(selected) <= 1:
return selected
centres = np.array([[r.cx, r.cy] for r in selected])
centroid = centres.mean(axis=0)
dx = centres[:, 0] - centroid[0]
dy = centres[:, 1] - centroid[1]
angles = np.arctan2(dx, dy)
order = np.argsort(angles)
lower_left_idx = int(
np.argmin(centres[:, 0] - centres[:, 1])
)
start = int(np.where(order == lower_left_idx)[0][0])
order = np.roll(order, -start)
return [selected[i] for i in order]
# ---------------------------------------------------------------------------
# 5. Main pipeline
# ---------------------------------------------------------------------------
def load_image_records(input_dir: str) -> List[ImageRecord]:
"""Load image file paths from a directory, sorted by name."""
extensions = ("*.jpg", "*.jpeg", "*.png", "*.tif", "*.tiff", "*.bmp")
paths: List[str] = []
for ext in extensions:
paths.extend(glob.glob(os.path.join(input_dir, ext)))
paths.extend(glob.glob(os.path.join(input_dir, ext.upper())))
paths = sorted(set(paths))
if not paths:
raise FileNotFoundError(f"No image files found in {input_dir}")
records = [
ImageRecord(index=i, filepath=p, filename=os.path.basename(p))
for i, p in enumerate(paths)
]
logger.info("Loaded %d image paths from %s", len(records), input_dir)
return records
def select_tiling_from_directory(
input_dir: str,
config: Optional[TilingConfig] = None,
sort_result: bool = True,
) -> List[str]:
"""
End-to-end pipeline.
Parameters
----------
input_dir : str
Directory containing contiguous drone images.
config : TilingConfig, optional
Pipeline configuration; uses defaults when None.
sort_result : bool
If True, sort selected images clockwise from lower-left.
Returns
-------
list[str]
File paths of the selected tiling images.
"""
if config is None:
config = TilingConfig()
records = load_image_records(input_dir)
matcher = FeatureMatcher(config)
matches = matcher.match_sequence(records)
if not matches:
logger.warning("No pairwise matches found. Returning all images.")
return [r.filepath for r in records]
records = register_images(records, matches)
selected = select_tiling_images(records, config)
if sort_result:
selected = sort_clockwise_from_lower_left(selected)
return [rec.filepath for rec in selected]
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Select a minimal set of drone images that tile the surveyed area."
)
parser.add_argument(
"--input_dir", required=True,
help="Directory containing contiguous aerial drone images.",
)
parser.add_argument(
"--output", default=None,
help="Optional JSON file to write the selected file list to.",
)
parser.add_argument(
"--grid_resolution", type=int, default=100,
help="Grid resolution for coverage computation (default: 100).",
)
parser.add_argument(
"--work_scale", type=float, default=0.5,
help="Down-scale factor for images during processing (default: 0.5).",
)
parser.add_argument(
"--search_radius", type=int, default=5,
help="Match each image against this many neighbours (default: 5).",
)
parser.add_argument(
"--no_sort", action="store_true",
help="Skip clockwise spatial sorting of the result.",
)
args = parser.parse_args()
config = TilingConfig(
grid_resolution=args.grid_resolution,
work_scale=args.work_scale,
)
selected = select_tiling_from_directory(
args.input_dir, config, sort_result=not args.no_sort
)
print(f"\n{'='*60}")
print(f"Selected {len(selected)} images for tiling:")
print(f"{'='*60}")
for i, path in enumerate(selected, 1):
print(f" {i:>3d}. {os.path.basename(path)}")
if args.output:
with open(args.output, "w") as f:
json.dump({"selected_images": selected}, f, indent=2)
print(f"\nResults written to {args.output}")
if __name__ == "__main__":
main()
No comments:
Post a Comment