From 93c7e83b2aca1ad3bc674af2a6c522be7aba2c57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20M=C3=BCller?= Date: Sun, 19 Sep 2021 22:37:13 +0200 Subject: Audaspace: porting upstream pulseaudio fixes. Fixes T89045 and T91057. --- extern/audaspace/CMakeLists.txt | 2 + extern/audaspace/include/util/RingBuffer.h | 97 ++++++++++++ .../plugins/pulseaudio/PulseAudioDevice.cpp | 169 ++++++++++++++------- .../plugins/pulseaudio/PulseAudioDevice.h | 76 +++++++-- .../plugins/pulseaudio/PulseAudioSymbols.h | 11 ++ extern/audaspace/src/util/RingBuffer.cpp | 137 +++++++++++++++++ 6 files changed, 419 insertions(+), 73 deletions(-) create mode 100644 extern/audaspace/include/util/RingBuffer.h create mode 100644 extern/audaspace/src/util/RingBuffer.cpp (limited to 'extern') diff --git a/extern/audaspace/CMakeLists.txt b/extern/audaspace/CMakeLists.txt index 1599c03cbad..d30726a3c2b 100644 --- a/extern/audaspace/CMakeLists.txt +++ b/extern/audaspace/CMakeLists.txt @@ -129,6 +129,7 @@ set(SRC src/util/Barrier.cpp src/util/Buffer.cpp src/util/BufferReader.cpp + src/util/RingBuffer.cpp src/util/StreamBuffer.cpp src/util/ThreadPool.cpp ) @@ -244,6 +245,7 @@ set(PUBLIC_HDR include/util/BufferReader.h include/util/ILockable.h include/util/Math3D.h + include/util/RingBuffer.h include/util/StreamBuffer.h include/util/ThreadPool.h ) diff --git a/extern/audaspace/include/util/RingBuffer.h b/extern/audaspace/include/util/RingBuffer.h new file mode 100644 index 00000000000..67bd1cc8640 --- /dev/null +++ b/extern/audaspace/include/util/RingBuffer.h @@ -0,0 +1,97 @@ +/******************************************************************************* + * Copyright 2009-2021 Jörg Müller + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +#pragma once + +/** + * @file RingBuffer.h + * @ingroup util + * The RingBuffer class. + */ + +#include "Audaspace.h" +#include "Buffer.h" + +#include + +AUD_NAMESPACE_BEGIN + +/** + * This class is a simple ring buffer in RAM which is 32 Byte aligned and provides + * functionality for concurrent reading and writting without locks. + */ +class AUD_API RingBuffer +{ +private: + /// The buffer storing the actual data. + Buffer m_buffer; + + /// The reading pointer. + volatile size_t m_read; + + /// The writing pointer. + volatile size_t m_write; + + // delete copy constructor and operator= + RingBuffer(const RingBuffer&) = delete; + RingBuffer& operator=(const RingBuffer&) = delete; + +public: + /** + * Creates a new ring buffer. + * \param size The size of the buffer in bytes. + */ + RingBuffer(int size = 0); + + /** + * Returns the pointer to the ring buffer in memory. + */ + sample_t* getBuffer() const; + + /** + * Returns the size of the ring buffer in bytes. + */ + int getSize() const; + + size_t getReadSize() const; + + size_t getWriteSize() const; + + size_t read(data_t* target, size_t size); + + size_t write(data_t* source, size_t size); + + /** + * Resets the ring buffer to a state where nothing has been written or read. + */ + void reset(); + + /** + * Resizes the ring buffer. + * \param size The new size of the ring buffer, measured in bytes. + */ + void resize(int size); + + /** + * Makes sure the ring buffer has a minimum size. + * If size is >= current size, nothing will happen. + * Otherwise the ring buffer is resized with keep as parameter. + * \param size The new minimum size of the ring buffer, measured in bytes. + */ + void assureSize(int size); +}; + +AUD_NAMESPACE_END diff --git a/extern/audaspace/plugins/pulseaudio/PulseAudioDevice.cpp b/extern/audaspace/plugins/pulseaudio/PulseAudioDevice.cpp index bf3fad82620..cddc411cfc6 100644 --- a/extern/audaspace/plugins/pulseaudio/PulseAudioDevice.cpp +++ b/extern/audaspace/plugins/pulseaudio/PulseAudioDevice.cpp @@ -23,95 +23,121 @@ AUD_NAMESPACE_BEGIN -void PulseAudioDevice::PulseAudio_state_callback(pa_context *context, void *data) +PulseAudioDevice::PulseAudioSynchronizer::PulseAudioSynchronizer(PulseAudioDevice *device) : + m_device(device) { - PulseAudioDevice* device = (PulseAudioDevice*)data; +} - std::lock_guard lock(*device); +double PulseAudioDevice::PulseAudioSynchronizer::getPosition(std::shared_ptr handle) +{ + pa_usec_t latency; + int negative; + AUD_pa_stream_get_latency(m_device->m_stream, &latency, &negative); - device->m_state = AUD_pa_context_get_state(context); + double delay = m_device->m_ring_buffer.getReadSize() / (AUD_SAMPLE_SIZE(m_device->m_specs) * m_device->m_specs.rate) + latency * 1.0e-6; + + return handle->getPosition() - delay; } -void PulseAudioDevice::PulseAudio_request(pa_stream *stream, size_t total_bytes, void *data) +void PulseAudioDevice::updateRingBuffer() { - PulseAudioDevice* device = (PulseAudioDevice*)data; + unsigned int samplesize = AUD_SAMPLE_SIZE(m_specs); - void* buffer; + std::unique_lock lock(m_mixingLock); - while(total_bytes > 0) + Buffer buffer; + + while(m_valid) { - size_t num_bytes = total_bytes; + size_t size = m_ring_buffer.getWriteSize(); - AUD_pa_stream_begin_write(stream, &buffer, &num_bytes); + size_t sample_count = size / samplesize; - device->mix((data_t*)buffer, num_bytes / AUD_DEVICE_SAMPLE_SIZE(device->m_specs)); + if(sample_count > 0) + { + size = sample_count * samplesize; - AUD_pa_stream_write(stream, buffer, num_bytes, nullptr, 0, PA_SEEK_RELATIVE); + buffer.assureSize(size); - total_bytes -= num_bytes; + mix(reinterpret_cast(buffer.getBuffer()), sample_count); + + m_ring_buffer.write(reinterpret_cast(buffer.getBuffer()), size); + } + + m_mixingCondition.wait(lock); } } -void PulseAudioDevice::PulseAudio_underflow(pa_stream *stream, void *data) +void PulseAudioDevice::PulseAudio_state_callback(pa_context *context, void *data) { PulseAudioDevice* device = (PulseAudioDevice*)data; - DeviceSpecs specs = device->getSpecs(); + device->m_state = AUD_pa_context_get_state(context); - if(++device->m_underflows > 4 && device->m_buffersize < AUD_DEVICE_SAMPLE_SIZE(specs) * specs.rate * 2) - { - device->m_buffersize <<= 1; - device->m_underflows = 0; + AUD_pa_threaded_mainloop_signal(device->m_mainloop, 0); +} - pa_buffer_attr buffer_attr; +void PulseAudioDevice::PulseAudio_request(pa_stream *stream, size_t total_bytes, void *data) +{ + PulseAudioDevice* device = (PulseAudioDevice*)data; - buffer_attr.fragsize = -1U; - buffer_attr.maxlength = -1U; - buffer_attr.minreq = -1U; - buffer_attr.prebuf = -1U; - buffer_attr.tlength = device->m_buffersize; + data_t* buffer; - AUD_pa_stream_set_buffer_attr(stream, &buffer_attr, nullptr, nullptr); - } -} + size_t sample_size = AUD_DEVICE_SAMPLE_SIZE(device->m_specs); -void PulseAudioDevice::runMixingThread() -{ - for(;;) + while(total_bytes > 0) { + size_t num_bytes = total_bytes; + + AUD_pa_stream_begin_write(stream, reinterpret_cast(&buffer), &num_bytes); + + size_t readsamples = device->m_ring_buffer.getReadSize(); + + readsamples = std::min(readsamples, size_t(num_bytes)) / sample_size; + + device->m_ring_buffer.read(buffer, readsamples * sample_size); + + if(readsamples * sample_size < num_bytes) + std::memset(buffer + readsamples * sample_size, 0, num_bytes - readsamples * sample_size); + + if(device->m_mixingLock.try_lock()) { - std::lock_guard lock(*this); - - if(shouldStop()) - { - AUD_pa_stream_cork(m_stream, 1, nullptr, nullptr); - AUD_pa_stream_flush(m_stream, nullptr, nullptr); - doStop(); - return; - } + device->m_mixingCondition.notify_all(); + device->m_mixingLock.unlock(); } - if(AUD_pa_stream_is_corked(m_stream)) - AUD_pa_stream_cork(m_stream, 0, nullptr, nullptr); + AUD_pa_stream_write(stream, reinterpret_cast(buffer), num_bytes, nullptr, 0, PA_SEEK_RELATIVE); - // similar to AUD_pa_mainloop_iterate(m_mainloop, false, nullptr); except with a longer timeout - AUD_pa_mainloop_prepare(m_mainloop, 1 << 14); - AUD_pa_mainloop_poll(m_mainloop); - AUD_pa_mainloop_dispatch(m_mainloop); + total_bytes -= num_bytes; } } +void PulseAudioDevice::playing(bool playing) +{ + m_playback = playing; + + AUD_pa_threaded_mainloop_lock(m_mainloop); + AUD_pa_stream_cork(m_stream, playing ? 0 : 1, nullptr, nullptr); + AUD_pa_threaded_mainloop_unlock(m_mainloop); +} + PulseAudioDevice::PulseAudioDevice(std::string name, DeviceSpecs specs, int buffersize) : + m_synchronizer(this), + m_playback(false), m_state(PA_CONTEXT_UNCONNECTED), + m_valid(true), m_underflows(0) { - m_mainloop = AUD_pa_mainloop_new(); + m_mainloop = AUD_pa_threaded_mainloop_new(); + + AUD_pa_threaded_mainloop_lock(m_mainloop); - m_context = AUD_pa_context_new(AUD_pa_mainloop_get_api(m_mainloop), name.c_str()); + m_context = AUD_pa_context_new(AUD_pa_threaded_mainloop_get_api(m_mainloop), name.c_str()); if(!m_context) { - AUD_pa_mainloop_free(m_mainloop); + AUD_pa_threaded_mainloop_unlock(m_mainloop); + AUD_pa_threaded_mainloop_free(m_mainloop); AUD_THROW(DeviceException, "Could not connect to PulseAudio."); } @@ -120,21 +146,26 @@ PulseAudioDevice::PulseAudioDevice(std::string name, DeviceSpecs specs, int buff AUD_pa_context_connect(m_context, nullptr, PA_CONTEXT_NOFLAGS, nullptr); + AUD_pa_threaded_mainloop_start(m_mainloop); + while(m_state != PA_CONTEXT_READY) { switch(m_state) { case PA_CONTEXT_FAILED: case PA_CONTEXT_TERMINATED: + AUD_pa_threaded_mainloop_unlock(m_mainloop); + AUD_pa_threaded_mainloop_stop(m_mainloop); + AUD_pa_context_disconnect(m_context); AUD_pa_context_unref(m_context); - AUD_pa_mainloop_free(m_mainloop); + AUD_pa_threaded_mainloop_free(m_mainloop); AUD_THROW(DeviceException, "Could not connect to PulseAudio."); break; default: - AUD_pa_mainloop_iterate(m_mainloop, true, nullptr); + AUD_pa_threaded_mainloop_wait(m_mainloop); break; } } @@ -182,16 +213,18 @@ PulseAudioDevice::PulseAudioDevice(std::string name, DeviceSpecs specs, int buff if(!m_stream) { + AUD_pa_threaded_mainloop_unlock(m_mainloop); + AUD_pa_threaded_mainloop_stop(m_mainloop); + AUD_pa_context_disconnect(m_context); AUD_pa_context_unref(m_context); - AUD_pa_mainloop_free(m_mainloop); + AUD_pa_threaded_mainloop_free(m_mainloop); AUD_THROW(DeviceException, "Could not create PulseAudio stream."); } AUD_pa_stream_set_write_callback(m_stream, PulseAudio_request, this); - AUD_pa_stream_set_underflow_callback(m_stream, PulseAudio_underflow, this); buffersize *= AUD_DEVICE_SAMPLE_SIZE(m_specs); m_buffersize = buffersize; @@ -204,31 +237,53 @@ PulseAudioDevice::PulseAudioDevice(std::string name, DeviceSpecs specs, int buff buffer_attr.prebuf = -1U; buffer_attr.tlength = buffersize; - if(AUD_pa_stream_connect_playback(m_stream, nullptr, &buffer_attr, static_cast(PA_STREAM_START_CORKED | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_AUTO_TIMING_UPDATE), nullptr, nullptr) < 0) + m_ring_buffer.resize(buffersize); + + if(AUD_pa_stream_connect_playback(m_stream, nullptr, &buffer_attr, static_cast(PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_AUTO_TIMING_UPDATE), nullptr, nullptr) < 0) { + AUD_pa_threaded_mainloop_unlock(m_mainloop); + AUD_pa_threaded_mainloop_stop(m_mainloop); + AUD_pa_context_disconnect(m_context); AUD_pa_context_unref(m_context); - AUD_pa_mainloop_free(m_mainloop); + AUD_pa_threaded_mainloop_free(m_mainloop); AUD_THROW(DeviceException, "Could not connect PulseAudio stream."); } + AUD_pa_threaded_mainloop_unlock(m_mainloop); + create(); + + m_mixingThread = std::thread(&PulseAudioDevice::updateRingBuffer, this); } PulseAudioDevice::~PulseAudioDevice() { - stopMixingThread(); + m_valid = false; + + m_mixingLock.lock(); + m_mixingCondition.notify_all(); + m_mixingLock.unlock(); + + m_mixingThread.join(); + + AUD_pa_threaded_mainloop_stop(m_mainloop); AUD_pa_context_disconnect(m_context); AUD_pa_context_unref(m_context); - AUD_pa_mainloop_free(m_mainloop); + AUD_pa_threaded_mainloop_free(m_mainloop); destroy(); } +ISynchronizer *PulseAudioDevice::getSynchronizer() +{ + return &m_synchronizer; +} + class PulseAudioDeviceFactory : public IDeviceFactory { private: diff --git a/extern/audaspace/plugins/pulseaudio/PulseAudioDevice.h b/extern/audaspace/plugins/pulseaudio/PulseAudioDevice.h index 45b813a5755..57359110633 100644 --- a/extern/audaspace/plugins/pulseaudio/PulseAudioDevice.h +++ b/extern/audaspace/plugins/pulseaudio/PulseAudioDevice.h @@ -26,7 +26,11 @@ * The PulseAudioDevice class. */ -#include "devices/ThreadedDevice.h" +#include "devices/SoftwareDevice.h" +#include "util/RingBuffer.h" + +#include +#include #include @@ -35,17 +39,65 @@ AUD_NAMESPACE_BEGIN /** * This device plays back through PulseAudio, the simple direct media layer. */ -class AUD_PLUGIN_API PulseAudioDevice : public ThreadedDevice +class AUD_PLUGIN_API PulseAudioDevice : public SoftwareDevice { private: - pa_mainloop* m_mainloop; + class PulseAudioSynchronizer : public DefaultSynchronizer + { + PulseAudioDevice* m_device; + + public: + PulseAudioSynchronizer(PulseAudioDevice* device); + + virtual double getPosition(std::shared_ptr handle); + }; + + /// Synchronizer. + PulseAudioSynchronizer m_synchronizer; + + /** + * Whether there is currently playback. + */ + volatile bool m_playback; + + pa_threaded_mainloop* m_mainloop; pa_context* m_context; pa_stream* m_stream; pa_context_state_t m_state; + /** + * The mixing ring buffer. + */ + RingBuffer m_ring_buffer; + + /** + * Whether the device is valid. + */ + bool m_valid; + int m_buffersize; uint32_t m_underflows; + /** + * The mixing thread. + */ + std::thread m_mixingThread; + + /** + * Mutex for mixing. + */ + std::mutex m_mixingLock; + + /** + * Condition for mixing. + */ + std::condition_variable m_mixingCondition; + + /** + * Updates the ring buffer. + */ + AUD_LOCAL void updateRingBuffer(); + /** * Reports the state of the PulseAudio server connection. * \param context The PulseAudio context. @@ -61,23 +113,13 @@ private: */ AUD_LOCAL static void PulseAudio_request(pa_stream* stream, size_t total_bytes, void* data); - /** - * Reports an underflow from the PulseAudio server. - * Automatically adjusts the latency if this happens too often. - * @param stream The PulseAudio stream. - * \param data The PulseAudio device. - */ - AUD_LOCAL static void PulseAudio_underflow(pa_stream* stream, void* data); - - /** - * Streaming thread main function. - */ - AUD_LOCAL void runMixingThread(); - // delete copy constructor and operator= PulseAudioDevice(const PulseAudioDevice&) = delete; PulseAudioDevice& operator=(const PulseAudioDevice&) = delete; +protected: + virtual void playing(bool playing); + public: /** * Opens the PulseAudio audio device for playback. @@ -93,6 +135,8 @@ public: */ virtual ~PulseAudioDevice(); + virtual ISynchronizer* getSynchronizer(); + /** * Registers this plugin. */ diff --git a/extern/audaspace/plugins/pulseaudio/PulseAudioSymbols.h b/extern/audaspace/plugins/pulseaudio/PulseAudioSymbols.h index 361aa518087..a33135b6e25 100644 --- a/extern/audaspace/plugins/pulseaudio/PulseAudioSymbols.h +++ b/extern/audaspace/plugins/pulseaudio/PulseAudioSymbols.h @@ -25,6 +25,7 @@ PULSEAUDIO_SYMBOL(pa_stream_begin_write); PULSEAUDIO_SYMBOL(pa_stream_connect_playback); PULSEAUDIO_SYMBOL(pa_stream_cork); PULSEAUDIO_SYMBOL(pa_stream_flush); +PULSEAUDIO_SYMBOL(pa_stream_get_latency); PULSEAUDIO_SYMBOL(pa_stream_is_corked); PULSEAUDIO_SYMBOL(pa_stream_new); PULSEAUDIO_SYMBOL(pa_stream_set_buffer_attr); @@ -39,3 +40,13 @@ PULSEAUDIO_SYMBOL(pa_mainloop_iterate); PULSEAUDIO_SYMBOL(pa_mainloop_prepare); PULSEAUDIO_SYMBOL(pa_mainloop_poll); PULSEAUDIO_SYMBOL(pa_mainloop_dispatch); + +PULSEAUDIO_SYMBOL(pa_threaded_mainloop_free); +PULSEAUDIO_SYMBOL(pa_threaded_mainloop_get_api); +PULSEAUDIO_SYMBOL(pa_threaded_mainloop_lock); +PULSEAUDIO_SYMBOL(pa_threaded_mainloop_new); +PULSEAUDIO_SYMBOL(pa_threaded_mainloop_signal); +PULSEAUDIO_SYMBOL(pa_threaded_mainloop_start); +PULSEAUDIO_SYMBOL(pa_threaded_mainloop_stop); +PULSEAUDIO_SYMBOL(pa_threaded_mainloop_unlock); +PULSEAUDIO_SYMBOL(pa_threaded_mainloop_wait); diff --git a/extern/audaspace/src/util/RingBuffer.cpp b/extern/audaspace/src/util/RingBuffer.cpp new file mode 100644 index 00000000000..3796684aa88 --- /dev/null +++ b/extern/audaspace/src/util/RingBuffer.cpp @@ -0,0 +1,137 @@ +/******************************************************************************* + * Copyright 2009-2021 Jörg Müller + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +#include "util/RingBuffer.h" + +#include +#include +#include + +#define ALIGNMENT 32 +#define ALIGN(a) (a + ALIGNMENT - ((long long)a & (ALIGNMENT-1))) + +AUD_NAMESPACE_BEGIN + +RingBuffer::RingBuffer(int size) : + m_buffer(size), + m_read(0), + m_write(0) +{ +} + +sample_t* RingBuffer::getBuffer() const +{ + return m_buffer.getBuffer(); +} + +int RingBuffer::getSize() const +{ + return m_buffer.getSize(); +} + +size_t RingBuffer::getReadSize() const +{ + size_t read = m_read; + size_t write = m_write; + + if(read > write) + return write + getSize() - read; + else + return write - read; +} + +size_t RingBuffer::getWriteSize() const +{ + size_t read = m_read; + size_t write = m_write; + + if(read > write) + return read - write - 1; + else + return read + getSize() - write - 1; +} + +size_t RingBuffer::read(data_t* target, size_t size) +{ + size = std::min(size, getReadSize()); + + data_t* buffer = reinterpret_cast(m_buffer.getBuffer()); + + if(m_read + size > m_buffer.getSize()) + { + size_t read_first = m_buffer.getSize() - m_read; + size_t read_second = size - read_first; + + std::memcpy(target, buffer + m_read, read_first); + std::memcpy(target + read_first, buffer, read_second); + + m_read = read_second; + } + else + { + std::memcpy(target, buffer + m_read, size); + + m_read += size; + } + + return size; +} + +size_t RingBuffer::write(data_t* source, size_t size) +{ + size = std::min(size, getWriteSize()); + + data_t* buffer = reinterpret_cast(m_buffer.getBuffer()); + + if(m_write + size > m_buffer.getSize()) + { + size_t write_first = m_buffer.getSize() - m_write; + size_t write_second = size - write_first; + + std::memcpy(buffer + m_write, source, write_first); + std::memcpy(buffer, source + write_first, write_second); + + m_write = write_second; + } + else + { + std::memcpy(buffer + m_write, source, size); + + m_write += size; + } + + return size; +} + +void RingBuffer::reset() +{ + m_read = 0; + m_write = 0; +} + +void RingBuffer::resize(int size) +{ + m_buffer.resize(size); + reset(); +} + +void RingBuffer::assureSize(int size) +{ + m_buffer.assureSize(size); + reset(); +} + +AUD_NAMESPACE_END -- cgit v1.2.3