2023-09-19 08:58:00 +02:00
|
|
|
# python 3 (3.9+) implementation of audioop deprecated in 3.11 as of PEP 594 (removed in 3.13)
|
|
|
|
|
# based on the canonical c implementation from Jeffrey S. Lee in python (3.11) standard library
|
|
|
|
|
# https://docs.python.org/3/library/audioop.html
|
|
|
|
|
|
|
|
|
|
from builtins import max as builtin_max
|
|
|
|
|
from builtins import min as builtin_min
|
|
|
|
|
from builtins import sum as builtin_sum
|
|
|
|
|
import math
|
|
|
|
|
import struct
|
|
|
|
|
from sys import maxsize as INT_MAX
|
|
|
|
|
from ctypes import create_string_buffer
|
|
|
|
|
|
|
|
|
|
MAXVALS = [0, 0x7F, 0x7FFF, 0x7FFFFF, 0x7FFFFFFF]
|
|
|
|
|
MINVALS = [0, -0x80, -0x8000, -0x800000, -0x80000000]
|
|
|
|
|
STRUCT_FORMAT = {
|
|
|
|
|
1: ['B', 'b'],
|
|
|
|
|
2: ['H', 'h'],
|
|
|
|
|
4: ['I', 'i']
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
class error(Exception):
|
2023-09-19 08:58:00 +02:00
|
|
|
'''
|
|
|
|
|
This exception is raised on all errors, such as unknown number of bytes per sample, etc.
|
|
|
|
|
'''
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def _fboud(val, minval, maxval):
|
|
|
|
|
return builtin_max(builtin_min(val, maxval), minval)
|
|
|
|
|
|
|
|
|
|
def _check_size(size: int):
|
|
|
|
|
# see STRUCT_FORMAT (1-byte, 2-short, 4-int)
|
|
|
|
|
if size < 1 or size > 4 or size == 3:
|
|
|
|
|
raise error("Size should be 1, 2, or 4")
|
|
|
|
|
|
|
|
|
|
def _check_parameters(length: int, size: int):
|
|
|
|
|
_check_size(size)
|
|
|
|
|
if length == 0:
|
|
|
|
|
raise error("lenght is 0")
|
|
|
|
|
if length % size != 0:
|
|
|
|
|
raise error("not a whole number of frames")
|
|
|
|
|
|
|
|
|
|
def _get_raw_sample(cp, size: int, i: int, signed=True):
|
|
|
|
|
struct_format = STRUCT_FORMAT[size][signed]
|
|
|
|
|
start = i * size
|
|
|
|
|
end = start + size
|
2024-04-24 09:23:50 +02:00
|
|
|
return struct.unpack_from(struct_format, memoryview(cp)[start:end])[0]
|
2023-09-19 08:58:00 +02:00
|
|
|
|
|
|
|
|
def _get_samples(cp, size: int, signed=True):
|
2024-04-24 09:23:50 +02:00
|
|
|
for i in range(math.ceil(len(cp) / size)):
|
2023-09-19 08:58:00 +02:00
|
|
|
yield _get_raw_sample(cp, size, i, signed)
|
|
|
|
|
|
|
|
|
|
def _set_raw_sample(cp, size: int, i: int, val, signed=True):
|
|
|
|
|
struct_format = STRUCT_FORMAT[size][signed]
|
|
|
|
|
struct.pack_into(struct_format, cp, i * size, val)
|
|
|
|
|
|
|
|
|
|
def _overflow(val, size, signed=True):
|
|
|
|
|
if MINVALS[size] <= val <= MAXVALS[size]:
|
|
|
|
|
return val
|
|
|
|
|
|
|
|
|
|
bits = 8 * size
|
|
|
|
|
if signed:
|
|
|
|
|
m = 2**(bits - 1)
|
|
|
|
|
return ((val + m) % 2**bits) - m
|
|
|
|
|
else:
|
|
|
|
|
return val % 2**bits
|
|
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
def getsample(cp, size: int, index: int):
|
2023-09-19 08:58:00 +02:00
|
|
|
'''
|
|
|
|
|
Return the value of sample index from the fragment.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
if index < 0 and index >= len(fragment)/size:
|
|
|
|
|
raise error("Index out of range")
|
|
|
|
|
return _get_raw_sample(fragment, size, index)
|
|
|
|
|
|
|
|
|
|
def max(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Return the maximum of the absolute value of all samples in a fragment.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
if len(fragment) == 0:
|
|
|
|
|
return 0
|
|
|
|
|
return builtin_max(abs(sample) for sample in _get_samples(fragment, size))
|
|
|
|
|
|
|
|
|
|
def minmax(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Return a tuple consisting of the minimum and maximum values of all samples in the sound fragment.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
sample_min = 0x7FFFFFFF
|
|
|
|
|
sample_max = -0x80000000
|
2024-04-24 09:23:50 +02:00
|
|
|
for sample in _get_samples(fragment, size):
|
2023-09-19 08:58:00 +02:00
|
|
|
sample_min = builtin_min(sample, sample_min)
|
|
|
|
|
sample_max = builtin_min(sample, sample_max)
|
|
|
|
|
return sample_min, sample_max
|
|
|
|
|
|
|
|
|
|
def avg(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Return the average over all samples in the fragment.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
if nbr_samples == 0:
|
|
|
|
|
return 0
|
|
|
|
|
return builtin_sum(_get_samples(fragment, size)) / nbr_samples
|
|
|
|
|
|
|
|
|
|
def rms(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Return the root-mean-square of the fragment, i.e. sqrt(sum(S_i^2)/n).
|
|
|
|
|
|
|
|
|
|
This is a measure of the power in an audio signal.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
if nbr_samples == 0:
|
|
|
|
|
return 0
|
|
|
|
|
sum_squares = builtin_sum(sample**2 for sample in _get_samples(fragment, size))
|
|
|
|
|
return int(math.sqrt(sum_squares / nbr_samples))
|
|
|
|
|
|
|
|
|
|
def _sum2(cpa, cpb, length):
|
|
|
|
|
sum2 = 0
|
|
|
|
|
for i in range(length):
|
|
|
|
|
sum2 += getsample(cpa, 2, i) + getsample(cpb, 2, i)
|
|
|
|
|
return sum2
|
|
|
|
|
|
|
|
|
|
def findfit(fragment, reference):
|
|
|
|
|
'''
|
|
|
|
|
Try to match reference as well as possible to a portion of fragment (which should be the longer fragment). This is (conceptually) done by taking slices out of fragment, using findfactor() to compute the best match, and minimizing the result. The fragments should both contain 2-byte samples. Return a tuple (offset, factor) where offset is the (integer) offset into fragment where the optimal match started and factor is the (floating-point) factor as per findfactor().
|
|
|
|
|
'''
|
|
|
|
|
cp1 = fragment
|
|
|
|
|
cp2 = reference
|
|
|
|
|
|
|
|
|
|
if len(cp1) % 2 != 0 or len(cp2) % 2 != 0:
|
|
|
|
|
raise error("Strings should be even-sized")
|
|
|
|
|
|
|
|
|
|
if len(cp1) < len(cp2):
|
|
|
|
|
raise error("First sample should be longer")
|
|
|
|
|
|
|
|
|
|
len1 = len(cp1) / 2
|
|
|
|
|
len2 = len(cp2) / 2
|
|
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
sum_ri_2 = _sum2(cp2, cp2, len2)
|
|
|
|
|
sum_aij_2 = _sum2(cp1, cp1, len2)
|
|
|
|
|
sum_aij_ri = _sum2(cp1, cp2, len2)
|
2023-09-19 08:58:00 +02:00
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
#result = (sum_ri_2*sum_aij_2 - sum_aij_ri*sum_aij_ri) / sum_aij_2
|
|
|
|
|
result = (sum_ri_2*sum_aij_2 - sum_aij_ri**2) / sum_aij_2
|
2023-09-19 08:58:00 +02:00
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
best_result = result
|
|
|
|
|
best_j = 0
|
2023-09-19 08:58:00 +02:00
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
for j in range(1, len1-len2):
|
|
|
|
|
aj_m1 = _get_raw_sample(cp1, 2, j-1)
|
|
|
|
|
aj_lm1 = _get_raw_sample(cp1, 2, j+len2-1)
|
|
|
|
|
|
|
|
|
|
# sum_aij_2 = sum_aij_2 + aj_lm1*aj_lm1 - aj_m1*aj_m1
|
|
|
|
|
sum_aij_2 += aj_lm1**2 - aj_m1**2
|
|
|
|
|
sum_aij_ri = _sum2(memoryview(cp1)[j*2:], cp2, len2)
|
2023-09-19 08:58:00 +02:00
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
# result = (sum_ri_2*sum_aij_2 - sum_aij_ri*sum_aij_ri) / sum_aij_2
|
|
|
|
|
result = (sum_ri_2*sum_aij_2 - sum_aij_ri**2) / sum_aij_2
|
2023-09-19 08:58:00 +02:00
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
if result < best_result:
|
|
|
|
|
best_result = result
|
|
|
|
|
best_j = j
|
2023-09-19 08:58:00 +02:00
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
factor = _sum2(memoryview(cp1)[best_j*2:], cp2, len2) / sum_ri_2
|
2023-09-19 08:58:00 +02:00
|
|
|
return best_j, factor
|
|
|
|
|
|
|
|
|
|
def findfactor(fragment, reference):
|
|
|
|
|
'''
|
|
|
|
|
Return a factor F such that rms(add(fragment, mul(reference, -F))) is minimal, i.e., return the factor with which you should multiply reference to make it match as well as possible to fragment. The fragments should both contain 2-byte samples.
|
|
|
|
|
|
|
|
|
|
The time taken by this routine is proportional to len(fragment).
|
|
|
|
|
'''
|
|
|
|
|
cp1 = fragment
|
|
|
|
|
cp2 = reference
|
|
|
|
|
|
|
|
|
|
if len(cp1) % 2 != 0 or len(cp2) % 2 != 0:
|
|
|
|
|
raise error("Strings should be even-sized")
|
|
|
|
|
|
|
|
|
|
if len(cp1) != len(cp2):
|
|
|
|
|
raise error("Samples should be same size")
|
|
|
|
|
|
|
|
|
|
len1 = len(cp1) / 2
|
|
|
|
|
sum_ri_2 = _sum2(cp2, cp2, len1)
|
|
|
|
|
sum_aij_ri = _sum2(cp1, cp2, len1)
|
|
|
|
|
return sum_aij_ri / sum_ri_2
|
|
|
|
|
|
|
|
|
|
def findmax(fragment, length):
|
|
|
|
|
'''
|
|
|
|
|
Search fragment for a slice of length length samples (not bytes!) with maximum energy, i.e., return i for which rms(fragment[i*2:(i+length)*2]) is maximal. The fragments should both contain 2-byte samples.
|
|
|
|
|
|
|
|
|
|
The routine takes time proportional to len(fragment).
|
|
|
|
|
'''
|
|
|
|
|
cp1 = fragment
|
|
|
|
|
|
|
|
|
|
if len(cp1) % 2 != 0:
|
|
|
|
|
raise error("Strings should be even-sized")
|
|
|
|
|
|
|
|
|
|
len1 = len(cp1) / 2
|
|
|
|
|
|
|
|
|
|
if length < 0 or len1 < length:
|
|
|
|
|
raise error("Input sample should be longer")
|
|
|
|
|
|
|
|
|
|
if len1 == 0:
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
result = _sum2(cp1, cp1, length)
|
|
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
best_result = result
|
|
|
|
|
best_j = 0
|
2023-09-19 08:58:00 +02:00
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
for j in range(1, len1-length):
|
|
|
|
|
aj_m1 = _get_raw_sample(cp1, 2, j-1)
|
|
|
|
|
aj_lm1 = _get_raw_sample(cp1, 2, j+length-1)
|
2023-09-19 08:58:00 +02:00
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
# result = result + aj_lm1*aj_lm1 - aj_m1*aj_m1
|
|
|
|
|
result += aj_lm1**2 - aj_m1**2
|
2023-09-19 08:58:00 +02:00
|
|
|
|
2024-04-24 09:23:50 +02:00
|
|
|
if result < best_result:
|
2023-09-19 08:58:00 +02:00
|
|
|
best_result = result
|
|
|
|
|
best_j = j
|
|
|
|
|
|
|
|
|
|
return best_j
|
|
|
|
|
|
|
|
|
|
def avgpp(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Return the average peak-peak value over all samples in the fragment. No filtering is done, so the usefulness of this routine is questionable.
|
|
|
|
|
'''
|
|
|
|
|
prevextremevalid = False
|
|
|
|
|
prevextreme = None
|
|
|
|
|
accum = 0
|
|
|
|
|
nextreme = 0
|
|
|
|
|
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
|
|
|
|
|
prevval = _get_raw_sample(fragment, size, 0)
|
|
|
|
|
val = _get_raw_sample(fragment, size, 1)
|
|
|
|
|
# prevdiff = 17;
|
|
|
|
|
prevdiff = val - prevval
|
|
|
|
|
|
|
|
|
|
for i in range(1, nbr_samples):
|
|
|
|
|
val = _get_raw_sample(fragment, size, i)
|
|
|
|
|
if val != prevval:
|
|
|
|
|
diff = val < prevval
|
|
|
|
|
if not prevdiff == diff:
|
|
|
|
|
# Derivative changed sign. Compute difference to last extreme value and remember.
|
|
|
|
|
# (i.e. peak detected)
|
|
|
|
|
if prevextremevalid:
|
|
|
|
|
accum += abs(prevval - prevextreme)
|
|
|
|
|
nextreme += 1
|
|
|
|
|
|
|
|
|
|
prevextremevalid = True
|
|
|
|
|
prevextreme = prevval
|
|
|
|
|
|
|
|
|
|
prevval = val
|
|
|
|
|
prevdiff = diff
|
|
|
|
|
|
|
|
|
|
if nextreme == 0:
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
return accum / nextreme
|
|
|
|
|
|
|
|
|
|
def maxpp(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Return the maximum peak-peak value in the sound fragment.
|
|
|
|
|
'''
|
|
|
|
|
prevextremevalid = False
|
|
|
|
|
prevextreme = None
|
|
|
|
|
maxval = 0
|
|
|
|
|
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
|
|
|
|
|
prevval = _get_raw_sample(fragment, size, 0)
|
|
|
|
|
val = _get_raw_sample(fragment, size, 1)
|
|
|
|
|
# prevdiff = 17;
|
|
|
|
|
prevdiff = val - prevval
|
|
|
|
|
|
|
|
|
|
for i in range(1, nbr_samples):
|
|
|
|
|
val = _get_raw_sample(fragment, size, i)
|
|
|
|
|
if val != prevval:
|
|
|
|
|
diff = val < prevval
|
|
|
|
|
if not prevdiff == diff:
|
|
|
|
|
# Derivative changed sign. Compute difference to last extreme value and remember.
|
|
|
|
|
# (i.e. peak detected)
|
|
|
|
|
if prevextremevalid:
|
|
|
|
|
extremediff = abs(prevval - prevextreme)
|
|
|
|
|
if extremediff > maxval:
|
|
|
|
|
maxval = extremediff
|
|
|
|
|
|
|
|
|
|
prevextremevalid = True
|
|
|
|
|
prevextreme = prevval
|
|
|
|
|
|
|
|
|
|
prevval = val
|
|
|
|
|
prevdiff = diff
|
|
|
|
|
|
|
|
|
|
return maxval
|
|
|
|
|
|
|
|
|
|
def cross(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Return the number of zero crossings in the fragment passed as an argument.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
|
|
|
|
|
ncross = 0
|
|
|
|
|
prevval = _get_raw_sample(fragment, size, 0) < 0
|
|
|
|
|
|
|
|
|
|
for i in range(1, nbr_samples):
|
|
|
|
|
val = _get_raw_sample(fragment, size, i) < 0
|
|
|
|
|
if not val == prevval:
|
|
|
|
|
ncross += 1
|
|
|
|
|
prevval = val
|
|
|
|
|
|
|
|
|
|
return ncross
|
|
|
|
|
|
|
|
|
|
def mul(fragment, size, factor):
|
|
|
|
|
'''
|
|
|
|
|
Return a fragment that has all samples in the original fragment multiplied by the floating-point value factor. Samples are truncated in case of overflow.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
|
|
|
|
|
rv = create_string_buffer(len(fragment))
|
|
|
|
|
|
|
|
|
|
for i in range(nbr_samples):
|
|
|
|
|
val = _get_raw_sample(fragment, size, i)
|
|
|
|
|
fval = _fboud(val * factor, MAXVALS[size], MINVALS[size])
|
|
|
|
|
_set_raw_sample(rv, size, i, fval)
|
|
|
|
|
|
|
|
|
|
return rv.raw
|
|
|
|
|
|
|
|
|
|
def tomono(fragment, size, lfactor, rfactor):
|
|
|
|
|
'''
|
|
|
|
|
Convert a stereo fragment to a mono fragment. The left channel is multiplied by lfactor and the right channel by rfactor before adding the two channels to give a mono signal.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
|
|
|
|
|
# len(fragment) % 2 ??
|
|
|
|
|
rv = create_string_buffer(len(fragment) / 2)
|
|
|
|
|
|
|
|
|
|
for i in range(0, nbr_samples, 2):
|
|
|
|
|
val1 = _get_raw_sample(fragment, size, i)
|
|
|
|
|
val2 = _get_raw_sample(fragment, size, i + 1)
|
|
|
|
|
val = val1 * lfactor + val2 * rfactor
|
|
|
|
|
fval = _fboud(val, MAXVALS[size], MINVALS[size])
|
|
|
|
|
_set_raw_sample(rv, size, i / 2, fval)
|
|
|
|
|
|
|
|
|
|
return rv.raw
|
|
|
|
|
|
|
|
|
|
def tostereo(fragment, size, lfactor, rfactor):
|
|
|
|
|
'''
|
|
|
|
|
Generate a stereo fragment from a mono fragment. Each pair of samples in the stereo fragment are computed from the mono sample, whereby left channel samples are multiplied by lfactor and right channel samples by rfactor.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
|
|
|
|
|
rv = create_string_buffer(len(fragment) * 2)
|
|
|
|
|
|
|
|
|
|
for i in range(nbr_samples):
|
|
|
|
|
val = _get_raw_sample(fragment, size, i)
|
|
|
|
|
val1 = _fboud(val * lfactor, MAXVALS[size], MINVALS[size])
|
|
|
|
|
val2 = _fboud(val * rfactor, MAXVALS[size], MINVALS[size])
|
|
|
|
|
_set_raw_sample(rv, size, i * 2, val1)
|
|
|
|
|
_set_raw_sample(rv, size, i * 2 + 1, val2)
|
|
|
|
|
|
|
|
|
|
return rv.raw
|
|
|
|
|
|
|
|
|
|
def add(fragment1, fragment2, size):
|
|
|
|
|
'''
|
|
|
|
|
Return a fragment which is the addition of the two samples passed as parameters. width is the sample width in bytes, either 1, 2, 3 or 4. Both fragments should have the same length. Samples are truncated in case of overflow.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment1), size)
|
|
|
|
|
nbr_samples = len(fragment1) / size
|
|
|
|
|
|
|
|
|
|
if len(fragment1) != len(fragment2):
|
|
|
|
|
raise error("Lengths should be the same")
|
|
|
|
|
|
|
|
|
|
rv = create_string_buffer(len(fragment1))
|
|
|
|
|
|
|
|
|
|
for i in range(nbr_samples):
|
|
|
|
|
val1 = _get_raw_sample(fragment1, size, i)
|
|
|
|
|
val2 = _get_raw_sample(fragment2, size, i)
|
|
|
|
|
fval = (val1 + val2, MAXVALS[size], MINVALS[size])
|
|
|
|
|
_set_raw_sample(rv, size, i, fval)
|
|
|
|
|
|
|
|
|
|
return rv.raw
|
|
|
|
|
|
|
|
|
|
def bias(fragment, size, bias):
|
|
|
|
|
'''
|
|
|
|
|
Return a fragment that is the original fragment with a bias added to each sample. Samples wrap around in case of overflow.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
|
|
|
|
|
rv = create_string_buffer(len(fragment))
|
|
|
|
|
|
|
|
|
|
for i in range(nbr_samples):
|
|
|
|
|
val = _get_raw_sample(fragment, size, i)
|
|
|
|
|
fval = _overflow(val + bias, size)
|
|
|
|
|
_set_raw_sample(rv, size, i, fval)
|
|
|
|
|
|
|
|
|
|
rv.raw
|
|
|
|
|
|
|
|
|
|
def reverse(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Reverse the samples in a fragment and returns the modified fragment.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
|
|
|
|
|
rv = create_string_buffer(len(fragment))
|
|
|
|
|
|
|
|
|
|
for i in range(nbr_samples):
|
|
|
|
|
val = _get_raw_sample(fragment, size, i)
|
|
|
|
|
_set_raw_sample(rv, size, nbr_samples - 1 - i, val)
|
|
|
|
|
|
|
|
|
|
return rv.raw
|
|
|
|
|
|
|
|
|
|
def lin2lin(fragment, size, newsize):
|
|
|
|
|
'''
|
|
|
|
|
Convert samples between 1-, 2-, and 4-byte formats.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
_check_size(newsize)
|
|
|
|
|
nbr_samples = len(fragment) / size
|
|
|
|
|
|
|
|
|
|
rv = create_string_buffer(nbr_samples * newsize)
|
|
|
|
|
|
|
|
|
|
for i in range(nbr_samples):
|
|
|
|
|
val = _get_raw_sample(fragment, size, i)
|
|
|
|
|
if size > newsize:
|
|
|
|
|
val = val << (4 * newsize / size)
|
|
|
|
|
else:
|
|
|
|
|
val = val >> (4 * size / newsize)
|
|
|
|
|
val = _overflow(val, newsize)
|
|
|
|
|
_set_raw_sample(rv, newsize, i, val)
|
|
|
|
|
|
|
|
|
|
return rv.raw
|
|
|
|
|
|
|
|
|
|
def ratecv(fragment, size, nchannels, inrate, outrate, state, weightA=1, weightB=0):
|
|
|
|
|
'''
|
|
|
|
|
Convert the frame rate of the input fragment.
|
|
|
|
|
|
|
|
|
|
state is a tuple containing the state of the converter. The converter returns a tuple (newfragment, newstate), and newstate should be passed to the next call of ratecv(). The initial call should pass None as the state.
|
|
|
|
|
|
|
|
|
|
The weightA and weightB arguments are parameters for a simple digital filter and default to 1 and 0 respectively.
|
|
|
|
|
'''
|
|
|
|
|
_check_parameters(len(fragment), size)
|
|
|
|
|
|
|
|
|
|
if nchannels < 1:
|
|
|
|
|
raise error("# of channels should be >= 1")
|
|
|
|
|
|
|
|
|
|
if size * nchannels > INT_MAX:
|
|
|
|
|
raise OverflowError("size * nchannels too big for a C int")
|
|
|
|
|
|
|
|
|
|
bytes_per_frame = size * nchannels
|
|
|
|
|
|
|
|
|
|
if weightA < 1 or weightB < 0:
|
|
|
|
|
raise error("weightA should be >= 1, weightB should be >= 0")
|
|
|
|
|
|
|
|
|
|
if len(fragment) % bytes_per_frame != 0:
|
|
|
|
|
raise error("not a whole number of frames")
|
|
|
|
|
|
|
|
|
|
if inrate <= 0 or outrate <= 0:
|
|
|
|
|
raise error("sampling rate not > 0")
|
|
|
|
|
|
|
|
|
|
d = math.gcd(inrate, outrate)
|
|
|
|
|
inrate /= d
|
|
|
|
|
outrate /= d
|
|
|
|
|
|
|
|
|
|
# need this?
|
|
|
|
|
d = math.gcd(weightA, weightB)
|
|
|
|
|
weightA /= d
|
|
|
|
|
weightB /= d
|
|
|
|
|
|
|
|
|
|
prev_i = None
|
|
|
|
|
cur_i = None
|
|
|
|
|
|
|
|
|
|
nbr_frames = len(fragment) / bytes_per_frame
|
|
|
|
|
|
|
|
|
|
if state is None:
|
|
|
|
|
d = -outrate
|
|
|
|
|
prev_i = [0] * nchannels
|
|
|
|
|
cur_i = [0] * nchannels
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
d, samps = state
|
|
|
|
|
|
|
|
|
|
if len(samps) != nchannels:
|
|
|
|
|
raise error("illegal state argument")
|
|
|
|
|
|
|
|
|
|
prev_i, cur_i = zip(*samps)
|
|
|
|
|
prev_i, cur_i = list(prev_i), list(cur_i)
|
|
|
|
|
|
|
|
|
|
q = 1 + (nbr_frames - 1) / inrate
|
|
|
|
|
|
|
|
|
|
# PY_SSIZE_T_MAX
|
|
|
|
|
if outrate > INT_MAX / q / bytes_per_frame:
|
|
|
|
|
raise error("not enough memory for output buffer")
|
|
|
|
|
|
|
|
|
|
cp = create_string_buffer(q * outrate * bytes_per_frame)
|
|
|
|
|
samples = _get_samples(fragment, size)
|
|
|
|
|
ncp = 0
|
|
|
|
|
|
|
|
|
|
while True:
|
2024-04-24 09:23:50 +02:00
|
|
|
while d < 0:
|
2023-09-19 08:58:00 +02:00
|
|
|
if nbr_frames == 0:
|
|
|
|
|
samps = zip(prev_i, cur_i)
|
|
|
|
|
rv = cp.raw
|
|
|
|
|
trim_i = (ncp * bytes_per_frame) - len(rv)
|
2024-04-24 09:23:50 +02:00
|
|
|
rv = memoryview(rv)[:trim_i]
|
2023-09-19 08:58:00 +02:00
|
|
|
return (rv, (d, tuple(samps)))
|
|
|
|
|
|
|
|
|
|
for chan in range(nchannels):
|
|
|
|
|
prev_i[chan] = cur_i[chan]
|
|
|
|
|
cur_i[chan] = samples.next()
|
|
|
|
|
cur_i[chan] = (weightA * cur_i[chan] + weightB * prev_i[chan]) / (weightA + weightB)
|
|
|
|
|
|
|
|
|
|
nbr_frames -= 1
|
|
|
|
|
d += outrate
|
|
|
|
|
|
|
|
|
|
while d >= 0:
|
|
|
|
|
for chan in range(nchannels):
|
|
|
|
|
cur_o = (prev_i[chan] * d + cur_i[chan] * (outrate - d)) / outrate
|
|
|
|
|
cur_o = _overflow(cur_o, size)
|
|
|
|
|
_set_raw_sample(cp, size, ncp, cur_o)
|
|
|
|
|
ncp += 1
|
|
|
|
|
d -= inrate
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# NOT IMPLEMENTED
|
|
|
|
|
|
|
|
|
|
def adpcm2lin(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Not Implemented
|
|
|
|
|
'''
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
def alaw2lin(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Not Implemented
|
|
|
|
|
'''
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
def byteswap(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Not Implemented
|
|
|
|
|
'''
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
def lin2adpcm(fragment, size, state):
|
|
|
|
|
'''
|
|
|
|
|
Not Implemented
|
|
|
|
|
'''
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
def lin2alaw(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Not Implemented
|
|
|
|
|
'''
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
def lin2ulaw(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Not Implemented
|
|
|
|
|
'''
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
def ulaw2lin(fragment, size):
|
|
|
|
|
'''
|
|
|
|
|
Not Implemented
|
|
|
|
|
'''
|
|
|
|
|
raise NotImplementedError()
|