import argparse, json, sys, time, random, logging, signal, threading, string import utterance.voice import utterance.utils import utterance.oscosc import examine.metric logging.basicConfig(level=logging.INFO) UTTERANCE_LEN = 64 #<--------------- these should be in config NUM_METRIC_GEN = 75 NUM_SAMPLE_VOICES = 3 RANDOM_SEED_TIMER_MIN = 2 STATE_TRANSITION_TIMER_MIN = 10 broadcast = None metric = None exit = False terminal = False debug = False state = "METRIC" B_SKIP = [] B_SWAPS = {} def format_str(text) -> str: t = utterance.utils.clean(text) return utterance.utils.format(t) def tokenise_str(text): return utterance.utils.tokenise(text) def utter_one(v, temp=None, length=None) -> str: u = v.utter_one(temp=temp, length=length) return format_str(u) def prompt_one(v, pinput: str, temp=None, length=None) -> str: u = v.prompt(pinput=pinput, temp=None, length=length) return format_str(u) def utter_one_vectorise(v, temp=None, length=None): global metric uv = utter_one(v, temp, length) uv_vec = metric.vector(uv) return uv, uv_vec def prompt_one_vectorise(v, pinput: str, temp=None, length=None): global metric uv = prompt_one(v, pinput, temp, length) uv_vec = metric.vector(uv) return uv, uv_vec def utter_n_vectorise_distance(v, n, vec, temp=None, length=None): global metric results = [] texts = v.utter_n(n=n, temp=temp, length=length) for t in texts: t = format_str(t) t_vec = metric.vector(t) d = examine.metric.cos_dist(vec, t_vec) results.append([d, t, v]) return results def terminal_utterance(utterance): if terminal: print(utterance, end="") def fix_ending(frags): result = frags.copy() end = result[-1] end = end.translate(str.maketrans('', '', string.punctuation)) fix = utterance.utils.fix_sentence(end) if fix is None or len(fix) == 0: result = result[:-1] # result[-1] = result[-1] else: result[-1] = fix result[-1] = utterance.utils.fix_punctuation(result[-1]) print(result) return result def fix_beginning(frags): global B_SKIP, B_SWAPS result = frags.copy() beginnig = result[0] toks = beginnig.split() if len(toks) > 0: f = toks[0].lower() if f[0] in string.punctuation: f = f[1:] if f in B_SKIP: if len(toks) > 2: result[0] = " ".join(toks[1:]).capitalize() + "\n" else: if result[1][0] == ' ': result[1] = result[1][1:] result[1] = result[1].capitalize() return result[1:] elif toks[0] in B_SWAPS: result[0] = result[0].replace(toks[0], B_SWAPS[toks[0]]) return result def broadcast_utterance(v, utterance): print(utterance) global broadcast, exit, debug # Send all text to server to calculate bounds in advance broadcast.utterance(utterance, v.calculate) text = "" broadcast.utterance(text, v.channel) terminal_utterance(text) time.sleep(2) frags = v.fragments(utterance) frags = fix_beginning(frags) frags = fix_ending(frags) for f in frags: terminal_utterance(f) text += f broadcast.utterance(text, v.channel) # sleep_time = 2 # toks = tokenise_str(f) toks = f.split() sleep_time = len(toks) if sleep_time <= 2: sleep_time += 1 time.sleep(sleep_time) if exit: return broadcast.command('clear') print("==========") time.sleep(3) def find_candidates(v, uv_vec, voices, results): logging.info(f"LOOP::finding candidates") start = time.time() candidates = random.sample(voices, NUM_SAMPLE_VOICES) for c in candidates: if exit: break if c == v: continue results += utter_n_vectorise_distance(c, NUM_METRIC_GEN, uv_vec) results.sort(key=lambda t: t[0], reverse=True) lapse = time.time() - start logging.info(f"LOOP::done - {lapse} secs") # def update(): # global exit # while not exit: # try: # utterance.osc.update() # except Exception as e: # logging.error(e) # pass # time.sleep(0.2) def signal_terminate(signum, frame): global exit logging.warning("::SIGNAL TERMINATE::") exit = True def main() -> int: global broadcast, metric, terminal, debug, state, UTTERANCE_LEN, NUM_METRIC_GEN, NUM_SAMPLE_VOICES, RANDOM_SEED_TIMER_MIN, STATE_TRANSITION_TIMER_MIN, B_SKIP, B_SWAPS p = argparse.ArgumentParser() p.add_argument("-c", "--config", type=str, default="voice.config.json", help="configuratin file") p.add_argument("-i", "--iterations", type=int, default=10, help="number of iterations") p.add_argument("-t", "--terminal", action='store_true', help="print to terminal") args = p.parse_args() logging.info(f"INIT::loading config file - {args.config}") with open(args.config) as f: conf = json.load(f) logging.info(conf) terminal = args.terminal #--------------------# # CONFIGS #--------------------# u_conf = conf['utterance_configuration'] UTTERANCE_LEN = u_conf['UTTERANCE_LEN'] NUM_METRIC_GEN = u_conf['NUM_METRIC_GEN'] NUM_SAMPLE_VOICES = u_conf['NUM_SAMPLE_VOICES'] RANDOM_SEED_TIMER_MIN = u_conf['RANDOM_SEED_TIMER_MIN'] STATE_TRANSITION_TIMER_MIN = u_conf['STATE_TRANSITION_TIMER_MIN'] B_SKIP = u_conf['b_skip'] B_SWAPS = u_conf['b_swaps'] #--------------------# # VOICES #--------------------# logging.info(f"INIT::creating voices") voices = [] for v in conf['voices']: model = v['model'] voice = utterance.voice.Voice(name=v["name"].upper(), model=model['model_dir'], tokenizer=model['tokeniser_file'], temp=float(model["temperature"]), length=UTTERANCE_LEN) voice.set_channel(v['osc_channel']['root'], v['osc_channel']['utterance']) voice.set_calculate(v['osc_channel']['root'], v['osc_channel']['calculate']) voice.set_temperature(v['osc_channel']['root'], v['osc_channel']['temperature']) voices.append(voice) #--------------------# # QUESTION #--------------------# logging.info(f"INIT::setting up question") questions = conf['questions'] questions_array = questions.copy() random.shuffle(questions_array) #--------------------# # NET #--------------------# logging.info(f"INIT::setting up OSC") broadcast = utterance.oscosc.OscBroadcaster(name="osc_broadcast", host=conf['host_voicemachine'], port=conf['port_voicemachine'], command_channel=conf['command_osc_channel']) # def receiver_cb_temp(unused_addr, args, temp, name): # for v in voices: # if v.name == name: # print(f'{name} - {temp}') # v.temp = temp # # broadcast.temperature(temp, v.temperature) # <-- doesn works because deadlocks in osc_process... # def receiver_cb_command(unused_addr, args, cmd): # global debug # debug = name # logging.info(f"DEBUG MODE: {debug}") # receiver = utterance.oscosc.OscReceiver(name="osc_receiver", host=conf['host_machinespeak'], port=conf['port_machinespeak'], callback_fn_command=receiver_cb_command, callback_fn_temp=receiver_cb_temp) # t_osc_receiver = threading.Thread(target=receiver.server.serve_forever) # t_osc_receiver.start() #--------------------# # METRIC #--------------------# logging.info(f"INIT::loading doc2vec metrics") metric = examine.metric.Metric(model_input='data/models/doc2vec.model') #--------------------# # RANDOM #--------------------# def random_seed(seconds): global t_random_seed, exit i = 0 while i < seconds: i += 1 time.sleep(1) if exit: return logging.info("RANDOM::SEEDING RANDOM") random.seed(time.time()) if not exit: t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), )) t_random_seed.start() t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), )) t_random_seed.start() #--------------------# # STATE TRANSITION #--------------------# # STATES = ["METRIC", "QUESTION", "RANDOM"] def state_transition(seconds): global t_state_transition, exit, state i = 0 while i < seconds: i += 1 time.sleep(1) if exit: return logging.info("STATE::STATE TRANSITION") state = "QUESTION" if not exit: t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), )) t_state_transition.start() t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), )) t_state_transition.start() #--------------------# # A #--------------------# logging.info(f"INIT::generating first utterance") v = random.choice(voices) uv, uv_vec = utter_one_vectorise(v) # -- this only updates OSC -- # -- might not need this in production # t_update = threading.Thread(target=update) # t_update.start() while not exit: if state == "METRIC": logging.info(f"- state METRIC") results = [] t = threading.Thread(target=find_candidates, args=[v, uv_vec, voices, results]) t.start() logging.info(f"METRIC::broadcasting {v.name}") try: broadcast_utterance(v, uv) except Exception as e: logging.error(e) pass t.join() # ok here we need to randomise maybe...?! # ([d, t, v]) choice = results[0] # makse sure we don't say the same thing over and over again for r in results: v = r[2] u = r[1] if v.select(u): choice = r break else: logging.info(f"METRIC::reduncancy {v.name}") v = choice[2] uv = choice[1] uv_vec = metric.vector(uv) logging.info(f"METRIC::next {v.name}") elif state == "QUESTION": logging.info(f"- state QUESTION") if len(questions_array) <= 0: questions_array = questions.copy() random.shuffle(questions_array) # random question q = questions_array.pop(0) # random voice v = random.choice(voices) # random voice asks random question logging.info(f"QUESTION::{v.name} : {q['question']}") broadcast.utterance(q['question'], v.calculate) broadcast.utterance(q['question'], v.channel) time.sleep(15) # answer v = [e for e in voices if e.name == q['voice']] if len(v) == 1: v = v[0] logging.info(f"QUESTION::answer - {v.name}") uv, uv_vec = prompt_one_vectorise(v, q['prompt']) v.remember(uv) # broadcast_utterance(v, uv) <-- this is broadcasted as part of METRIC STATE state = "METRIC" elif state == "RANDOM": logging.info(f"- state RANDOM") v = random.choice(voices) l = random.randint(5, UTTERANCE_LEN) uv, uv_vec = utter_one_vectorise(v, length=l) broadcast_utterance(v, uv) # t_update.join() # logging.info(f"TERMINATE::terminating OSC") # t_osc_receiver.stop() # t_osc_receiver.join() # if t_random_seed: logging.info(f"TERMINATE::random seed") t_random_seed.join() logging.info(f"TERMINATE::state transition") t_state_transition.join() logging.info(f"FIN") if __name__ == '__main__': signal.signal(signal.SIGINT, signal_terminate) sys.exit(main())