transcriptum/audio.py

290 lines
7.9 KiB
Python
Raw Normal View History

2024-04-24 09:42:52 +02:00
import numpy
import math
import pyaudio as pa
from opaudio import audioop
import threading
import collections
class AudioSource(object):
def __init__(self):
raise NotImplementedError("this is an abstract class")
def __enter__(self):
raise NotImplementedError("this is an abstract class")
def __exit__(self, exc_type, exc_value, traceback):
raise NotImplementedError("this is an abstract class")
class Microphone(AudioSource):
def __init__(self, device_info=None, sample_rate=None, chunk_size=1024):
assert device_info is not None, "device_info must not be None (see Microphone.select)"
a = pa.PyAudio()
try:
if sample_rate is None:
assert isinstance(device_info.get('defaultSampleRate'), (float, int)) and device_info.get('defaultSampleRate') > 0, "Wrong sample rate provided by PyAudio"
sample_rate = int(device_info.get('defaultSampleRate'))
self.device_info = device_info
finally:
a.terminate()
self.format = pa.paInt16
self.SAMPLE_WIDTH = pa.get_sample_size(self.format)
self.SAMPLE_RATE = sample_rate
self.CHUNK = chunk_size
self.audio = None
self.stream = None
@staticmethod
def select():
n = 0
microphones = []
a = pa.PyAudio()
for i in range(a.get_device_count()):
d = a.get_device_info_by_index(i)
if(d.get('maxInputChannels') > 0):
microphones.append(d)
print(f"{n}. {d.get('name')}")
n += 1
while True:
sel = input("select microphone: ")
if not sel.isdigit() or int(sel) > n:
print("Wrong selection.")
continue
m = microphones[int(sel)]
a.terminate()
return m
def __enter__(self):
assert self.stream is None, "Source already streaming"
self.a = pa.PyAudio()
try:
pa_stream = self.a.open(input_device_index=self.device_info.get('index'), channels=1, format=self.format, rate=self.SAMPLE_RATE, frames_per_buffer=self.CHUNK, input=True)
self.stream = Microphone.Stream(pa_stream)
except Exception as e:
print(e)
self.a.terminate()
return self
def __exit__(self, exc_type, exc_value, traceback):
try:
self.stream.close()
finally:
self.stream = None
self.a.terminate()
class Stream(object):
def __init__(self, pa_stream):
self.pa_stream = pa_stream
def read(self, size):
return self.pa_stream.read(size, exception_on_overflow=False)
def close(self):
try:
if not self.pa_stream.is_stopped():
self.pa_stream.stop_stream()
finally:
self.pa_stream.close()
class WaitTimeoutError(Exception):
pass
class Listener(AudioSource):
def __init__(self):
self.energy_threshold = 300 # minimum audio energy to consider for recording
self.dynamic_energy_threshold = True
self.dynamic_energy_adjustment_damping = 0.15
self.dynamic_energy_ratio = 1.5
self.pause_threshold = 0.8 # seconds of non-speaking audio before a phrase is considered complete
self.operation_timeout = None # seconds after an internal operation (e.g., an API request) starts before it times out, or ``None`` for no timeout
self.phrase_threshold = 0.5 # minimum seconds of speaking audio before we consider the speaking audio a phrase - values below this are ignored (for filtering out clicks and pops)
self.non_speaking_duration = 0.5 # seconds of non-speaking audio to keep on both sides of the recording
def check_source(self, source):
assert isinstance(source, AudioSource), "Source must be an AudioSource"
assert source.stream is not None, "Source must be streaming"
def dynamic_thresholding(self, energy, buffer, seconds_per_buffer):
damping = self.dynamic_energy_adjustment_damping ** seconds_per_buffer
target_energy = energy * self.dynamic_energy_ratio
self.energy_threshold = self.energy_threshold * damping + target_energy * (1 - damping)
def adjust_ambient_noise(self, source, duration=1):
self.check_source(source)
seconds_per_buffer = float(source.CHUNK) / source.SAMPLE_RATE
elapsed_time = 0
print(f"Adjust ambient noise {duration}")
while True:
elapsed_time += seconds_per_buffer
if elapsed_time > duration:
break
buffer = source.stream.read(source.CHUNK)
energy = audioop.rms(buffer, source.SAMPLE_WIDTH)
self.dynamic_thresholding(energy, buffer, seconds_per_buffer)
def listen(self, source, listen_timeout=None, phrase_timeout=None, is_listening_cb=None):
self.check_source(source)
seconds_per_buffer = float(source.CHUNK) / source.SAMPLE_RATE
elapsed_time = 0
pause_buffer_cnt = int(math.ceil(self.pause_threshold / seconds_per_buffer))
phrase_buffer_cnt = int(math.ceil(self.phrase_threshold / seconds_per_buffer))
non_speaking_buffer_cnt = int(math.ceil(self.non_speaking_duration / seconds_per_buffer))
buffer = b""
pause_cnt = 0
phrase_cnt = 0
timed_out = False
while True:
frames = collections.deque()
while True:
elapsed_time += seconds_per_buffer
if listen_timeout and elapsed_time > listen_timeout:
raise WaitTimeoutError("Listener timed out while waiting for input")
# # print("timeout")
# if phrase_cnt > 0:
# timed_out = True
# break
# else:
# raise WaitTimeoutError("Listener timed out while waiting for input")
buffer = source.stream.read(source.CHUNK)
if len(buffer) == 0:
break
frames.append(buffer)
if len(frames) > non_speaking_buffer_cnt:
frames.popleft()
energy = audioop.rms(buffer, source.SAMPLE_WIDTH)
###############################################
# print(f"{energy} - {self.energy_threshold}")
if energy > self.energy_threshold:
break
if self.dynamic_energy_threshold:
print("dynamic_thresholding")
self.dynamic_thresholding(energy, buffer, seconds_per_buffer)
if timed_out:
break
phrase_start_time = elapsed_time
if is_listening_cb:
is_listening_cb(True)
while True:
elapsed_time += seconds_per_buffer
if phrase_timeout and elapsed_time - phrase_start_time > phrase_timeout:
break
buffer = source.stream.read(source.CHUNK)
if len(buffer) == 0:
break
frames.append(buffer)
energy = audioop.rms(buffer, source.SAMPLE_WIDTH)
if energy > self.energy_threshold:
pause_cnt = 0
phrase_cnt += 1
# print(f"phrase {phrase_cnt}")
else:
pause_cnt += 1
# print(f"pause {pause_cnt}")
if pause_cnt > pause_buffer_cnt:
# print(f"pause {pause_cnt} - {pause_buffer_cnt}")
break
# print(f"phrase count {phrase_cnt} - {phrase_buffer_cnt}")
if phrase_cnt >= phrase_buffer_cnt or len(buffer) == 0:
break
# frames.append(buffer)
# phrase_cnt += 1
# print(f"phrase {phrase_cnt}")
# energy = audioop.rms(buffer, source.SAMPLE_WIDTH)
# if energy > self.energy_threshold:
# pause_cnt = 0
# else:
# pause_cnt += 1
# print(f"pause {pause_cnt}")
# if pause_cnt > pause_buffer_cnt:
# print(f"pause {pause_cnt} - {pause_buffer_cnt}")
# break
# phrase_cnt -= pause_cnt
# print(f"phrase count {phrase_cnt} - {phrase_buffer_cnt}")
# if phrase_cnt >= phrase_buffer_cnt or len(buffer) == 0:
# break
if is_listening_cb:
is_listening_cb(False)
if frames:
for i in range(pause_cnt - non_speaking_buffer_cnt):
frames.pop()
frame_data = b"".join(frames)
return (frame_data, source.SAMPLE_RATE, source.SAMPLE_WIDTH)
def listen_in_background(self, source, listen_cb, listen_timeout=None, is_listening_cb=None):
assert isinstance(source, AudioSource), "Source must be an AudioSource"
running = [True]
def listen_in_thread():
with source:
while running[0]:
try:
data = self.listen(source=source, listen_timeout=1, phrase_timeout=listen_timeout, is_listening_cb=is_listening_cb)
except WaitTimeoutError:
is_listening_cb(False)
pass
else:
if running[0]:
listen_cb(self, data)
def stopper(wait_join_stop=True):
running[0] = False
if wait_join_stop:
listener_thread.join()
listener_thread = threading.Thread(target=listen_in_thread)
listener_thread.deamon = True
listener_thread.start()
return stopper