Friday, May 8, 2026

 Tiling of aerial drone survey area

Problem statement:

Most UAV with top-down camera provide a video of their tour regardless of their flight path. This video can be split into several contiguous aerial drone images which may often number in hundreds even for a short duration. We need a software implementation that can take these images as input in the sorted order of the timeline and select those images that can complete tiling of the area surveyed by drone tour. The output of the implementation must be a selection of the original input. Flight path could be assumed to be rectangular with the lower left as origin for simplicity but none of the images have gps information available. Additionally, the selection could be output in the sorted order to start from lower left of the area surveyed and move clockwise along the perimeter but this can be skipped from the implementation.

Solution:

The following implementation uses computer vision (feature matching + homography estimation) to determine spatial relationships between images and then solves a set-cover problem to select a minimal tiling subset. A set of five stages completes the pipeline towards the goal:

Pipeline:

Stage What happens Key algorithm

1. Feature matching ORB keypoints are detected in each image; nearby frames (within search_radius) are matched using brute-force Hamming + Lowe's ratio test, then a homography is estimated with RANSAC. ORB [Rublee 2011], RANSAC [Fischler 1981]

2. Global registration A BFS traversal from the most-connected image propagates homographies so every frame is placed in a single mosaic coordinate system. No GPS needed. Brown & Lowe (2007) panoramic stitching

3. Coverage gridding The mosaic bounding box is discretised into a grid. Each image's warped footprint is rasterised to determine which cells it covers. OpenCV fillConvexPoly

4. Greedy set cover Iteratively selects the image covering the most uncovered cells until 100% coverage is reached. This is the classic greedy approximation (ln n + 1 factor). Chvatal (1979)

5. Spatial sort Selected images are sorted clockwise starting from the lower-left corner using angular ordering around the centroid. Convex-hull sweep

Usage:

# Install dependencies

pip install opencv-python-headless numpy

# Run (CLI)

python drone_tiling.py --input_dir ./my_drone_frames --output tiles.json

# Run (programmatic)

from drone_tiling import select_tiling_from_directory

selected = select_tiling_from_directory("./my_drone_frames")

print(selected)

Implementation:

"""

Drone Image Tiling Selector

============================

Selects a minimal subset of contiguous aerial drone images that completely

tiles (covers) the area surveyed during a drone flight.

Algorithm & Citations

---------------------

1. **Feature Detection & Matching**: ORB (Oriented FAST and Rotated BRIEF)

   detector with brute-force Hamming-distance matching.

   - Rublee, E., Rabaud, V., Konolige, K., & Bradski, G. (2011).

     "ORB: An efficient alternative to SIFT or SURF."

     IEEE International Conference on Computer Vision (ICCV), pp. 2564-2571.

     DOI: 10.1109/ICCV.2011.6126544

   - OpenCV documentation: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html

2. **Homography Estimation (RANSAC)**: Used to compute the projective

   transformation between overlapping image pairs, which gives us the

   relative spatial position of each image.

   - Fischler, M. A., & Bolles, R. C. (1981).

     "Random Sample Consensus: A Paradigm for Model Fitting with

     Applications to Image Analysis and Automated Cartography."

     Communications of the ACM, 24(6), 381-395.

   - OpenCV documentation: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html

3. **Image Stitching / Registration Pipeline**: The pairwise registration

   approach follows the methodology in:

   - Brown, M., & Lowe, D. G. (2007).

     "Automatic Panoramic Image Stitching using Invariant Features."

     International Journal of Computer Vision, 74(1), 59-73.

     DOI: 10.1007/s11263-006-0002-3

4. **Greedy Weighted Set Cover** for selecting the minimal tiling subset:

   - Chvatal, V. (1979).

     "A Greedy Heuristic for the Set-Covering Problem."

     Mathematics of Operations Research, 4(3), 233-235.

   - Vazirani, V. V. (2001). "Approximation Algorithms", Chapter 2.

     Springer-Verlag. ISBN: 3-540-65367-8.

Dependencies

------------

    pip install opencv-python-headless numpy

Usage

-----

    python drone_tiling.py --input_dir ./drone_images --output tiles.json

"""

import os

import glob

import json

import argparse

import logging

from dataclasses import dataclass, field

from typing import List, Tuple, Dict, Optional, Set

import cv2

import numpy as np

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------

# Configuration

# ---------------------------------------------------------------------------

@dataclass

class TilingConfig:

    """Tuneable parameters for the tiling pipeline."""

    # ORB feature detector

    orb_n_features: int = 3000

    orb_scale_factor: float = 1.2

    orb_n_levels: int = 8

    # Feature matching

    match_ratio_thresh: float = 0.75 # Lowe's ratio test threshold

    min_good_matches: int = 30 # minimum inlier matches to

                                              # consider a pair overlapping

    # RANSAC homography

    ransac_reproj_thresh: float = 5.0 # reprojection error in pixels

    # Coverage grid resolution (number of cells along the longer axis)

    grid_resolution: int = 100

    # Overlap: minimum fraction of an image that must overlap with the

    # already-covered area for the image to be considered redundant.

    redundancy_overlap: float = 0.95

    # Image scaling for speed (process at this fraction of original size)

    work_scale: float = 0.5

# ---------------------------------------------------------------------------

# Data structures

# ---------------------------------------------------------------------------

@dataclass

class ImageRecord:

    """Metadata for a single drone image."""

    index: int

    filepath: str

    filename: str

    width: int = 0

    height: int = 0

    # Position of the image centre in the *mosaic* coordinate frame.

    cx: float = 0.0

    cy: float = 0.0

    # 3x3 homography that maps this image into mosaic coordinates.

    H: Optional[np.ndarray] = None

@dataclass

class PairwiseMatch:

    """Result of matching two images."""

    idx_a: int

    idx_b: int

    n_inliers: int

    H_ab: np.ndarray # homography from image A to image B

    confidence: float = 0.0

# ---------------------------------------------------------------------------

# 1. Feature detection & pair-wise matching

# ---------------------------------------------------------------------------

