Source code for sc3nb.timed_queue

"""Classes to run register functions at certain timepoints and run asynchronously"""

import threading
import time
from typing import Any, Callable, Iterable, NoReturn, Union

import numpy as np

import sc3nb
from sc3nb.osc.osc_communication import Bundler, OSCCommunication, OSCMessage


[docs] class Event: """Stores a timestamp, function and arguments for that function. Long running functions can be wrapped inside an own thread Parameters ---------- timestamp : float Time event should be executed function : Callable[..., None] Function to be executed args : Iterable[Any] Arguments for function spawn : bool, optional if True, create new thread for function, by default False """ def __init__( self, timestamp: float, function: Callable[..., None], args: Iterable[Any], spawn: bool = False, ) -> None: if spawn: thread = threading.Thread(target=function, args=args) function = thread.start args = () self.timestamp = timestamp self.function = function self.args = args
[docs] def execute(self) -> None: """Executes function""" self.function(*self.args)
[docs] def __eq__(self, other): return self.timestamp == other.timestamp
[docs] def __lt__(self, other): return self.timestamp < other.timestamp
[docs] def __le__(self, other): return self.timestamp <= other.timestamp
[docs] def __repr__(self): return "%s: %s" % (self.timestamp, self.function.__name__)
[docs] class TimedQueue: """Accumulates events as timestamps and functions. Executes given functions according to the timestamps Parameters ---------- relative_time : bool, optional If True, use relative time, by default False thread_sleep_time : float, optional Sleep time in seconds for worker thread, by default 0.001 drop_time_threshold : float, optional Threshold for execution time of events in seconds. If this is exceeded the event will be dropped, by default 0.5 """ def __init__( self, relative_time: bool = False, thread_sleep_time: float = 0.001, drop_time_threshold: float = 0.5, ) -> None: self.drop_time_thr = drop_time_threshold self.start = time.time() if relative_time else 0 self.onset_idx = np.empty((0, 2)) self.event_list = [] self.close_event = threading.Event() self.lock = threading.Lock() self.thread = threading.Thread( target=self.__worker, args=(thread_sleep_time, self.close_event) ) # , daemon=True) self.thread.start()
[docs] def close(self) -> None: """Closes event processing without waiting for pending events""" self.close_event.set() self.thread.join()
[docs] def join(self) -> None: """Closes event processing after waiting for pending events""" self.complete() self.close_event.set() self.thread.join()
[docs] def complete(self) -> None: """Blocks until all pending events have completed""" while self.event_list: time.sleep(0.01)
[docs] def put( self, timestamp: float, function: Callable[..., None], args: Iterable[Any] = (), spawn: bool = False, ) -> None: """Adds event to queue Parameters ---------- timestamp : float Time (POSIX) when event should be executed function : Callable[..., None] Function to be executed args : Iterable[Any], optional Arguments to be passed to function, by default () spawn : bool, optional if True, create new sub-thread for function, by default False Raises ------ TypeError raised if function is not callable """ if not callable(function): raise TypeError("function argument cannot be called") if not isinstance(args, tuple): args = (args,) new_event = Event(timestamp, function, args, spawn) with self.lock: self.event_list.append(new_event) evlen = len(self.event_list) if not self.onset_idx.any(): idx = 0 else: idx = np.searchsorted(self.onset_idx[:, 0], timestamp) self.onset_idx = np.insert( self.onset_idx, idx, [timestamp, evlen - 1], axis=0 )
[docs] def get(self) -> Event: """Get latest event from queue and remove event Returns ------- Event Latest event """ event = self.peek() self.pop() return event
[docs] def peek(self) -> Event: """Look up latest event from queue Returns ------- Event Latest event """ with self.lock: return self.event_list[int(self.onset_idx[0][1])]
[docs] def empty(self) -> bool: """Checks if queue is empty Returns ------- bool True if queue if empty """ with self.lock: return bool(self.event_list)
[docs] def pop(self) -> None: """Removes latest event from queue""" with self.lock: event_idx = int(self.onset_idx[0][1]) self.onset_idx = self.onset_idx[1:] # remove 1 from all idcs after popped event self.onset_idx[:, 1][self.onset_idx[:, 1] > event_idx] -= 1 del self.event_list[event_idx]
[docs] def __worker(self, sleep_time: float, close_event: threading.Event) -> NoReturn: """Worker function to process events""" while True: if close_event.is_set(): break if self.event_list: event = self.peek() if event.timestamp <= time.time() - self.start: # execute only if not too old if event.timestamp > time.time() - self.start - self.drop_time_thr: event.execute() self.pop() # sleep_time = event_list[0].timestamp - (time.time() - self.start) - 0.001 time.sleep(sleep_time)
[docs] def __repr__(self): return f"<TimedQueue {self.event_list.__repr__()}>"
[docs] def elapse(self, time_delta: float) -> None: """Add time delta to the current queue time. Parameters ---------- time_delta : float Additional time """ self.start += time_delta
[docs] class TimedQueueSC(TimedQueue): """Timed queue with OSC communication. Parameters ---------- server : OSCCommunication, optional OSC server to handle the bundlers and messsages, by default None relative_time : bool, optional If True, use relative time, by default False thread_sleep_time : float, optional Sleep time in seconds for worker thread, by default 0.001 """ def __init__( self, server: OSCCommunication = None, relative_time: bool = False, thread_sleep_time: float = 0.001, ): super().__init__(relative_time, thread_sleep_time) self.server = server or sc3nb.SC.get_default().server
[docs] def put_bundler(self, onset: float, bundler: Bundler) -> None: """Add a Bundler to queue Parameters ---------- onset : float Sending timetag of the Bundler bundler : Bundler Bundler that will be sent """ self.put(onset, bundler.send)
[docs] def put_msg( self, onset: float, msg: Union[OSCMessage, str], msg_params: Iterable[Any] ) -> None: """Add a message to queue Parameters ---------- onset : float Sending timetag of the message msg : Union[OSCMessage, str] OSCMessage or OSC address msg_params : Iterable[Any] If msg is str, this will be the parameters of the created OSCMessage """ if isinstance(msg, str): self.put(onset, self.server.msg, args=(msg, msg_params)) else: self.put(onset, self.server.send, args=(msg,))