diff options
Diffstat (limited to 'python/lc3.py')
-rw-r--r-- | python/lc3.py | 400 |
1 files changed, 400 insertions, 0 deletions
diff --git a/python/lc3.py b/python/lc3.py new file mode 100644 index 0000000..e07efee --- /dev/null +++ b/python/lc3.py @@ -0,0 +1,400 @@ +# +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import array +import ctypes +import enum +import glob +import os + +from ctypes import c_bool, c_byte, c_int, c_uint, c_size_t, c_void_p +from ctypes.util import find_library + + +class _Base: + + def __init__(self, frame_duration, samplerate, nchannels, **kwargs): + + self.hrmode = False + self.dt_us = int(frame_duration * 1000) + self.sr_hz = int(samplerate) + self.sr_pcm_hz = self.sr_hz + self.nchannels = nchannels + + libpath = None + + for k in kwargs.keys(): + if k == 'hrmode': + self.hrmode = bool(kwargs[k]) + elif k == 'pcm_samplerate': + self.sr_pcm_hz = int(kwargs[k]) + elif k == 'libpath': + libpath = kwargs[k] + else: + raise ValueError("Invalid keyword argument: " + k) + + if self.dt_us not in [2500, 5000, 7500, 10000]: + raise ValueError( + "Invalid frame duration: %.1f ms" % frame_duration) + + allowed_samplerate = [8000, 16000, 24000, 32000, 48000] \ + if not self.hrmode else [48000, 96000] + + if self.sr_hz not in allowed_samplerate: + raise ValueError("Invalid sample rate: %d Hz" % samplerate) + + if libpath is None: + mesonpy_lib = glob.glob(os.path.join(os.path.dirname(__file__), '.lc3.mesonpy.libs', '*lc3*')) + + if mesonpy_lib: + libpath = mesonpy_lib[0] + else: + libpath = find_library("lc3") + if not libpath: + raise Exception("LC3 library not found") + + lib = ctypes.cdll.LoadLibrary(libpath) + + try: + lib.lc3_hr_frame_samples \ + and lib.lc3_hr_frame_block_bytes \ + and lib.lc3_hr_resolve_bitrate \ + and lib.lc3_hr_delay_samples + + except AttributeError: + + if self.hrmode: + raise Exception('High-Resolution interface not available') + + lib.lc3_hr_frame_samples = \ + lambda hrmode, dt_us, sr_hz: \ + lib.lc3_frame_samples(dt_us, sr_hz) + + lib.lc3_hr_frame_block_bytes = \ + lambda hrmode, dt_us, sr_hz, nchannels, bitrate: \ + nchannels * lib.lc3_frame_bytes(dt_us, bitrate // 2) + + lib.lc3_hr_resolve_bitrate = \ + lambda hrmode, dt_us, sr_hz, nbytes: \ + lib.lc3_resolve_bitrate(dt_us, nbytes) + + lib.lc3_hr_delay_samples = \ + lambda hrmode, dt_us, sr_hz: \ + lib.lc3_delay_samples(dt_us, sr_hz) + + lib.lc3_hr_frame_samples.argtypes = [c_bool, c_int, c_int] + lib.lc3_hr_frame_block_bytes.argtypes = \ + [c_bool, c_int, c_int, c_int, c_int] + lib.lc3_hr_resolve_bitrate.argtypes = [c_bool, c_int, c_int, c_int] + lib.lc3_hr_delay_samples.argtypes = [c_bool, c_int, c_int] + self.lib = lib + + libc = ctypes.cdll.LoadLibrary(find_library("c")) + + self.malloc = libc.malloc + self.malloc.argtypes = [c_size_t] + self.malloc.restype = c_void_p + + self.free = libc.free + self.free.argtypes = [c_void_p] + + def get_frame_samples(self): + """ + Returns the number of PCM samples in an LC3 frame + """ + ret = self.lib.lc3_hr_frame_samples( + self.hrmode, self.dt_us, self.sr_pcm_hz) + if ret < 0: + raise ValueError("Bad parameters") + return ret + + def get_frame_bytes(self, bitrate): + """ + Returns the size of LC3 frame blocks, from bitrate in bit per seconds. + A target `bitrate` equals 0 or `INT32_MAX` returns respectively + the minimum and maximum allowed size. + """ + ret = self.lib.lc3_hr_frame_block_bytes( + self.hrmode, self.dt_us, self.sr_hz, self.nchannels, bitrate) + if ret < 0: + raise ValueError("Bad parameters") + return ret + + def resolve_bitrate(self, nbytes): + """ + Returns the bitrate in bits per seconds, from the size of LC3 frames. + """ + ret = self.lib.lc3_hr_resolve_bitrate( + self.hrmode, self.dt_us, self.sr_hz, nbytes) + if ret < 0: + raise ValueError("Bad parameters") + return ret + + def get_delay_samples(self): + """ + Returns the algorithmic delay, as a number of samples. + """ + ret = self.lib.lc3_hr_delay_samples( + self.hrmode, self.dt_us, self.sr_pcm_hz) + if ret < 0: + raise ValueError("Bad parameters") + return ret + + @staticmethod + def _resolve_pcm_format(bitdepth): + PCM_FORMAT_S16 = 0 + PCM_FORMAT_S24 = 1 + PCM_FORMAT_S24_3LE = 2 + PCM_FORMAT_FLOAT = 3 + + match bitdepth: + case 16: return (PCM_FORMAT_S16, ctypes.c_int16) + case 24: return (PCM_FORMAT_S24_3LE, 3 * ctypes.c_byte) + case None: return (PCM_FORMAT_FLOAT, ctypes.c_float) + case _: raise ValueError("Could not interpret PCM bitdepth") + + +class Encoder(_Base): + """ + LC3 Encoder wrapper + + The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5 + or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000 + or 48000, unless High-Resolution mode is enabled. In High-Resolution mode, + the `samplerate` is 48000 or 96000. + + By default, one channel is processed. When `nchannels` is greater than one, + the PCM input stream is read interleaved and consecutives LC3 frames are + output, for each channel. + + Keyword arguments: + hrmode : Enable High-Resolution mode, default is `False`. + sr_pcm_hz : Input PCM samplerate, enable downsampling of input. + libpath : LC3 library path and name + """ + + class c_encoder_t(c_void_p): + pass + + def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs): + + super().__init__(frame_duration, samplerate, nchannels, **kwargs) + + lib = self.lib + + try: + lib.lc3_hr_encoder_size \ + and lib.lc3_hr_setup_encoder + + except AttributeError: + + assert not self.hrmode + + lib.lc3_hr_encoder_size = \ + lambda hrmode, dt_us, sr_hz: \ + lib.lc3_encoder_size(dt_us, sr_hz) + + lib.lc3_hr_setup_encoder = \ + lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \ + lib.lc3_setup_encoder(dt_us, sr_hz, sr_pcm_hz, mem) + + lib.lc3_hr_encoder_size.argtypes = [c_bool, c_int, c_int] + lib.lc3_hr_encoder_size.restype = c_uint + + lib.lc3_hr_setup_encoder.argtypes = \ + [c_bool, c_int, c_int, c_int, c_void_p] + lib.lc3_hr_setup_encoder.restype = self.c_encoder_t + + lib.lc3_encode.argtypes = \ + [self.c_encoder_t, c_int, c_void_p, c_int, c_int, c_void_p] + + def new_encoder(): return lib.lc3_hr_setup_encoder( + self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz, + self.malloc(lib.lc3_hr_encoder_size( + self.hrmode, self.dt_us, self.sr_pcm_hz))) + + self.__encoders = [new_encoder() for i in range(nchannels)] + + def __del__(self): + + try: + (self.free(encoder) for encoder in self.__encoders) + finally: + return + + def encode(self, pcm, nbytes, bitdepth=None): + """ + Encode LC3 frame(s), for each channel. + + The `pcm` input is given in two ways. When no `bitdepth` is defined, + it's a vector of floating point values from -1 to 1, coding the sample + levels. When `bitdepth` is defined, `pcm` is interpreted as a byte-like + object, each sample coded on `bitdepth` bits (16 or 24). + The machine endianness, or little endian, is used for 16 or 24 bits + width, respectively. + In both cases, the `pcm` vector data is padded with zeros when + its length is less than the required input samples for the encoder. + Channels concatenation of encoded LC3 frames, of `nbytes`, is returned. + """ + + nchannels = self.nchannels + frame_samples = self.get_frame_samples() + + (pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth) + pcm_len = nchannels * frame_samples + + if bitdepth is None: + pcm_buffer = array.array('f', pcm) + + # Invert test to catch NaN + if not abs(sum(pcm)) / frame_samples < 2: + raise ValueError("Out of range PCM input") + + padding = max(pcm_len - frame_samples, 0) + pcm_buffer.extend(array.array('f', [0] * padding)) + + else: + padding = max(pcm_len * ctypes.sizeof(pcm_t) - len(pcm), 0) + pcm_buffer = bytearray(pcm) + bytearray(padding) + + data_buffer = (c_byte * nbytes)() + data_offset = 0 + + for (ich, encoder) in enumerate(self.__encoders): + + pcm_offset = ich * ctypes.sizeof(pcm_t) + pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) + + data_size = nbytes // nchannels + int(ich < nbytes % nchannels) + data = (c_byte * data_size).from_buffer(data_buffer, data_offset) + data_offset += data_size + + ret = self.lib.lc3_encode( + encoder, pcm_fmt, pcm, nchannels, len(data), data) + if ret < 0: + raise ValueError("Bad parameters") + + return bytes(data_buffer) + + +class Decoder(_Base): + """ + LC3 Decoder wrapper + + The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5 + or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000 + or 48000, unless High-Resolution mode is enabled. In High-Resolution + mode, the `samplerate` is 48000 or 96000. + + By default, one channel is processed. When `nchannels` is greater than one, + the PCM input stream is read interleaved and consecutives LC3 frames are + output, for each channel. + + Keyword arguments: + hrmode : Enable High-Resolution mode, default is `False`. + sr_pcm_hz : Output PCM samplerate, enable upsampling of output. + libpath : LC3 library path and name + """ + + class c_decoder_t(c_void_p): + pass + + def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs): + + super().__init__(frame_duration, samplerate, nchannels, **kwargs) + + lib = self.lib + + try: + lib.lc3_hr_decoder_size \ + and lib.lc3_hr_setup_decoder + + except AttributeError: + + assert not self.hrmode + + lib.lc3_hr_decoder_size = \ + lambda hrmode, dt_us, sr_hz: \ + lib.lc3_decoder_size(dt_us, sr_hz) + + lib.lc3_hr_setup_decoder = \ + lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \ + lib.lc3_setup_decoder(dt_us, sr_hz, sr_pcm_hz, mem) + + lib.lc3_hr_decoder_size.argtypes = [c_bool, c_int, c_int] + lib.lc3_hr_decoder_size.restype = c_uint + + lib.lc3_hr_setup_decoder.argtypes = \ + [c_bool, c_int, c_int, c_int, c_void_p] + lib.lc3_hr_setup_decoder.restype = self.c_decoder_t + + lib.lc3_decode.argtypes = \ + [self.c_decoder_t, c_void_p, c_int, c_int, c_void_p, c_int] + + def new_decoder(): return lib.lc3_hr_setup_decoder( + self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz, + self.malloc(lib.lc3_hr_decoder_size( + self.hrmode, self.dt_us, self.sr_pcm_hz))) + + self.__decoders = [new_decoder() for i in range(nchannels)] + + def __del__(self): + + try: + (self.free(decoder) for decoder in self.__decoders) + finally: + return + + def decode(self, data, bitdepth=None): + """ + Decode an LC3 frame + + The input `data` is the channels concatenation of LC3 frames in a + byte-like object. Interleaved PCM samples are returned according to + the `bitdepth` indication. + When no `bitdepth` is defined, it's a vector of floating point values + from -1 to 1, coding the sample levels. When `bitdepth` is defined, + it returns a byte array, each sample coded on `bitdepth` bits. + The machine endianness, or little endian, is used for 16 or 24 bits + width, respectively. + """ + + nchannels = self.nchannels + frame_samples = self.get_frame_samples() + + (pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth) + pcm_len = nchannels * self.get_frame_samples() + pcm_buffer = (pcm_t * pcm_len)() + + data_buffer = bytearray(data) + data_offset = 0 + + for (ich, decoder) in enumerate(self.__decoders): + pcm_offset = ich * ctypes.sizeof(pcm_t) + pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) + + data_size = len(data_buffer) // nchannels + \ + int(ich < len(data_buffer) % nchannels) + data = (c_byte * data_size).from_buffer(data_buffer, data_offset) + data_offset += data_size + + ret = self.lib.lc3_decode( + decoder, data, len(data), pcm_fmt, pcm, self.nchannels) + if ret < 0: + raise ValueError("Bad parameters") + + return array.array('f', pcm_buffer) \ + if bitdepth is None else bytes(pcm_buffer) |