class FeatureMatcher:

    """

    Detects ORB features in every image and matches consecutive /

    nearby frames to estimate pairwise homographies.

    Reference

    ---------

    Rublee et al., "ORB: An efficient alternative to SIFT or SURF", ICCV 2011.

    OpenCV ORB docs: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html

    """

    def __init__(self, config: TilingConfig):

        self.cfg = config

        self.orb = cv2.ORB_create(

            nfeatures=config.orb_n_features,

            scaleFactor=config.orb_scale_factor,

            nlevels=config.orb_n_levels,

        )

        self.bf = cv2.BFMatcher(cv2.NORM_HAMMING)

    @staticmethod

    def _load_grey(path: str, scale: float) -> np.ndarray:

        img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)

        if img is None:

            raise FileNotFoundError(f"Cannot read image: {path}")

        if scale != 1.0:

            img = cv2.resize(img, None, fx=scale, fy=scale,

                             interpolation=cv2.INTER_AREA)

        return img

    def _detect(self, grey: np.ndarray):

        kp, des = self.orb.detectAndCompute(grey, None)

        return kp, des

    def _match_pair(self, des_a, des_b, kp_a, kp_b) -> Optional[PairwiseMatch]:

        """

        Match descriptors with Lowe's ratio test, then estimate a homography

        with RANSAC.

        Reference

        ---------

        Lowe, D. G. (2004). "Distinctive Image Features from Scale-Invariant

        Keypoints." IJCV, 60(2), 91-110.

        Fischler & Bolles (1981). "Random Sample Consensus." CACM.

        """

        if des_a is None or des_b is None:

            return None

        if len(des_a) < 2 or len(des_b) < 2:

            return None

        raw_matches = self.bf.knnMatch(des_a, des_b, k=2)

        good = []

        for m_pair in raw_matches:

            if len(m_pair) < 2:

                continue

            m, n = m_pair

            if m.distance < self.cfg.match_ratio_thresh * n.distance:

                good.append(m)

        if len(good) < self.cfg.min_good_matches:

            return None

        pts_a = np.float32([kp_a[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)

        pts_b = np.float32([kp_b[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)

        H, mask = cv2.findHomography(

            pts_a, pts_b, cv2.RANSAC, self.cfg.ransac_reproj_thresh

        )

        if H is None:

            return None

        n_inliers = int(mask.sum())

        if n_inliers < self.cfg.min_good_matches:

            return None

        confidence = n_inliers / (8.0 + 0.3 * len(good)) # Brown & Lowe 2007

        return PairwiseMatch(

            idx_a=-1, idx_b=-1,

            n_inliers=n_inliers,

            H_ab=H,

            confidence=confidence,

        )

    def match_sequence(

        self,

        records: List[ImageRecord],

        search_radius: int = 5,

    ) -> List[PairwiseMatch]:

        """

        For every image, match against up to *search_radius* neighbours in

        both directions (handles forward overlap AND lateral overlap from

        adjacent flight strips).

        """

        n = len(records)

        scale = self.cfg.work_scale

        features = []

        for rec in records:

            grey = self._load_grey(rec.filepath, scale)

            rec.height, rec.width = grey.shape[:2]

            kp, des = self._detect(grey)

            features.append((kp, des))

            logger.debug(" %s - %d keypoints", rec.filename, len(kp))

        matches: List[PairwiseMatch] = []

        tested: Set[Tuple[int, int]] = set()

        for i in range(n):

            for j in range(i + 1, min(i + search_radius + 1, n)):

                if (i, j) in tested:

                    continue

                tested.add((i, j))

                kp_a, des_a = features[i]

                kp_b, des_b = features[j]

                pm = self._match_pair(des_a, des_b, kp_a, kp_b)

                if pm is not None:

                    pm.idx_a = i

                    pm.idx_b = j

                    matches.append(pm)

                    logger.debug(

                        " match %s <-> %s inliers=%d conf=%.3f",

                        records[i].filename, records[j].filename,

                        pm.n_inliers, pm.confidence,

                    )

        logger.info("Pairwise matching complete: %d valid pairs from %d images.",

                     len(matches), n)

        return matches

# ---------------------------------------------------------------------------

# 2. Global registration - place every image in a common mosaic frame

# ---------------------------------------------------------------------------

def register_images(

    records: List[ImageRecord],

    matches: List[PairwiseMatch],

) -> List[ImageRecord]:

    """

    Build a connected graph of images and propagate homographies via BFS

    so that every image is mapped into the coordinate frame of a single

    reference image (the one with the most connections).

    Reference

    ---------

    Brown, M. & Lowe, D. G. (2007). "Automatic Panoramic Image Stitching

    using Invariant Features." IJCV 74(1), 59-73.

    OpenCV Stitching: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html

    """

    n = len(records)

    adj: Dict[int, List[Tuple[int, np.ndarray, float]]] = {i: [] for i in range(n)}

    for pm in matches:

        a, b = pm.idx_a, pm.idx_b

        adj[a].append((b, pm.H_ab, pm.confidence))

        H_inv = np.linalg.inv(pm.H_ab)

        adj[b].append((a, H_inv, pm.confidence))

    ref = max(range(n), key=lambda i: len(adj[i]))

    logger.info("Reference image: %s (index %d, %d neighbours)",

                records[ref].filename, ref, len(adj[ref]))

    records[ref].H = np.eye(3, dtype=np.float64)

    visited = {ref}

    queue = [ref]

    while queue:

        cur = queue.pop(0)

        for nb, H_cur_to_nb, _conf in adj[cur]:

            if nb in visited:

                continue

            H_nb_to_mosaic = records[cur].H @ np.linalg.inv(H_cur_to_nb)

            records[nb].H = H_nb_to_mosaic

            visited.add(nb)

            queue.append(nb)

    disconnected = [i for i in range(n) if records[i].H is None]

    if disconnected:

        logger.warning(

            "%d images could not be registered (disconnected): %s",

            len(disconnected),

            [records[i].filename for i in disconnected],

        )

    for rec in records:

        if rec.H is None:

            continue

        centre_local = np.array([[rec.width / 2, rec.height / 2]], dtype=np.float64)

        centre_mosaic = cv2.perspectiveTransform(

            centre_local.reshape(-1, 1, 2), rec.H

        )

        rec.cx, rec.cy = centre_mosaic[0, 0]

    return records

# ---------------------------------------------------------------------------

# 3. Coverage grid & greedy set-cover selection

# ---------------------------------------------------------------------------

def _image_footprint(rec: ImageRecord) -> Optional[np.ndarray]:

    """Return the four mosaic-frame corners of *rec* as an (4,2) array."""

    if rec.H is None:

        return None

    corners = np.float64([

        [0, 0],

        [rec.width, 0],

        [rec.width, rec.height],

        [0, rec.height],

    ]).reshape(-1, 1, 2)

    warped = cv2.perspectiveTransform(corners, rec.H)

    return warped.reshape(-1, 2)

def _build_coverage_grid(

    records: List[ImageRecord],

    resolution: int,

) -> Tuple[np.ndarray, float, float, float]:

    """Create a boolean grid representing the total survey area."""

    all_corners = []

    for rec in records:

        fp = _image_footprint(rec)

        if fp is not None:

            all_corners.append(fp)

    if not all_corners:

        raise RuntimeError("No images could be registered - cannot build grid.")

    all_pts = np.vstack(all_corners)

    x_min, y_min = all_pts.min(axis=0)

    x_max, y_max = all_pts.max(axis=0)

    span = max(x_max - x_min, y_max - y_min)

    cell_size = span / resolution

    cols = int(np.ceil((x_max - x_min) / cell_size)) + 1

    rows = int(np.ceil((y_max - y_min) / cell_size)) + 1

    grid = np.zeros((rows, cols), dtype=bool)

    return grid, cell_size, float(x_min), float(y_min)

def _rasterise_footprint(

    corners: np.ndarray,

    cell_size: float,

    x_min: float,

    y_min: float,

    grid_shape: Tuple[int, int],

) -> Set[Tuple[int, int]]:

    """Return the set of grid cells covered by the quadrilateral *corners*."""

    rows, cols = grid_shape

    gc = ((corners - [x_min, y_min]) / cell_size).astype(np.int32)

    mask = np.zeros((rows, cols), dtype=np.uint8)

    cv2.fillConvexPoly(mask, gc, 1)

    cells = set(zip(*np.where(mask > 0)))

    return cells

def select_tiling_images(

    records: List[ImageRecord],

    config: TilingConfig,

) -> List[ImageRecord]:

    """

    Greedy weighted set-cover: iteratively pick the image that covers the

    most *uncovered* grid cells until the entire survey area is covered.

    Reference

    ---------

    Chvatal, V. (1979). "A Greedy Heuristic for the Set-Covering Problem."

    Mathematics of Operations Research, 4(3), 233-235.

    """

    grid, cell_size, x_min, y_min = _build_coverage_grid(

        records, config.grid_resolution

    )

    grid_shape = grid.shape

    universe: Set[Tuple[int, int]] = set()

    image_cells: Dict[int, Set[Tuple[int, int]]] = {}

    for rec in records:

        fp = _image_footprint(rec)

        if fp is None:

            continue

        cells = _rasterise_footprint(fp, cell_size, x_min, y_min, grid_shape)

        image_cells[rec.index] = cells

        universe |= cells

    logger.info("Survey area: %d grid cells to cover.", len(universe))

    uncovered = set(universe)

    selected_indices: List[int] = []

    used: Set[int] = set()

    while uncovered:

        best_idx = -1

        best_gain = 0

        for idx, cells in image_cells.items():

            if idx in used:

                continue

            gain = len(cells & uncovered)

            if gain > best_gain:

                best_gain = gain

                best_idx = idx

        if best_idx == -1 or best_gain == 0:

            logger.warning(

                "Cannot cover %d remaining cells - possible registration gaps.",

                len(uncovered),

            )

            break

        selected_indices.append(best_idx)

        used.add(best_idx)

        uncovered -= image_cells[best_idx]

        logger.debug(

            " selected %s - covers %d new cells, %d remaining",

            records[best_idx].filename, best_gain, len(uncovered),

        )

    logger.info("Selected %d / %d images for full tiling.",

                len(selected_indices), len(records))

    selected = [records[i] for i in selected_indices]

    return selected

# ---------------------------------------------------------------------------

# 4. (Optional) Sort selected images - lower-left origin, clockwise

# ---------------------------------------------------------------------------

def sort_clockwise_from_lower_left(

    selected: List[ImageRecord],

) -> List[ImageRecord]:

    """

    Sort images starting from the lower-left corner and proceeding clockwise

    along the convex-hull perimeter.

    """

    if len(selected) <= 1:

        return selected

    centres = np.array([[r.cx, r.cy] for r in selected])

    centroid = centres.mean(axis=0)

    dx = centres[:, 0] - centroid[0]

    dy = centres[:, 1] - centroid[1]

    angles = np.arctan2(dx, dy)

    order = np.argsort(angles)

    lower_left_idx = int(

        np.argmin(centres[:, 0] - centres[:, 1])

    )

    start = int(np.where(order == lower_left_idx)[0][0])

    order = np.roll(order, -start)

    return [selected[i] for i in order]

# ---------------------------------------------------------------------------

# 5. Main pipeline

# ---------------------------------------------------------------------------

def load_image_records(input_dir: str) -> List[ImageRecord]:

    """Load image file paths from a directory, sorted by name."""

    extensions = ("*.jpg", "*.jpeg", "*.png", "*.tif", "*.tiff", "*.bmp")

    paths: List[str] = []

    for ext in extensions:

        paths.extend(glob.glob(os.path.join(input_dir, ext)))

        paths.extend(glob.glob(os.path.join(input_dir, ext.upper())))

    paths = sorted(set(paths))

    if not paths:

        raise FileNotFoundError(f"No image files found in {input_dir}")

    records = [

        ImageRecord(index=i, filepath=p, filename=os.path.basename(p))

        for i, p in enumerate(paths)

    ]

    logger.info("Loaded %d image paths from %s", len(records), input_dir)

    return records

def select_tiling_from_directory(

    input_dir: str,

    config: Optional[TilingConfig] = None,

    sort_result: bool = True,

) -> List[str]:

    """

    End-to-end pipeline.

    Parameters

    ----------

    input_dir : str

        Directory containing contiguous drone images.

    config : TilingConfig, optional

        Pipeline configuration; uses defaults when None.

    sort_result : bool

        If True, sort selected images clockwise from lower-left.

    Returns

    -------

    list[str]

        File paths of the selected tiling images.

    """

    if config is None:

        config = TilingConfig()

    records = load_image_records(input_dir)

    matcher = FeatureMatcher(config)

    matches = matcher.match_sequence(records)

    if not matches:

        logger.warning("No pairwise matches found. Returning all images.")

        return [r.filepath for r in records]

    records = register_images(records, matches)

    selected = select_tiling_images(records, config)

    if sort_result:

        selected = sort_clockwise_from_lower_left(selected)

    return [rec.filepath for rec in selected]

# ---------------------------------------------------------------------------

# CLI

# ---------------------------------------------------------------------------

def main():

    parser = argparse.ArgumentParser(

        description="Select a minimal set of drone images that tile the surveyed area."

    )

    parser.add_argument(

        "--input_dir", required=True,

        help="Directory containing contiguous aerial drone images.",

    )

    parser.add_argument(

        "--output", default=None,

        help="Optional JSON file to write the selected file list to.",

    )

    parser.add_argument(

        "--grid_resolution", type=int, default=100,

        help="Grid resolution for coverage computation (default: 100).",

    )

    parser.add_argument(

        "--work_scale", type=float, default=0.5,

        help="Down-scale factor for images during processing (default: 0.5).",

    )

    parser.add_argument(

        "--search_radius", type=int, default=5,

        help="Match each image against this many neighbours (default: 5).",

    )

    parser.add_argument(

        "--no_sort", action="store_true",

        help="Skip clockwise spatial sorting of the result.",

    )

    args = parser.parse_args()

    config = TilingConfig(

        grid_resolution=args.grid_resolution,

        work_scale=args.work_scale,

    )

    selected = select_tiling_from_directory(

        args.input_dir, config, sort_result=not args.no_sort

    )

    print(f"\n{'='*60}")

    print(f"Selected {len(selected)} images for tiling:")

    print(f"{'='*60}")

    for i, path in enumerate(selected, 1):

        print(f" {i:>3d}. {os.path.basename(path)}")

    if args.output:

        with open(args.output, "w") as f:

            json.dump({"selected_images": selected}, f, indent=2)

        print(f"\nResults written to {args.output}")

if __name__ == "__main__":

    main()

Tiling of aerial drone survey area

Problem statement:

Most UAV with top-down camera provide a video of their tour regardless of their flight path. This video can be split into several contiguous aerial drone images which may often number in hundreds even for a short duration. We need a software implementation that can take these images as input in the sorted order of the timeline and select those images that can complete tiling of the area surveyed by drone tour. The output of the implementation must be a selection of the original input. Flight path could be assumed to be rectangular with the lower left as origin for simplicity but none of the images have gps information available. Additionally, the selection could be output in the sorted order to start from lower left of the area surveyed and move clockwise along the perimeter but this can be skipped from the implementation.

Solution:

The following implementation uses computer vision (feature matching + homography estimation) to determine spatial relationships between images and then solves a set-cover problem to select a minimal tiling subset. A set of five stages completes the pipeline towards the goal:

Pipeline:

Stage What happens Key algorithm

1. Feature matching ORB keypoints are detected in each image; nearby frames (within search_radius) are matched using brute-force Hamming + Lowe's ratio test, then a homography is estimated with RANSAC. ORB [Rublee 2011], RANSAC [Fischler 1981]

2. Global registration A BFS traversal from the most-connected image propagates homographies so every frame is placed in a single mosaic coordinate system. No GPS needed. Brown & Lowe (2007) panoramic stitching

3. Coverage gridding The mosaic bounding box is discretised into a grid. Each image's warped footprint is rasterised to determine which cells it covers. OpenCV fillConvexPoly

4. Greedy set cover Iteratively selects the image covering the most uncovered cells until 100% coverage is reached. This is the classic greedy approximation (ln n + 1 factor). Chvatal (1979)

5. Spatial sort Selected images are sorted clockwise starting from the lower-left corner using angular ordering around the centroid. Convex-hull sweep

Usage:

# Install dependencies

pip install opencv-python-headless numpy

# Run (CLI)

python drone_tiling.py --input_dir ./my_drone_frames --output tiles.json

# Run (programmatic)

from drone_tiling import select_tiling_from_directory

selected = select_tiling_from_directory("./my_drone_frames")

print(selected)

Implementation:

"""

Drone Image Tiling Selector

============================

Selects a minimal subset of contiguous aerial drone images that completely

tiles (covers) the area surveyed during a drone flight.

Algorithm & Citations

---------------------

1. **Feature Detection & Matching**: ORB (Oriented FAST and Rotated BRIEF)

   detector with brute-force Hamming-distance matching.

   - Rublee, E., Rabaud, V., Konolige, K., & Bradski, G. (2011).

     "ORB: An efficient alternative to SIFT or SURF."

     IEEE International Conference on Computer Vision (ICCV), pp. 2564-2571.

     DOI: 10.1109/ICCV.2011.6126544

   - OpenCV documentation: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html

2. **Homography Estimation (RANSAC)**: Used to compute the projective

   transformation between overlapping image pairs, which gives us the

   relative spatial position of each image.

   - Fischler, M. A., & Bolles, R. C. (1981).

     "Random Sample Consensus: A Paradigm for Model Fitting with

     Applications to Image Analysis and Automated Cartography."

     Communications of the ACM, 24(6), 381-395.

   - OpenCV documentation: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html

3. **Image Stitching / Registration Pipeline**: The pairwise registration

   approach follows the methodology in:

   - Brown, M., & Lowe, D. G. (2007).

     "Automatic Panoramic Image Stitching using Invariant Features."

     International Journal of Computer Vision, 74(1), 59-73.

     DOI: 10.1007/s11263-006-0002-3

4. **Greedy Weighted Set Cover** for selecting the minimal tiling subset:

   - Chvatal, V. (1979).

     "A Greedy Heuristic for the Set-Covering Problem."

     Mathematics of Operations Research, 4(3), 233-235.

   - Vazirani, V. V. (2001). "Approximation Algorithms", Chapter 2.

     Springer-Verlag. ISBN: 3-540-65367-8.

Dependencies

------------

    pip install opencv-python-headless numpy

Usage

-----

    python drone_tiling.py --input_dir ./drone_images --output tiles.json

"""

import os

import glob

import json

import argparse

import logging

from dataclasses import dataclass, field

from typing import List, Tuple, Dict, Optional, Set

import cv2

import numpy as np

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------

# Configuration

# ---------------------------------------------------------------------------

@dataclass

class TilingConfig:

    """Tuneable parameters for the tiling pipeline."""

    # ORB feature detector

    orb_n_features: int = 3000

    orb_scale_factor: float = 1.2

    orb_n_levels: int = 8

    # Feature matching

    match_ratio_thresh: float = 0.75 # Lowe's ratio test threshold

    min_good_matches: int = 30 # minimum inlier matches to

                                              # consider a pair overlapping

    # RANSAC homography

    ransac_reproj_thresh: float = 5.0 # reprojection error in pixels

    # Coverage grid resolution (number of cells along the longer axis)

    grid_resolution: int = 100

    # Overlap: minimum fraction of an image that must overlap with the

    # already-covered area for the image to be considered redundant.

    redundancy_overlap: float = 0.95

    # Image scaling for speed (process at this fraction of original size)

    work_scale: float = 0.5

# ---------------------------------------------------------------------------

# Data structures

# ---------------------------------------------------------------------------

@dataclass

class ImageRecord:

    """Metadata for a single drone image."""

    index: int

    filepath: str

    filename: str

    width: int = 0

    height: int = 0

    # Position of the image centre in the *mosaic* coordinate frame.

    cx: float = 0.0

    cy: float = 0.0

    # 3x3 homography that maps this image into mosaic coordinates.

    H: Optional[np.ndarray] = None

@dataclass

class PairwiseMatch:

    """Result of matching two images."""

    idx_a: int

    idx_b: int

    n_inliers: int

    H_ab: np.ndarray # homography from image A to image B

    confidence: float = 0.0

# ---------------------------------------------------------------------------

# 1. Feature detection & pair-wise matching

# ---------------------------------------------------------------------------

class FeatureMatcher:

    """

    Detects ORB features in every image and matches consecutive /

    nearby frames to estimate pairwise homographies.

    Reference

    ---------

    Rublee et al., "ORB: An efficient alternative to SIFT or SURF", ICCV 2011.

    OpenCV ORB docs: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html

    """

    def __init__(self, config: TilingConfig):

        self.cfg = config

        self.orb = cv2.ORB_create(

            nfeatures=config.orb_n_features,

            scaleFactor=config.orb_scale_factor,

            nlevels=config.orb_n_levels,

        )

        self.bf = cv2.BFMatcher(cv2.NORM_HAMMING)

    @staticmethod

    def _load_grey(path: str, scale: float) -> np.ndarray:

        img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)

        if img is None:

            raise FileNotFoundError(f"Cannot read image: {path}")

        if scale != 1.0:

            img = cv2.resize(img, None, fx=scale, fy=scale,

                             interpolation=cv2.INTER_AREA)

        return img

    def _detect(self, grey: np.ndarray):

        kp, des = self.orb.detectAndCompute(grey, None)

        return kp, des

    def _match_pair(self, des_a, des_b, kp_a, kp_b) -> Optional[PairwiseMatch]:

        """

        Match descriptors with Lowe's ratio test, then estimate a homography

        with RANSAC.

        Reference

        ---------

        Lowe, D. G. (2004). "Distinctive Image Features from Scale-Invariant

        Keypoints." IJCV, 60(2), 91-110.

        Fischler & Bolles (1981). "Random Sample Consensus." CACM.

        """

        if des_a is None or des_b is None:

            return None

        if len(des_a) < 2 or len(des_b) < 2:

            return None

        raw_matches = self.bf.knnMatch(des_a, des_b, k=2)

        good = []

        for m_pair in raw_matches:

            if len(m_pair) < 2:

                continue

            m, n = m_pair

            if m.distance < self.cfg.match_ratio_thresh * n.distance:

                good.append(m)

        if len(good) < self.cfg.min_good_matches:

            return None

        pts_a = np.float32([kp_a[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)

        pts_b = np.float32([kp_b[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)

        H, mask = cv2.findHomography(

            pts_a, pts_b, cv2.RANSAC, self.cfg.ransac_reproj_thresh

        )

        if H is None:

            return None

        n_inliers = int(mask.sum())

        if n_inliers < self.cfg.min_good_matches:

            return None

        confidence = n_inliers / (8.0 + 0.3 * len(good)) # Brown & Lowe 2007

        return PairwiseMatch(

            idx_a=-1, idx_b=-1,

            n_inliers=n_inliers,

            H_ab=H,

            confidence=confidence,

        )

    def match_sequence(

        self,

        records: List[ImageRecord],

        search_radius: int = 5,

    ) -> List[PairwiseMatch]:

        """

        For every image, match against up to *search_radius* neighbours in

        both directions (handles forward overlap AND lateral overlap from

        adjacent flight strips).

        """

        n = len(records)

        scale = self.cfg.work_scale

        features = []

        for rec in records:

            grey = self._load_grey(rec.filepath, scale)

            rec.height, rec.width = grey.shape[:2]

            kp, des = self._detect(grey)

            features.append((kp, des))

            logger.debug(" %s - %d keypoints", rec.filename, len(kp))

        matches: List[PairwiseMatch] = []

        tested: Set[Tuple[int, int]] = set()

        for i in range(n):

            for j in range(i + 1, min(i + search_radius + 1, n)):

                if (i, j) in tested:

                    continue

                tested.add((i, j))

                kp_a, des_a = features[i]

                kp_b, des_b = features[j]

                pm = self._match_pair(des_a, des_b, kp_a, kp_b)

                if pm is not None:

                    pm.idx_a = i

                    pm.idx_b = j

                    matches.append(pm)

                    logger.debug(

                        " match %s <-> %s inliers=%d conf=%.3f",

                        records[i].filename, records[j].filename,

                        pm.n_inliers, pm.confidence,

                    )

        logger.info("Pairwise matching complete: %d valid pairs from %d images.",

                     len(matches), n)

        return matches

# ---------------------------------------------------------------------------

# 2. Global registration - place every image in a common mosaic frame

# ---------------------------------------------------------------------------

def register_images(

    records: List[ImageRecord],

    matches: List[PairwiseMatch],

) -> List[ImageRecord]:

    """

    Build a connected graph of images and propagate homographies via BFS

    so that every image is mapped into the coordinate frame of a single

    reference image (the one with the most connections).

    Reference

    ---------

    Brown, M. & Lowe, D. G. (2007). "Automatic Panoramic Image Stitching

    using Invariant Features." IJCV 74(1), 59-73.

    OpenCV Stitching: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html

    """

    n = len(records)

    adj: Dict[int, List[Tuple[int, np.ndarray, float]]] = {i: [] for i in range(n)}

    for pm in matches:

        a, b = pm.idx_a, pm.idx_b

        adj[a].append((b, pm.H_ab, pm.confidence))

        H_inv = np.linalg.inv(pm.H_ab)

        adj[b].append((a, H_inv, pm.confidence))

    ref = max(range(n), key=lambda i: len(adj[i]))

    logger.info("Reference image: %s (index %d, %d neighbours)",

                records[ref].filename, ref, len(adj[ref]))

    records[ref].H = np.eye(3, dtype=np.float64)

    visited = {ref}

    queue = [ref]

    while queue:

        cur = queue.pop(0)

        for nb, H_cur_to_nb, _conf in adj[cur]:

            if nb in visited:

                continue

            H_nb_to_mosaic = records[cur].H @ np.linalg.inv(H_cur_to_nb)

            records[nb].H = H_nb_to_mosaic

            visited.add(nb)

            queue.append(nb)

    disconnected = [i for i in range(n) if records[i].H is None]

    if disconnected:

        logger.warning(

            "%d images could not be registered (disconnected): %s",

            len(disconnected),

            [records[i].filename for i in disconnected],

        )

    for rec in records:

        if rec.H is None:

            continue

        centre_local = np.array([[rec.width / 2, rec.height / 2]], dtype=np.float64)

        centre_mosaic = cv2.perspectiveTransform(

            centre_local.reshape(-1, 1, 2), rec.H

        )

        rec.cx, rec.cy = centre_mosaic[0, 0]

    return records

# ---------------------------------------------------------------------------

# 3. Coverage grid & greedy set-cover selection

# ---------------------------------------------------------------------------

def _image_footprint(rec: ImageRecord) -> Optional[np.ndarray]:

    """Return the four mosaic-frame corners of *rec* as an (4,2) array."""

    if rec.H is None:

        return None

    corners = np.float64([

        [0, 0],

        [rec.width, 0],

        [rec.width, rec.height],

        [0, rec.height],

    ]).reshape(-1, 1, 2)

    warped = cv2.perspectiveTransform(corners, rec.H)

    return warped.reshape(-1, 2)

def _build_coverage_grid(

    records: List[ImageRecord],

    resolution: int,

) -> Tuple[np.ndarray, float, float, float]:

    """Create a boolean grid representing the total survey area."""

    all_corners = []

    for rec in records:

        fp = _image_footprint(rec)

        if fp is not None:

            all_corners.append(fp)

    if not all_corners:

        raise RuntimeError("No images could be registered - cannot build grid.")

    all_pts = np.vstack(all_corners)

    x_min, y_min = all_pts.min(axis=0)

    x_max, y_max = all_pts.max(axis=0)

    span = max(x_max - x_min, y_max - y_min)

    cell_size = span / resolution

    cols = int(np.ceil((x_max - x_min) / cell_size)) + 1

    rows = int(np.ceil((y_max - y_min) / cell_size)) + 1

    grid = np.zeros((rows, cols), dtype=bool)

    return grid, cell_size, float(x_min), float(y_min)

def _rasterise_footprint(

    corners: np.ndarray,

    cell_size: float,

    x_min: float,

    y_min: float,

    grid_shape: Tuple[int, int],

) -> Set[Tuple[int, int]]:

    """Return the set of grid cells covered by the quadrilateral *corners*."""

    rows, cols = grid_shape

    gc = ((corners - [x_min, y_min]) / cell_size).astype(np.int32)

    mask = np.zeros((rows, cols), dtype=np.uint8)

    cv2.fillConvexPoly(mask, gc, 1)

    cells = set(zip(*np.where(mask > 0)))

    return cells

def select_tiling_images(

    records: List[ImageRecord],

    config: TilingConfig,

) -> List[ImageRecord]:

    """

    Greedy weighted set-cover: iteratively pick the image that covers the

    most *uncovered* grid cells until the entire survey area is covered.

    Reference

    ---------

    Chvatal, V. (1979). "A Greedy Heuristic for the Set-Covering Problem."

    Mathematics of Operations Research, 4(3), 233-235.

    """

    grid, cell_size, x_min, y_min = _build_coverage_grid(

        records, config.grid_resolution

    )

    grid_shape = grid.shape

    universe: Set[Tuple[int, int]] = set()

    image_cells: Dict[int, Set[Tuple[int, int]]] = {}

    for rec in records:

        fp = _image_footprint(rec)

        if fp is None:

            continue

        cells = _rasterise_footprint(fp, cell_size, x_min, y_min, grid_shape)

        image_cells[rec.index] = cells

        universe |= cells

    logger.info("Survey area: %d grid cells to cover.", len(universe))

    uncovered = set(universe)

    selected_indices: List[int] = []

    used: Set[int] = set()

    while uncovered:

        best_idx = -1

        best_gain = 0

        for idx, cells in image_cells.items():

            if idx in used:

                continue

            gain = len(cells & uncovered)

            if gain > best_gain:

                best_gain = gain

                best_idx = idx

        if best_idx == -1 or best_gain == 0:

            logger.warning(

                "Cannot cover %d remaining cells - possible registration gaps.",

                len(uncovered),

            )

            break

        selected_indices.append(best_idx)

        used.add(best_idx)

        uncovered -= image_cells[best_idx]

        logger.debug(

            " selected %s - covers %d new cells, %d remaining",

            records[best_idx].filename, best_gain, len(uncovered),

        )

    logger.info("Selected %d / %d images for full tiling.",

                len(selected_indices), len(records))

    selected = [records[i] for i in selected_indices]

    return selected

# ---------------------------------------------------------------------------

# 4. (Optional) Sort selected images - lower-left origin, clockwise

# ---------------------------------------------------------------------------

def sort_clockwise_from_lower_left(

    selected: List[ImageRecord],

) -> List[ImageRecord]:

    """

    Sort images starting from the lower-left corner and proceeding clockwise

    along the convex-hull perimeter.

    """

    if len(selected) <= 1:

        return selected

    centres = np.array([[r.cx, r.cy] for r in selected])

    centroid = centres.mean(axis=0)

    dx = centres[:, 0] - centroid[0]

    dy = centres[:, 1] - centroid[1]

    angles = np.arctan2(dx, dy)

    order = np.argsort(angles)

    lower_left_idx = int(

        np.argmin(centres[:, 0] - centres[:, 1])

    )

    start = int(np.where(order == lower_left_idx)[0][0])

    order = np.roll(order, -start)

    return [selected[i] for i in order]

# ---------------------------------------------------------------------------

# 5. Main pipeline

# ---------------------------------------------------------------------------

def load_image_records(input_dir: str) -> List[ImageRecord]:

    """Load image file paths from a directory, sorted by name."""

    extensions = ("*.jpg", "*.jpeg", "*.png", "*.tif", "*.tiff", "*.bmp")

    paths: List[str] = []

    for ext in extensions:

        paths.extend(glob.glob(os.path.join(input_dir, ext)))

        paths.extend(glob.glob(os.path.join(input_dir, ext.upper())))

    paths = sorted(set(paths))

    if not paths:

        raise FileNotFoundError(f"No image files found in {input_dir}")

    records = [

        ImageRecord(index=i, filepath=p, filename=os.path.basename(p))

        for i, p in enumerate(paths)

    ]

    logger.info("Loaded %d image paths from %s", len(records), input_dir)

    return records

def select_tiling_from_directory(

    input_dir: str,

    config: Optional[TilingConfig] = None,

    sort_result: bool = True,

) -> List[str]:

    """

    End-to-end pipeline.

    Parameters

    ----------

    input_dir : str

        Directory containing contiguous drone images.

    config : TilingConfig, optional

        Pipeline configuration; uses defaults when None.

    sort_result : bool

        If True, sort selected images clockwise from lower-left.

    Returns

    -------

    list[str]

        File paths of the selected tiling images.

    """

    if config is None:

        config = TilingConfig()

    records = load_image_records(input_dir)

    matcher = FeatureMatcher(config)

    matches = matcher.match_sequence(records)

    if not matches:

        logger.warning("No pairwise matches found. Returning all images.")

        return [r.filepath for r in records]

    records = register_images(records, matches)

    selected = select_tiling_images(records, config)

    if sort_result:

        selected = sort_clockwise_from_lower_left(selected)

    return [rec.filepath for rec in selected]

# ---------------------------------------------------------------------------

# CLI

# ---------------------------------------------------------------------------

def main():

    parser = argparse.ArgumentParser(

        description="Select a minimal set of drone images that tile the surveyed area."

    )

    parser.add_argument(

        "--input_dir", required=True,

        help="Directory containing contiguous aerial drone images.",

    )

    parser.add_argument(

        "--output", default=None,

        help="Optional JSON file to write the selected file list to.",

    )

    parser.add_argument(

        "--grid_resolution", type=int, default=100,

        help="Grid resolution for coverage computation (default: 100).",

    )

    parser.add_argument(

        "--work_scale", type=float, default=0.5,

        help="Down-scale factor for images during processing (default: 0.5).",

    )

    parser.add_argument(

        "--search_radius", type=int, default=5,

        help="Match each image against this many neighbours (default: 5).",

    )

    parser.add_argument(

        "--no_sort", action="store_true",

        help="Skip clockwise spatial sorting of the result.",

    )

    args = parser.parse_args()

    config = TilingConfig(

        grid_resolution=args.grid_resolution,

        work_scale=args.work_scale,

    )

    selected = select_tiling_from_directory(

        args.input_dir, config, sort_result=not args.no_sort

    )

    print(f"\n{'='*60}")

    print(f"Selected {len(selected)} images for tiling:")

    print(f"{'='*60}")

    for i, path in enumerate(selected, 1):

        print(f" {i:>3d}. {os.path.basename(path)}")

    if args.output:

        with open(args.output, "w") as f:

            json.dump({"selected_images": selected}, f, indent=2)

        print(f"\nResults written to {args.output}")

if __name__ == "__main__":

    main()

Tiling of aerial drone survey area

Problem statement:

Most UAV with top-down camera provide a video of their tour regardless of their flight path. This video can be split into several contiguous aerial drone images which may often number in hundreds even for a short duration. We need a software implementation that can take these images as input in the sorted order of the timeline and select those images that can complete tiling of the area surveyed by drone tour. The output of the implementation must be a selection of the original input. Flight path could be assumed to be rectangular with the lower left as origin for simplicity but none of the images have gps information available. Additionally, the selection could be output in the sorted order to start from lower left of the area surveyed and move clockwise along the perimeter but this can be skipped from the implementation.

Solution:

The following implementation uses computer vision (feature matching + homography estimation) to determine spatial relationships between images and then solves a set-cover problem to select a minimal tiling subset. A set of five stages completes the pipeline towards the goal:

Pipeline:

Stage What happens Key algorithm

1. Feature matching ORB keypoints are detected in each image; nearby frames (within search_radius) are matched using brute-force Hamming + Lowe's ratio test, then a homography is estimated with RANSAC. ORB [Rublee 2011], RANSAC [Fischler 1981]

2. Global registration A BFS traversal from the most-connected image propagates homographies so every frame is placed in a single mosaic coordinate system. No GPS needed. Brown & Lowe (2007) panoramic stitching

3. Coverage gridding The mosaic bounding box is discretised into a grid. Each image's warped footprint is rasterised to determine which cells it covers. OpenCV fillConvexPoly

4. Greedy set cover Iteratively selects the image covering the most uncovered cells until 100% coverage is reached. This is the classic greedy approximation (ln n + 1 factor). Chvatal (1979)

5. Spatial sort Selected images are sorted clockwise starting from the lower-left corner using angular ordering around the centroid. Convex-hull sweep

Usage:

# Install dependencies

pip install opencv-python-headless numpy

# Run (CLI)

python drone_tiling.py --input_dir ./my_drone_frames --output tiles.json

# Run (programmatic)

from drone_tiling import select_tiling_from_directory

selected = select_tiling_from_directory("./my_drone_frames")

print(selected)

Implementation:

"""

Drone Image Tiling Selector

============================

Selects a minimal subset of contiguous aerial drone images that completely

tiles (covers) the area surveyed during a drone flight.

Algorithm & Citations

---------------------

1. **Feature Detection & Matching**: ORB (Oriented FAST and Rotated BRIEF)

   detector with brute-force Hamming-distance matching.

   - Rublee, E., Rabaud, V., Konolige, K., & Bradski, G. (2011).

     "ORB: An efficient alternative to SIFT or SURF."

     IEEE International Conference on Computer Vision (ICCV), pp. 2564-2571.

     DOI: 10.1109/ICCV.2011.6126544

   - OpenCV documentation: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html

2. **Homography Estimation (RANSAC)**: Used to compute the projective

   transformation between overlapping image pairs, which gives us the

   relative spatial position of each image.

   - Fischler, M. A., & Bolles, R. C. (1981).

     "Random Sample Consensus: A Paradigm for Model Fitting with

     Applications to Image Analysis and Automated Cartography."

     Communications of the ACM, 24(6), 381-395.

   - OpenCV documentation: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html

3. **Image Stitching / Registration Pipeline**: The pairwise registration

   approach follows the methodology in:

   - Brown, M., & Lowe, D. G. (2007).

     "Automatic Panoramic Image Stitching using Invariant Features."

     International Journal of Computer Vision, 74(1), 59-73.

     DOI: 10.1007/s11263-006-0002-3

4. **Greedy Weighted Set Cover** for selecting the minimal tiling subset:

   - Chvatal, V. (1979).

     "A Greedy Heuristic for the Set-Covering Problem."

     Mathematics of Operations Research, 4(3), 233-235.

   - Vazirani, V. V. (2001). "Approximation Algorithms", Chapter 2.

     Springer-Verlag. ISBN: 3-540-65367-8.

Dependencies

------------

    pip install opencv-python-headless numpy

Usage

-----

    python drone_tiling.py --input_dir ./drone_images --output tiles.json

"""

import os

import glob

import json

import argparse

import logging

from dataclasses import dataclass, field

from typing import List, Tuple, Dict, Optional, Set

import cv2

import numpy as np

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------

# Configuration

# ---------------------------------------------------------------------------

@dataclass

class TilingConfig:

    """Tuneable parameters for the tiling pipeline."""

    # ORB feature detector

    orb_n_features: int = 3000

    orb_scale_factor: float = 1.2

    orb_n_levels: int = 8

    # Feature matching

    match_ratio_thresh: float = 0.75 # Lowe's ratio test threshold

    min_good_matches: int = 30 # minimum inlier matches to

                                              # consider a pair overlapping

    # RANSAC homography

    ransac_reproj_thresh: float = 5.0 # reprojection error in pixels

    # Coverage grid resolution (number of cells along the longer axis)

    grid_resolution: int = 100

    # Overlap: minimum fraction of an image that must overlap with the

    # already-covered area for the image to be considered redundant.

    redundancy_overlap: float = 0.95

    # Image scaling for speed (process at this fraction of original size)

    work_scale: float = 0.5

# ---------------------------------------------------------------------------

# Data structures

# ---------------------------------------------------------------------------

@dataclass

class ImageRecord:

    """Metadata for a single drone image."""

    index: int

    filepath: str

    filename: str

    width: int = 0

    height: int = 0

    # Position of the image centre in the *mosaic* coordinate frame.

    cx: float = 0.0

    cy: float = 0.0

    # 3x3 homography that maps this image into mosaic coordinates.

    H: Optional[np.ndarray] = None

@dataclass

class PairwiseMatch:

    """Result of matching two images."""

    idx_a: int

    idx_b: int

    n_inliers: int

    H_ab: np.ndarray # homography from image A to image B

    confidence: float = 0.0

# ---------------------------------------------------------------------------

# 1. Feature detection & pair-wise matching

# ---------------------------------------------------------------------------

class FeatureMatcher:

    """

    Detects ORB features in every image and matches consecutive /

    nearby frames to estimate pairwise homographies.

    Reference

    ---------

    Rublee et al., "ORB: An efficient alternative to SIFT or SURF", ICCV 2011.

    OpenCV ORB docs: https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html

    """

    def __init__(self, config: TilingConfig):

        self.cfg = config

        self.orb = cv2.ORB_create(

            nfeatures=config.orb_n_features,

            scaleFactor=config.orb_scale_factor,

            nlevels=config.orb_n_levels,

        )

        self.bf = cv2.BFMatcher(cv2.NORM_HAMMING)

    @staticmethod

    def _load_grey(path: str, scale: float) -> np.ndarray:

        img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)

        if img is None:

            raise FileNotFoundError(f"Cannot read image: {path}")

        if scale != 1.0:

            img = cv2.resize(img, None, fx=scale, fy=scale,

                             interpolation=cv2.INTER_AREA)

        return img

    def _detect(self, grey: np.ndarray):

        kp, des = self.orb.detectAndCompute(grey, None)

        return kp, des

    def _match_pair(self, des_a, des_b, kp_a, kp_b) -> Optional[PairwiseMatch]:

        """

        Match descriptors with Lowe's ratio test, then estimate a homography

        with RANSAC.

        Reference

        ---------

        Lowe, D. G. (2004). "Distinctive Image Features from Scale-Invariant

        Keypoints." IJCV, 60(2), 91-110.

        Fischler & Bolles (1981). "Random Sample Consensus." CACM.

        """

        if des_a is None or des_b is None:

            return None

        if len(des_a) < 2 or len(des_b) < 2:

            return None

        raw_matches = self.bf.knnMatch(des_a, des_b, k=2)

        good = []

        for m_pair in raw_matches:

            if len(m_pair) < 2:

                continue

            m, n = m_pair

            if m.distance < self.cfg.match_ratio_thresh * n.distance:

                good.append(m)

        if len(good) < self.cfg.min_good_matches:

            return None

        pts_a = np.float32([kp_a[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)

        pts_b = np.float32([kp_b[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)

        H, mask = cv2.findHomography(

            pts_a, pts_b, cv2.RANSAC, self.cfg.ransac_reproj_thresh

        )

        if H is None:

            return None

        n_inliers = int(mask.sum())

        if n_inliers < self.cfg.min_good_matches:

            return None

        confidence = n_inliers / (8.0 + 0.3 * len(good)) # Brown & Lowe 2007

        return PairwiseMatch(

            idx_a=-1, idx_b=-1,

            n_inliers=n_inliers,

            H_ab=H,

            confidence=confidence,

        )

    def match_sequence(

        self,

        records: List[ImageRecord],

        search_radius: int = 5,

    ) -> List[PairwiseMatch]:

        """

        For every image, match against up to *search_radius* neighbours in

        both directions (handles forward overlap AND lateral overlap from

        adjacent flight strips).

        """

        n = len(records)

        scale = self.cfg.work_scale

        features = []

        for rec in records:

            grey = self._load_grey(rec.filepath, scale)

            rec.height, rec.width = grey.shape[:2]

            kp, des = self._detect(grey)

            features.append((kp, des))

            logger.debug(" %s - %d keypoints", rec.filename, len(kp))

        matches: List[PairwiseMatch] = []

        tested: Set[Tuple[int, int]] = set()

        for i in range(n):

            for j in range(i + 1, min(i + search_radius + 1, n)):

                if (i, j) in tested:

                    continue

                tested.add((i, j))

                kp_a, des_a = features[i]

                kp_b, des_b = features[j]

                pm = self._match_pair(des_a, des_b, kp_a, kp_b)

                if pm is not None:

                    pm.idx_a = i

                    pm.idx_b = j

                    matches.append(pm)

                    logger.debug(

                        " match %s <-> %s inliers=%d conf=%.3f",

                        records[i].filename, records[j].filename,

                        pm.n_inliers, pm.confidence,

                    )

        logger.info("Pairwise matching complete: %d valid pairs from %d images.",

                     len(matches), n)

        return matches

# ---------------------------------------------------------------------------

# 2. Global registration - place every image in a common mosaic frame

# ---------------------------------------------------------------------------

def register_images(

    records: List[ImageRecord],

    matches: List[PairwiseMatch],

) -> List[ImageRecord]:

    """

    Build a connected graph of images and propagate homographies via BFS

    so that every image is mapped into the coordinate frame of a single

    reference image (the one with the most connections).

    Reference

    ---------

    Brown, M. & Lowe, D. G. (2007). "Automatic Panoramic Image Stitching

    using Invariant Features." IJCV 74(1), 59-73.

    OpenCV Stitching: https://docs.opencv.org/4.x/d9/dab/tutorial_homography.html

    """

    n = len(records)

    adj: Dict[int, List[Tuple[int, np.ndarray, float]]] = {i: [] for i in range(n)}

    for pm in matches:

        a, b = pm.idx_a, pm.idx_b

        adj[a].append((b, pm.H_ab, pm.confidence))

        H_inv = np.linalg.inv(pm.H_ab)

        adj[b].append((a, H_inv, pm.confidence))

    ref = max(range(n), key=lambda i: len(adj[i]))

    logger.info("Reference image: %s (index %d, %d neighbours)",

                records[ref].filename, ref, len(adj[ref]))

    records[ref].H = np.eye(3, dtype=np.float64)

    visited = {ref}

    queue = [ref]

    while queue:

        cur = queue.pop(0)

        for nb, H_cur_to_nb, _conf in adj[cur]:

            if nb in visited:

                continue

            H_nb_to_mosaic = records[cur].H @ np.linalg.inv(H_cur_to_nb)

            records[nb].H = H_nb_to_mosaic

            visited.add(nb)

            queue.append(nb)

    disconnected = [i for i in range(n) if records[i].H is None]

    if disconnected:

        logger.warning(

            "%d images could not be registered (disconnected): %s",

            len(disconnected),

            [records[i].filename for i in disconnected],

        )

    for rec in records:

        if rec.H is None:

            continue

        centre_local = np.array([[rec.width / 2, rec.height / 2]], dtype=np.float64)

        centre_mosaic = cv2.perspectiveTransform(

            centre_local.reshape(-1, 1, 2), rec.H

        )

        rec.cx, rec.cy = centre_mosaic[0, 0]

    return records

# ---------------------------------------------------------------------------

# 3. Coverage grid & greedy set-cover selection

# ---------------------------------------------------------------------------

def _image_footprint(rec: ImageRecord) -> Optional[np.ndarray]:

    """Return the four mosaic-frame corners of *rec* as an (4,2) array."""

    if rec.H is None:

        return None

    corners = np.float64([

        [0, 0],

        [rec.width, 0],

        [rec.width, rec.height],

        [0, rec.height],

    ]).reshape(-1, 1, 2)

    warped = cv2.perspectiveTransform(corners, rec.H)

    return warped.reshape(-1, 2)

def _build_coverage_grid(

    records: List[ImageRecord],

    resolution: int,

) -> Tuple[np.ndarray, float, float, float]:

    """Create a boolean grid representing the total survey area."""

    all_corners = []

    for rec in records:

        fp = _image_footprint(rec)

        if fp is not None:

            all_corners.append(fp)

    if not all_corners:

        raise RuntimeError("No images could be registered - cannot build grid.")

    all_pts = np.vstack(all_corners)

    x_min, y_min = all_pts.min(axis=0)

    x_max, y_max = all_pts.max(axis=0)

    span = max(x_max - x_min, y_max - y_min)

    cell_size = span / resolution

    cols = int(np.ceil((x_max - x_min) / cell_size)) + 1

    rows = int(np.ceil((y_max - y_min) / cell_size)) + 1

    grid = np.zeros((rows, cols), dtype=bool)

    return grid, cell_size, float(x_min), float(y_min)

def _rasterise_footprint(

    corners: np.ndarray,

    cell_size: float,

    x_min: float,

    y_min: float,

    grid_shape: Tuple[int, int],

) -> Set[Tuple[int, int]]:

    """Return the set of grid cells covered by the quadrilateral *corners*."""

    rows, cols = grid_shape

    gc = ((corners - [x_min, y_min]) / cell_size).astype(np.int32)

    mask = np.zeros((rows, cols), dtype=np.uint8)

    cv2.fillConvexPoly(mask, gc, 1)

    cells = set(zip(*np.where(mask > 0)))

    return cells

def select_tiling_images(

    records: List[ImageRecord],

    config: TilingConfig,

) -> List[ImageRecord]:

    """

    Greedy weighted set-cover: iteratively pick the image that covers the

    most *uncovered* grid cells until the entire survey area is covered.

    Reference

    ---------

    Chvatal, V. (1979). "A Greedy Heuristic for the Set-Covering Problem."

    Mathematics of Operations Research, 4(3), 233-235.

    """

    grid, cell_size, x_min, y_min = _build_coverage_grid(

        records, config.grid_resolution

    )

    grid_shape = grid.shape

    universe: Set[Tuple[int, int]] = set()

    image_cells: Dict[int, Set[Tuple[int, int]]] = {}

    for rec in records:

        fp = _image_footprint(rec)

        if fp is None:

            continue

        cells = _rasterise_footprint(fp, cell_size, x_min, y_min, grid_shape)

        image_cells[rec.index] = cells

        universe |= cells

    logger.info("Survey area: %d grid cells to cover.", len(universe))

    uncovered = set(universe)

    selected_indices: List[int] = []

    used: Set[int] = set()

    while uncovered:

        best_idx = -1

        best_gain = 0

        for idx, cells in image_cells.items():

            if idx in used:

                continue

            gain = len(cells & uncovered)

            if gain > best_gain:

                best_gain = gain

                best_idx = idx

        if best_idx == -1 or best_gain == 0:

            logger.warning(

                "Cannot cover %d remaining cells - possible registration gaps.",

                len(uncovered),

            )

            break

        selected_indices.append(best_idx)

        used.add(best_idx)

        uncovered -= image_cells[best_idx]

        logger.debug(

            " selected %s - covers %d new cells, %d remaining",

            records[best_idx].filename, best_gain, len(uncovered),

        )

    logger.info("Selected %d / %d images for full tiling.",

                len(selected_indices), len(records))

    selected = [records[i] for i in selected_indices]

    return selected

# ---------------------------------------------------------------------------

# 4. (Optional) Sort selected images - lower-left origin, clockwise

# ---------------------------------------------------------------------------

def sort_clockwise_from_lower_left(

    selected: List[ImageRecord],

) -> List[ImageRecord]:

    """

    Sort images starting from the lower-left corner and proceeding clockwise

    along the convex-hull perimeter.

    """

    if len(selected) <= 1:

        return selected

    centres = np.array([[r.cx, r.cy] for r in selected])

    centroid = centres.mean(axis=0)

    dx = centres[:, 0] - centroid[0]

    dy = centres[:, 1] - centroid[1]

    angles = np.arctan2(dx, dy)

    order = np.argsort(angles)

    lower_left_idx = int(

        np.argmin(centres[:, 0] - centres[:, 1])

    )

    start = int(np.where(order == lower_left_idx)[0][0])

    order = np.roll(order, -start)

    return [selected[i] for i in order]

# ---------------------------------------------------------------------------

# 5. Main pipeline

# ---------------------------------------------------------------------------

def load_image_records(input_dir: str) -> List[ImageRecord]:

    """Load image file paths from a directory, sorted by name."""

    extensions = ("*.jpg", "*.jpeg", "*.png", "*.tif", "*.tiff", "*.bmp")

    paths: List[str] = []

    for ext in extensions:

        paths.extend(glob.glob(os.path.join(input_dir, ext)))

        paths.extend(glob.glob(os.path.join(input_dir, ext.upper())))

    paths = sorted(set(paths))

    if not paths:

        raise FileNotFoundError(f"No image files found in {input_dir}")

    records = [

        ImageRecord(index=i, filepath=p, filename=os.path.basename(p))

        for i, p in enumerate(paths)

    ]

    logger.info("Loaded %d image paths from %s", len(records), input_dir)

    return records

def select_tiling_from_directory(

    input_dir: str,

    config: Optional[TilingConfig] = None,

    sort_result: bool = True,

) -> List[str]:

    """

    End-to-end pipeline.

    Parameters

    ----------

    input_dir : str

        Directory containing contiguous drone images.

    config : TilingConfig, optional

        Pipeline configuration; uses defaults when None.

    sort_result : bool

        If True, sort selected images clockwise from lower-left.

    Returns

    -------

    list[str]

        File paths of the selected tiling images.

    """

    if config is None:

        config = TilingConfig()

    records = load_image_records(input_dir)

    matcher = FeatureMatcher(config)

    matches = matcher.match_sequence(records)

    if not matches:

        logger.warning("No pairwise matches found. Returning all images.")

        return [r.filepath for r in records]

    records = register_images(records, matches)

    selected = select_tiling_images(records, config)

    if sort_result:

        selected = sort_clockwise_from_lower_left(selected)

    return [rec.filepath for rec in selected]

# ---------------------------------------------------------------------------

# CLI

# ---------------------------------------------------------------------------

def main():

    parser = argparse.ArgumentParser(

        description="Select a minimal set of drone images that tile the surveyed area."

    )

    parser.add_argument(

        "--input_dir", required=True,

        help="Directory containing contiguous aerial drone images.",

    )

    parser.add_argument(

        "--output", default=None,

        help="Optional JSON file to write the selected file list to.",

    )

    parser.add_argument(

        "--grid_resolution", type=int, default=100,

        help="Grid resolution for coverage computation (default: 100).",

    )

    parser.add_argument(

        "--work_scale", type=float, default=0.5,

        help="Down-scale factor for images during processing (default: 0.5).",

    )

    parser.add_argument(

        "--search_radius", type=int, default=5,

        help="Match each image against this many neighbours (default: 5).",

    )

    parser.add_argument(

        "--no_sort", action="store_true",

        help="Skip clockwise spatial sorting of the result.",

    )

    args = parser.parse_args()

    config = TilingConfig(

        grid_resolution=args.grid_resolution,

        work_scale=args.work_scale,

    )

    selected = select_tiling_from_directory(

        args.input_dir, config, sort_result=not args.no_sort

    )

    print(f"\n{'='*60}")

    print(f"Selected {len(selected)} images for tiling:")

    print(f"{'='*60}")

    for i, path in enumerate(selected, 1):

        print(f" {i:>3d}. {os.path.basename(path)}")

    if args.output:

        with open(args.output, "w") as f:

            json.dump({"selected_images": selected}, f, indent=2)

        print(f"\nResults written to {args.output}")

if __name__ == "__main__":

    main()


No comments:

Post a Comment