NATURESPEAK-ML-UTTER/speak_broadcast.py
2022-04-10 15:20:13 +02:00

361 lines
8.9 KiB
Python

import argparse, json, sys, time, random, logging, signal, threading
import utterance.voice
import utterance.utils
import utterance.oscosc
import examine.metric
logging.basicConfig(level=logging.INFO)
UTTERANCE_LEN = 64 #<--------------- these should be in config
NUM_METRIC_GEN = 75
NUM_SAMPLE_VOICES = 3
RANDOM_SEED_TIMER_MIN = 2
STATE_TRANSITION_TIMER_MIN = 10
broadcast = None
metric = None
exit = False
terminal = False
debug = False
state = "METRIC"
def format_str(text) -> str:
t = utterance.utils.clean(text)
return utterance.utils.format(t)
def tokenise_str(text):
return utterance.utils.tokenise(text)
def utter_one(v, temp=None, length=None) -> str:
u = v.utter_one(temp=temp, length=length)
return format_str(u)
def prompt_one(v, pinput: str, temp=None, length=None) -> str:
u = v.prompt(pinput=pinput, temp=None, length=length)
return format_str(u)
def utter_one_vectorise(v, temp=None, length=None):
global metric
uv = utter_one(v, temp, length)
uv_vec = metric.vector(uv)
return uv, uv_vec
def prompt_one_vectorise(v, pinput: str, temp=None, length=None):
global metric
uv = prompt_one(v, pinput, temp, length)
uv_vec = metric.vector(uv)
return uv, uv_vec
def utter_n_vectorise_distance(v, n, vec, temp=None, length=None):
global metric
results = []
texts = v.utter_n(n=n, temp=temp, length=length)
for t in texts:
t = format_str(t)
t_vec = metric.vector(t)
d = examine.metric.cos_dist(vec, t_vec)
results.append([d, t, v])
return results
def terminal_utterance(utterance):
if terminal:
print(utterance, end="")
def broadcast_utterance(v, utterance):
global broadcast, exit, debug
# Send all text to server to calculate bounds in advance
broadcast.utterance(utterance, v.calculate)
text = ""
broadcast.utterance(text, v.channel)
terminal_utterance(text)
time.sleep(2)
frags = v.fragments(utterance)
for f in frags:
text += f
broadcast.utterance(text, v.channel)
terminal_utterance(f)
# sleep_time = 2
# toks = tokenise_str(f)
toks = f.split()
sleep_time = len(toks)
if sleep_time <= 2:
sleep_time += 1
time.sleep(sleep_time)
if exit:
return
broadcast.command('clear')
print("==========")
time.sleep(3)
def find_candidates(v, uv_vec, voices, results):
logging.info(f"LOOP::finding candidates")
start = time.time()
candidates = random.sample(voices, NUM_SAMPLE_VOICES)
for c in candidates:
if exit:
break
if c == v:
continue
results += utter_n_vectorise_distance(c, NUM_METRIC_GEN, uv_vec)
results.sort(key=lambda t: t[0], reverse=True)
lapse = time.time() - start
logging.info(f"LOOP::done - {lapse} secs")
# def update():
# global exit
# while not exit:
# try:
# utterance.osc.update()
# except Exception as e:
# logging.error(e)
# pass
# time.sleep(0.2)
def signal_terminate(signum, frame):
global exit
logging.warning("::SIGNAL TERMINATE::")
exit = True
def main() -> int:
global broadcast, metric, terminal, debug, state
p = argparse.ArgumentParser()
p.add_argument("-c", "--config", type=str, default="voice.config.json", help="configuratin file")
p.add_argument("-i", "--iterations", type=int, default=10, help="number of iterations")
p.add_argument("-t", "--terminal", action='store_true', help="print to terminal")
args = p.parse_args()
logging.info(f"INIT::loading config file - {args.config}")
with open(args.config) as f:
conf = json.load(f)
logging.info(conf)
terminal = args.terminal
#--------------------#
# VOICES
#--------------------#
logging.info(f"INIT::creating voices")
voices = []
for v in conf['voices']:
model = v['model']
voice = utterance.voice.Voice(name=v["name"].upper(), model=model['model_dir'], tokenizer=model['tokeniser_file'], temp=float(model["temperature"]), length=UTTERANCE_LEN)
voice.set_channel(v['osc_channel']['root'], v['osc_channel']['utterance'])
voice.set_calculate(v['osc_channel']['root'], v['osc_channel']['calculate'])
voice.set_temperature(v['osc_channel']['root'], v['osc_channel']['temperature'])
voices.append(voice)
#--------------------#
# QUESTION
#--------------------#
logging.info(f"INIT::setting up question")
questions = conf['questions']
questions_array = questions.copy()
random.shuffle(questions_array)
#--------------------#
# NET
#--------------------#
logging.info(f"INIT::setting up OSC")
broadcast = utterance.oscosc.OscBroadcaster(name="osc_broadcast", host=conf['host_voicemachine'], port=conf['port_voicemachine'], command_channel=conf['command_osc_channel'])
# def receiver_cb_temp(unused_addr, args, temp, name):
# for v in voices:
# if v.name == name:
# print(f'{name} - {temp}')
# v.temp = temp
# # broadcast.temperature(temp, v.temperature) # <-- doesn works because deadlocks in osc_process...
# def receiver_cb_command(unused_addr, args, cmd):
# global debug
# debug = name
# logging.info(f"DEBUG MODE: {debug}")
# receiver = utterance.oscosc.OscReceiver(name="osc_receiver", host=conf['host_machinespeak'], port=conf['port_machinespeak'], callback_fn_command=receiver_cb_command, callback_fn_temp=receiver_cb_temp)
# t_osc_receiver = threading.Thread(target=receiver.server.serve_forever)
# t_osc_receiver.start()
#--------------------#
# METRIC
#--------------------#
logging.info(f"INIT::loading doc2vec metrics")
metric = examine.metric.Metric(model_input='data/models/doc2vec.model')
#--------------------#
# RANDOM
#--------------------#
def random_seed(seconds):
global t_random_seed, exit
i = 0
while i < seconds:
i += 1
time.sleep(1)
if exit:
return
logging.info("RANDOM::SEEDING RANDOM")
random.seed(time.time())
if not exit:
t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), ))
t_random_seed.start()
t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), ))
t_random_seed.start()
#--------------------#
# STATE TRANSITION
#--------------------#
# STATES = ["METRIC", "QUESTION", "RANDOM"]
def state_transition(seconds):
global t_state_transition, exit, state
i = 0
while i < seconds:
i += 1
time.sleep(1)
if exit:
return
logging.info("STATE::STATE TRANSITION")
state = "QUESTION"
if not exit:
t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), ))
t_state_transition.start()
t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), ))
t_state_transition.start()
#--------------------#
# A
#--------------------#
logging.info(f"INIT::generating first utterance")
v = random.choice(voices)
uv, uv_vec = utter_one_vectorise(v)
# -- this only updates OSC --
# -- might not need this in production
# t_update = threading.Thread(target=update)
# t_update.start()
while not exit:
if state == "METRIC":
logging.info(f"- state METRIC")
results = []
t = threading.Thread(target=find_candidates, args=[v, uv_vec, voices, results])
t.start()
logging.info(f"METRIC::broadcasting {v.name}")
broadcast_utterance(v, uv)
t.join()
# ok here we need to randomise maybe...?!
# ([d, t, v])
choice = results[0]
# makse sure we don't say the same thing over and over again
for r in results:
v = r[2]
u = r[1]
if v.select(u):
choice = r
break
else:
logging.info(f"METRIC::reduncancy {v.name}")
v = choice[2]
uv = choice[1]
uv_vec = metric.vector(uv)
logging.info(f"METRIC::next {v.name}")
elif state == "QUESTION":
logging.info(f"- state QUESTION")
if len(questions_array) <= 0:
questions_array = questions.copy()
random.shuffle(questions_array)
# random question
q = questions_array.pop(0)
# random voice
v = random.choice(voices)
# random voice asks random question
logging.info(f"QUESTION::{v.name} : {q['question']}")
broadcast.utterance(q['question'], v.calculate)
broadcast.utterance(q['question'], v.channel)
time.sleep(15)
# answer
v = [e for e in voices if e.name == q['voice']]
if len(v) == 1:
v = v[0]
logging.info(f"QUESTION::answer - {v.name}")
uv, uv_vec = prompt_one_vectorise(v, q['prompt'])
v.remember(uv)
# broadcast_utterance(v, uv) <-- this is broadcasted as part of METRIC STATE
state = "METRIC"
elif state == "RANDOM":
logging.info(f"- state RANDOM")
v = random.choice(voices)
l = random.randint(5, UTTERANCE_LEN)
uv, uv_vec = utter_one_vectorise(v, length=l)
broadcast_utterance(v, uv)
# t_update.join()
# logging.info(f"TERMINATE::terminating OSC")
# t_osc_receiver.stop()
# t_osc_receiver.join()
# if t_random_seed:
logging.info(f"TERMINATE::random seed")
t_random_seed.join()
logging.info(f"TERMINATE::state transition")
t_state_transition.join()
logging.info(f"FIN")
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_terminate)
sys.exit(main())