Source code for pyrobopath.toolpath_scheduling.toolpath_scheduler

from __future__ import annotations
from typing import Dict
from dataclasses import dataclass

from pyrobopath.toolpath import Toolpath, Contour
from pyrobopath.process import AgentModel, DependencyGraph

from .schedule import ContourEvent, MoveEvent, MultiAgentToolpathSchedule
from .toolpath_collision import events_cause_collision


[docs] @dataclass class PlanningOptions: retract_height: float = 50.0 collision_offset: float = 5.0 collision_gap_threshold: float = 1.0
[docs] class SchedulingContext: def __init__(self, agent_models: Dict[str, AgentModel], options: PlanningOptions): self.agent_models = agent_models self.options = options self.reset() def reset(self): self.start_times = dict.fromkeys(self.agent_models.keys(), 0.0) self.positions = dict() for agent in self.agent_models: self.positions[agent] = self.agent_models[agent].home_position def get_agents_with_start_time(self, time): min_time_agents = [a for (a, t) in self.start_times.items() if t == time] return min_time_agents def get_unique_start_times(self): return sorted(set(self.start_times.values())) def set_agent_start_time(self, agent, time): self.start_times[agent] = time def get_current_position(self, agent): return self.positions[agent]
[docs] class TaskManager: def __init__(self, toolpath: Toolpath, dg: DependencyGraph): self.contours = toolpath.contours self.dg = dg # task sets self.frontier = set() self.completed_tasks = set() self.in_progress: Dict[str, float] = dict() def add_inprogress(self, id, t_end): self.in_progress[id] = t_end def mark_inprogress_complete(self, time): complete = [k for (k, v) in self.in_progress.items() if time >= v] self.completed_tasks.update(complete) for c in complete: self.dg.mark_complete(c) self.in_progress.pop(c) def has_frontier(self): return bool(self.frontier) def get_available_tasks(self, *args): available = [n for n in self.frontier if self.dg.can_start(n)] return available
[docs] def build_event_chain( t_start, p_start, contour: Contour, agent, context: SchedulingContext ): # travel + approach event p_approach = contour.path[0].copy() p_approach[2] += context.options.retract_height path_travel = [p_start, p_approach, contour.path[0]] if (p_start == p_approach).all(): path_travel.pop(0) e_travel = MoveEvent( t_start, path_travel, context.agent_models[agent].travel_velocity ) # contour event e_contour = ContourEvent( e_travel.end, contour, context.agent_models[agent].velocity ) # depart + home events p_depart = contour.path[-1].copy() p_depart[2] += context.options.retract_height e_depart = MoveEvent( e_contour.end, [contour.path[-1], p_depart], context.agent_models[agent].travel_velocity, ) e_home = MoveEvent( e_depart.end, [p_depart, context.agent_models[agent].home_position], context.agent_models[agent].travel_velocity, ) return [e_travel, e_contour, e_depart, e_home]
[docs] class MultiAgentToolpathPlanner: def __init__(self, agent_models: Dict[str, AgentModel]): self._agent_models = agent_models def plan( self, toolpath: Toolpath, dg: DependencyGraph, options: PlanningOptions ) -> MultiAgentToolpathSchedule: self._validate_toolpath(toolpath) schedule = MultiAgentToolpathSchedule() schedule.add_agents(self._agent_models.keys()) context = SchedulingContext(self._agent_models, options) tm = TaskManager(toolpath, dg) tm.frontier.update(dg.roots()) while tm.has_frontier(): sorted_times = context.get_unique_start_times() time = sorted_times[0] min_time_agents = context.get_agents_with_start_time(time) tm.mark_inprogress_complete(time) for agent in min_time_agents: tools = self._agent_models[agent].capabilities available = tm.get_available_tasks() available = [c for c in available if tm.contours[c].tool in tools] if not available: if len(sorted_times) > 1: context.set_agent_start_time(agent, sorted_times[1]) continue # prioritize tasks with highest out-degree key = lambda n: dg._graph.out_degree(n) nodes = sorted(available, key=key, reverse=True) # type: ignore all_collide_flag = True for node in nodes: contour = tm.contours[node] p_start = schedule[agent].get_state( time, self._agent_models[agent].home_position ) events = build_event_chain(time, p_start, contour, agent, context) if events_cause_collision( events, agent, schedule, self._agent_models, options.collision_gap_threshold, ): continue # slice home event if overlap if schedule[agent].end_time() > events[0].start: prev_home_event = schedule[agent]._events.pop() if prev_home_event.start != events[0].start: sliced_home = self._slice_home_event( prev_home_event, events[0].start ) schedule.add_event(sliced_home, agent) schedule.add_events(events, agent) tm.add_inprogress(node, events[1].end) tm.frontier.remove(node) tm.frontier.update(dg._graph.successors(node)) context.positions[agent] = events[2].data[-1] context.set_agent_start_time(agent, events[2].end) all_collide_flag = False break if all_collide_flag: context.set_agent_start_time(agent, time + options.collision_offset) return schedule def _validate_toolpath(self, toolpath): required_tools = set(toolpath.tools()) provided_tools = set( [cap for a in self._agent_models.values() for cap in a.capabilities] ) if not required_tools.issubset(provided_tools): raise ValueError("Agents cannot provide all required capabilities") def _slice_home_event(self, home_event: MoveEvent, end_time: float): new_traj = home_event.traj.slice(home_event.start, end_time) path = [p.data for p in new_traj.points] return MoveEvent(home_event.start, path, home_event.velocity)