Source code for pyrobopath.toolpath_scheduling.batched_planners

from typing import Dict
from concurrent.futures import ProcessPoolExecutor

from pyrobopath.toolpath import Toolpath
from pyrobopath.process import AgentModel, DependencyGraph, batch_digraph

from .schedule import MultiAgentToolpathSchedule
from .toolpath_scheduler import PlanningOptions, MultiAgentToolpathPlanner


[docs] class BatchedSequentialPlanner: def __init__(self, agent_models: Dict[str, AgentModel], batch_size: int): self._base_planner = MultiAgentToolpathPlanner(agent_models) self._batch_size = batch_size def plan( self, toolpath: Toolpath, dg: DependencyGraph, options: PlanningOptions ) -> MultiAgentToolpathSchedule: dgs = batch_digraph(dg, self._batch_size) results = [] for subgraph in dgs: results.append(self._base_planner.plan(toolpath, subgraph, options)) return MultiAgentToolpathSchedule.merge(results)
[docs] class BatchedParallelPlanner: def __init__(self, agent_models: Dict[str, AgentModel], batch_size: int): self._base_planner = MultiAgentToolpathPlanner(agent_models) self._batch_size = batch_size def _plan_batch(self, args): return self._base_planner.plan(*args) def plan( self, toolpath: Toolpath, dg: DependencyGraph, options: PlanningOptions ) -> MultiAgentToolpathSchedule: dgs = batch_digraph(dg, self._batch_size) args = [(toolpath, subgraph, options) for subgraph in dgs] results = [] with ProcessPoolExecutor() as executor: results = list(executor.map(self._plan_batch, args)) return MultiAgentToolpathSchedule.merge(results)