diff options
Diffstat (limited to 'mmi2grpc/_audio.py')
-rw-r--r-- | mmi2grpc/_audio.py | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/mmi2grpc/_audio.py b/mmi2grpc/_audio.py new file mode 100644 index 0000000..8e83c67 --- /dev/null +++ b/mmi2grpc/_audio.py @@ -0,0 +1,107 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Audio tools.""" + +import itertools +import math +import os +from threading import Thread + +import numpy as np +from scipy.io import wavfile + +SINE_FREQUENCY = 440 +SINE_DURATION = 0.1 + +# File which stores the audio signal output data (after transport). +# Used for running comparisons with the generated audio signal. +OUTPUT_WAV_FILE = '/tmp/audiodata' + +WAV_RIFF_SIZE_OFFSET = 4 +WAV_DATA_SIZE_OFFSET = 40 + + +def _fixup_wav_header(path): + with open(path, 'r+b') as f: + f.seek(0, os.SEEK_END) + file_size = f.tell() + for offset in [WAV_RIFF_SIZE_OFFSET, WAV_DATA_SIZE_OFFSET]: + size = file_size - offset - 4 + f.seek(offset) + f.write(size.to_bytes(4, byteorder='little')) + + +class AudioSignal: + """Audio signal generator and verifier.""" + + def __init__(self, transport, amplitude, fs): + """Init AudioSignal class. + + Args: + transport: function to send the generated audio data to. + amplitude: amplitude of the signal to generate. + fs: sampling rate of the signal to generate. + """ + self.transport = transport + self.amplitude = amplitude + self.fs = fs + self.thread = None + + def start(self): + """Generates the audio signal and send it to the transport.""" + self.thread = Thread(target=self._run) + self.thread.start() + + def _run(self): + sine = self._generate_sine(SINE_FREQUENCY, SINE_DURATION) + + # Interleaved audio. + stereo = np.zeros(sine.size * 2, dtype=sine.dtype) + stereo[0::2] = sine + + # Send 4 second of audio. + audio = itertools.repeat(stereo.tobytes(), int(4 / SINE_DURATION)) + + self.transport(audio) + + def _generate_sine(self, f, duration): + sine = self.amplitude * \ + np.sin(2 * np.pi * np.arange(self.fs * duration) * (f / self.fs)) + s16le = (sine * 32767).astype('<i2') + return s16le + + def verify(self): + """Verifies that the audio signal is correctly output.""" + assert self.thread is not None + self.thread.join() + self.thread = None + + _fixup_wav_header(OUTPUT_WAV_FILE) + + samplerate, data = wavfile.read(OUTPUT_WAV_FILE) + # Take one second of audio after the first second. + audio = data[samplerate:samplerate*2, 0].astype(np.float) / 32767 + assert len(audio) == samplerate + + spectrum = np.abs(np.fft.fft(audio)) + frequency = np.fft.fftfreq(samplerate, d=1/samplerate) + amplitudes = spectrum / (samplerate/2) + index = np.where(frequency == SINE_FREQUENCY) + amplitude = amplitudes[index][0] + + match_amplitude = math.isclose( + amplitude, self.amplitude, rel_tol=1e-03) + + return match_amplitude |