Skip to content

Commit 70037dc

Browse files
committed
feat: initial draft implementation of OSC address pattern matching
Signed-off-by: Christopher Arndt <chris@chrisarndt.de>
1 parent 57de07d commit 70037dc

File tree

1 file changed

+182
-0
lines changed

1 file changed

+182
-0
lines changed

uosc/dispatch.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
"""OSC Address Spaces and Address Pattern Matching.
2+
3+
Implements the following parts of the OSC 1.0 specification:
4+
5+
* OSC Address Spaces and OSC Addresses
6+
* OSC Message Dispatching and Pattern Matching
7+
8+
See the ``_demo()`` function below for a usage example.
9+
10+
**Note:** Path-traversing wildcards (``//``) as specified by the OSC 1.1
11+
"specification" paper are **not** supported.
12+
13+
"""
14+
15+
import re
16+
from fnmatch import filter as fnfilter
17+
18+
19+
ALLOWED_ADDRESS_CHARS = re.compile(r'^[0-9a-zA-Z!"$%&' + "\\\\'" + r"()+.:;<=>@^_`|~-]+\Z")
20+
TYPETAGS_ANY = "*"
21+
22+
23+
def expand_curly_braces(s, offset=0):
24+
expansions = [s]
25+
while True:
26+
new_expansions = []
27+
28+
for sn in expansions:
29+
start = sn.find("{")
30+
31+
if start == -1:
32+
return expansions
33+
34+
end = sn.find("}", start + 1)
35+
36+
if end == -1:
37+
raise ValueError("Unmatched opening curly brace.")
38+
39+
items = [
40+
item.strip()
41+
for item in sn[start + 1:end].split(",")
42+
if ALLOWED_ADDRESS_CHARS.match(item.strip())
43+
]
44+
new_expansions.extend([(sn[:start] + item.strip() + sn[end + 1:]) for item in items])
45+
expansions = new_expansions
46+
47+
48+
class OSCAddressContainer(dict):
49+
def __init__(self, name, parent=None):
50+
super().__init__()
51+
self.name = name
52+
self.parent = parent
53+
54+
def add_container(self, name):
55+
self[name] = OSCAddressContainer(name, parent=self)
56+
57+
def add_method(self, callable_, address, typetags=TYPETAGS_ANY):
58+
name = address.split("/")[-1]
59+
self[name] = OSCMethod(address, callable_, typetags=typetags, parent=self)
60+
61+
def getroot(self):
62+
node = self
63+
64+
while node.parent:
65+
node = node.parent
66+
67+
return node
68+
69+
def register_method(self, callable_, address, typetags=TYPETAGS_ANY):
70+
assert address.startswith("/")
71+
_, *parts, leaf = address.split("/")
72+
# Is an empty string for the address leaf part allowed, e.g. "/" or "/status/"?
73+
# No empty address parts allowed:
74+
assert all(parts)
75+
# all address parts must be printable ASCII strings
76+
# minus explicitly dis-allowed chars
77+
assert all(ALLOWED_ADDRESS_CHARS.match(part) for part in parts)
78+
79+
node = self.getroot()
80+
81+
for name in parts:
82+
if name not in node:
83+
node.add_container(name)
84+
node = node[name]
85+
86+
node.add_method(callable_, address, typetags=typetags)
87+
88+
def match(self, pattern, typetags=None, glob_matching=True, brace_expansion=True):
89+
assert pattern.startswith("/")
90+
_, *parts, leaf = pattern.split("/")
91+
assert all(parts) # no empty address pattern parts allowed
92+
93+
results = []
94+
to_check = [self.getroot()]
95+
96+
while parts:
97+
ptn = parts.pop(0)
98+
99+
branches = []
100+
for node in to_check:
101+
if glob_matching:
102+
branches.extend(
103+
self._check_branch(node, ptn, OSCAddressContainer, brace_expansion)
104+
)
105+
elif ptn in node:
106+
branches = [node[ptn]]
107+
108+
to_check = branches
109+
110+
for branch in to_check:
111+
if glob_matching:
112+
results.extend(
113+
[
114+
method
115+
for method in self._check_branch(branch, leaf, OSCMethod)
116+
if typetags is None or method.typetags in (TYPETAGS_ANY, typetags)
117+
]
118+
)
119+
elif leaf in branch:
120+
results.append(branch[leaf])
121+
122+
return results
123+
124+
@staticmethod
125+
def _check_branch(node, ptn, nodetype, brace_expansion=True):
126+
patterns = [ptn]
127+
128+
if brace_expansion:
129+
try:
130+
patterns = expand_curly_braces(ptn)
131+
except ValueError:
132+
pass
133+
134+
for ptn in patterns:
135+
for name in fnfilter(node.keys(), ptn):
136+
child = node[name]
137+
if isinstance(child, nodetype):
138+
yield child
139+
140+
141+
class OSCMethod:
142+
def __init__(self, name, callable_, typetags=TYPETAGS_ANY, parent=None):
143+
self.name = name
144+
self.callable_ = callable_
145+
self.typetags = typetags
146+
self.parent = parent
147+
148+
def __call__(self, *args, **kwargs):
149+
return self.callable_(*args, **kwargs)
150+
151+
def __repr__(self):
152+
return f"<OSCMethod '{self.name}', {self.typetags}>"
153+
154+
155+
_root = None
156+
157+
158+
def get_default_root():
159+
global _root
160+
if _root is None:
161+
_root = OSCAddressContainer("/")
162+
return _root
163+
164+
165+
def _demo():
166+
def fn(*args):
167+
pass
168+
169+
import sys
170+
171+
root = get_default_root()
172+
root.register_method(fn, "/ops/math/add", "ii")
173+
root.register_method(fn, "/ops/math/sum", TYPETAGS_ANY)
174+
root.register_method(fn, "/ops/string/add", "ii")
175+
root.register_method(fn, "/ops/array/add", "ii")
176+
root.register_method(fn, "/ops/math/sub", "ii")
177+
178+
print(root.match(*sys.argv[1:]))
179+
180+
181+
if __name__ == "__main__":
182+
_demo()

0 commit comments

Comments
 (0)