Source code for pyrobopath.collision_detection.trajectory_collision_detection

from __future__ import annotations
from typing import List
import numpy as np

from pyrobopath.tools.types import NDArray
from .collision_model import (
    CollisionGroup,
    CollisionModel,
)
from .trajectory import Trajectory


[docs] def continuous_collide( model1: CollisionModel, trans1_final: NDArray, model2: CollisionModel, trans2_final: NDArray, threshold, ) -> bool: start1 = model1.translation.copy() start2 = model2.translation.copy() dir1 = trans1_final - start1 dir2 = trans2_final - start2 len1 = np.linalg.norm(dir1) len2 = np.linalg.norm(dir2) n = int(np.ceil(max(len1, len2) / threshold)) collision_result = False for s in np.linspace(0.0, 1.0, n): model1.translation = start1 + dir1 * s model2.translation = start2 + dir2 * s if model1.in_collision(model2): collision_result = True break # reset model position model1.translation = start1 model2.translation = start2 return collision_result
class _ConcurrentSegmentIterator: """An iterator to loop over concurrent sections of trajectory segments.""" def __init__(self, trajs: List[Trajectory]): unique_times = set() for t in trajs: times = [p.time for p in t] unique_times = unique_times.union(times) self.unique_times = sorted(list(unique_times)) self.trajs = trajs self.idx = 0 def __iter__(self): self.idx = 0 return self def __next__(self): if self.idx + 1 == len(self.unique_times): raise StopIteration t0 = self.unique_times[self.idx] t1 = self.unique_times[self.idx + 1] slices = [t.slice(t0, t1) for t in self.trajs] self.idx += 1 return slices
[docs] def trajectory_collision_query( model1: CollisionModel, traj1: Trajectory, model2: CollisionModel, traj2: Trajectory, threshold: float, ) -> bool: """Determine if two models collide along trajectories""" for traj_pair in _ConcurrentSegmentIterator([traj1, traj2]): model1.translation = traj_pair[0][0].data # first point model2.translation = traj_pair[1][0].data p1_final = traj_pair[0][-1].data # last point p2_final = traj_pair[1][-1].data if continuous_collide(model1, p1_final, model2, p2_final, threshold): return True return False
class _TrajectoryStateInterpolator(object): """Interpolate a trajectory by stepping the state from start_time to the latest time in the trajectory.""" def __init__(self, traj: Trajectory, delta_t, start_time=0.0): self.traj = traj self.delta_t = delta_t self.start_time = start_time self.reset() def reset(self): self.current_start_point = self.traj[0] self.current_end_point = self.traj[1] self.time = self.start_time self.segment_idx = 1 self.complete = False def step_state(self): # return last state after reaching final time if self.complete: return self.traj[-1] self.time += self.delta_t # return start state before reaching initial time if self.current_start_point.time > self.time: return self.current_start_point # update current segment if self.current_end_point.time <= self.time: self.segment_idx += 1 if self.segment_idx + 1 > len(self.traj.points): self.complete = True return self.traj[-1] self.current_start_point = self.current_end_point self.current_end_point = self.traj[self.segment_idx] elapsed = self.time - self.current_start_point.time s = elapsed / (self.current_end_point.time - self.current_start_point.time) state = self.current_start_point.interp(self.current_end_point, s) return state def interp_state(self, t): """Find the trajectory state at time t""" # use interp functions in trajectory points pass
[docs] def check_trajectory_collision( group: CollisionGroup, trajectories: List[Trajectory], threshold: float ) -> bool: start_time, end_time = 0.0, 0.0 for traj in trajectories: start_time = min(start_time, traj.start_time()) end_time = max(end_time, traj.end_time()) # find trajectory with the fastest velocity # this trajectory defines the time step to ensure distance 'threshold' max_vel_idx = np.argmin([t.distance() / t.elapsed() for t in trajectories]) n_steps = trajectories[max_vel_idx].distance() / threshold delta_t = trajectories[max_vel_idx].elapsed() / n_steps traj_interps = [ _TrajectoryStateInterpolator(t, delta_t, start_time) for t in trajectories ] completed = False while not completed: completed = True for model_id, traj_interp in enumerate(traj_interps): completed = traj_interp.complete and completed # step trajectory state = traj_interp.step_state() group.models[model_id].translation = state.data if group.in_collision(): return True return False