The previous articles described position and velocity adjustments for drone formation to transform from one phase to another. While those methods allow individual units to transition to the new formation, it does not detect conflicts. So the conflict detection strategy must be layered on the position, velocity and heading determination. This is especially true for fixed wing drones that do not have hover and yawing capabilities
An example for this is now illustrated below:
import heapq
from collections import defaultdict, namedtuple
Constraint = namedtuple('Constraint', ['agent', 'position', 'time'])
class AStarPlanner:
def __init__(self, grid):
self.grid = grid
self.rows = len(grid)
self.cols = len(grid[0])
def neighbors(self, pos):
r, c = pos
for dr, dc in [(0,1), (1,0), (0,-1), (-1,0), (0,0)]: # 4 directions + wait
nr, nc = r + dr, c + dc
if 0 <= nr < self.rows and 0 <= nc < self.cols and self.grid[nr][nc] != 1:
yield (nr, nc)
def heuristic(self, a, b):
return abs(a[0] - b[0]) + abs(a[1] - b[1])
def plan(self, start, goal, constraints):
open_set = [(0 + self.heuristic(start, goal), 0, start, [start])]
seen = set()
constraint_dict = defaultdict(set)
for c in constraints:
constraint_dict[(c.position, c.time)].add(c.agent)
while open_set:
f, g, current, path = heapq.heappop(open_set)
if (current, g) in seen:
continue
seen.add((current, g))
if current == goal and g >= max(t for _, t in constraint_dict.keys() if _[0] == goal or _[1] == goal or not constraint_dict else 0):
return path
for neighbor in self.neighbors(current):
if any(c.agent == 'any' or c.agent == agent for c in constraint_dict.get((neighbor, g+1), set())):
continue
heapq.heappush(open_set, (g + 1 + self.heuristic(neighbor, goal), g + 1, neighbor, path + [neighbor]))
return None # No path found
class CBS:
def __init__(self, grid, starts, goals):
self.grid = grid
self.starts = starts
self.goals = goals
self.num_agents = len(starts)
def detect_conflict(self, paths):
max_t = max(len(p) for p in paths)
for t in range(max_t):
positions = {}
for i, path in enumerate(paths):
pos = path[t] if t < len(path) else path[-1]
if pos in positions:
return {'time': t, 'a1': positions[pos], 'a2': i, 'pos': pos}
positions[pos] = i
return None
def search(self):
root = {'paths': [], 'constraints': []}
astar = AStarPlanner(self.grid)
for i in range(self.num_agents):
path = astar.plan(self.starts[i], self.goals[i], [])
if not path:
return None
root['paths'].append(path)
open_set = [root]
while open_set:
node = open_set.pop(0)
conflict = self.detect_conflict(node['paths'])
if not conflict:
return node['paths']
for agent in [conflict['a1'], conflict['a2']]:
new_constraints = node['constraints'] + [Constraint(agent, conflict['pos'], conflict['time'])]
new_paths = list(node['paths'])
path = astar.plan(self.starts[agent], self.goals[agent], new_constraints)
if not path:
continue
new_paths[agent] = path
open_set.append({'constraints': new_constraints, 'paths': new_paths})
return None
#codingexercise: CodingExercise-07-05-2025.docx