diff options
author | Gyuhwan Park <unstabler@unstabler.pl> | 2022-05-24 15:41:35 +0300 |
---|---|---|
committer | Gyuhwan Park <unstabler@unstabler.pl> | 2022-05-24 15:41:35 +0300 |
commit | b89d1bb8daa824826c455eb9399c5f5ce875baf9 (patch) | |
tree | b0cfa52fd88461d14a02adf0137f94639b8b6512 | |
parent | bc794094dae8f22b58c303a90cae2cb90fd3bad5 (diff) |
refactor(ProjectionThread): split? ipc logic to IPCConnection
-rw-r--r-- | IPCConnection.cpp | 111 | ||||
-rw-r--r-- | IPCConnection.hpp | 86 | ||||
-rw-r--r-- | ProjectionThread.cpp | 133 | ||||
-rw-r--r-- | ProjectionThread.hpp | 51 |
4 files changed, 226 insertions, 155 deletions
diff --git a/IPCConnection.cpp b/IPCConnection.cpp new file mode 100644 index 0000000..e5f28af --- /dev/null +++ b/IPCConnection.cpp @@ -0,0 +1,111 @@ +// +// Created by Gyuhwan Park on 2022/05/24. +// + +#if defined(HAVE_CONFIG_H) +#include <config_ac.h> +#endif + +#include "IPCConnection.hpp" + +IPCConnection::IPCConnection(std::string socketPath): + _socket(socketPath), + _isWorkerTerminated(false), + + _messageId(0), + _ackId(0) +{ + +} + +void IPCConnection::connect() { + _socket.connect(); + + _workerThread = std::thread(&IPCConnection::workerLoop, this); +} + +void IPCConnection::disconnect() { + _isWorkerTerminated = true; + + if (_workerThread.joinable()) { + _workerThread.join(); + } + _socket.close(); +} + +std::unique_ptr<ULIPCHeader, IPCConnection::MallocFreeDeleter> IPCConnection::nextHeader() { + return std::move(read<ULIPCHeader>(sizeof(ULIPCHeader))); +} + +void IPCConnection::write(const void *pointer, size_t size) { + assert(pointer != nullptr); + assert(size > 0); + + std::scoped_lock<std::mutex> scopedWriteTasksLock(_writeTasksLock); + + std::unique_ptr<uint8_t, MallocFreeDeleter> data( + (uint8_t *) malloc(size), + free + ); + + std::memcpy(data.get(), pointer, size); + + _writeTasks.emplace(size, std::move(data)); +} + +void IPCConnection::workerLoop() { + const size_t MAX_READ_SIZE = 8192; + + size_t readBytes = 0; + std::unique_ptr<uint8_t> _currentReadTask; + + while (!_isWorkerTerminated) { + if (!_writeTasks.empty()) { + std::scoped_lock<std::mutex> scopedWriteTasksLock(_writeTasksLock); + auto writeTask = std::move(_writeTasks.front()); + _writeTasks.pop(); + + if (_socket.write(writeTask.second.get(), writeTask.first) < 0) { + throw std::runtime_error("could not perform write()"); + } + } + + if (!_readTasks.empty()) { + auto &readTask = _readTasks.front(); + + auto &contentLength = readTask.first; + auto &promise = readTask.second; + + if (_currentReadTask == nullptr) { + readBytes = 0; + _currentReadTask = std::unique_ptr<uint8_t>( + new uint8_t[readTask.first] + ); + } + + int bytes = std::min( + (size_t) MAX_READ_SIZE, + contentLength - readBytes + ); + + size_t retval = _socket.read(_currentReadTask.get() + readBytes, bytes); + if (retval < 0) { + throw std::runtime_error("failed to perform read()"); + } + + readBytes += retval; + + if (readBytes >= contentLength) { + promise.set_value(std::move(_currentReadTask)); + + { + std::scoped_lock<std::mutex> scopedReadTasksLock(_readTasksLock); + _readTasks.pop(); + } + + _currentReadTask = nullptr; + readBytes = 0; + } + } + } +} diff --git a/IPCConnection.hpp b/IPCConnection.hpp new file mode 100644 index 0000000..d3b1f1f --- /dev/null +++ b/IPCConnection.hpp @@ -0,0 +1,86 @@ +// +// Created by Gyuhwan Park on 2022/05/24. +// + +#ifndef ULALACA_IPCCONNECTION_HPP +#define ULALACA_IPCCONNECTION_HPP + +#include <memory> +#include <thread> +#include <queue> +#include <future> + +#include "UnixSocket.hpp" + +#include "messages/projector.h" + +#include "ulalaca.hpp" + + +class IPCConnection { +public: + using MallocFreeDeleter = std::function<void(void *)>; + + explicit IPCConnection(std::string socketPath); + IPCConnection(IPCConnection &) = delete; + + /** + * @throws SystemCallException + */ + void connect(); + void disconnect(); + + std::unique_ptr<ULIPCHeader, MallocFreeDeleter> nextHeader(); + + template <typename T> + void writeMessage(uint16_t messageType, T message) { + auto header = ULIPCHeader { + (uint16_t) messageType, + _messageId, + 0, // FIXME + 0, // FIXME + sizeof(T) + }; + + write(&header, sizeof(header)); + write(&message, sizeof(T)); + } + + template<typename T> + std::unique_ptr<T, MallocFreeDeleter> read(size_t size) { + assert(size != 0); + + auto promise = std::promise<std::unique_ptr<uint8_t>>(); + { + std::scoped_lock<std::mutex> scopedReadTasksLock(_readTasksLock); + _readTasks.emplace(size, promise); + } + auto pointer = promise.get_future().get(); + + return std::move(std::unique_ptr<T, MallocFreeDeleter>( + reinterpret_cast<T *>(pointer.release()), + free + )); + } + + void write(const void *pointer, size_t size); + +private: + void workerLoop(); + + std::atomic_uint64_t _messageId; + std::atomic_uint64_t _ackId; + + UnixSocket _socket; + std::thread _workerThread; + bool _isWorkerTerminated; + + std::mutex _writeTasksLock; + std::mutex _readTasksLock; + + std::queue<std::pair<size_t, std::unique_ptr<uint8_t, MallocFreeDeleter>>> _writeTasks; + std::queue<std::pair<size_t, std::promise<std::unique_ptr<uint8_t>> &>> _readTasks; +}; + + +#endif //XRDP_IPCCONNECTION_HPP diff --git a/ProjectionThread.cpp b/ProjectionThread.cpp index d4a07bc..5723cc4 100644 --- a/ProjectionThread.cpp +++ b/ProjectionThread.cpp @@ -13,17 +13,17 @@ extern "C" { #include "defines.h" #include "guid.h" #include "xrdp_client_info.h" -}; +} #include "ProjectionThread.hpp" #include "KeycodeMap.hpp" ProjectionThread::ProjectionThread( XrdpUlalaca &xrdpUlalaca, - UnixSocket &socket + const std::string &socketPath ): _xrdpUlalaca(xrdpUlalaca), - _socket(socket), + _ipcConnection(socketPath), _isTerminated(false) { @@ -31,7 +31,6 @@ ProjectionThread::ProjectionThread( void ProjectionThread::start() { _projectorThread = std::thread(&ProjectionThread::mainLoop, this); - _ioThread = std::thread(&ProjectionThread::ioLoop, this); } void ProjectionThread::stop() { @@ -43,14 +42,14 @@ void ProjectionThread::handleEvent(XrdpEvent &event) { auto keycode = event.param3; auto cgKeycode = rdpKeycodeToCGKeycode(keycode); auto eventType = event.type == XrdpEvent::KEY_DOWN ? - KEY_EVENT_TYPE_KEYDOWN : - KEY_EVENT_TYPE_KEYUP; + KEYBOARD_EVENT_TYPE_KEYDOWN : + KEYBOARD_EVENT_TYPE_KEYUP; if (cgKeycode == -1) { return; } - writeMessage(OUT_KEYBOARD_EVENT, KeyboardEvent { + _ipcConnection.writeMessage(TYPE_EVENT_KEYBOARD, ULIPCKeyboardEvent { eventType, (uint32_t) cgKeycode, 0 }); } else if (event.type == XrdpEvent::KEY_SYNCHRONIZE_LOCK) { @@ -63,7 +62,7 @@ void ProjectionThread::handleEvent(XrdpEvent &event) { uint16_t posX = event.param1; uint16_t posY = event.param2; - writeMessage(OUT_MOUSE_MOVE_EVENT, MouseMoveEvent { + _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_MOVE, ULIPCMouseMoveEvent { posX, posY, 0 }); @@ -71,16 +70,16 @@ void ProjectionThread::handleEvent(XrdpEvent &event) { } case XrdpEvent::MOUSE_BUTTON_LEFT_DOWN: { - writeMessage(OUT_MOUSE_BUTTON_EVENT, MouseButtonEvent { - MOUSE_EVENT_TYPE_MOUSEDOWN, + _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_BUTTON, ULIPCMouseButtonEvent { + MOUSE_EVENT_TYPE_DOWN, MOUSE_EVENT_BUTTON_LEFT, 0 }); return; } case XrdpEvent::MOUSE_BUTTON_LEFT_UP: { - writeMessage(OUT_MOUSE_BUTTON_EVENT, MouseButtonEvent { - MOUSE_EVENT_TYPE_MOUSEUP, + _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_BUTTON, ULIPCMouseButtonEvent { + MOUSE_EVENT_TYPE_UP, MOUSE_EVENT_BUTTON_LEFT, 0 }); @@ -89,16 +88,16 @@ void ProjectionThread::handleEvent(XrdpEvent &event) { case XrdpEvent::MOUSE_BUTTON_RIGHT_DOWN: { - writeMessage(OUT_MOUSE_BUTTON_EVENT, MouseButtonEvent { - MOUSE_EVENT_TYPE_MOUSEDOWN, + _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_BUTTON, ULIPCMouseButtonEvent { + MOUSE_EVENT_TYPE_DOWN, MOUSE_EVENT_BUTTON_RIGHT, 0 }); return; } case XrdpEvent::MOUSE_BUTTON_RIGHT_UP: { - writeMessage(OUT_MOUSE_BUTTON_EVENT, MouseButtonEvent { - MOUSE_EVENT_TYPE_MOUSEUP, + _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_BUTTON, ULIPCMouseButtonEvent { + MOUSE_EVENT_TYPE_UP, MOUSE_EVENT_BUTTON_RIGHT, 0 }); @@ -144,114 +143,32 @@ void ProjectionThread::handleEvent(XrdpEvent &event) { void ProjectionThread::mainLoop() { while (!_isTerminated) { - auto header = nextHeader(); + auto header = _ipcConnection.nextHeader(); switch (header->messageType) { - case IN_SCREEN_UPDATE_EVENT: { - auto updateEvent = read<ScreenUpdateEvent>(header->length); + case TYPE_SCREEN_UPDATE_NOTIFY: { + auto notification = _ipcConnection.read<ULIPCScreenUpdateNotify>(header->length); LOG(LOG_LEVEL_DEBUG, "mainLoop(): adding dirty rect"); - _xrdpUlalaca.addDirtyRect(updateEvent->rect); + _xrdpUlalaca.addDirtyRect(notification->rect); continue; } - case IN_SCREEN_COMMIT_UPDATE: { - auto commitUpdate = read<ScreenCommitUpdate>(header->length); - auto bitmap = read<uint8_t>(commitUpdate->bitmapLength); + case TYPE_SCREEN_UPDATE_COMMIT: { + auto commit = _ipcConnection.read<ULIPCScreenUpdateCommit>(header->length); + auto bitmap = _ipcConnection.read<uint8_t>(commit->bitmapLength); LOG(LOG_LEVEL_DEBUG, "mainLoop(): commiting update"); _xrdpUlalaca.commitUpdate( bitmap.get(), - commitUpdate->screenRect.width, - commitUpdate->screenRect.height + commit->screenRect.width, + commit->screenRect.height ); continue; } default: { // ignore - read<uint8_t>(header->length); - } - } - } -} - -void ProjectionThread::ioLoop() { - const size_t MAX_READ_SIZE = 8192; - - size_t readBytes = 0; - std::unique_ptr<uint8_t> _currentReadTask; - - while (!_isTerminated) { - if (_writeTasks.empty() && _readTasks.empty()) { - using namespace std::chrono_literals; - std::this_thread::sleep_for(1ms); - } - - if (!_writeTasks.empty()) { - std::scoped_lock<std::mutex> scopedWriteTasksLock(_writeTasksLock); - auto writeTask = std::move(_writeTasks.front()); - _writeTasks.pop(); - - if (_socket.write(writeTask.second.get(), writeTask.first) < 0) { - throw std::runtime_error("failed to perform write()"); - } - } - - if (!_readTasks.empty()) { - auto &readTask = _readTasks.front(); - - auto &contentLength = readTask.first; - auto &promise = readTask.second; - - if (_currentReadTask == nullptr) { - readBytes = 0; - _currentReadTask = std::unique_ptr<uint8_t>( - new uint8_t[readTask.first] - ); - } - - int bytes = std::min( - (size_t) MAX_READ_SIZE, - contentLength - readBytes - ); - - size_t retval = _socket.read(_currentReadTask.get() + readBytes, bytes); - if (retval < 0) { - throw std::runtime_error("failed to perform read()"); - } - - readBytes += retval; - - if (readBytes >= contentLength) { - promise.set_value(std::move(_currentReadTask)); - - { - std::scoped_lock<std::mutex> scopedReadTasksLock(_readTasksLock); - _readTasks.pop(); - } - - _currentReadTask = nullptr; - readBytes = 0; + _ipcConnection.read<uint8_t>(header->length); } } } -} - -std::unique_ptr<ProjectorMessageHeader, MallocFreeDeleter> ProjectionThread::nextHeader() { - return std::move(read<ProjectorMessageHeader>(sizeof(ProjectorMessageHeader))); -} - -void ProjectionThread::write(const void *pointer, size_t size) { - assert(pointer != nullptr); - assert(size > 0); - - std::scoped_lock<std::mutex> scopedWriteTasksLock(_writeTasksLock); - - std::unique_ptr<uint8_t, MallocFreeDeleter> data( - (uint8_t *) malloc(size), - free - ); - - std::memcpy(data.get(), pointer, size); - - _writeTasks.emplace(size, std::move(data)); }
\ No newline at end of file diff --git a/ProjectionThread.hpp b/ProjectionThread.hpp index fee749e..c8be1bf 100644 --- a/ProjectionThread.hpp +++ b/ProjectionThread.hpp @@ -12,18 +12,19 @@ #include "UnixSocket.hpp" +#include "IPCConnection.hpp" #include "messages/projector.h" #include "ulalaca.hpp" -using MallocFreeDeleter = std::function<void(void *)>; class ProjectionThread { public: explicit ProjectionThread( XrdpUlalaca &xrdpUlalaca, - UnixSocket &socket + const std::string &socketPath ); + ProjectionThread(ProjectionThread &) = delete; void start(); void stop(); @@ -31,56 +32,12 @@ public: void handleEvent(XrdpEvent &event); private: void mainLoop(); - void ioLoop(); - - std::unique_ptr<ProjectorMessageHeader, MallocFreeDeleter> nextHeader(); - - template<typename T> - std::unique_ptr<T, MallocFreeDeleter> read(size_t size) { - assert(size != 0); - - auto promise = std::promise<std::unique_ptr<uint8_t>>(); - { - std::scoped_lock<std::mutex> scopedReadTasksLock(_readTasksLock); - _readTasks.emplace(size, promise); - } - auto pointer = promise.get_future().get(); - - return std::move(std::unique_ptr<T, MallocFreeDeleter>( - reinterpret_cast<T *>(pointer.release()), - free - )); - } - - void write(const void *pointer, size_t size); - - template <typename T> - void writeMessage(uint16_t messageType, T message) { - auto header = ProjectorMessageHeader { - (uint16_t) messageType, - 0, 0, - 0, - sizeof(T) - }; - - write(&header, sizeof(header)); - write(&message, sizeof(T)); - } - XrdpUlalaca &_xrdpUlalaca; - UnixSocket &_socket; + IPCConnection _ipcConnection; bool _isTerminated; std::thread _projectorThread; - std::thread _ioThread; - - - std::mutex _writeTasksLock; - std::mutex _readTasksLock; - - std::queue<std::pair<size_t, std::unique_ptr<uint8_t, MallocFreeDeleter>>> _writeTasks; - std::queue<std::pair<size_t, std::promise<std::unique_ptr<uint8_t>> &>> _readTasks; }; #endif //ULALACA_PROJECTIONTHREAD_HPP |