new osc, tempo, etc.

This commit is contained in:
NATURESPEAK 2022-04-10 15:20:13 +02:00
parent 5857da6e00
commit d70f643836
5 changed files with 622 additions and 88 deletions

359
speak_broadcast-backup.py Normal file
View File

@ -0,0 +1,359 @@
import argparse, json, sys, time, random, logging, signal, threading
import utterance.voice
import utterance.utils
import utterance.osc
import examine.metric
logging.basicConfig(level=logging.INFO)
UTTERANCE_LEN = 64 #<--------------- these should be in config
NUM_METRIC_GEN = 75
NUM_SAMPLE_VOICES = 3
RANDOM_SEED_TIMER_MIN = 2
STATE_TRANSITION_TIMER_MIN = 1.5
broadcast = None
metric = None
exit = False
terminal = False
debug = False
state = "METRIC"
def format_str(text) -> str:
t = utterance.utils.clean(text)
return utterance.utils.format(t)
def tokenise_str(text):
return utterance.utils.tokenise(text)
def utter_one(v, temp=None, length=None) -> str:
u = v.utter_one(temp=temp, length=length)
return format_str(u)
def prompt_one(v, pinput: str, temp=None, length=None) -> str:
u = v.prompt(pinput=pinput, temp=None, length=length)
return format_str(u)
def utter_one_vectorise(v, temp=None, length=None):
global metric
uv = utter_one(v, temp, length)
uv_vec = metric.vector(uv)
return uv, uv_vec
def prompt_one_vectorise(v, pinput: str, temp=None, length=None):
global metric
uv = prompt_one(v, pinput, temp, length)
uv_vec = metric.vector(uv)
return uv, uv_vec
def utter_n_vectorise_distance(v, n, vec, temp=None, length=None):
global metric
results = []
texts = v.utter_n(n=n, temp=temp, length=length)
for t in texts:
t = format_str(t)
t_vec = metric.vector(t)
d = examine.metric.cos_dist(vec, t_vec)
results.append([d, t, v])
return results
def terminal_utterance(utterance):
if terminal:
print(utterance, end="")
def broadcast_utterance(v, utterance):
global broadcast, exit, debug
# Send all text to server to calculate bounds in advance
broadcast.utterance(utterance, v.calculate)
text = ""
broadcast.utterance(text, v.channel)
terminal_utterance(text)
time.sleep(2)
frags = v.fragments(utterance)
for f in frags:
text += f
broadcast.utterance(text, v.channel)
terminal_utterance(f)
# sleep_time = 2
# toks = tokenise_str(f)
toks = f.split()
sleep_time = len(toks)
if sleep_time <= 2:
sleep_time += 1
time.sleep(sleep_time)
if exit:
return
broadcast.command('clear')
print("==========")
time.sleep(3)
def find_candidates(v, uv_vec, voices, results):
logging.info(f"LOOP::finding candidates")
start = time.time()
candidates = random.sample(voices, NUM_SAMPLE_VOICES)
for c in candidates:
if exit:
break
if c == v:
continue
results += utter_n_vectorise_distance(c, NUM_METRIC_GEN, uv_vec)
results.sort(key=lambda t: t[0], reverse=True)
lapse = time.time() - start
logging.info(f"LOOP::done - {lapse} secs")
def update():
global exit
while not exit:
try:
utterance.osc.update()
except Exception as e:
logging.error(e)
pass
time.sleep(0.2)
def signal_terminate(signum, frame):
global exit
logging.warning("::SIGNAL TERMINATE::")
exit = True
def main() -> int:
global broadcast, metric, terminal, debug, state
p = argparse.ArgumentParser()
p.add_argument("-c", "--config", type=str, default="voice.config.json", help="configuratin file")
p.add_argument("-i", "--iterations", type=int, default=10, help="number of iterations")
p.add_argument("-t", "--terminal", action='store_true', help="print to terminal")
args = p.parse_args()
logging.info(f"INIT::loading config file - {args.config}")
with open(args.config) as f:
conf = json.load(f)
logging.info(conf)
terminal = args.terminal
#--------------------#
# VOICES
#--------------------#
logging.info(f"INIT::creating voices")
voices = []
for v in conf['voices']:
model = v['model']
voice = utterance.voice.Voice(name=v["name"].upper(), model=model['model_dir'], tokenizer=model['tokeniser_file'], temp=float(model["temperature"]), length=UTTERANCE_LEN)
voice.set_channel(v['osc_channel']['root'], v['osc_channel']['utterance'])
voice.set_calculate(v['osc_channel']['root'], v['osc_channel']['calculate'])
voice.set_temperature(v['osc_channel']['root'], v['osc_channel']['temperature'])
voices.append(voice)
#--------------------#
# QUESTION
#--------------------#
logging.info(f"INIT::setting up question")
questions = conf['questions']
questions_array = questions.copy()
random.shuffle(questions_array)
#--------------------#
# NET
#--------------------#
logging.info(f"INIT::setting up OSC")
utterance.osc.start_osc()
broadcast = utterance.osc.OscBroadcaster(name="osc_broadcast", host=conf['host_voicemachine'], port=conf['port_voicemachine'], command_channel=conf['command_osc_channel'])
def receiver_cb(temp, name):
global debug
if type(temp) == str and temp == "DEBUG":
debug = name
logging.info(f"DEBUG MODE: {debug}")
else: # temperature
for v in voices:
if v.name == name:
print(f'{name} - {temp}')
v.temp = temp
# broadcast.temperature(temp, v.temperature) # <-- doesn works because deadlocks in osc_process...
receiver = utterance.osc.OscReceiver(name="osc_receiver", host=conf['host_machinespeak'], port=conf['port_machinespeak'], callback_fn=receiver_cb)
#--------------------#
# METRIC
#--------------------#
logging.info(f"INIT::loading doc2vec metrics")
metric = examine.metric.Metric(model_input='data/models/doc2vec.model')
#--------------------#
# RANDOM
#--------------------#
def random_seed(seconds):
global t_random_seed, exit
i = 0
while i < seconds:
i += 1
time.sleep(1)
if exit:
return
logging.info("RANDOM::SEEDING RANDOM")
random.seed(time.time())
if not exit:
t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), ))
t_random_seed.start()
t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), ))
t_random_seed.start()
#--------------------#
# STATE TRANSITION
#--------------------#
# STATES = ["METRIC", "QUESTION", "RANDOM"]
def state_transition(seconds):
global t_state_transition, exit, state
i = 0
while i < seconds:
i += 1
time.sleep(1)
if exit:
return
logging.info("STATE::STATE TRANSITION")
state = "QUESTION"
if not exit:
t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), ))
t_state_transition.start()
t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), ))
t_state_transition.start()
#--------------------#
# A
#--------------------#
logging.info(f"INIT::generating first utterance")
v = random.choice(voices)
uv, uv_vec = utter_one_vectorise(v)
# -- this only updates OSC --
# -- might not need this in production
t_update = threading.Thread(target=update)
t_update.start()
while not exit:
if state == "METRIC":
logging.info(f"- state METRIC")
results = []
t = threading.Thread(target=find_candidates, args=[v, uv_vec, voices, results])
t.start()
logging.info(f"METRIC::broadcasting {v.name}")
broadcast_utterance(v, uv)
t.join()
# ok here we need to randomise maybe...?!
# ([d, t, v])
choice = results[0]
# makse sure we don't say the same thing over and over again
for r in results:
v = r[2]
u = r[1]
if v.select(u):
choice = r
break
else:
logging.info(f"METRIC::reduncancy {v.name}")
v = choice[2]
uv = choice[1]
uv_vec = metric.vector(uv)
logging.info(f"METRIC::next {v.name}")
elif state == "QUESTION":
logging.info(f"- state QUESTION")
if len(questions_array) <= 0:
questions_array = questions.copy()
random.shuffle(questions_array)
# random question
q = questions_array.pop(0)
# random voice
v = random.choice(voices)
# random voice asks random question
logging.info(f"QUESTION::{v.name} : {q['question']}")
broadcast.utterance(q['question'], v.calculate)
broadcast.utterance(q['question'], v.channel)
time.sleep(15)
# answer
v = [e for e in voices if e.name == q['voice']]
if len(v) == 1:
v = v[0]
logging.info(f"QUESTION::answer - {v.name}")
uv, uv_vec = prompt_one_vectorise(v, q['prompt'])
v.remember(uv)
# broadcast_utterance(v, uv) <-- this is broadcasted as part of METRIC STATE
state = "METRIC"
elif state == "RANDOM":
logging.info(f"- state RANDOM")
v = random.choice(voices)
l = random.randint(5, UTTERANCE_LEN)
uv, uv_vec = utter_one_vectorise(v, length=l)
broadcast_utterance(v, uv)
t_update.join()
logging.info(f"TERMINATE::terminating OSC")
utterance.osc.terminate_osc()
# if t_random_seed:
logging.info(f"TERMINATE::random seed")
t_random_seed.join()
logging.info(f"TERMINATE::state transition")
t_state_transition.join()
logging.info(f"FIN")
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_terminate)
sys.exit(main())

