from __future__ import annotations
from typing import List, Iterable, Hashable
import collections
[docs]
class Interval:
"""
An implementation of Allen's interval algebra
See: https://cse.unl.edu/~choueiry/Documents/Allen-CACM1983.pdf
"""
def __init__(self, start, end):
self.start = start
self.end = end
def offset(self, t):
self.start += t
self.end += t
[docs]
def precedes(self, other):
"""XXX YYY"""
return self.end < other.start
[docs]
def meets(self, other):
"""
XXXYYY
"""
return self.end == other.start
[docs]
def overlaps(self, other):
"""
| XXX
| YYY
"""
return (
self.start < other.start and self.end > other.start and self.end < other.end
)
[docs]
def starts(self, other):
"""
| XXX
| YYYYY
"""
return self.start == other.start and self.end < other.end
[docs]
def during(self, other):
"""
| XXX
| YYYYY
"""
return self.start > other.start and self.end < other.end
[docs]
def finishes(self, other):
"""
| XXX
| YYYYY
"""
return self.start > other.start and self.end == other.end
[docs]
def equals(self, other):
"""
| XXX
| YYY
"""
return self.start == other.start and self.end == other.end
[docs]
def finished_by(self, other):
"""
| XXXXX
| YYY
"""
return self.start < other.start and self.end == other.end
[docs]
def contains(self, other):
"""
| XXXXX
| YYY
"""
return self.start < other.start and self.end > other.end
[docs]
def started_by(self, other):
"""
| XXXXX
| YYY
"""
return self.start == other.start and self.end > other.end
[docs]
def overlapped_by(self, other):
"""
| XXX
| YYY
"""
return (
self.start > other.start and self.start < other.end and self.end > other.end
)
[docs]
def met_by(self, other):
"""
YYYXXX
"""
return self.start == other.end
[docs]
def preceded_by(self, other):
"""
YYY XXX
"""
return self.start > other.end
[docs]
class Event(Interval):
def __init__(self, start, end, data=None):
super(Event, self).__init__(start, end)
self.data = data
@property
def duration(self):
return self.end - self.start
[docs]
class Schedule:
def __init__(self):
self._events: List[Event] = []
self._start_time = float("inf")
self._end_time = float("-inf")
def add_event(self, event: Event):
self._start_time = min(self._start_time, event.start)
self._end_time = max(self._end_time, event.end)
self._events.append(event)
def add_events(self, events: List[Event]):
self._events.extend(events)
self._start_time = min(self._start_time, min(e.start for e in events))
self._end_time = max(self._end_time, max(e.end for e in events))
def start_time(self):
return self._start_time
def end_time(self):
return self._end_time
def duration(self):
return self.end_time() - self.start_time()
def offset(self, time):
for e in self._events:
e.offset(time)
self._start_time += time
self._end_time += time
def n_events(self):
return len(self._events)
[docs]
def slice(self, t_start, t_end) -> Schedule:
"""
Returns a new schedule with events that end at/after t_start and
start at/before t_end
{e | e.start <= t_end ∧ e.end >= t_start for all e in schedule}
Note: Events are not sliced. If an event has any time in [t_start, t_end],
the original event is included in its entirety.
"""
new_sched = Schedule()
new_sched._events = [self._events[i] for i in self.slice_ind(t_start, t_end)]
if not new_sched._events:
new_sched._start_time = t_start
new_sched._end_time = t_end
return new_sched
new_sched._start_time = min([e.start for e in new_sched._events])
new_sched._end_time = max([e.end for e in new_sched._events])
return new_sched
[docs]
def slice_ind(self, t_start, t_end) -> List[int]:
"""
Returns the indices of events that end at/after t_start and
start at/before t_end
{i | e[i].start <= t_end ∧ e[i].end >= t_start for all e in schedule}
Note: Events are not sliced. If an event has any time in [t_start, t_end],
the original event is included in its entirety.
"""
filter = lambda e: e.end >= t_start and e.start <= t_end
ind = [i for i, e in enumerate(self._events) if filter(e)]
return ind
@classmethod
def merge(cls, schedules):
merged = cls()
current_time = schedules[0].start_time
for s in schedules:
s.offset(current_time)
merged._events.extend(s._events)
current_time = s.end_time()
merged._start_time = min([e.start for e in merged._events])
merged._end_time = max([e.end for e in merged._events])
return merged
[docs]
class MultiAgentSchedule:
def __init__(self):
self.schedules = collections.defaultdict(Schedule)
self._start_time = 0.0
self._end_time = 0.0
def __getitem__(self, agent: Hashable):
return self.schedules[agent]
def add_agent(self, agent: Hashable):
self.schedules[agent] = Schedule()
def add_agents(self, agents: Iterable[Hashable]):
for agent in agents:
self.add_agent(agent)
def agents(self):
return self.schedules.keys()
def add_event(self, event: Event, agent):
end_time = event.start + event.duration
if event.start < self._start_time:
self._start_time = event.start
if end_time > self._end_time:
self._end_time = end_time
self.schedules[agent].add_event(event)
def add_events(self, events: List[Event], agent):
self.schedules[agent].add_events(events)
self._start_time = min(self._start_time, self.schedules[agent].start_time())
self._end_time = max(self._end_time, self.schedules[agent].end_time())
def add_schedule(self, schedule: Schedule, agent):
if schedule.start_time() < self._start_time:
self._start_time = schedule.start_time()
if schedule.end_time() > self._end_time:
self._end_time = schedule.end_time()
self.schedules[agent] = schedule
def start_time(self):
return self._start_time
def end_time(self):
return self._end_time
[docs]
def duration(self):
"""The duration of the combined schedule"""
return self.end_time() - self.start_time()
def offset(self, time):
for s in self.schedules.values():
s.offset(time)
self._start_time += time
self._end_time += time
[docs]
def n_agents(self):
"""The number of agents with schedules"""
return len(self.schedules)
[docs]
def n_events(self):
"""The number of events from all schedules"""
return sum([s.n_events() for s in self.schedules.values()])
[docs]
def first_started(self):
"""Returns the agent belonging to the schedule that finishes first"""
return min(self.schedules, key=lambda s: self.schedules[s].start_time())
[docs]
def last_started(self):
"""Returns the agent belonging to the schedule that finishes last"""
return max(self.schedules, key=lambda s: self.schedules[s].start_time())
[docs]
def first_finished(self):
"""Returns the agent belonging to the schedule that finishes first"""
return min(self.schedules, key=lambda s: self.schedules[s].end_time())
[docs]
def last_finished(self):
"""Returns the agent belonging to the schedule that finishes last"""
return max(self.schedules, key=lambda s: self.schedules[s].end_time())
[docs]
def slice(self, t_start, t_end) -> MultiAgentSchedule:
"""Returns a new multi-agent schedule with the schedules for each agent
filtered with events that end after t_start and start before t_end"""
new_mas = MultiAgentSchedule()
for agent, schedule in self.schedules.items():
new_mas.add_schedule(schedule.slice(t_start, t_end), agent)
if new_mas.n_events() == 0:
new_mas._start_time = t_start
new_mas._end_time = t_end
return new_mas
new_mas._start_time = min([s.start_time() for s in new_mas.schedules.values()])
new_mas._end_time = max([s.end_time() for s in new_mas.schedules.values()])
return new_mas
@classmethod
def merge(cls, schedules):
merged = cls()
current_time = schedules[0].start_time()
for mas in schedules:
mas.offset(current_time)
for agent in mas.agents():
if agent not in merged.schedules:
merged.add_agent(agent)
if mas[agent]._events:
merged[agent].add_events(mas[agent]._events)
current_time = mas.end_time()
merged._start_time = min(s._start_time for s in merged.schedules.values())
merged._end_time = max(s._end_time for s in merged.schedules.values())
return merged