diff options
| author | 2 * r + 2 * t <61896496+soramanew@users.noreply.github.com> | 2025-09-13 14:38:44 +1000 |
|---|---|---|
| committer | 2 * r + 2 * t <61896496+soramanew@users.noreply.github.com> | 2025-09-13 14:38:44 +1000 |
| commit | 306cfc06ed38a2f86616c1f2fe64de45321f21a6 (patch) | |
| tree | a27c79d9c4d01c2dadeeae74c844875ab7ab4eed /plugin/src/Caelestia/Services/audiocollector.cpp | |
| parent | popouts/tray: better interaction (diff) | |
| download | caelestia-shell-306cfc06ed38a2f86616c1f2fe64de45321f21a6.tar.gz caelestia-shell-306cfc06ed38a2f86616c1f2fe64de45321f21a6.tar.bz2 caelestia-shell-306cfc06ed38a2f86616c1f2fe64de45321f21a6.zip | |
plugin: refactor into modules
Diffstat (limited to 'plugin/src/Caelestia/Services/audiocollector.cpp')
| -rw-r--r-- | plugin/src/Caelestia/Services/audiocollector.cpp | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/plugin/src/Caelestia/Services/audiocollector.cpp b/plugin/src/Caelestia/Services/audiocollector.cpp new file mode 100644 index 0000000..9dc3871 --- /dev/null +++ b/plugin/src/Caelestia/Services/audiocollector.cpp @@ -0,0 +1,275 @@ +#include "audiocollector.hpp" + +#include "service.hpp" +#include <algorithm> +#include <pipewire/pipewire.h> +#include <qdebug.h> +#include <qmutex.h> +#include <spa/param/audio/format-utils.h> +#include <spa/param/latency-utils.h> +#include <stop_token> +#include <vector> + +namespace caelestia { + +PipeWireWorker::PipeWireWorker(std::stop_token token, AudioCollector* collector) + : m_loop(nullptr) + , m_stream(nullptr) + , m_timer(nullptr) + , m_idle(true) + , m_token(token) + , m_collector(collector) { + pw_init(nullptr, nullptr); + + m_loop = pw_main_loop_new(nullptr); + if (!m_loop) { + qWarning() << "PipeWireWorker::init: failed to create PipeWire main loop"; + pw_deinit(); + return; + } + + timespec timeout = { 0, 10 * SPA_NSEC_PER_MSEC }; + m_timer = pw_loop_add_timer(pw_main_loop_get_loop(m_loop), handleTimeout, this); + pw_loop_update_timer(pw_main_loop_get_loop(m_loop), m_timer, &timeout, &timeout, false); + + auto props = pw_properties_new( + PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Capture", PW_KEY_MEDIA_ROLE, "Music", nullptr); + pw_properties_set(props, PW_KEY_STREAM_CAPTURE_SINK, "true"); + pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%u", nextPowerOf2(512 * collector->sampleRate() / 48000), + collector->sampleRate()); + pw_properties_set(props, PW_KEY_NODE_PASSIVE, "true"); + pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); + pw_properties_set(props, PW_KEY_STREAM_DONT_REMIX, "false"); + pw_properties_set(props, "channelmix.upmix", "true"); + + std::vector<uint8_t> buffer(collector->chunkSize()); + spa_pod_builder b; + spa_pod_builder_init(&b, buffer.data(), static_cast<quint32>(buffer.size())); + + spa_audio_info_raw info{}; + info.format = SPA_AUDIO_FORMAT_S16; + info.rate = collector->sampleRate(); + info.channels = 1; + + const spa_pod* params[1]; + params[0] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info); + + pw_stream_events events{}; + events.state_changed = [](void* data, pw_stream_state, pw_stream_state state, const char*) { + auto* self = static_cast<PipeWireWorker*>(data); + self->streamStateChanged(state); + }; + events.process = [](void* data) { + auto* self = static_cast<PipeWireWorker*>(data); + self->processStream(); + }; + + m_stream = pw_stream_new_simple(pw_main_loop_get_loop(m_loop), "caelestia-shell", props, &events, this); + + const int success = pw_stream_connect(m_stream, PW_DIRECTION_INPUT, collector->nodeId(), + static_cast<pw_stream_flags>( + PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS), + params, 1); + if (success < 0) { + qWarning() << "PipeWireWorker::init: failed to connect stream"; + pw_stream_destroy(m_stream); + pw_main_loop_destroy(m_loop); + pw_deinit(); + return; + } + + pw_main_loop_run(m_loop); + + pw_stream_destroy(m_stream); + pw_main_loop_destroy(m_loop); + pw_deinit(); +} + +void PipeWireWorker::handleTimeout(void* data, uint64_t expirations) { + auto* self = static_cast<PipeWireWorker*>(data); + + if (self->m_token.stop_requested()) { + pw_main_loop_quit(self->m_loop); + return; + } + + if (!self->m_idle) { + if (expirations < 10) { + self->m_collector->clearBuffer(); + } else { + self->m_idle = true; + timespec timeout = { 0, 500 * SPA_NSEC_PER_MSEC }; + pw_loop_update_timer(pw_main_loop_get_loop(self->m_loop), self->m_timer, &timeout, &timeout, false); + } + } +} + +void PipeWireWorker::streamStateChanged(pw_stream_state state) { + m_idle = false; + switch (state) { + case PW_STREAM_STATE_PAUSED: { + timespec timeout = { 0, 10 * SPA_NSEC_PER_MSEC }; + pw_loop_update_timer(pw_main_loop_get_loop(m_loop), m_timer, &timeout, &timeout, false); + break; + } + case PW_STREAM_STATE_STREAMING: + pw_loop_update_timer(pw_main_loop_get_loop(m_loop), m_timer, nullptr, nullptr, false); + break; + case PW_STREAM_STATE_ERROR: + pw_main_loop_quit(m_loop); + break; + default: + break; + } +} + +void PipeWireWorker::processStream() { + if (m_token.stop_requested()) { + pw_main_loop_quit(m_loop); + return; + } + + pw_buffer* buffer = pw_stream_dequeue_buffer(m_stream); + if (buffer == nullptr) { + return; + } + + const spa_buffer* buf = buffer->buffer; + const qint16* samples = reinterpret_cast<const qint16*>(buf->datas[0].data); + if (samples == nullptr) { + return; + } + + const quint32 count = buf->datas[0].chunk->size / 2; + m_collector->loadChunk(samples, count); + + pw_stream_queue_buffer(m_stream, buffer); +} + +unsigned int PipeWireWorker::nextPowerOf2(unsigned int n) { + if (n == 0) { + return 1; + } + + n--; + n |= n >> 1; + n |= n >> 2; + n |= n >> 4; + n |= n >> 8; + n |= n >> 16; + n++; + + return n; +} + +AudioCollector::AudioCollector(QObject* parent) + : Service(parent) + , m_sampleRate(44100) + , m_chunkSize(512) + , m_nodeId(PW_ID_ANY) + , m_buffer1(m_chunkSize) + , m_buffer2(m_chunkSize) + , m_readBuffer(&m_buffer1) + , m_writeBuffer(&m_buffer2) {} + +AudioCollector::~AudioCollector() { + stop(); +} + +quint32 AudioCollector::sampleRate() const { + return m_sampleRate; +} + +quint32 AudioCollector::chunkSize() const { + return m_chunkSize; +} + +quint32 AudioCollector::nodeId() { + QMutexLocker locker(&m_nodeIdMutex); + return m_nodeId; +} + +void AudioCollector::setNodeId(quint32 nodeId) { + { + QMutexLocker locker(&m_nodeIdMutex); + + if (nodeId == m_nodeId) { + return; + } + + m_nodeId = nodeId; + } + emit nodeIdChanged(); + + if (m_thread.joinable()) { + stop(); + start(); + } +} + +void AudioCollector::clearBuffer() { + auto* writeBuffer = m_writeBuffer.load(std::memory_order_relaxed); + std::fill(writeBuffer->begin(), writeBuffer->end(), 0.0f); + + auto* oldRead = m_readBuffer.exchange(writeBuffer, std::memory_order_acq_rel); + m_writeBuffer.store(oldRead, std::memory_order_release); +} + +void AudioCollector::loadChunk(const qint16* samples, quint32 count) { + if (count > m_chunkSize) { + count = m_chunkSize; + } + + auto* writeBuffer = m_writeBuffer.load(std::memory_order_relaxed); + std::transform(samples, samples + count, writeBuffer->begin(), [](qint16 sample) { + return sample / 32768.0f; + }); + + auto* oldRead = m_readBuffer.exchange(writeBuffer, std::memory_order_acq_rel); + m_writeBuffer.store(oldRead, std::memory_order_release); +} + +quint32 AudioCollector::readChunk(float* out, quint32 count) { + if (count == 0 || count > m_chunkSize) { + count = m_chunkSize; + } + + auto* readBuffer = m_readBuffer.load(std::memory_order_acquire); + std::memcpy(out, readBuffer->data(), count * sizeof(float)); + + return count; +} + +quint32 AudioCollector::readChunk(double* out, quint32 count) { + if (count == 0 || count > m_chunkSize) { + count = m_chunkSize; + } + + auto* readBuffer = m_readBuffer.load(std::memory_order_acquire); + std::transform(readBuffer->begin(), readBuffer->begin() + count, out, [](float sample) { + return static_cast<double>(sample); + }); + + return count; +} + +void AudioCollector::start() { + if (m_thread.joinable()) { + return; + } + + clearBuffer(); + + m_thread = std::jthread([this](std::stop_token token) { + PipeWireWorker worker(token, this); + }); +} + +void AudioCollector::stop() { + if (m_thread.joinable()) { + m_thread.request_stop(); + m_thread.join(); + } +} + +} // namespace caelestia |