View File

@ -1,39 +1,54 @@
import argparse, json, sys, time, random, logging, signal, threading import argparse, json, sys, time, random, logging, signal, threading
import utterance.voice import utterance.voice
import utterance.utils import utterance.utils
import utterance.osc import utterance.oscosc
import examine.metric import examine.metric
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
UTTERANCE_LEN = 16 #<--------------- these should be in config UTTERANCE_LEN = 64 #<--------------- these should be in config
NUM_METRIC_GEN = 75 NUM_METRIC_GEN = 75
NUM_SAMPLE_VOICES = 3 NUM_SAMPLE_VOICES = 3
RANDOM_SEED_TIMER_MIN = 2 RANDOM_SEED_TIMER_MIN = 2
STATE_TRANSITION_TIMER_MIN = 10
broadcast = None broadcast = None
metric = None metric = None
exit = False exit = False
terminal = False terminal = False
debug = False debug = False
state = "METRIC"
def format_str(text) -> str: def format_str(text) -> str:
t = utterance.utils.clean(text) t = utterance.utils.clean(text)
return utterance.utils.format(t) return utterance.utils.format(t)
def utter_one(v) -> str: def tokenise_str(text):
u = v.utter_one() return utterance.utils.tokenise(text)
def utter_one(v, temp=None, length=None) -> str:
u = v.utter_one(temp=temp, length=length)
return format_str(u) return format_str(u)
def utter_one_vectorise(v): def prompt_one(v, pinput: str, temp=None, length=None) -> str:
u = v.prompt(pinput=pinput, temp=None, length=length)
return format_str(u)
def utter_one_vectorise(v, temp=None, length=None):
global metric global metric
uv = utter_one(v) uv = utter_one(v, temp, length)
uv_vec = metric.vector(uv) uv_vec = metric.vector(uv)
return uv, uv_vec return uv, uv_vec
def utter_n_vectorise_distance(v, n, vec): def prompt_one_vectorise(v, pinput: str, temp=None, length=None):
global metric
uv = prompt_one(v, pinput, temp, length)
uv_vec = metric.vector(uv)
return uv, uv_vec
def utter_n_vectorise_distance(v, n, vec, temp=None, length=None):
global metric global metric
results = [] results = []
texts = v.utter_n(n=n) texts = v.utter_n(n=n, temp=temp, length=length)
for t in texts: for t in texts:
t = format_str(t) t = format_str(t)
t_vec = metric.vector(t) t_vec = metric.vector(t)
@ -49,13 +64,6 @@ def broadcast_utterance(v, utterance):
global broadcast, exit, debug global broadcast, exit, debug
# if debug:
# temp = format(v.temp, '.2f')
# text = f"{v.name.upper()}: {temp}\n"
# else:
# text = f""
# Send all text to server to calculate bounds in advance # Send all text to server to calculate bounds in advance
broadcast.utterance(utterance, v.calculate) broadcast.utterance(utterance, v.calculate)
@ -70,14 +78,21 @@ def broadcast_utterance(v, utterance):
text += f text += f
broadcast.utterance(text, v.channel) broadcast.utterance(text, v.channel)
terminal_utterance(f) terminal_utterance(f)
time.sleep(2)
# sleep_time = 2
# toks = tokenise_str(f)
toks = f.split()
sleep_time = len(toks)
if sleep_time <= 2:
sleep_time += 1
time.sleep(sleep_time)
if exit: if exit:
return return
broadcast.command('clear') broadcast.command('clear')
print("==========") print("==========")
time.sleep(2) time.sleep(3)
def find_candidates(v, uv_vec, voices, results): def find_candidates(v, uv_vec, voices, results):
logging.info(f"LOOP::finding candidates") logging.info(f"LOOP::finding candidates")
@ -96,16 +111,16 @@ def find_candidates(v, uv_vec, voices, results):
lapse = time.time() - start lapse = time.time() - start
logging.info(f"LOOP::done - {lapse} secs") logging.info(f"LOOP::done - {lapse} secs")
def update(): # def update():
global exit # global exit
while not exit: # while not exit:
try: # try:
utterance.osc.update() # utterance.osc.update()
except Exception as e: # except Exception as e:
logging.error(e) # logging.error(e)
pass # pass
time.sleep(0.2) # time.sleep(0.2)
def signal_terminate(signum, frame): def signal_terminate(signum, frame):
global exit global exit
@ -114,7 +129,7 @@ def signal_terminate(signum, frame):
def main() -> int: def main() -> int:
global broadcast, metric, terminal, debug global broadcast, metric, terminal, debug, state
p = argparse.ArgumentParser() p = argparse.ArgumentParser()
p.add_argument("-c", "--config", type=str, default="voice.config.json", help="configuratin file") p.add_argument("-c", "--config", type=str, default="voice.config.json", help="configuratin file")
@ -139,36 +154,45 @@ def main() -> int:
voices = [] voices = []
for v in conf['voices']: for v in conf['voices']:
model = v['model'] model = v['model']
voice = utterance.voice.Voice(name=v["name"].upper(), model=model['model_dir'], tokenizer=model['tokeniser_file'], temp=float(model["temperature"]), lenght=UTTERANCE_LEN) voice = utterance.voice.Voice(name=v["name"].upper(), model=model['model_dir'], tokenizer=model['tokeniser_file'], temp=float(model["temperature"]), length=UTTERANCE_LEN)
voice.set_channel(v['osc_channel']['root'], v['osc_channel']['utterance']) voice.set_channel(v['osc_channel']['root'], v['osc_channel']['utterance'])
voice.set_calculate(v['osc_channel']['root'], v['osc_channel']['calculate']) voice.set_calculate(v['osc_channel']['root'], v['osc_channel']['calculate'])
voice.set_temperature(v['osc_channel']['root'], v['osc_channel']['temperature']) voice.set_temperature(v['osc_channel']['root'], v['osc_channel']['temperature'])
voices.append(voice) voices.append(voice)
#--------------------#
# QUESTION
#--------------------#
logging.info(f"INIT::setting up question")
questions = conf['questions']
questions_array = questions.copy()
random.shuffle(questions_array)
#--------------------# #--------------------#
# NET # NET
#--------------------# #--------------------#
logging.info(f"INIT::setting up OSC") logging.info(f"INIT::setting up OSC")
utterance.osc.start_osc() broadcast = utterance.oscosc.OscBroadcaster(name="osc_broadcast", host=conf['host_voicemachine'], port=conf['port_voicemachine'], command_channel=conf['command_osc_channel'])
broadcast = utterance.osc.OscBroadcaster(name="osc_broadcast", host=conf['host_voicemachine'], port=conf['port_voicemachine'], command_channel=conf['command_osc_channel']) # def receiver_cb_temp(unused_addr, args, temp, name):
# for v in voices:
# if v.name == name:
# print(f'{name} - {temp}')
# v.temp = temp
# # broadcast.temperature(temp, v.temperature) # <-- doesn works because deadlocks in osc_process...
def receiver_cb(temp, name): # def receiver_cb_command(unused_addr, args, cmd):
global debug # global debug
if type(temp) == str and temp == "DEBUG": # debug = name
debug = name # logging.info(f"DEBUG MODE: {debug}")
logging.info(f"DEBUG MODE: {debug}")
else: # temperature
for v in voices:
if v.name == name:
print(f'{name} - {temp}')
v.temp = temp
# broadcast.temperature(temp, v.temperature) # <-- doesn works because deadlocks in osc_process...
receiver = utterance.osc.OscReceiver(name="osc_receiver", host=conf['host_machinespeak'], port=conf['port_machinespeak'], callback_fn=receiver_cb)
# receiver = utterance.oscosc.OscReceiver(name="osc_receiver", host=conf['host_machinespeak'], port=conf['port_machinespeak'], callback_fn_command=receiver_cb_command, callback_fn_temp=receiver_cb_temp)
# t_osc_receiver = threading.Thread(target=receiver.server.serve_forever)
# t_osc_receiver.start()
#--------------------# #--------------------#
# METRIC # METRIC
@ -180,17 +204,49 @@ def main() -> int:
# RANDOM # RANDOM
#--------------------# #--------------------#
def random_seed(): def random_seed(seconds):
global t_random_seed global t_random_seed, exit
logging.info("INIT::SEEDING RANDOM") i = 0
while i < seconds:
i += 1
time.sleep(1)
if exit:
return
logging.info("RANDOM::SEEDING RANDOM")
random.seed(time.time()) random.seed(time.time())
if not exit: if not exit:
t_random_seed = threading.Timer(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), random_seed) t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), ))
t_random_seed.start() t_random_seed.start()
t_random_seed = threading.Timer(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), random_seed) t_random_seed = threading.Thread(target=random_seed, args=(random.randint(60, 60 * RANDOM_SEED_TIMER_MIN), ))
t_random_seed.start() t_random_seed.start()
#--------------------#
# STATE TRANSITION
#--------------------#
# STATES = ["METRIC", "QUESTION", "RANDOM"]
def state_transition(seconds):
global t_state_transition, exit, state
i = 0
while i < seconds:
i += 1
time.sleep(1)
if exit:
return
logging.info("STATE::STATE TRANSITION")
state = "QUESTION"
if not exit:
t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), ))
t_state_transition.start()
t_state_transition = threading.Thread(target=state_transition, args=(random.randint(60, 60 * STATE_TRANSITION_TIMER_MIN), ))
t_state_transition.start()
#--------------------# #--------------------#
# A # A
#--------------------# #--------------------#
@ -199,16 +255,23 @@ def main() -> int:
v = random.choice(voices) v = random.choice(voices)
uv, uv_vec = utter_one_vectorise(v) uv, uv_vec = utter_one_vectorise(v)
t_update = threading.Thread(target=update) # -- this only updates OSC --
t_update.start() # -- might not need this in production
# t_update = threading.Thread(target=update)
# t_update.start()
while not exit: while not exit:
if state == "METRIC":
logging.info(f"- state METRIC")
results = [] results = []
t = threading.Thread(target=find_candidates, args=[v, uv_vec, voices, results]) t = threading.Thread(target=find_candidates, args=[v, uv_vec, voices, results])
t.start() t.start()
logging.info(f"LOOP::broadcasting {v.name}") logging.info(f"METRIC::broadcasting {v.name}")
broadcast_utterance(v, uv) broadcast_utterance(v, uv)
t.join() t.join()
@ -216,7 +279,6 @@ def main() -> int:
# ok here we need to randomise maybe...?! # ok here we need to randomise maybe...?!
# ([d, t, v]) # ([d, t, v])
choice = results[0] choice = results[0]
# makse sure we don't say the same thing over and over again # makse sure we don't say the same thing over and over again
for r in results: for r in results:
@ -226,23 +288,71 @@ def main() -> int:
choice = r choice = r
break break
else: else:
logging.info(f"LOOP::reduncancy {v.name}") logging.info(f"METRIC::reduncancy {v.name}")
v = choice[2] v = choice[2]
uv = choice[1] uv = choice[1]
uv_vec = metric.vector(uv) uv_vec = metric.vector(uv)
logging.info(f"LOOP::next {v.name}") logging.info(f"METRIC::next {v.name}")
t_update.join() elif state == "QUESTION":
logging.info(f"TERMINATE::terminating OSC") logging.info(f"- state QUESTION")
utterance.osc.terminate_osc()
if len(questions_array) <= 0:
questions_array = questions.copy()
random.shuffle(questions_array)
# random question
q = questions_array.pop(0)
# random voice
v = random.choice(voices)
# random voice asks random question
logging.info(f"QUESTION::{v.name} : {q['question']}")
broadcast.utterance(q['question'], v.calculate)
broadcast.utterance(q['question'], v.channel)
time.sleep(15)
# answer
v = [e for e in voices if e.name == q['voice']]
if len(v) == 1:
v = v[0]
logging.info(f"QUESTION::answer - {v.name}")
uv, uv_vec = prompt_one_vectorise(v, q['prompt'])
v.remember(uv)
# broadcast_utterance(v, uv) <-- this is broadcasted as part of METRIC STATE
state = "METRIC"
elif state == "RANDOM":
logging.info(f"- state RANDOM")
v = random.choice(voices)
l = random.randint(5, UTTERANCE_LEN)
uv, uv_vec = utter_one_vectorise(v, length=l)
broadcast_utterance(v, uv)
# t_update.join()
# logging.info(f"TERMINATE::terminating OSC")
# t_osc_receiver.stop()
# t_osc_receiver.join()
# if t_random_seed: # if t_random_seed:
logging.info(f"TERMINATE::random seed") logging.info(f"TERMINATE::random seed")
t_random_seed.cancel()
t_random_seed.join() t_random_seed.join()
logging.info(f"TERMINATE::state transition")
t_state_transition.join()
logging.info(f"FIN") logging.info(f"FIN")

