aboutsummaryrefslogtreecommitdiff
path: root/mmi2grpc/_audio.py
blob: 8e83c67cc6522f0eccbfa83c64e380a73c12af32 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Audio tools."""

import itertools
import math
import os
from threading import Thread

import numpy as np
from scipy.io import wavfile

SINE_FREQUENCY = 440
SINE_DURATION = 0.1

# File which stores the audio signal output data (after transport).
# Used for running comparisons with the generated audio signal.
OUTPUT_WAV_FILE = '/tmp/audiodata'

WAV_RIFF_SIZE_OFFSET = 4
WAV_DATA_SIZE_OFFSET = 40


def _fixup_wav_header(path):
    with open(path, 'r+b') as f:
        f.seek(0, os.SEEK_END)
        file_size = f.tell()
        for offset in [WAV_RIFF_SIZE_OFFSET, WAV_DATA_SIZE_OFFSET]:
            size = file_size - offset - 4
            f.seek(offset)
            f.write(size.to_bytes(4, byteorder='little'))


class AudioSignal:
    """Audio signal generator and verifier."""

    def __init__(self, transport, amplitude, fs):
        """Init AudioSignal class.

        Args:
            transport: function to send the generated audio data to.
            amplitude: amplitude of the signal to generate.
            fs: sampling rate of the signal to generate.
        """
        self.transport = transport
        self.amplitude = amplitude
        self.fs = fs
        self.thread = None

    def start(self):
        """Generates the audio signal and send it to the transport."""
        self.thread = Thread(target=self._run)
        self.thread.start()

    def _run(self):
        sine = self._generate_sine(SINE_FREQUENCY, SINE_DURATION)

        # Interleaved audio.
        stereo = np.zeros(sine.size * 2, dtype=sine.dtype)
        stereo[0::2] = sine

        # Send 4 second of audio.
        audio = itertools.repeat(stereo.tobytes(), int(4 / SINE_DURATION))

        self.transport(audio)

    def _generate_sine(self, f, duration):
        sine = self.amplitude * \
            np.sin(2 * np.pi * np.arange(self.fs * duration) * (f / self.fs))
        s16le = (sine * 32767).astype('<i2')
        return s16le

    def verify(self):
        """Verifies that the audio signal is correctly output."""
        assert self.thread is not None
        self.thread.join()
        self.thread = None

        _fixup_wav_header(OUTPUT_WAV_FILE)

        samplerate, data = wavfile.read(OUTPUT_WAV_FILE)
        # Take one second of audio after the first second.
        audio = data[samplerate:samplerate*2, 0].astype(np.float) / 32767
        assert len(audio) == samplerate

        spectrum = np.abs(np.fft.fft(audio))
        frequency = np.fft.fftfreq(samplerate, d=1/samplerate)
        amplitudes = spectrum / (samplerate/2)
        index = np.where(frequency == SINE_FREQUENCY)
        amplitude = amplitudes[index][0]

        match_amplitude = math.isclose(
            amplitude, self.amplitude, rel_tol=1e-03)

        return match_amplitude