diff options
Diffstat (limited to 'mmi2grpc/_audio.py')
-rw-r--r-- | mmi2grpc/_audio.py | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/mmi2grpc/_audio.py b/mmi2grpc/_audio.py new file mode 100644 index 0000000..92e06df --- /dev/null +++ b/mmi2grpc/_audio.py @@ -0,0 +1,79 @@ +import itertools +import math +import os +from threading import Thread + +import numpy as np +from scipy.io import wavfile + + +def _fixup_wav_header(path): + WAV_RIFF_SIZE_OFFSET = 4 + WAV_DATA_SIZE_OFFSET = 40 + + with open(path, 'r+b') as f: + f.seek(0, os.SEEK_END) + file_size = f.tell() + for offset in [WAV_RIFF_SIZE_OFFSET, WAV_DATA_SIZE_OFFSET]: + size = file_size - offset - 4 + f.seek(offset) + f.write(size.to_bytes(4, byteorder='little')) + + +SINE_FREQUENCY = 440 +SINE_DURATION = 0.1 + +WAV_FILE = "/tmp/audiodata" + + +class AudioSignal: + def __init__(self, transport, amplitude, fs): + self.transport = transport + self.amplitude = amplitude + self.fs = fs + self.thread = None + + def start(self): + self.thread = Thread(target=self._run) + self.thread.start() + + def _run(self): + sine = self._generate_sine(SINE_FREQUENCY, SINE_DURATION) + + # Interleaved audio + stereo = np.zeros(sine.size * 2, dtype=sine.dtype) + stereo[0::2] = sine + + # Send 4 second of audio + audio = itertools.repeat(stereo.tobytes(), int(4 / SINE_DURATION)) + + self.transport(audio) + + def _generate_sine(self, f, duration): + sine = self.amplitude * \ + np.sin(2 * np.pi * np.arange(self.fs * duration) * (f / self.fs)) + s16le = (sine * 32767).astype("<i2") + return s16le + + def verify(self): + assert self.thread is not None + self.thread.join() + self.thread = None + + _fixup_wav_header(WAV_FILE) + + samplerate, data = wavfile.read(WAV_FILE) + # Take one second of audio after the first second + audio = data[samplerate:samplerate*2, 0].astype(np.float) / 32767 + assert(len(audio) == samplerate) + + spectrum = np.abs(np.fft.fft(audio)) + frequency = np.fft.fftfreq(samplerate, d=1/samplerate) + amplitudes = spectrum / (samplerate/2) + index = np.where(frequency == SINE_FREQUENCY) + amplitude = amplitudes[index][0] + + match_amplitude = math.isclose( + amplitude, self.amplitude, rel_tol=1e-03) + + return match_amplitude |