56
utterance/oscosc.py Normal file
View File

@ -0,0 +1,56 @@
from pythonosc import udp_client
from pythonosc import osc_server
from pythonosc import dispatcher
from threading import Lock
temp_mutex = Lock()
class OscBroadcaster:
def __init__(self, name: str, host: str, port: str, command_channel: str):
self.name = name
self.host = host
self.port = int(port)
self.cmd = command_channel
self.client = udp_client.SimpleUDPClient(self.host, self.port)
def utterance(self, utterance: str, channel: str):
self.client.send_message(channel, utterance)
def command(self, command: str):
self.client.send_message(self.cmd, command)
def temperature(self, temp: float, channel: str):
global temp_mutex
with temp_mutex:
self.client.send_message(self.cmd, command)
class OscReceiver:
def __init__(self, name: str, host: str, port: str, callback_fn_command=None, callback_fn_temp=None):
self.dispatcher = dispatcher.Dispatcher()
if callback_fn_command != None:
self.dispatcher.map('/command', callback_fn_command)
if callback_fn_temp != None:
self.dispatcher.map('/temperature', callback_fn_command)
self.name = name
self.host = "127.0.0.1"
self.port = int(port)
self.server = osc_server.ThreadingOSCUDPServer((self.host, self.port), self.dispatcher)
# # osc_udp_server(self.host, self.port, self.name)
# osc_udp_server("127.0.0.1", self.port, self.name)
# if callback_fn:
# osc_method('/', callback_fn)
# else:
# osc_method('/', temperature)

