Friday, May 15, 2026

 Drone Survey Area reconstitution:

Problem statement:

Aerial drone images extracted from a drone video are sufficient to reconstitute the survey area with image selection to create a mosaic that fully covers the survey area. This method does away with the knowledge of flight path of the drone. Write a python implementation that places selections from the input on the tiles in a grid to increase the likelihood of match with the overall survey area.

Solution:

The following is a visual survey approximation, not a georeferenced orthomosaic. Without GPS/EXIF or camera poses from the previous example, the script cannot know the true ground positions, so the grid is an informed montage rather than a mathematically correct map.

Usage:

pip install pyodm

docker run -p 3000:3000 opendronemap/nodeodm --test

Code:

#! /usr/bin/python

import cv2

import numpy as np

from pathlib import Path

import math

def detect_road_like_mask(img):

    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    gray = cv2.GaussianBlur(gray, (5, 5), 0)

    edges = cv2.Canny(gray, 40, 120)

    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (7, 7))

    closed = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel, iterations=2)

    dilated = cv2.dilate(closed, kernel, iterations=1)

    return (dilated > 0).astype(np.uint8) * 255

def skeletonize(mask):

    mask = (mask > 0).astype(np.uint8)

    skel = np.zeros_like(mask)

    element = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))

    temp = mask.copy()

    while True:

        eroded = cv2.erode(temp, element)

        opened = cv2.dilate(eroded, element)

        temp2 = cv2.subtract(temp, opened)

        skel = cv2.bitwise_or(skel, temp2)

        temp = eroded.copy()

        if cv2.countNonZero(temp) == 0:

            break

    return skel

def border_signature(skel):

    h, w = skel.shape

    return (

        skel[0, :], # top

        skel[-1, :], # bottom

        skel[:, 0], # left

        skel[:, -1], # right

    )

def border_similarity(a, b):

    if a.shape != b.shape:

        return 0

    return np.sum((a > 0) & (b > 0))

def compute_pairwise_border_scores(skeletons):

    N = len(skeletons)

    borders = [border_signature(s) for s in skeletons]

    scores = {}

    for i in range(N):

        for j in range(N):

            if i == j:

                continue

            scores[(i, j)] = {

                "up": border_similarity(borders[i][0], borders[j][1]),

                "down": border_similarity(borders[i][1], borders[j][0]),

                "left": border_similarity(borders[i][2], borders[j][3]),

                "right": border_similarity(borders[i][3], borders[j][2]),

            }

    return scores

def filter_redundant_frames(skeletons, overlap_threshold=0.75):

    N = len(skeletons)

    keep = [True] * N

    for i in range(N):

        if not keep[i]:

            continue

        si = skeletons[i]

        if si is None or si.size == 0:

            keep[i] = False

            continue

        si = si > 0

        for j in range(i + 1, N):

            if not keep[j]:

                continue

            sj = skeletons[j]

            if sj is None or sj.size == 0:

                keep[j] = False

                continue

            sj = sj > 0

            inter = np.sum(si & sj)

            union = np.sum(si | sj)

            if union == 0:

                continue

            iou = inter / union

            if iou > overlap_threshold:

                keep[j] = False

    return keep

def solve_directional_grid(N, scores, min_adj_score=20, direction_bias=1.5):

    G = int(math.ceil(math.sqrt(N)))

    grid = [[None for _ in range(G)] for _ in range(G)]

    used = set()

    grid[0][0] = 0

    used.add(0)

    for r in range(G):

        for c in range(G):

            if r == 0 and c == 0:

                continue

            best_tile = None

            best_score = -1

            for t in range(N):

                if t in used:

                    continue

                score = 0

                if r > 0 and grid[r - 1][c] is not None:

                    above = grid[r - 1][c]

                    vertical_score = scores.get((above, t), {}).get("down", 0)

                    score += vertical_score * direction_bias

                if c > 0 and grid[r][c - 1] is not None:

                    left = grid[r][c - 1]

                    horizontal_score = scores.get((left, t), {}).get("right", 0)

                    score += horizontal_score * direction_bias

                if score > best_score:

                    best_score = score

                    best_tile = t

            if best_score < min_adj_score:

                grid[r][c] = None

            else:

                grid[r][c] = best_tile

                used.add(best_tile)

            if len(used) == N:

                return grid

    return grid

def build_grid_mosaic(images, grid):

    H, W = images[0][1].shape[:2]

    G = len(grid)

    canvas = np.zeros((G * H, G * W, 3), dtype=np.uint8)

    for r in range(G):

        for c in range(G):

            idx = grid[r][c]

            if idx is None:

                continue

            name, img = images[idx]

            y0, y1 = r * H, (r + 1) * H

            x0, x1 = c * W, (c + 1) * W

            canvas[y0:y1, x0:x1] = img

    return canvas

def mosaic_street_grid(folder, out_path="grid_mosaic.jpg"):

    folder = Path(folder)

    images = []

    for p in sorted(folder.iterdir()):

        if p.suffix.lower() in [".jpg", ".jpeg", ".png"]:

            img = cv2.imread(str(p))

            images.append((p.name, img))

    if not images:

        raise RuntimeError("No images found")

    # normalize all images to the size of the first one

    base_h, base_w = images[0][1].shape[:2]

    norm_images = []

    for name, img in images:

        h, w = img.shape[:2]

        if (h, w) != (base_h, base_w):

            img = cv2.resize(img, (base_w, base_h), interpolation=cv2.INTER_AREA)

        norm_images.append((name, img))

    images = norm_images

    skeletons = []

    for name, img in images:

        road_mask = detect_road_like_mask(img)

        skel = skeletonize(road_mask)

        skeletons.append(skel)

        cv2.imwrite(str(folder / f"temp-road-{name}"), road_mask)

        cv2.imwrite(str(folder / f"temp-skel-{name}"), skel)

    valid_images = []

    valid_skeletons = []

    for (name, img), skel in zip(images, skeletons):

        if skel is None:

            print(f"[WARN] Skeleton for {name} is None — skipping")

            continue

        if skel.size == 0:

            print(f"[WARN] Skeleton for {name} is empty — skipping")

            continue

        if len(skel.shape) != 2:

            print(f"[WARN] Skeleton for {name} has invalid shape {skel.shape} — skipping")

            continue

        valid_images.append((name, img))

        valid_skeletons.append(skel)

    images = valid_images

    skeletons = valid_skeletons

    if len(skeletons) == 0:

        raise RuntimeError("All skeletons were invalid — nothing to process.")

    keep_mask = filter_redundant_frames(skeletons)

    images = [img for img, k in zip(images, keep_mask) if k]

    skeletons = [sk for sk, k in zip(skeletons, keep_mask) if k]

    scores = compute_pairwise_border_scores(skeletons)

    grid = solve_directional_grid(len(images), scores)

    mosaic = build_grid_mosaic(images, grid)

    cv2.imwrite(out_path, mosaic)

    return mosaic

if __name__ == "__main__":

    mosaic_street_grid(".", "street_grid_mosaic.jpg")


No comments:

Post a Comment