summaryrefslogtreecommitdiff
path: root/assets/realtime-beat-detector.py
blob: dfecad67f4a4258dc10a5c4606682822c5c5bba9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/bin/python

import pyaudio
import numpy as np
import aubio
import signal
import sys

from typing import List, Tuple


class BeatDetector:
    def __init__(self, buf_size: int):
        self.buf_size: int = buf_size

        # Set up pyaudio and aubio beat detector
        self.audio: pyaudio.PyAudio = pyaudio.PyAudio()
        samplerate: int = 44100

        self.stream: pyaudio.Stream = self.audio.open(
            format=pyaudio.paFloat32,
            channels=1,
            rate=samplerate,
            input=True,
            frames_per_buffer=self.buf_size,
            stream_callback=self._pyaudio_callback
        )

        fft_size: int = self.buf_size * 2

        # tempo detection
        self.tempo: aubio.tempo = aubio.tempo("default", fft_size, self.buf_size, samplerate)

    # this one is called every time enough audio data (buf_size) has been read by the stream
    def _pyaudio_callback(self, in_data, frame_count, time_info, status):
        # Interpret a buffer as a 1-dimensional array (aubio do not work with raw data)
        audio_data = np.frombuffer(in_data, dtype=np.float32)
        # true if beat present
        beat = self.tempo(audio_data)

        # if beat detected, calculate BPM and send to OSC
        if beat[0]:
            print(self.tempo.get_bpm(), flush=True)

        return None, pyaudio.paContinue  # Tell pyAudio to continue

    def __del__(self):
        self.stream.close()
        self.audio.terminate()
        print('--- Stopped ---')


# main
def main():
    bd = BeatDetector(512)

    # capture ctrl+c to stop gracefully process
    def signal_handler(none, frame):
        bd.stream.stop_stream()
        bd.stream.close()
        bd.audio.terminate()
        print(' ===> Ctrl + C')
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)

    # Audio processing happens in separate thread, so put this thread to sleep
    signal.pause()


# main run
if __name__ == "__main__":
    main()