Source code for sc3nb.sc_objects.node

"""Implements Node and subclasses Synth and Group."""

import logging
import warnings
from abc import ABC, abstractmethod
from enum import Enum, unique
from functools import reduce
from operator import iconcat
from threading import Event, RLock
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    NamedTuple,
    Optional,
    Sequence,
    Tuple,
    Union,
)

import sc3nb
from sc3nb.osc.osc_communication import OSCCommunicationError, OSCMessage
from sc3nb.sc_objects.synthdef import SynthDef

if TYPE_CHECKING:
    from sc3nb.sc_objects.bus import Bus
    from sc3nb.sc_objects.server import SCServer
    from sc3nb.sclang import SynthArgument

[docs] _LOGGER = logging.getLogger(__name__)
@unique
[docs] class GroupReply(str, Enum): """Replies of Group Commands"""
[docs] QUERY_TREE_REPLY = "/g_queryTree.reply"
@unique
[docs] class GroupCommand(str, Enum): """OSC Commands for Groups"""
[docs] QUERY_TREE = "/g_queryTree"
[docs] DUMP_TREE = "/g_dumpTree"
[docs] DEEP_FREE = "/g_deepFree"
[docs] FREE_ALL = "/g_freeAll"
[docs] TAIL = "/g_tail"
[docs] HEAD = "/g_head"
[docs] G_NEW = "/g_new"
[docs] P_NEW = "/p_new"
@unique
[docs] class SynthCommand(str, Enum): """OSC Commands for Synths"""
[docs] NEW = "/s_new"
[docs] S_GET = "/s_get"
[docs] S_GETN = "/s_getn"
@unique
[docs] class NodeReply(str, Enum): """Replies of Node Commands"""
[docs] INFO = "/n_info"
@unique
[docs] class NodeCommand(str, Enum): """OSC Commands for Nodes"""
[docs] ORDER = "/n_order"
[docs] TRACE = "/n_trace"
[docs] QUERY = "/n_query"
[docs] MAP = "/n_map"
[docs] MAPN = "/n_mapn"
[docs] MAPA = "/n_mapa"
[docs] MAPAN = "/n_mapan"
[docs] FILL = "/n_fill"
[docs] SET = "/n_set"
[docs] SETN = "/n_setn"
[docs] RUN = "/n_run"
[docs] FREE = "/n_free"
@unique
[docs] class AddAction(Enum): """AddAction of SuperCollider nodes. This Enum contains the codes for the different ways to add a node. """
[docs] TO_HEAD = 0 # (the default) add at the head of the group specified by target
[docs] TO_TAIL = 1 # add at the tail of the group specified by target
[docs] BEFORE = 2 # add immediately before target in its server's node order
[docs] AFTER = 3 # add immediately after target in its server's node order
[docs] REPLACE = 4 # replace target and take its place in its server's node order
# Note: A Synth is not a valid target for \addToHead and \addToTail.
[docs] class SynthInfo(NamedTuple): """Information about the Synth from /n_info"""
[docs] nodeid: int
[docs] group: int
[docs] prev_nodeid: int
[docs] next_nodeid: int
[docs] class GroupInfo(NamedTuple): """Information about the Group from /n_info"""
[docs] nodeid: int
[docs] group: int
[docs] prev_nodeid: int
[docs] next_nodeid: int
[docs] head: int
[docs] tail: int
[docs] class Node(ABC): """Representation of a Node on SuperCollider.""" def __new__( cls, *args: Any, nodeid: Optional[int] = None, server: Optional["SCServer"] = None, **kwargs: Any, ) -> "Node": if nodeid is not None: if server is None: server = sc3nb.SC.get_default().server try: node = server.nodes[nodeid] if node is not None: if node.freed and not node.is_playing: _LOGGER.debug( "Removing %s(%s) from %s", type(node).__name__, node.nodeid, repr(server), ) del server.nodes[nodeid] # check here if nodes types are compatible elif isinstance(node, cls): _LOGGER.debug( "Returned %s(%s) from %s", type(node).__name__, node.nodeid, repr(server), ) return node else: raise RuntimeError( f"Tried to get {node} from {server}" f" as {cls.__name__} but type is {type(node).__name__}" ) except KeyError: pass _LOGGER.debug("%s(%s) not in Server (%s)", cls.__name__, nodeid, server) return super().__new__(cls) @abstractmethod def __init__( self, *, nodeid: Optional[int] = None, add_action: Optional[Union[AddAction, int]] = None, target: Optional[Union["Node", int]] = None, server: Optional["SCServer"] = None, ) -> None: """Create a new Node Parameters ---------- nodeid : int or None This Nodes node id or None add_action : AddAction or corresponding int, optional This Nodes AddAction when created in Server, by default None target : Node or int or None, optional This Nodes AddActions target, by default None server : SCServer, optional The Server for this Node, by default use the SC default server """ self._server = server or sc3nb.SC.get_default().server if nodeid in self._server.nodes: raise RuntimeError("The __init__ of Node should not be called twice") self._state_lock = RLock() self._free_event = Event() self._on_free_callback = None self._started = False self._freed = False self._nodeid = ( nodeid if nodeid is not None else self._server.node_ids.allocate(1)[0] ) self._group = None self._target_id = Node._get_nodeid(target) if target is not None else None if add_action is not None: self._add_action = AddAction(add_action) else: self._add_action = AddAction.TO_HEAD self._set_node_attrs(target, add_action) # only with node watcher self._is_playing = None self._is_running = None # this is state that we cannot really be sure of self._current_controls = {} _LOGGER.debug( "Adding new %s(%s) to %s", type(self).__name__, self._nodeid, self._server ) self._server.nodes[self._nodeid] = self @abstractmethod
[docs] def new( self, *args, add_action: Optional[Union[AddAction, int]] = None, target: Optional[Union["Node", int]] = None, return_msg: bool = False, **kwargs, ) -> Union["Node", OSCMessage]: """Create a new Node Parameters ---------- add_action : AddAction or int, optional Where the Node should be added, by default AddAction.TO_HEAD (0) target : Node or int, optional AddAction target, if None it will be the default group of the server """ with self._state_lock: self._started = True self._freed = False self._free_event.clear() self._is_playing = None self._is_running = None self._set_node_attrs(target=target, add_action=add_action)
[docs] def _get_status_repr(self) -> str: status = "" if self.started and not self.is_playing: status = "s" if self.is_playing: running_symbol = "~" if self.is_running else "-" status = running_symbol if self.freed: status = "f" return status
[docs] def _set_node_attrs( self, target: Optional[Union["Node", int]] = None, add_action: Optional[Union[AddAction, int]] = None, ) -> None: """Derive Node group from addaction and target Parameters ---------- target : int or Node Target nodeid or Target Node of this Node's AddAction add_action : AddAction AddAction of this Node, default AddAction.TO_HEAD (0) """ with self._state_lock: _LOGGER.debug( "Node attrs before setting: nodeid %s, group %s, add_action %s, target %s", self._nodeid, self._group, self._add_action, self._target_id, ) # get target id if target is not None: self._target_id = Node._get_nodeid(target) else: if self._target_id is None: self._target_id = self.server.default_group.nodeid # get add action if add_action is None: self._add_action = AddAction.TO_HEAD elif isinstance(add_action, AddAction): self._add_action = add_action else: self._add_action = AddAction(add_action) # derive group if self._add_action in [AddAction.TO_HEAD, AddAction.TO_TAIL]: self._group = self._target_id elif self._group is None: # AddAction BEFORE, AFTER or REPLACE if isinstance(target, Node): self._group = target.group elif target in self._server.nodes: target_node = self._server.nodes[self._target_id] if target_node: self._group = target_node.group else: _LOGGER.warning( "Could not derive group of Node, assuming default group" ) self._group = self._server.default_group.nodeid _LOGGER.debug( "Node attrs after setting: nodeid %s, group %s, add_action %s, target %s", self._nodeid, self._group, self._add_action, self._target_id, )
@property def nodeid(self) -> int: """Identifier of node.""" return self._nodeid @property def group(self) -> Optional[int]: """Identifier of this nodes group.""" return self._group @property def server(self) -> "SCServer": """The server on which this node is located.""" return self._server @property def is_playing(self) -> Optional[bool]: """True if this node is playing. None if unkown.""" return self._is_playing @property def is_running(self) -> Optional[bool]: """True if this node is running. None if unkown.""" return self._is_running @property def freed(self) -> bool: """True if free was called on this node. This is reseted when receiving a /n_go notification """ return self._freed @property def started(self) -> bool: """True if new was called on this node. This is reseted when receiving a /n_end notification """ return self._started
[docs] def free(self, return_msg: bool = False) -> Union["Node", OSCMessage]: """Free the node with /n_free. This will set is_running and is_playing to false. Even when the message is returned to mimic the behavior of the SuperCollider Node See https://doc.sccode.org/Classes/Node.html#-freeMsg Returns ------- Node or OSCMessage self for chaining or OSCMessage when return_msg=True """ with self._state_lock: self._freed = True msg = OSCMessage(NodeCommand.FREE, [self.nodeid]) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def run( self, on: bool = True, return_msg: bool = False ) -> Union["Node", OSCMessage]: """Turn node on or off with /n_run. Parameters ---------- on : bool True for on, False for off, by default True Returns ------- Node or OSCMessage self for chaining or OSCMessage when return_msg=True """ msg = OSCMessage(NodeCommand.RUN, [self.nodeid, 1 if on else 0]) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def set( self, argument: Union[str, Dict, List], *values: Any, return_msg: bool = False, ) -> Union["Node", OSCMessage]: """Set a control value(s) of the node with n_set. Parameters ---------- argument : str | dict | list if string: name of control argument if dict: dict with argument, value pairs if list: use list as message content value : any, optional only used if argument is string, by default None Examples -------- >>> synth.set("freq", 400) >>> synth.set({"dur": 1, "freq": 400}) >>> synth.set(["dur", 1, "freq", 400]) """ # update cached current_control values with self._state_lock: msg_params: List[Any] = [self.nodeid] if isinstance(argument, dict): for arg, val in argument.items(): msg_params.append(arg) msg_params.append(val) self._update_control(arg, val) elif isinstance(argument, list): for arg_idx, arg in enumerate(argument): if isinstance(arg, str): self._update_control(arg, argument[arg_idx + 1]) msg_params.extend(argument) else: if len(values) == 1: self._update_control(argument, values[0]) else: self._update_control(argument, values) msg_params.extend([argument] + list(values)) msg = OSCMessage(NodeCommand.SET, msg_params) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def _update_control(self, control: str, value: Any) -> None: with self._state_lock: try: val = object.__getattribute__(self, control) except AttributeError: pass else: warnings.warn( f"attribute {control}={val} is deleted and recognized as Node Parameter now" ) delattr(self, control) if not control.startswith("t_"): self._current_controls[control] = value
[docs] def _update_controls(self, controls: Optional[Dict[str, Any]] = None) -> None: with self._state_lock: if controls is not None: for arg, val in controls.items(): self._update_control(arg, val)
[docs] def fill( self, control: Union[str, int], num_controls: int, value: Any, return_msg: bool = False, ) -> Union["Node", OSCMessage]: """Fill ranges of control values with n_fill. Parameters ---------- control : int or string control index or name num_controls : int number of control values to fill value : float or int value to set return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- OSCMessage if return_msg else self """ msg = OSCMessage(NodeCommand.FILL, [self.nodeid, control, num_controls, value]) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def map( self, control: Union[str, int], bus: "Bus", return_msg: bool = False ) -> Union["Node", OSCMessage]: """Map a node's control to read from a bus using /n_map or /n_mapa. Parameters ---------- control : int or string control index or name bus : Bus control/audio bus return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- OSCMessage if return_msg else self """ msg_params = [self.nodeid, control, bus.idxs[0]] if bus.num_channels > 1: map_command = NodeCommand.MAPAN if bus.is_audio_bus() else NodeCommand.MAPN msg_params.append(bus.num_channels) else: map_command = NodeCommand.MAPA if bus.is_audio_bus() else NodeCommand.MAP msg = OSCMessage(map_command, msg_params) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def release( self, release_time: Optional[float] = None, return_msg: bool = False ) -> Union["Node", OSCMessage]: """Set gate as specified. https://doc.sccode.org/Classes/Node.html#-release Parameters ---------- release_time : float, optional amount of time in seconds during which the node will release. If set to a value <= 0, the synth will release immediately. If None using its Envs normal release stage(s) return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- OSCMessage if return_msg else self """ if release_time is not None: release_time = 1 if release_time <= 0 else -1 * (release_time + 1) else: release_time = 0 msg = OSCMessage(NodeCommand.SET, [self.nodeid, "gate", release_time]) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def query(self) -> Union[SynthInfo, GroupInfo]: """Sends an n_query message to the server. The answer is send to all clients who have registered via the /notify command. Content of answer: node ID the node's parent group ID previous node ID, -1 if no previous node. next node ID, -1 if no next node. 1 if the node is a group, 0 if it is a synth if the node is a group: ID of the head node, -1 if there is no head node. ID of the tail node, -1 if there is no tail node. Returns ------- SynthInfo or GroupInfo n_info answer. See above for content description """ msg = OSCMessage(NodeCommand.QUERY, [self.nodeid]) result = self.server.send(msg, bundle=False) return self._parse_info(*result)
[docs] def trace(self, return_msg: bool = False) -> Union["Node", OSCMessage]: """Trace a node. Print out values of the inputs and outputs for one control period. If node is a group then print the node IDs and names of each node. Parameters ---------- return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- Node or OSCMessage if return_msg else self """ msg = OSCMessage(NodeCommand.TRACE, [self.nodeid]) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def move( self, add_action: AddAction, another_node: "Node", return_msg: bool = False ) -> Union["Node", OSCMessage]: """Move this node Parameters ---------- add_action : AddAction [TO_HEAD, TO_TAIL, AFTER, BEFORE] What add action should be done. another_node : Node The node which is the target of the add action return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- Node or OSCMessage if return_msg this will be the OSCMessage, else self Raises ------ ValueError If a wrong AddAction was provided """ if add_action == AddAction.REPLACE: raise ValueError( "add_action needs to be in [TO_HEAD, TO_TAIL, AFTER, BEFORE]" ) msg = OSCMessage( NodeCommand.ORDER, [add_action.value, another_node.nodeid, self.nodeid] ) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def register(self): """Register to be watched.""" raise NotImplementedError("Currently all nodes are being watched on default")
[docs] def unregister(self): """Unregister to stop being watched.""" raise NotImplementedError("Currently all nodes are being watched on default")
[docs] def on_free(self, func): """Callback that is executed when this Synth is freed""" self._on_free_callback = func
[docs] def wait(self, timeout: Optional[float] = None) -> None: """Wait until this Node is freed Raises ------ TimeoutError If timeout was provided and wait timed out. """ # TODO check if self._server is in bundling mode, # This should probably fail if used inside of Bundler if not self._free_event.wait(timeout=timeout): raise TimeoutError("Timed out waiting for synth.")
[docs] def _parse_info( self, nodeid: int, group: int, prev_nodeid: int, next_nodeid: int, *rest: Sequence[int], ) -> Union[SynthInfo, GroupInfo]: assert ( self.nodeid == nodeid ), "nodeids does not match self.nodeid={self.nodeid} != {nodeid}" if len(rest) == 1 and rest[0] == 0: # node is synth return SynthInfo._make([nodeid, group, prev_nodeid, next_nodeid]) else: _, head, tail = rest return GroupInfo._make( [nodeid, group, prev_nodeid, next_nodeid, head, tail] )
[docs] def _handle_notification(self, kind: str, info) -> None: with self._state_lock: if kind == "/n_go": self._is_playing = True self._is_running = True self._started = True self._freed = False elif kind == "/n_end": self._is_playing = False self._is_running = False self._started = False self._freed = True self._group = None self._free_event.set() if self._on_free_callback is not None: self._on_free_callback() elif kind == "/n_on": self._is_running = True self._freed = False elif kind == "/n_off": self._is_running = False self._started = False elif kind == "/n_move": node_info = self._parse_info(*info) self._group = node_info.group else: raise ValueError(f"Illegal notification kind '{kind}' {info}") _LOGGER.debug("Handled %s notification: %s", kind, info)
[docs] def __eq__(self, other): return self.nodeid == other.nodeid
@staticmethod
[docs] def _get_nodeid(value: Union["Node", int]) -> int: """Get the corresponding node id Parameters ---------- value : Node or int If a Node is provided it will get its nodeid If a int is provided it will be returned Returns ------- int nodeid Raises ------ ValueError When neither Node or int was provided """ if isinstance(value, Node): nodeid = value.nodeid elif isinstance(value, int): nodeid = value else: raise ValueError("Could not get a node id") return nodeid
[docs] class Synth(Node): """Representation of a Synth on SuperCollider.""" def __init__( self, name: Optional[str] = None, controls: Dict[str, Any] = None, *, nodeid: Optional[int] = None, new: bool = True, add_action: Optional[Union[AddAction, int]] = None, target: Optional[Union["Node", int]] = None, server: Optional["SCServer"] = None, ) -> None: """Create a Python representation of a SuperCollider synth. Parameters ---------- name : str, optional name of the synth to be created, by default "default" controls : dict, optional synth control arguments, by default None nodeid : int, optional ID of the node in SuperCollider, by default sc3nb will create one. Can be set to an existing id to create a Python instance of a running Node. new : bool, optional True if synth should be created on the server, by default True Should be False if creating an instance of a running Node. add_action : AddAction or int, optional Where the Synth should be added, by default AddAction.TO_HEAD (0) target : Node or int, optional AddAction target, if None it will be the default group of the server server : SCServer sc3nb SCServer instance Raises ------ ValueError Raised when synth can't be found via SynthDescLib.global Examples -------- >>> scn.Synth(sc, "s1", {"dur": 1, "freq": 400}) """ self._server = server or sc3nb.SC.get_default().server if nodeid in self._server.nodes: self._update_synth_state(name=name, controls=controls) if new: self.new(controls=controls, add_action=add_action, target=target) return # attention: this must be the first line. see __setattr__, __getattr__ self._initialized = False super().__init__( nodeid=nodeid, add_action=add_action, target=target, server=server, ) with self._state_lock: self._name = name or "default" self._synth_desc = SynthDef.get_description(self._name) if controls is None: controls = {} self._current_controls = controls self._freed = False self._started = False # attention: this must be after every attribute is set self._initialized = True if new: self.new( controls=self._current_controls, add_action=self._add_action, target=self._target_id, )
[docs] def _update_synth_state(self, name: Optional[str], controls: Optional[dict]): _LOGGER.debug("Update Synth(%s)", self.nodeid) with self._state_lock: if name is not None: self._name = name self._synth_desc = SynthDef.get_description(name) self._update_controls(controls)
@property def synth_desc(self) -> Optional[Dict[str, "SynthArgument"]]: """A Description of this Synths arguments""" with self._state_lock: if self._synth_desc is None: self._synth_desc = SynthDef.get_description(self._name) return self._synth_desc @property def name(self) -> str: """This Synths SynthDef name.""" return self._name @property def current_controls(self) -> Dict[str, Any]: """This Synth currently cached control arguments.""" return self._current_controls
[docs] def new( self, controls: Optional[dict] = None, add_action: Optional[Union[AddAction, int]] = None, target: Optional[Union[Node, int]] = None, *, return_msg: bool = False, ) -> Union["Synth", OSCMessage]: """Creates the synth on the server with s_new. Attention: Here you create an identical synth! Same nodeID etc. - This will fail if there is already this nodeID on the SuperCollider server! """ with self._state_lock: super().new(target=target, add_action=add_action) self._update_controls(controls) flatten_args = reduce(iconcat, self._current_controls.items(), []) msg = OSCMessage( SynthCommand.NEW, [self._name, self.nodeid, self._add_action.value, self._target_id] + flatten_args, ) if return_msg: return msg else: self.server.send(msg, bundle=True, await_reply=False) return self
[docs] def get(self, control: str) -> Any: """Get a Synth argument This will request the value from scsynth with /s_get(n). Parameters ---------- control : str name of the Synth control argument """ with self._state_lock: if ( self._synth_desc is not None ): # change from synth_desc to self._current_controls try: default_value = self._synth_desc[control].default except KeyError as error: raise ValueError( f"argument '{control}' not in synth_desc" f"{self._synth_desc.keys()}" ) from error else: default_value = None # if we know they type of the argument and its list we use s_getn if default_value is not None and isinstance(default_value, list): command = SynthCommand.S_GETN msg_params = [self.nodeid, control, len(default_value)] else: command = SynthCommand.S_GET msg_params = [self.nodeid, control] msg = OSCMessage(command, msg_params) try: reply = self.server.send(msg, bundle=False) except OSCCommunicationError as osc_error: if command in self.server.fails: fail = self.server.fails.msg_queues[command].get(timeout=0) if f"Node {self.nodeid} not found" in fail: raise RuntimeError( f"Node {self.nodeid} cannot be found" ) from osc_error raise else: if default_value is not None and isinstance(default_value, list): nodeid, name, _, *values = reply ret_val = list(values) else: # default s_get nodeid, name, ret_val = reply if self.nodeid == nodeid and name == control: self.current_controls[control] = ret_val return ret_val else: raise RuntimeError("Received msg with wrong node id")
[docs] def seti(self, *args): """Set part of an arrayed control.""" raise NotImplementedError()
[docs] def __getattr__(self, name): # python will try obj.__getattribute__(name) before this if self._initialized: with self._state_lock: if ( name in self._current_controls or self._synth_desc and name in self._synth_desc ): return self.get(name) raise AttributeError( f"'{type(self).__name__}' object has no attribute '{name}'" )
[docs] def __setattr__(self, name, value): # First try regular attribute access. # This is done similiar in pandas NDFrame. try: object.__getattribute__(self, name) return object.__setattr__(self, name, value) except AttributeError: pass # First time the _initialized and _server is not set # and then it is false until Synth instance is done with __init__ if name in ["_server", "_initialized"] or not self._initialized: return super().__setattr__(name, value) # if initialized try setting current controls with self._state_lock: if name in self._current_controls or ( self._synth_desc and name in self._synth_desc ): return self.set(name, value) warnings.warn( f"Setting '{name}' as python attribute and not as Synth Parameter. " "SynthDesc is unknown. sclang must be running and knowing this SynthDef " "Use set method when using Synths without SynthDesc to set Synth Parameters." ) super().__setattr__(name, value)
[docs] def __repr__(self) -> str: status = self._get_status_repr() return ( f"<Synth({self.nodeid}) '{self._name}' {status} {self._current_controls}>" )
[docs] class Group(Node): """Representation of a Group on SuperCollider.""" def __init__( self, *, nodeid: Optional[int] = None, new: bool = True, parallel: bool = False, add_action: AddAction = AddAction.TO_HEAD, target: Optional[Union[Node, int]] = None, server: Optional["SCServer"] = None, ) -> None: """Create a Python representation of a SuperCollider group. Parameters ---------- nodeid : int, optional ID of the node in SuperCollider, by default sc3nb will create one. Can be set to an existing id to create a Python instance of a running Node. new : bool, optional True if synth should be created on the server, by default True Should be False if creating an instance of a running Node. parallel : bool, optional If True create a parallel group, by default False add_action : AddAction or int, optional Where the Group should be added, by default AddAction.TO_HEAD (0) target : Node or int, optional AddAction target, if None it will be the default group of the server server : SCServer, optional Server instance where this Group is located, by default use the SC default server """ self._server = server or sc3nb.SC.get_default().server if nodeid in self._server.nodes: if new: self.new(add_action=add_action, target=target) return super().__init__( nodeid=nodeid, add_action=add_action, target=target, server=server, ) with self._state_lock: self._parallel = parallel self._children = [] if new: self.new(add_action=self._add_action, target=self._target_id)
[docs] def _update_group_state( self, children: Optional[Sequence[Node]] = None, ) -> None: _LOGGER.debug("Update Group(%s)", self.nodeid) with self._state_lock: if children is not None: self._children = children
[docs] def new( self, add_action=AddAction.TO_HEAD, target=None, *, parallel=None, return_msg=False, ) -> Union["Group", OSCMessage]: """Creates the synth on the server with g_new / p_new. Attention: Here you create an identical group! Same nodeID etc. - This will fail if there is already this nodeID on the SuperCollider server! Parameters ---------- add_action : AddAction or int, optional where the group should be added, by default AddAction.TO_HEAD (0) target : Node or int, optional add action target, by default 1 parallel : bool, optional If True use p_new, by default False return_msg : bool, optional If ture return the OSCMessage instead of sending it, by default False Returns ------- Group self """ with self._state_lock: super().new(target=target, add_action=add_action) if parallel is not None: self._parallel = parallel new_command = GroupCommand.P_NEW if self._parallel else GroupCommand.G_NEW msg = OSCMessage( new_command, [self.nodeid, self._add_action.value, self._target_id] ) if return_msg: return msg else: self.server.send(msg, bundle=True, await_reply=False) return self
@property def children(self) -> Sequence[Node]: """Return this groups children as currently known Returns ------- Sequence[Node] Sequence of child Nodes (Synths or Groups) """ return self._children
[docs] def move_node_to_head(self, node, return_msg=False): """Move node to this groups head with g_head. Parameters ---------- node : Node node to move return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- Group self """ msg = OSCMessage(GroupCommand.HEAD, [self.nodeid, node.nodeid]) self.server.send(msg, bundle=True) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def move_node_to_tail(self, node, return_msg=False): """Move node to this groups tail with g_tail. Parameters ---------- node : Node node to move return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- Group self """ msg = OSCMessage(GroupCommand.TAIL, [self.nodeid, node.nodeid]) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def free_all(self, return_msg=False): """Frees all nodes in the group with g_freeAll. Parameters ---------- return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- OSCMessage if return_msg else self """ self._children = [] msg = OSCMessage(GroupCommand.FREE_ALL, [self.nodeid]) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def deep_free(self, return_msg=False): """Free all synths in this group and its sub-groups with g_deepFree. Sub-groups are not freed. Parameters ---------- return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- OSCMessage if return_msg else self """ with self._state_lock: self._children = [c for c in self._children if isinstance(c, Group)] msg = OSCMessage(GroupCommand.DEEP_FREE, [self.nodeid]) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def dump_tree(self, post_controls=True, return_msg=False): """Posts a representation of this group's node subtree with g_dumpTree. Parameters ---------- post_controls : bool, optional True for control values, by default False return_msg : bool, optional If True return msg else send it directly, by default False Returns ------- OSCMessage if return_msg else self """ msg = OSCMessage( GroupCommand.DUMP_TREE, [self.nodeid, 1 if post_controls else 0] ) if return_msg: return msg else: self.server.send(msg, bundle=True) return self
[docs] def query_tree(self, include_controls=False) -> "Group": """Send a g_queryTree message for this group. See https://doc.sccode.org/Reference/Server-Command-Reference.html#/g_queryTree for details. Parameters ---------- include_controls : bool, optional True for control values, by default False Returns ------- tuple /g_queryTree.reply """ msg = OSCMessage( GroupCommand.QUERY_TREE, [self.nodeid, 1 if include_controls else 0] ) _, *nodes_info = self.server.send(msg) NodeTree( info=nodes_info, root_nodeid=self.nodeid, controls_included=include_controls, start=0, server=self.server, ) return self
[docs] def _repr_pretty_(self, printer, cylce): status = self._get_status_repr() if cylce: printer.text(f"Group({self.nodeid}) {status}>") else: printer.text(f"Group({self.nodeid}) {status} {self._current_controls}") with printer.group(2, " children=[", "]"): if self._children: printer.breakable() for idx, child in enumerate(self._children): if idx: printer.text(",") printer.breakable() printer.pretty(child)
[docs] def __repr__(self) -> str: status = self._get_status_repr() return f"<Group({self.nodeid}) {status} {self._current_controls} children={self.children}>"
[docs] class NodeTree: """Node Tree is a class for parsing /g_queryTree.reply""" def __init__( self, info: Sequence[Any], root_nodeid: int, controls_included: bool, start: int = 0, server: Optional["SCServer"] = None, ) -> None: self.controls_included = controls_included self.root_nodeid = root_nodeid parsed, self.root = NodeTree.parse_nodes(info, controls_included, start, server) assert len(info) == parsed, "Mismatch in nodes info length and parsed info" @staticmethod
[docs] def parse_nodes( info: Sequence[Any], controls_included: bool = True, start: int = 0, server: Optional["SCServer"] = None, ) -> Tuple[int, Node]: """Parse Nodes from reply of the /g_queryTree cmd of scsynth. This reads the /g_queryTree.reply and creates the corresponding Nodes in Python. See https://doc.sccode.org/Reference/Server-Command-Reference.html#/g_queryTree Parameters ---------- controls_included : bool If True the current control (arg) values for synths will be included start : int starting position of the parsing, used for recursion, default 0 info : Sequence[Any] /g_queryTree.reply to be parsed. Returns ------- Tuple[int, Node] postion where the parsing ended, resulting Node """ pos = start + 2 nodeid, num_children = info[start:pos] if num_children < 0: # -1 children ==> synth symbol = info[pos:][0] pos += 1 num_controls = None controls = None if controls_included: num_controls = info[pos:][0] pos += 1 controls_size = 2 * num_controls controls_info = info[pos:][:controls_size] controls = dict(zip(controls_info[::2], controls_info[1::2])) pos += controls_size return ( pos, Synth( name=symbol, controls=controls, nodeid=nodeid, new=False, server=server, ), ) # num_children >= 0 ==> group children = [] to_parse = num_children while to_parse > 0: # is group pos, node = NodeTree.parse_nodes(info, controls_included, pos, server) node._group = nodeid children.append(node) to_parse -= 1 group = Group(nodeid=nodeid, new=False, server=server) group._update_group_state(children=children) return pos, group
[docs] def _repr_pretty_(self, printer, cylce): if cylce: printer.text(f"NodeTree root={self.root_nodeid}") else: printer.text("NodeTree root=") printer.pretty(self.root)