from typing import List, Dict, Hashable
import numpy as np
from pyrobopath.process import AgentModel
from pyrobopath.collision_detection import (
Trajectory,
TrajectoryPoint,
trajectory_collision_query,
)
from pyrobopath.scheduling import Interval
from .schedule import ContourEvent, ToolpathSchedule, MultiAgentToolpathSchedule
[docs]
def schedule_to_trajectory(
schedule: ToolpathSchedule, t_start: float, t_end: float, default_state
) -> Trajectory:
"""Slice a toolpath schedule to a continuous trajectory.
The trajectory is guaranteed to have points at times t_start and t_end.
The trajectories in any ContourEvent that fall in the time window are
concatenated in the resulting trajectory
:param schedule: The schedule to be sliced
:type schedule: ToolpathSchedule
:param t_start: Start time of slice
:type t_start: float
:param t_end: End time of slice
:type t_end: float
:param default_state: The default trajectory state for times with no known
state in the schedule
:type default_state: np.ndarray
:return: The trajectory inferred from the schedule
:rtype: Trajectory
"""
traj = Trajectory()
traj.add_traj_point(TrajectoryPoint(np.empty(3), float("nan")))
for e in schedule._events:
if e.end < t_start:
continue
if e.start >= t_end:
break
event_traj = e.traj.slice(max(e.start, t_start), min(e.end, t_end))
# remove duplicates between contiguous events
if event_traj[0].time == traj.points[-1].time:
event_traj.points.pop(0)
traj += event_traj
traj.points.pop(0)
# add endpoints if traj_start != t_start or traj_end != t_end
start_state, end_state = None, None
if traj.n_points():
if traj.start_time() > t_start:
start_state = schedule.get_state(t_start, default_state)
if traj.end_time() < t_end:
end_state = schedule.get_state(t_end, default_state)
# add last known states as start and end of traj
else:
start_state = schedule.get_state(t_start, default_state)
end_state = schedule.get_state(t_end, default_state)
if start_state is not None:
traj.insert_traj_point(0, TrajectoryPoint(start_state, t_start))
if end_state is not None:
traj.add_traj_point((TrajectoryPoint(end_state, t_end)))
return traj
[docs]
def schedule_to_trajectories(
schedule: ToolpathSchedule, t_start: float, t_end: float
) -> List[Trajectory]:
"""Slice a toolpath schedule to a list of continuous trajectories.
The trajectories of MoveEvents are sliced to fit in the window
[t_start, t_end].
:param schedule: The schedule to be sliced
:type schedule: ToolpathSchedule
:param t_start: start time of slice
:type t_start: float
:param t_end: end time of slice
:type t_end: float
:return: The list trajectories inferred from the schedule
:rtype: List[Trajectory]
"""
trajs = []
interval = Interval(t_start, t_end)
for e in schedule._events:
# event is not in interval
if e.precedes(interval):
continue
if e.preceded_by(interval):
break
# use Allen's interval algebra to determine which parts of the
# event's trajectory should be added to the list
traj = Trajectory()
if e.meets(interval):
traj.add_traj_point(e.traj.points[-1])
elif e.overlaps(interval):
traj = e.traj.slice(interval.start, e.end)
elif (
e.starts(interval)
or e.during(interval)
or e.finishes(interval)
or e.equals(interval)
):
traj = e.traj
elif e.finished_by(interval) or e.contains(interval) or e.started_by(interval):
traj = e.traj.slice(interval.start, interval.end)
elif e.overlapped_by(interval):
traj = e.traj.slice(e.start, interval.end)
elif e.met_by(interval):
traj.add_traj_point(e.traj.points[0])
trajs.append(traj)
return trajs
[docs]
def event_causes_collision(
event: ContourEvent,
agent: Hashable,
schedule: MultiAgentToolpathSchedule,
agent_models: Dict[Hashable, AgentModel],
threshold: float,
):
"""Determines if adding 'event' to 'schedule' will cause a collision in the
resulting trajectory.
The events in schedule from event.start_time() to schedule.end_time() are
checked for collision. This is necessary because adding an event at a
previous time can cause collisions in the future.
:param event: The event to be added (with Event.data = Contour)
:type event: ContourEvent
:param agent: The agent for the event
:type agent: Hashable
:param schedule: A schedule that is assumed collision-free
:type schedule: MultiAgentToolpathSchedule
:param agent_models: Context info about the system
:type agent_models: Dict[str, AgentModel]
:param threshold: The maximum collision checking step distance (in units
equivalent to points in `event`)
:type threshold: float
:return: True if event causes collision, False else
:rtype: bool
"""
et = max(schedule.end_time(), event.end)
new_sched = ToolpathSchedule()
new_sched.add_event(event)
event_traj = schedule_to_trajectory(
new_sched, event.start, et, agent_models[agent].home_position
)
for a, s in schedule.schedules.items():
if a == agent:
continue
traj = schedule_to_trajectory(s, event.start, et, agent_models[a].home_position)
collide = trajectory_collision_query(
agent_models[agent].collision_model,
event_traj,
agent_models[a].collision_model,
traj,
threshold,
)
if collide:
return True
return False
[docs]
def concurrent_trajectory_pairs(tr1: List[Trajectory], tr2: List[Trajectory]):
"""Finds the intersections of intervals in two lists of trajectories.
Each trajectory is sliced according to the interval intersections.
Runs in O(n + m)
:param tr1: The first list of trajectories
:type tr1: List[Trajectory]
:param tr2: The second list of trajectories
:type tr2: List[Trajectory]
:return: A list of concurrent trajectory pairs
:rtype: List[Tuple(Trajectory, Trajectory)]
"""
i = j = 0
traj_pairs = []
while i < len(tr1) and j < len(tr2):
a_start, a_end = tr1[i].start_time(), tr1[i].end_time()
b_start, b_end = tr2[j].start_time(), tr2[j].end_time()
if a_start <= b_end and b_start <= a_end:
st = max(a_start, b_start)
et = min(a_end, b_end)
traj_pairs.append((tr1[i].slice(st, et), tr2[j].slice(st, et)))
if a_end < b_end:
i += 1
else:
j += 1
return traj_pairs
[docs]
def events_cause_collision(
events: List[ContourEvent],
agent: Hashable,
schedule: MultiAgentToolpathSchedule,
agent_models: Dict[Hashable, AgentModel],
threshold: float,
):
"""
Determines if adding the 'events' to 'schedule' will cause a collision in the
resulting trajectory.
For timesteps with no event, agents are assumed to be in collision-free positions
Args:
events (List[ContourEvent]): The events to be added
agent (Hashable): The agent for the event
schedule (MultiAgentSchedule): A schedule that is assumed collision-free
agent_models (Dict[str, AgentModel]): Context info about the system
threshold: The maximum collision checking step distance
"""
st = min([e.start for e in events])
et = max([e.end for e in events])
new_sched = ToolpathSchedule()
for event in events:
new_sched.add_event(event)
event_trajs = schedule_to_trajectories(new_sched, st, et)
for a, s in schedule.schedules.items():
if a == agent:
continue
trajs = schedule_to_trajectories(s, st, et)
concurrent_trajs = concurrent_trajectory_pairs(event_trajs, trajs)
for pair in concurrent_trajs:
collide = trajectory_collision_query(
agent_models[agent].collision_model,
pair[0],
agent_models[a].collision_model,
pair[1],
threshold,
)
if collide:
return True
return False