aboutsummaryrefslogtreecommitdiff
path: root/mmi2grpc/_audio.py
blob: 92e06df273f73d96b2c887e27fec96152af1cb02 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import itertools
import math
import os
from threading import Thread

import numpy as np
from scipy.io import wavfile


def _fixup_wav_header(path):
    WAV_RIFF_SIZE_OFFSET = 4
    WAV_DATA_SIZE_OFFSET = 40

    with open(path, 'r+b') as f:
        f.seek(0, os.SEEK_END)
        file_size = f.tell()
        for offset in [WAV_RIFF_SIZE_OFFSET, WAV_DATA_SIZE_OFFSET]:
            size = file_size - offset - 4
            f.seek(offset)
            f.write(size.to_bytes(4, byteorder='little'))


SINE_FREQUENCY = 440
SINE_DURATION = 0.1

WAV_FILE = "/tmp/audiodata"


class AudioSignal:
    def __init__(self, transport, amplitude, fs):
        self.transport = transport
        self.amplitude = amplitude
        self.fs = fs
        self.thread = None

    def start(self):
        self.thread = Thread(target=self._run)
        self.thread.start()

    def _run(self):
        sine = self._generate_sine(SINE_FREQUENCY, SINE_DURATION)

        # Interleaved audio
        stereo = np.zeros(sine.size * 2, dtype=sine.dtype)
        stereo[0::2] = sine

        # Send 4 second of audio
        audio = itertools.repeat(stereo.tobytes(), int(4 / SINE_DURATION))

        self.transport(audio)

    def _generate_sine(self, f, duration):
        sine = self.amplitude * \
            np.sin(2 * np.pi * np.arange(self.fs * duration) * (f / self.fs))
        s16le = (sine * 32767).astype("<i2")
        return s16le

    def verify(self):
        assert self.thread is not None
        self.thread.join()
        self.thread = None

        _fixup_wav_header(WAV_FILE)

        samplerate, data = wavfile.read(WAV_FILE)
        # Take one second of audio after the first second
        audio = data[samplerate:samplerate*2, 0].astype(np.float) / 32767
        assert(len(audio) == samplerate)

        spectrum = np.abs(np.fft.fft(audio))
        frequency = np.fft.fftfreq(samplerate, d=1/samplerate)
        amplitudes = spectrum / (samplerate/2)
        index = np.where(frequency == SINE_FREQUENCY)
        amplitude = amplitudes[index][0]

        match_amplitude = math.isclose(
            amplitude, self.amplitude, rel_tol=1e-03)

        return match_amplitude