From d70f64383662f40644dcf12b3b0b90c3cd4ac45d Mon Sep 17 00:00:00 2001 From: NATURESPEAK Date: Sun, 10 Apr 2022 15:20:13 +0200 Subject: [PATCH] new osc, tempo, etc. --- speak_broadcast-backup.py | 359 ++++++++++++++++++++++++++++++++++++++ speak_broadcast.py | 262 ++++++++++++++++++++-------- utterance/oscosc.py | 56 ++++++ utterance/utils.py | 4 + utterance/voice.py | 29 +-- 5 files changed, 622 insertions(+), 88 deletions(-) create mode 100644 speak_broadcast-backup.py create mode 100644 utterance/oscosc.py diff --git a/speak_broadcast-backup.py b/speak_broadcast-backup.py new file mode 100644 index 0000000..2e82172 --- /dev/null +++ b/speak_broadcast-backup.py @@ -0,0 +1,359 @@ +import argparse, json, sys, time, random, logging, signal, threading +import utterance.voice +import utterance.utils +import utterance.osc +import examine.metric + +logging.basicConfig(level=logging.INFO) + +UTTERANCE_LEN = 64 #<--------------- these should be in config +NUM_METRIC_GEN = 75 +NUM_SAMPLE_VOICES = 3 +RANDOM_SEED_TIMER_MIN = 2 +STATE_TRANSITION_TIMER_MIN = 1.5 +broadcast = None +metric = None +exit = False +terminal = False +debug = False +state = "METRIC" + +def format_str(text) -> str: + t = utterance.utils.clean(text) + return utterance.utils.format(t) + +def tokenise_str(text): + return utterance.utils.tokenise(text) + +def utter_one(v, temp=None, length=None) -> str: + u = v.utter_one(temp=temp, length=length) + return format_str(u) + +def prompt_one(v, pinput: str, temp=None, length=None) -> str: + u = v.prompt(pinput=pinput, temp=None, length=length) + return format_str(u) + +def utter_one_vectorise(v, temp=None, length=None): + global metric + uv = utter_one(v, temp, length) + uv_vec = metric.vector(uv) + return uv, uv_vec + +def prompt_one_vectorise(v, pinput: str, temp=None, length=None): + global metric + uv = prompt_one(v, pinput, temp, length) + uv_vec = metric.vector(uv) + return uv, uv_vec + +def utter_n_vectorise_distance(v, n, vec, temp=None, length=None): + global metric + results = [] + texts = v.utter_n(n=n, temp=temp, length=length) + for t in texts: + t = format_str(t) + t_vec = metric.vector(t) + d = examine.metric.cos_dist(vec, t_vec) + results.append([d, t, v]) + return results + +def terminal_utterance(utterance): + if terminal: + print(utterance, end="") + +def broadcast_utterance(v, utterance): + + global broadcast, exit, debug + + # Send all text to server to calculate bounds in advance + broadcast.utterance(utterance, v.calculate) + + text = "" + broadcast.utterance(text, v.channel) + terminal_utterance(text) + time.sleep(2) + + frags = v.fragments(utterance) + + for f in frags: + text += f + broadcast.utterance(text, v.channel) + terminal_utterance(f) + + # sleep_time = 2 + # toks = tokenise_str(f) + toks = f.split() + sleep_time = len(toks) + if sleep_time <= 2: + sleep_time += 1 + time.sleep(sleep_time) + if exit: + return + + broadcast.command('clear') + print("==========") + + time.sleep(3) + +def find_candidates(v, uv_vec, voices, results): + logging.info(f"LOOP::finding candidates") + + start = time.time() + candidates = random.sample(voices, NUM_SAMPLE_VOICES) + for c in candidates: + if exit: + break + if c == v: + continue + results += utter_n_vectorise_distance(c, NUM_METRIC_GEN, uv_vec) + + results.sort(key=lambda t: t[0], reverse=True) + + lapse = time.time() - start + logging.info(f"LOOP::done - {lapse} secs") + +def update(): + global exit + while not exit: + try: + utterance.osc.update() + except Exception as e: + logging.error(e) + pass + + time.sleep(0.2) + +def signal_terminate(signum, frame): + global exit + logging.warning("::SIGNAL TERMINATE::") + exit = True + +def main() -> int: + + global broadcast, metric, terminal, debug, state + + p = argparse.ArgumentParser() + p.add_argument("-c", "--config", type=str, default="voice.config.json", help="configuratin file") + p.add_argument("-i", "--iterations", type=int, default=10, help="number of iterations") + p.add_argument("-t", "--terminal", action='store_true', help="print to terminal") + args = p.parse_args() + + logging.info(f"INIT::loading config file - {args.config}") + + with open(args.config) as f: + conf = json.load(f) + + logging.info(conf) + + terminal = args.terminal + + #--------------------# + # VOICES + #--------------------# + logging.info(f"INIT::creating voices") + + voices = [] + for v in conf['voices']: + model = v['model'] + voice = utterance.voice.Voice(name=v["name"].upper(), model=model['model_dir'], tokenizer=model['tokeniser_file'], temp=float(model["temperature"]), length=UTTERANCE_LEN) + voice.set_channel(v['osc_channel']['root'], v['osc_channel']['utterance']) + voice.set_calculate(v['osc_channel']['root'], v['osc_channel']['calculate']) + voice.set_temperature(v['osc_channel']['root'], v['osc_channel']['temperature']) + voices.append(voice) + + #--------------------# + # QUESTION + #--------------------# + logging.info(f"INIT::setting up question") + questions = conf['questions'] + questions_array = questions.copy() + random.shuffle(questions_array) + + + #--------------------# + # NET + #--------------------# + logging.info(f"INIT::setting up OSC") + + utterance.osc.start_osc() + + broadcast = utterance.osc.OscBroadcaster(name="osc_broadcast", host=conf['host_voicemachine'], port=conf['port_voicemachine'], command_channel=conf['command_osc_channel']) + + def receiver_cb(temp, name): + global debug + if type(temp) == str and temp == "DEBUG": + debug = name + logging.info(f"DEBUG MODE: {debug}") + else: # temperature + for v in voices: + if v.name == name: + print(f'{name} - {temp}') + v.temp = temp + # broadcast.temperature(temp, v.temperature) # <-- doesn works because deadlocks in osc_process... + + receiver = utterance.osc.OscReceiver(name="osc_receiver", host=conf['host_machinespeak'], port=conf['port_machinespeak'], callback_fn=receiver_cb) + + + #--------------------# + # METRIC + #--------------------# + logging.info(f"INIT::loading doc2vec metrics") + metric = examine.metric.Metric(model_input='data/models/doc2vec.model') + + #--------------------# + # RANDOM + #--------------------# + + def random_seed(seconds): + global t_random_seed, exit + i = 0 + while i < seconds: + i += 1 + time.sleep(1) + if exit: + return + logging.info("RANDOM::SEEDING RANDOM") + random.seed(time.time()) + if not exit: + t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), )) + t_random_seed.start() + + t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), )) + t_random_seed.start() + + #--------------------# + # STATE TRANSITION + #--------------------# + + # STATES = ["METRIC", "QUESTION", "RANDOM"] + + + def state_transition(seconds): + global t_state_transition, exit, state + i = 0 + while i < seconds: + i += 1 + time.sleep(1) + if exit: + return + logging.info("STATE::STATE TRANSITION") + + state = "QUESTION" + + if not exit: + t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), )) + t_state_transition.start() + + t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), )) + t_state_transition.start() + + #--------------------# + # A + #--------------------# + logging.info(f"INIT::generating first utterance") + + v = random.choice(voices) + uv, uv_vec = utter_one_vectorise(v) + + # -- this only updates OSC -- + # -- might not need this in production + t_update = threading.Thread(target=update) + t_update.start() + + + while not exit: + + if state == "METRIC": + + logging.info(f"- state METRIC") + + results = [] + t = threading.Thread(target=find_candidates, args=[v, uv_vec, voices, results]) + t.start() + + logging.info(f"METRIC::broadcasting {v.name}") + broadcast_utterance(v, uv) + + t.join() + + # ok here we need to randomise maybe...?! + # ([d, t, v]) + + choice = results[0] + # makse sure we don't say the same thing over and over again + for r in results: + v = r[2] + u = r[1] + if v.select(u): + choice = r + break + else: + logging.info(f"METRIC::reduncancy {v.name}") + + v = choice[2] + uv = choice[1] + uv_vec = metric.vector(uv) + logging.info(f"METRIC::next {v.name}") + + elif state == "QUESTION": + + logging.info(f"- state QUESTION") + + if len(questions_array) <= 0: + questions_array = questions.copy() + random.shuffle(questions_array) + + # random question + q = questions_array.pop(0) + + # random voice + v = random.choice(voices) + + # random voice asks random question + + logging.info(f"QUESTION::{v.name} : {q['question']}") + + broadcast.utterance(q['question'], v.calculate) + broadcast.utterance(q['question'], v.channel) + + time.sleep(15) + + # answer + v = [e for e in voices if e.name == q['voice']] + if len(v) == 1: + v = v[0] + logging.info(f"QUESTION::answer - {v.name}") + uv, uv_vec = prompt_one_vectorise(v, q['prompt']) + v.remember(uv) + # broadcast_utterance(v, uv) <-- this is broadcasted as part of METRIC STATE + + state = "METRIC" + + + elif state == "RANDOM": + + logging.info(f"- state RANDOM") + + v = random.choice(voices) + l = random.randint(5, UTTERANCE_LEN) + uv, uv_vec = utter_one_vectorise(v, length=l) + broadcast_utterance(v, uv) + + + t_update.join() + + logging.info(f"TERMINATE::terminating OSC") + utterance.osc.terminate_osc() + + # if t_random_seed: + logging.info(f"TERMINATE::random seed") + t_random_seed.join() + + logging.info(f"TERMINATE::state transition") + t_state_transition.join() + + logging.info(f"FIN") + + +if __name__ == '__main__': + signal.signal(signal.SIGINT, signal_terminate) + sys.exit(main()) \ No newline at end of file diff --git a/speak_broadcast.py b/speak_broadcast.py index 21ac3e5..729e812 100644 --- a/speak_broadcast.py +++ b/speak_broadcast.py @@ -1,39 +1,54 @@ import argparse, json, sys, time, random, logging, signal, threading import utterance.voice import utterance.utils -import utterance.osc +import utterance.oscosc import examine.metric logging.basicConfig(level=logging.INFO) -UTTERANCE_LEN = 16 #<--------------- these should be in config +UTTERANCE_LEN = 64 #<--------------- these should be in config NUM_METRIC_GEN = 75 NUM_SAMPLE_VOICES = 3 RANDOM_SEED_TIMER_MIN = 2 +STATE_TRANSITION_TIMER_MIN = 10 broadcast = None metric = None exit = False terminal = False debug = False +state = "METRIC" def format_str(text) -> str: t = utterance.utils.clean(text) return utterance.utils.format(t) -def utter_one(v) -> str: - u = v.utter_one() +def tokenise_str(text): + return utterance.utils.tokenise(text) + +def utter_one(v, temp=None, length=None) -> str: + u = v.utter_one(temp=temp, length=length) return format_str(u) -def utter_one_vectorise(v): +def prompt_one(v, pinput: str, temp=None, length=None) -> str: + u = v.prompt(pinput=pinput, temp=None, length=length) + return format_str(u) + +def utter_one_vectorise(v, temp=None, length=None): global metric - uv = utter_one(v) + uv = utter_one(v, temp, length) uv_vec = metric.vector(uv) return uv, uv_vec -def utter_n_vectorise_distance(v, n, vec): +def prompt_one_vectorise(v, pinput: str, temp=None, length=None): + global metric + uv = prompt_one(v, pinput, temp, length) + uv_vec = metric.vector(uv) + return uv, uv_vec + +def utter_n_vectorise_distance(v, n, vec, temp=None, length=None): global metric results = [] - texts = v.utter_n(n=n) + texts = v.utter_n(n=n, temp=temp, length=length) for t in texts: t = format_str(t) t_vec = metric.vector(t) @@ -49,13 +64,6 @@ def broadcast_utterance(v, utterance): global broadcast, exit, debug - # if debug: - # temp = format(v.temp, '.2f') - # text = f"{v.name.upper()}: {temp}\n" - # else: - # text = f"" - - # Send all text to server to calculate bounds in advance broadcast.utterance(utterance, v.calculate) @@ -70,14 +78,21 @@ def broadcast_utterance(v, utterance): text += f broadcast.utterance(text, v.channel) terminal_utterance(f) - time.sleep(2) + + # sleep_time = 2 + # toks = tokenise_str(f) + toks = f.split() + sleep_time = len(toks) + if sleep_time <= 2: + sleep_time += 1 + time.sleep(sleep_time) if exit: return broadcast.command('clear') print("==========") - time.sleep(2) + time.sleep(3) def find_candidates(v, uv_vec, voices, results): logging.info(f"LOOP::finding candidates") @@ -96,16 +111,16 @@ def find_candidates(v, uv_vec, voices, results): lapse = time.time() - start logging.info(f"LOOP::done - {lapse} secs") -def update(): - global exit - while not exit: - try: - utterance.osc.update() - except Exception as e: - logging.error(e) - pass +# def update(): +# global exit +# while not exit: +# try: +# utterance.osc.update() +# except Exception as e: +# logging.error(e) +# pass - time.sleep(0.2) +# time.sleep(0.2) def signal_terminate(signum, frame): global exit @@ -114,7 +129,7 @@ def signal_terminate(signum, frame): def main() -> int: - global broadcast, metric, terminal, debug + global broadcast, metric, terminal, debug, state p = argparse.ArgumentParser() p.add_argument("-c", "--config", type=str, default="voice.config.json", help="configuratin file") @@ -139,36 +154,45 @@ def main() -> int: voices = [] for v in conf['voices']: model = v['model'] - voice = utterance.voice.Voice(name=v["name"].upper(), model=model['model_dir'], tokenizer=model['tokeniser_file'], temp=float(model["temperature"]), lenght=UTTERANCE_LEN) + voice = utterance.voice.Voice(name=v["name"].upper(), model=model['model_dir'], tokenizer=model['tokeniser_file'], temp=float(model["temperature"]), length=UTTERANCE_LEN) voice.set_channel(v['osc_channel']['root'], v['osc_channel']['utterance']) voice.set_calculate(v['osc_channel']['root'], v['osc_channel']['calculate']) voice.set_temperature(v['osc_channel']['root'], v['osc_channel']['temperature']) voices.append(voice) + #--------------------# + # QUESTION + #--------------------# + logging.info(f"INIT::setting up question") + questions = conf['questions'] + questions_array = questions.copy() + random.shuffle(questions_array) + #--------------------# # NET #--------------------# logging.info(f"INIT::setting up OSC") - utterance.osc.start_osc() + broadcast = utterance.oscosc.OscBroadcaster(name="osc_broadcast", host=conf['host_voicemachine'], port=conf['port_voicemachine'], command_channel=conf['command_osc_channel']) - broadcast = utterance.osc.OscBroadcaster(name="osc_broadcast", host=conf['host_voicemachine'], port=conf['port_voicemachine'], command_channel=conf['command_osc_channel']) + # def receiver_cb_temp(unused_addr, args, temp, name): + # for v in voices: + # if v.name == name: + # print(f'{name} - {temp}') + # v.temp = temp + # # broadcast.temperature(temp, v.temperature) # <-- doesn works because deadlocks in osc_process... - def receiver_cb(temp, name): - global debug - if type(temp) == str and temp == "DEBUG": - debug = name - logging.info(f"DEBUG MODE: {debug}") - else: # temperature - for v in voices: - if v.name == name: - print(f'{name} - {temp}') - v.temp = temp - # broadcast.temperature(temp, v.temperature) # <-- doesn works because deadlocks in osc_process... + # def receiver_cb_command(unused_addr, args, cmd): + # global debug + # debug = name + # logging.info(f"DEBUG MODE: {debug}") + - receiver = utterance.osc.OscReceiver(name="osc_receiver", host=conf['host_machinespeak'], port=conf['port_machinespeak'], callback_fn=receiver_cb) + # receiver = utterance.oscosc.OscReceiver(name="osc_receiver", host=conf['host_machinespeak'], port=conf['port_machinespeak'], callback_fn_command=receiver_cb_command, callback_fn_temp=receiver_cb_temp) + # t_osc_receiver = threading.Thread(target=receiver.server.serve_forever) + # t_osc_receiver.start() #--------------------# # METRIC @@ -180,17 +204,49 @@ def main() -> int: # RANDOM #--------------------# - def random_seed(): - global t_random_seed - logging.info("INIT::SEEDING RANDOM") + def random_seed(seconds): + global t_random_seed, exit + i = 0 + while i < seconds: + i += 1 + time.sleep(1) + if exit: + return + logging.info("RANDOM::SEEDING RANDOM") random.seed(time.time()) if not exit: - t_random_seed = threading.Timer(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), random_seed) + t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), )) t_random_seed.start() - t_random_seed = threading.Timer(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), random_seed) + t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), )) t_random_seed.start() + #--------------------# + # STATE TRANSITION + #--------------------# + + # STATES = ["METRIC", "QUESTION", "RANDOM"] + + + def state_transition(seconds): + global t_state_transition, exit, state + i = 0 + while i < seconds: + i += 1 + time.sleep(1) + if exit: + return + logging.info("STATE::STATE TRANSITION") + + state = "QUESTION" + + if not exit: + t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), )) + t_state_transition.start() + + t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), )) + t_state_transition.start() + #--------------------# # A #--------------------# @@ -199,50 +255,104 @@ def main() -> int: v = random.choice(voices) uv, uv_vec = utter_one_vectorise(v) - t_update = threading.Thread(target=update) - t_update.start() + # -- this only updates OSC -- + # -- might not need this in production + # t_update = threading.Thread(target=update) + # t_update.start() + while not exit: - results = [] - t = threading.Thread(target=find_candidates, args=[v, uv_vec, voices, results]) - t.start() + if state == "METRIC": - logging.info(f"LOOP::broadcasting {v.name}") - broadcast_utterance(v, uv) - - t.join() + logging.info(f"- state METRIC") - # ok here we need to randomise maybe...?! - # ([d, t, v]) + results = [] + t = threading.Thread(target=find_candidates, args=[v, uv_vec, voices, results]) + t.start() + + logging.info(f"METRIC::broadcasting {v.name}") + broadcast_utterance(v, uv) + + t.join() + + # ok here we need to randomise maybe...?! + # ([d, t, v]) + + choice = results[0] + # makse sure we don't say the same thing over and over again + for r in results: + v = r[2] + u = r[1] + if v.select(u): + choice = r + break + else: + logging.info(f"METRIC::reduncancy {v.name}") + + v = choice[2] + uv = choice[1] + uv_vec = metric.vector(uv) + logging.info(f"METRIC::next {v.name}") + + elif state == "QUESTION": + + logging.info(f"- state QUESTION") + + if len(questions_array) <= 0: + questions_array = questions.copy() + random.shuffle(questions_array) + + # random question + q = questions_array.pop(0) + + # random voice + v = random.choice(voices) + + # random voice asks random question + + logging.info(f"QUESTION::{v.name} : {q['question']}") + + broadcast.utterance(q['question'], v.calculate) + broadcast.utterance(q['question'], v.channel) + + time.sleep(15) + + # answer + v = [e for e in voices if e.name == q['voice']] + if len(v) == 1: + v = v[0] + logging.info(f"QUESTION::answer - {v.name}") + uv, uv_vec = prompt_one_vectorise(v, q['prompt']) + v.remember(uv) + # broadcast_utterance(v, uv) <-- this is broadcasted as part of METRIC STATE + + state = "METRIC" - choice = results[0] - # makse sure we don't say the same thing over and over again - for r in results: - v = r[2] - u = r[1] - if v.select(u): - choice = r - break - else: - logging.info(f"LOOP::reduncancy {v.name}") + elif state == "RANDOM": - v = choice[2] - uv = choice[1] - uv_vec = metric.vector(uv) - logging.info(f"LOOP::next {v.name}") + logging.info(f"- state RANDOM") - t_update.join() + v = random.choice(voices) + l = random.randint(5, UTTERANCE_LEN) + uv, uv_vec = utter_one_vectorise(v, length=l) + broadcast_utterance(v, uv) - logging.info(f"TERMINATE::terminating OSC") - utterance.osc.terminate_osc() + + # t_update.join() + + # logging.info(f"TERMINATE::terminating OSC") + # t_osc_receiver.stop() + # t_osc_receiver.join() # if t_random_seed: logging.info(f"TERMINATE::random seed") - t_random_seed.cancel() t_random_seed.join() + logging.info(f"TERMINATE::state transition") + t_state_transition.join() + logging.info(f"FIN") diff --git a/utterance/oscosc.py b/utterance/oscosc.py new file mode 100644 index 0000000..617a248 --- /dev/null +++ b/utterance/oscosc.py @@ -0,0 +1,56 @@ + +from pythonosc import udp_client +from pythonosc import osc_server +from pythonosc import dispatcher +from threading import Lock + +temp_mutex = Lock() + +class OscBroadcaster: + + def __init__(self, name: str, host: str, port: str, command_channel: str): + self.name = name + self.host = host + self.port = int(port) + self.cmd = command_channel + self.client = udp_client.SimpleUDPClient(self.host, self.port) + + def utterance(self, utterance: str, channel: str): + self.client.send_message(channel, utterance) + + def command(self, command: str): + self.client.send_message(self.cmd, command) + + def temperature(self, temp: float, channel: str): + global temp_mutex + with temp_mutex: + self.client.send_message(self.cmd, command) + + +class OscReceiver: + + def __init__(self, name: str, host: str, port: str, callback_fn_command=None, callback_fn_temp=None): + + self.dispatcher = dispatcher.Dispatcher() + if callback_fn_command != None: + self.dispatcher.map('/command', callback_fn_command) + if callback_fn_temp != None: + self.dispatcher.map('/temperature', callback_fn_command) + + self.name = name + self.host = "127.0.0.1" + self.port = int(port) + self.server = osc_server.ThreadingOSCUDPServer((self.host, self.port), self.dispatcher) + + + + + # # osc_udp_server(self.host, self.port, self.name) + # osc_udp_server("127.0.0.1", self.port, self.name) + # if callback_fn: + # osc_method('/', callback_fn) + # else: + # osc_method('/', temperature) + + + diff --git a/utterance/utils.py b/utterance/utils.py index 354411b..c5089b8 100644 --- a/utterance/utils.py +++ b/utterance/utils.py @@ -1,4 +1,5 @@ import string, regex +from gensim.utils import tokenize def clean(text: str) -> str: @@ -48,6 +49,9 @@ def fragments(utterance: str): return frags +def tokenise(utterance: str): + return list(tokenize(utterance, lower=True)) + diff --git a/utterance/voice.py b/utterance/voice.py index f448464..a42ffae 100644 --- a/utterance/voice.py +++ b/utterance/voice.py @@ -7,29 +7,32 @@ UTTERANCE_MEMORY_MIN_DIST = 0.2 class Voice: - def __init__(self, name: str, model: str, tokenizer: str, temp: int, lenght: int): + def __init__(self, name: str, model: str, tokenizer: str, temp: int, length: int): self.name = name self.temp = temp - self.lenght = lenght + self.length = length self.v = aitextgen(model_folder=model, tokenizer_file=tokenizer) self.utterances = [] - def utter_n(self, n: int, temp: float = None, lenght: int = None): + def utter_n(self, n: int, temp: float = None, length: int = None): t = self.temp if temp == None else temp - l = self.lenght if lenght == None else lenght - return self.v.generate(n=n, max_lenght=l, temperature=t, seed=int(time.time()), return_as_list=True) + l = self.length if length == None else length + return self.v.generate(n=n, max_length=l, temperature=t, seed=int(time.time()), return_as_list=True) - def utter_one(self, temp: int = None, lenght: float = None) -> str: + def utter_one(self, temp: float = None, length: int = None) -> str: t = self.temp if temp == None else temp - l = self.lenght if lenght == None else lenght - return self.utter_n(n=1, temp=temp, lenght=lenght)[0] + l = self.length if length == None else length + return self.utter_n(n=1, temp=temp, length=length)[0] + + def prompt(self, pinput: str, temp: float = None, length: int = None) -> str: + t = self.temp if temp == None else temp + l = self.length if length == None else length + return self.v.generate(prompt=pinput, n=1, max_length=l, temperature=t, seed=int(time.time()), return_as_list=True)[0] def select(self, utterance: str) -> bool: # fuction making sure to not say the same thing - print("select;") - toks = set(gensim.utils.tokenize(utterance)) i = 0 @@ -41,13 +44,15 @@ class Voice: self.utterances.append(toks) - print(len(self.utterances)) - if len(self.utterances) > UTTERANCE_MEMORY_LEN: self.utterances.pop(0) return True + def remember(self, utterance: str): + toks = set(gensim.utils.tokenize(utterance)) + self.utterances.append(toks) + def set_channel(self, root: str, endpoint: str): self.channel = root + endpoint