2022-04-10 15:20:13 +02:00

57 lines
1.4 KiB
Python

from pythonosc import udp_client
from pythonosc import osc_server
from pythonosc import dispatcher
from threading import Lock
temp_mutex = Lock()
class OscBroadcaster:
def __init__(self, name: str, host: str, port: str, command_channel: str):
self.name = name
self.host = host
self.port = int(port)
self.cmd = command_channel
self.client = udp_client.SimpleUDPClient(self.host, self.port)
def utterance(self, utterance: str, channel: str):
self.client.send_message(channel, utterance)
def command(self, command: str):
self.client.send_message(self.cmd, command)
def temperature(self, temp: float, channel: str):
global temp_mutex
with temp_mutex:
self.client.send_message(self.cmd, command)
class OscReceiver:
def __init__(self, name: str, host: str, port: str, callback_fn_command=None, callback_fn_temp=None):
self.dispatcher = dispatcher.Dispatcher()
if callback_fn_command != None:
self.dispatcher.map('/command', callback_fn_command)
if callback_fn_temp != None:
self.dispatcher.map('/temperature', callback_fn_command)
self.name = name
self.host = "127.0.0.1"
self.port = int(port)
self.server = osc_server.ThreadingOSCUDPServer((self.host, self.port), self.dispatcher)
# # osc_udp_server(self.host, self.port, self.name)
# osc_udp_server("127.0.0.1", self.port, self.name)
# if callback_fn:
# osc_method('/', callback_fn)
# else:
# osc_method('/', temperature)