aboutsummaryrefslogtreecommitdiff
path: root/mmi2grpc/_audio.py
diff options
context:
space:
mode:
Diffstat (limited to 'mmi2grpc/_audio.py')
-rw-r--r--mmi2grpc/_audio.py107
1 files changed, 107 insertions, 0 deletions
diff --git a/mmi2grpc/_audio.py b/mmi2grpc/_audio.py
new file mode 100644
index 0000000..8e83c67
--- /dev/null
+++ b/mmi2grpc/_audio.py
@@ -0,0 +1,107 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Audio tools."""
+
+import itertools
+import math
+import os
+from threading import Thread
+
+import numpy as np
+from scipy.io import wavfile
+
+SINE_FREQUENCY = 440
+SINE_DURATION = 0.1
+
+# File which stores the audio signal output data (after transport).
+# Used for running comparisons with the generated audio signal.
+OUTPUT_WAV_FILE = '/tmp/audiodata'
+
+WAV_RIFF_SIZE_OFFSET = 4
+WAV_DATA_SIZE_OFFSET = 40
+
+
+def _fixup_wav_header(path):
+ with open(path, 'r+b') as f:
+ f.seek(0, os.SEEK_END)
+ file_size = f.tell()
+ for offset in [WAV_RIFF_SIZE_OFFSET, WAV_DATA_SIZE_OFFSET]:
+ size = file_size - offset - 4
+ f.seek(offset)
+ f.write(size.to_bytes(4, byteorder='little'))
+
+
+class AudioSignal:
+ """Audio signal generator and verifier."""
+
+ def __init__(self, transport, amplitude, fs):
+ """Init AudioSignal class.
+
+ Args:
+ transport: function to send the generated audio data to.
+ amplitude: amplitude of the signal to generate.
+ fs: sampling rate of the signal to generate.
+ """
+ self.transport = transport
+ self.amplitude = amplitude
+ self.fs = fs
+ self.thread = None
+
+ def start(self):
+ """Generates the audio signal and send it to the transport."""
+ self.thread = Thread(target=self._run)
+ self.thread.start()
+
+ def _run(self):
+ sine = self._generate_sine(SINE_FREQUENCY, SINE_DURATION)
+
+ # Interleaved audio.
+ stereo = np.zeros(sine.size * 2, dtype=sine.dtype)
+ stereo[0::2] = sine
+
+ # Send 4 second of audio.
+ audio = itertools.repeat(stereo.tobytes(), int(4 / SINE_DURATION))
+
+ self.transport(audio)
+
+ def _generate_sine(self, f, duration):
+ sine = self.amplitude * \
+ np.sin(2 * np.pi * np.arange(self.fs * duration) * (f / self.fs))
+ s16le = (sine * 32767).astype('<i2')
+ return s16le
+
+ def verify(self):
+ """Verifies that the audio signal is correctly output."""
+ assert self.thread is not None
+ self.thread.join()
+ self.thread = None
+
+ _fixup_wav_header(OUTPUT_WAV_FILE)
+
+ samplerate, data = wavfile.read(OUTPUT_WAV_FILE)
+ # Take one second of audio after the first second.
+ audio = data[samplerate:samplerate*2, 0].astype(np.float) / 32767
+ assert len(audio) == samplerate
+
+ spectrum = np.abs(np.fft.fft(audio))
+ frequency = np.fft.fftfreq(samplerate, d=1/samplerate)
+ amplitudes = spectrum / (samplerate/2)
+ index = np.where(frequency == SINE_FREQUENCY)
+ amplitude = amplitudes[index][0]
+
+ match_amplitude = math.isclose(
+ amplitude, self.amplitude, rel_tol=1e-03)
+
+ return match_amplitude