View File

@ -1,4 +1,5 @@
import string, regex import string, regex
from gensim.utils import tokenize
def clean(text: str) -> str: def clean(text: str) -> str:
@ -48,6 +49,9 @@ def fragments(utterance: str):
return frags return frags
def tokenise(utterance: str):
return list(tokenize(utterance, lower=True))

View File

@ -7,29 +7,32 @@ UTTERANCE_MEMORY_MIN_DIST = 0.2
class Voice: class Voice:
def __init__(self, name: str, model: str, tokenizer: str, temp: int, lenght: int): def __init__(self, name: str, model: str, tokenizer: str, temp: int, length: int):
self.name = name self.name = name
self.temp = temp self.temp = temp
self.lenght = lenght self.length = length
self.v = aitextgen(model_folder=model, tokenizer_file=tokenizer) self.v = aitextgen(model_folder=model, tokenizer_file=tokenizer)
self.utterances = [] self.utterances = []
def utter_n(self, n: int, temp: float = None, lenght: int = None): def utter_n(self, n: int, temp: float = None, length: int = None):
t = self.temp if temp == None else temp t = self.temp if temp == None else temp
l = self.lenght if lenght == None else lenght l = self.length if length == None else length
return self.v.generate(n=n, max_lenght=l, temperature=t, seed=int(time.time()), return_as_list=True) return self.v.generate(n=n, max_length=l, temperature=t, seed=int(time.time()), return_as_list=True)
def utter_one(self, temp: int = None, lenght: float = None) -> str: def utter_one(self, temp: float = None, length: int = None) -> str:
t = self.temp if temp == None else temp t = self.temp if temp == None else temp
l = self.lenght if lenght == None else lenght l = self.length if length == None else length
return self.utter_n(n=1, temp=temp, lenght=lenght)[0] return self.utter_n(n=1, temp=temp, length=length)[0]
def prompt(self, pinput: str, temp: float = None, length: int = None) -> str:
t = self.temp if temp == None else temp
l = self.length if length == None else length
return self.v.generate(prompt=pinput, n=1, max_length=l, temperature=t, seed=int(time.time()), return_as_list=True)[0]
def select(self, utterance: str) -> bool: def select(self, utterance: str) -> bool:
# fuction making sure to not say the same thing # fuction making sure to not say the same thing
print("select;")
toks = set(gensim.utils.tokenize(utterance)) toks = set(gensim.utils.tokenize(utterance))
i = 0 i = 0
@ -41,13 +44,15 @@ class Voice:
self.utterances.append(toks) self.utterances.append(toks)
print(len(self.utterances))
if len(self.utterances) > UTTERANCE_MEMORY_LEN: if len(self.utterances) > UTTERANCE_MEMORY_LEN:
self.utterances.pop(0) self.utterances.pop(0)
return True return True
def remember(self, utterance: str):
toks = set(gensim.utils.tokenize(utterance))
self.utterances.append(toks)
def set_channel(self, root: str, endpoint: str): def set_channel(self, root: str, endpoint: str):
self.channel = root + endpoint self.channel = root + endpoint