Saturday, July 5, 2025

 The previous articles described position and velocity adjustments for drone formation to transform from one phase to another. While those methods allow individual units to transition to the new formation it does not detect conflicts. So the conflict detection strategy must be layered on the position, velocity and heading determination. This is especially true for fixed wing drones that do not have hover and yawing capabilities  

An example for this is now illustrated below: 

import heapq 

from collections import defaultdict, namedtuple 

 

Constraint = namedtuple('Constraint', ['agent', 'position', 'time']) 

 

class AStarPlanner: 

    def __init__(self, grid): 

        self.grid = grid 

        self.rows = len(grid) 

        self.cols = len(grid[0]) 

     

    def neighbors(self, pos): 

        r, c = pos 

        for dr, dc in [(0,1), (1,0), (0,-1), (-1,0), (0,0)]:  # 4 directions + wait 

            nr, nc = r + dr, c + dc 

            if 0 <= nr < self.rows and 0 <= nc < self.cols and self.grid[nr][nc] != 1: 

                yield (nr, nc) 

 

    def heuristic(self, a, b): 

        return abs(a[0] - b[0]) + abs(a[1] - b[1]) 

 

    def plan(self, start, goal, constraints): 

        open_set = [(0 + self.heuristic(start, goal), 0, start, [start])] 

        seen = set() 

        constraint_dict = defaultdict(set) 

        for c in constraints: 

            constraint_dict[(c.position, c.time)].add(c.agent) 

 

        while open_set: 

            f, g, current, path = heapq.heappop(open_set) 

            if (current, g) in seen: 

                continue 

            seen.add((current, g)) 

 

            if current == goal and g >= max(t for _, t in constraint_dict.keys() if _[0] == goal or _[1] == goal or not constraint_dict else 0): 

                return path 

             

            for neighbor in self.neighbors(current): 

                if any(c.agent == 'any' or c.agent == agent for c in constraint_dict.get((neighbor, g+1), set())): 

                    continue 

                heapq.heappush(open_set, (g + 1 + self.heuristic(neighbor, goal), g + 1, neighbor, path + [neighbor])) 

        return None  # No path found 

 

class CBS: 

    def __init__(self, grid, starts, goals): 

        self.grid = grid 

        self.starts = starts 

        self.goals = goals 

        self.num_agents = len(starts) 

 

    def detect_conflict(self, paths): 

        max_t = max(len(p) for p in paths) 

        for t in range(max_t): 

            positions = {} 

            for i, path in enumerate(paths): 

                pos = path[t] if t < len(path) else path[-1] 

                if pos in positions: 

                    return {'time': t, 'a1': positions[pos], 'a2': i, 'pos': pos} 

                positions[pos] = i 

        return None 

 

    def search(self): 

        root = {'paths': [], 'constraints': []} 

        astar = AStarPlanner(self.grid) 

         

        for i in range(self.num_agents): 

            path = astar.plan(self.starts[i], self.goals[i], []) 

            if not path: 

                return None 

            root['paths'].append(path) 

         

        open_set = [root] 

        while open_set: 

            node = open_set.pop(0) 

            conflict = self.detect_conflict(node['paths']) 

            if not conflict: 

                return node['paths'] 

 

            for agent in [conflict['a1'], conflict['a2']]: 

                new_constraints = node['constraints'] + [Constraint(agent, conflict['pos'], conflict['time'])] 

                new_paths = list(node['paths']) 

                path = astar.plan(self.starts[agent], self.goals[agent], new_constraints) 

                if not path: 

                    continue 

                new_paths[agent] = path 

                open_set.append({'constraints': new_constraints, 'paths': new_paths}) 

        return None