Source code for pyrobopath.toolpath_scheduling.depth_based_planners
from typing import Dict
from concurrent.futures import ProcessPoolExecutor
from pyrobopath.toolpath import Toolpath
from pyrobopath.process import AgentModel, DependencyGraph, stratify_digraph
from .schedule import MultiAgentToolpathSchedule
from .toolpath_scheduler import PlanningOptions, MultiAgentToolpathPlanner
[docs]
class DepthBasedSequentialPlanner:
def __init__(self, agent_models: Dict[str, AgentModel]):
self._base_planner = MultiAgentToolpathPlanner(agent_models)
def plan(
self, toolpath: Toolpath, dg: DependencyGraph, options: PlanningOptions
) -> MultiAgentToolpathSchedule:
dgs = stratify_digraph(dg)
results = []
for subgraph in dgs:
results.append(self._base_planner.plan(toolpath, subgraph, options))
return MultiAgentToolpathSchedule.merge(results)
[docs]
class DepthBasedParallelPlanner:
def __init__(self, agent_models: Dict[str, AgentModel]):
self._base_planner = MultiAgentToolpathPlanner(agent_models)
def _plan_layer(self, args):
return self._base_planner.plan(*args)
def plan(
self, toolpath: Toolpath, dg: DependencyGraph, options: PlanningOptions
) -> MultiAgentToolpathSchedule:
dgs = stratify_digraph(dg)
args = [(toolpath, subgraph, options) for subgraph in dgs]
results = []
with ProcessPoolExecutor() as executor:
results = list(executor.map(self._plan_layer, args))
return MultiAgentToolpathSchedule.merge(results)