Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/neutrinolabs/ulalaca-xrdp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGyuhwan Park <unstabler@unstabler.pl>2022-05-24 15:41:35 +0300
committerGyuhwan Park <unstabler@unstabler.pl>2022-05-24 15:41:35 +0300
commitb89d1bb8daa824826c455eb9399c5f5ce875baf9 (patch)
treeb0cfa52fd88461d14a02adf0137f94639b8b6512
parentbc794094dae8f22b58c303a90cae2cb90fd3bad5 (diff)
refactor(ProjectionThread): split? ipc logic to IPCConnection
-rw-r--r--IPCConnection.cpp111
-rw-r--r--IPCConnection.hpp86
-rw-r--r--ProjectionThread.cpp133
-rw-r--r--ProjectionThread.hpp51
4 files changed, 226 insertions, 155 deletions
diff --git a/IPCConnection.cpp b/IPCConnection.cpp
new file mode 100644
index 0000000..e5f28af
--- /dev/null
+++ b/IPCConnection.cpp
@@ -0,0 +1,111 @@
+//
+// Created by Gyuhwan Park on 2022/05/24.
+//
+
+#if defined(HAVE_CONFIG_H)
+#include <config_ac.h>
+#endif
+
+#include "IPCConnection.hpp"
+
+IPCConnection::IPCConnection(std::string socketPath):
+ _socket(socketPath),
+ _isWorkerTerminated(false),
+
+ _messageId(0),
+ _ackId(0)
+{
+
+}
+
+void IPCConnection::connect() {
+ _socket.connect();
+
+ _workerThread = std::thread(&IPCConnection::workerLoop, this);
+}
+
+void IPCConnection::disconnect() {
+ _isWorkerTerminated = true;
+
+ if (_workerThread.joinable()) {
+ _workerThread.join();
+ }
+ _socket.close();
+}
+
+std::unique_ptr<ULIPCHeader, IPCConnection::MallocFreeDeleter> IPCConnection::nextHeader() {
+ return std::move(read<ULIPCHeader>(sizeof(ULIPCHeader)));
+}
+
+void IPCConnection::write(const void *pointer, size_t size) {
+ assert(pointer != nullptr);
+ assert(size > 0);
+
+ std::scoped_lock<std::mutex> scopedWriteTasksLock(_writeTasksLock);
+
+ std::unique_ptr<uint8_t, MallocFreeDeleter> data(
+ (uint8_t *) malloc(size),
+ free
+ );
+
+ std::memcpy(data.get(), pointer, size);
+
+ _writeTasks.emplace(size, std::move(data));
+}
+
+void IPCConnection::workerLoop() {
+ const size_t MAX_READ_SIZE = 8192;
+
+ size_t readBytes = 0;
+ std::unique_ptr<uint8_t> _currentReadTask;
+
+ while (!_isWorkerTerminated) {
+ if (!_writeTasks.empty()) {
+ std::scoped_lock<std::mutex> scopedWriteTasksLock(_writeTasksLock);
+ auto writeTask = std::move(_writeTasks.front());
+ _writeTasks.pop();
+
+ if (_socket.write(writeTask.second.get(), writeTask.first) < 0) {
+ throw std::runtime_error("could not perform write()");
+ }
+ }
+
+ if (!_readTasks.empty()) {
+ auto &readTask = _readTasks.front();
+
+ auto &contentLength = readTask.first;
+ auto &promise = readTask.second;
+
+ if (_currentReadTask == nullptr) {
+ readBytes = 0;
+ _currentReadTask = std::unique_ptr<uint8_t>(
+ new uint8_t[readTask.first]
+ );
+ }
+
+ int bytes = std::min(
+ (size_t) MAX_READ_SIZE,
+ contentLength - readBytes
+ );
+
+ size_t retval = _socket.read(_currentReadTask.get() + readBytes, bytes);
+ if (retval < 0) {
+ throw std::runtime_error("failed to perform read()");
+ }
+
+ readBytes += retval;
+
+ if (readBytes >= contentLength) {
+ promise.set_value(std::move(_currentReadTask));
+
+ {
+ std::scoped_lock<std::mutex> scopedReadTasksLock(_readTasksLock);
+ _readTasks.pop();
+ }
+
+ _currentReadTask = nullptr;
+ readBytes = 0;
+ }
+ }
+ }
+}
diff --git a/IPCConnection.hpp b/IPCConnection.hpp
new file mode 100644
index 0000000..d3b1f1f
--- /dev/null
+++ b/IPCConnection.hpp
@@ -0,0 +1,86 @@
+//
+// Created by Gyuhwan Park on 2022/05/24.
+//
+
+#ifndef ULALACA_IPCCONNECTION_HPP
+#define ULALACA_IPCCONNECTION_HPP
+
+#include <memory>
+#include <thread>
+#include <queue>
+#include <future>
+
+#include "UnixSocket.hpp"
+
+#include "messages/projector.h"
+
+#include "ulalaca.hpp"
+
+
+class IPCConnection {
+public:
+ using MallocFreeDeleter = std::function<void(void *)>;
+
+ explicit IPCConnection(std::string socketPath);
+ IPCConnection(IPCConnection &) = delete;
+
+ /**
+ * @throws SystemCallException
+ */
+ void connect();
+ void disconnect();
+
+ std::unique_ptr<ULIPCHeader, MallocFreeDeleter> nextHeader();
+
+ template <typename T>
+ void writeMessage(uint16_t messageType, T message) {
+ auto header = ULIPCHeader {
+ (uint16_t) messageType,
+ _messageId,
+ 0, // FIXME
+ 0, // FIXME
+ sizeof(T)
+ };
+
+ write(&header, sizeof(header));
+ write(&message, sizeof(T));
+ }
+
+ template<typename T>
+ std::unique_ptr<T, MallocFreeDeleter> read(size_t size) {
+ assert(size != 0);
+
+ auto promise = std::promise<std::unique_ptr<uint8_t>>();
+ {
+ std::scoped_lock<std::mutex> scopedReadTasksLock(_readTasksLock);
+ _readTasks.emplace(size, promise);
+ }
+ auto pointer = promise.get_future().get();
+
+ return std::move(std::unique_ptr<T, MallocFreeDeleter>(
+ reinterpret_cast<T *>(pointer.release()),
+ free
+ ));
+ }
+
+ void write(const void *pointer, size_t size);
+
+private:
+ void workerLoop();
+
+ std::atomic_uint64_t _messageId;
+ std::atomic_uint64_t _ackId;
+
+ UnixSocket _socket;
+ std::thread _workerThread;
+ bool _isWorkerTerminated;
+
+ std::mutex _writeTasksLock;
+ std::mutex _readTasksLock;
+
+ std::queue<std::pair<size_t, std::unique_ptr<uint8_t, MallocFreeDeleter>>> _writeTasks;
+ std::queue<std::pair<size_t, std::promise<std::unique_ptr<uint8_t>> &>> _readTasks;
+};
+
+
+#endif //XRDP_IPCCONNECTION_HPP
diff --git a/ProjectionThread.cpp b/ProjectionThread.cpp
index d4a07bc..5723cc4 100644
--- a/ProjectionThread.cpp
+++ b/ProjectionThread.cpp
@@ -13,17 +13,17 @@ extern "C" {
#include "defines.h"
#include "guid.h"
#include "xrdp_client_info.h"
-};
+}
#include "ProjectionThread.hpp"
#include "KeycodeMap.hpp"
ProjectionThread::ProjectionThread(
XrdpUlalaca &xrdpUlalaca,
- UnixSocket &socket
+ const std::string &socketPath
):
_xrdpUlalaca(xrdpUlalaca),
- _socket(socket),
+ _ipcConnection(socketPath),
_isTerminated(false)
{
@@ -31,7 +31,6 @@ ProjectionThread::ProjectionThread(
void ProjectionThread::start() {
_projectorThread = std::thread(&ProjectionThread::mainLoop, this);
- _ioThread = std::thread(&ProjectionThread::ioLoop, this);
}
void ProjectionThread::stop() {
@@ -43,14 +42,14 @@ void ProjectionThread::handleEvent(XrdpEvent &event) {
auto keycode = event.param3;
auto cgKeycode = rdpKeycodeToCGKeycode(keycode);
auto eventType = event.type == XrdpEvent::KEY_DOWN ?
- KEY_EVENT_TYPE_KEYDOWN :
- KEY_EVENT_TYPE_KEYUP;
+ KEYBOARD_EVENT_TYPE_KEYDOWN :
+ KEYBOARD_EVENT_TYPE_KEYUP;
if (cgKeycode == -1) {
return;
}
- writeMessage(OUT_KEYBOARD_EVENT, KeyboardEvent {
+ _ipcConnection.writeMessage(TYPE_EVENT_KEYBOARD, ULIPCKeyboardEvent {
eventType, (uint32_t) cgKeycode, 0
});
} else if (event.type == XrdpEvent::KEY_SYNCHRONIZE_LOCK) {
@@ -63,7 +62,7 @@ void ProjectionThread::handleEvent(XrdpEvent &event) {
uint16_t posX = event.param1;
uint16_t posY = event.param2;
- writeMessage(OUT_MOUSE_MOVE_EVENT, MouseMoveEvent {
+ _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_MOVE, ULIPCMouseMoveEvent {
posX, posY,
0
});
@@ -71,16 +70,16 @@ void ProjectionThread::handleEvent(XrdpEvent &event) {
}
case XrdpEvent::MOUSE_BUTTON_LEFT_DOWN: {
- writeMessage(OUT_MOUSE_BUTTON_EVENT, MouseButtonEvent {
- MOUSE_EVENT_TYPE_MOUSEDOWN,
+ _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_BUTTON, ULIPCMouseButtonEvent {
+ MOUSE_EVENT_TYPE_DOWN,
MOUSE_EVENT_BUTTON_LEFT,
0
});
return;
}
case XrdpEvent::MOUSE_BUTTON_LEFT_UP: {
- writeMessage(OUT_MOUSE_BUTTON_EVENT, MouseButtonEvent {
- MOUSE_EVENT_TYPE_MOUSEUP,
+ _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_BUTTON, ULIPCMouseButtonEvent {
+ MOUSE_EVENT_TYPE_UP,
MOUSE_EVENT_BUTTON_LEFT,
0
});
@@ -89,16 +88,16 @@ void ProjectionThread::handleEvent(XrdpEvent &event) {
case XrdpEvent::MOUSE_BUTTON_RIGHT_DOWN: {
- writeMessage(OUT_MOUSE_BUTTON_EVENT, MouseButtonEvent {
- MOUSE_EVENT_TYPE_MOUSEDOWN,
+ _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_BUTTON, ULIPCMouseButtonEvent {
+ MOUSE_EVENT_TYPE_DOWN,
MOUSE_EVENT_BUTTON_RIGHT,
0
});
return;
}
case XrdpEvent::MOUSE_BUTTON_RIGHT_UP: {
- writeMessage(OUT_MOUSE_BUTTON_EVENT, MouseButtonEvent {
- MOUSE_EVENT_TYPE_MOUSEUP,
+ _ipcConnection.writeMessage(TYPE_EVENT_MOUSE_BUTTON, ULIPCMouseButtonEvent {
+ MOUSE_EVENT_TYPE_UP,
MOUSE_EVENT_BUTTON_RIGHT,
0
});
@@ -144,114 +143,32 @@ void ProjectionThread::handleEvent(XrdpEvent &event) {
void ProjectionThread::mainLoop() {
while (!_isTerminated) {
- auto header = nextHeader();
+ auto header = _ipcConnection.nextHeader();
switch (header->messageType) {
- case IN_SCREEN_UPDATE_EVENT: {
- auto updateEvent = read<ScreenUpdateEvent>(header->length);
+ case TYPE_SCREEN_UPDATE_NOTIFY: {
+ auto notification = _ipcConnection.read<ULIPCScreenUpdateNotify>(header->length);
LOG(LOG_LEVEL_DEBUG, "mainLoop(): adding dirty rect");
- _xrdpUlalaca.addDirtyRect(updateEvent->rect);
+ _xrdpUlalaca.addDirtyRect(notification->rect);
continue;
}
- case IN_SCREEN_COMMIT_UPDATE: {
- auto commitUpdate = read<ScreenCommitUpdate>(header->length);
- auto bitmap = read<uint8_t>(commitUpdate->bitmapLength);
+ case TYPE_SCREEN_UPDATE_COMMIT: {
+ auto commit = _ipcConnection.read<ULIPCScreenUpdateCommit>(header->length);
+ auto bitmap = _ipcConnection.read<uint8_t>(commit->bitmapLength);
LOG(LOG_LEVEL_DEBUG, "mainLoop(): commiting update");
_xrdpUlalaca.commitUpdate(
bitmap.get(),
- commitUpdate->screenRect.width,
- commitUpdate->screenRect.height
+ commit->screenRect.width,
+ commit->screenRect.height
);
continue;
}
default: {
// ignore
- read<uint8_t>(header->length);
- }
- }
- }
-}
-
-void ProjectionThread::ioLoop() {
- const size_t MAX_READ_SIZE = 8192;
-
- size_t readBytes = 0;
- std::unique_ptr<uint8_t> _currentReadTask;
-
- while (!_isTerminated) {
- if (_writeTasks.empty() && _readTasks.empty()) {
- using namespace std::chrono_literals;
- std::this_thread::sleep_for(1ms);
- }
-
- if (!_writeTasks.empty()) {
- std::scoped_lock<std::mutex> scopedWriteTasksLock(_writeTasksLock);
- auto writeTask = std::move(_writeTasks.front());
- _writeTasks.pop();
-
- if (_socket.write(writeTask.second.get(), writeTask.first) < 0) {
- throw std::runtime_error("failed to perform write()");
- }
- }
-
- if (!_readTasks.empty()) {
- auto &readTask = _readTasks.front();
-
- auto &contentLength = readTask.first;
- auto &promise = readTask.second;
-
- if (_currentReadTask == nullptr) {
- readBytes = 0;
- _currentReadTask = std::unique_ptr<uint8_t>(
- new uint8_t[readTask.first]
- );
- }
-
- int bytes = std::min(
- (size_t) MAX_READ_SIZE,
- contentLength - readBytes
- );
-
- size_t retval = _socket.read(_currentReadTask.get() + readBytes, bytes);
- if (retval < 0) {
- throw std::runtime_error("failed to perform read()");
- }
-
- readBytes += retval;
-
- if (readBytes >= contentLength) {
- promise.set_value(std::move(_currentReadTask));
-
- {
- std::scoped_lock<std::mutex> scopedReadTasksLock(_readTasksLock);
- _readTasks.pop();
- }
-
- _currentReadTask = nullptr;
- readBytes = 0;
+ _ipcConnection.read<uint8_t>(header->length);
}
}
}
-}
-
-std::unique_ptr<ProjectorMessageHeader, MallocFreeDeleter> ProjectionThread::nextHeader() {
- return std::move(read<ProjectorMessageHeader>(sizeof(ProjectorMessageHeader)));
-}
-
-void ProjectionThread::write(const void *pointer, size_t size) {
- assert(pointer != nullptr);
- assert(size > 0);
-
- std::scoped_lock<std::mutex> scopedWriteTasksLock(_writeTasksLock);
-
- std::unique_ptr<uint8_t, MallocFreeDeleter> data(
- (uint8_t *) malloc(size),
- free
- );
-
- std::memcpy(data.get(), pointer, size);
-
- _writeTasks.emplace(size, std::move(data));
} \ No newline at end of file
diff --git a/ProjectionThread.hpp b/ProjectionThread.hpp
index fee749e..c8be1bf 100644
--- a/ProjectionThread.hpp
+++ b/ProjectionThread.hpp
@@ -12,18 +12,19 @@
#include "UnixSocket.hpp"
+#include "IPCConnection.hpp"
#include "messages/projector.h"
#include "ulalaca.hpp"
-using MallocFreeDeleter = std::function<void(void *)>;
class ProjectionThread {
public:
explicit ProjectionThread(
XrdpUlalaca &xrdpUlalaca,
- UnixSocket &socket
+ const std::string &socketPath
);
+ ProjectionThread(ProjectionThread &) = delete;
void start();
void stop();
@@ -31,56 +32,12 @@ public:
void handleEvent(XrdpEvent &event);
private:
void mainLoop();
- void ioLoop();
-
- std::unique_ptr<ProjectorMessageHeader, MallocFreeDeleter> nextHeader();
-
- template<typename T>
- std::unique_ptr<T, MallocFreeDeleter> read(size_t size) {
- assert(size != 0);
-
- auto promise = std::promise<std::unique_ptr<uint8_t>>();
- {
- std::scoped_lock<std::mutex> scopedReadTasksLock(_readTasksLock);
- _readTasks.emplace(size, promise);
- }
- auto pointer = promise.get_future().get();
-
- return std::move(std::unique_ptr<T, MallocFreeDeleter>(
- reinterpret_cast<T *>(pointer.release()),
- free
- ));
- }
-
- void write(const void *pointer, size_t size);
-
- template <typename T>
- void writeMessage(uint16_t messageType, T message) {
- auto header = ProjectorMessageHeader {
- (uint16_t) messageType,
- 0, 0,
- 0,
- sizeof(T)
- };
-
- write(&header, sizeof(header));
- write(&message, sizeof(T));
- }
-
XrdpUlalaca &_xrdpUlalaca;
- UnixSocket &_socket;
+ IPCConnection _ipcConnection;
bool _isTerminated;
std::thread _projectorThread;
- std::thread _ioThread;
-
-
- std::mutex _writeTasksLock;
- std::mutex _readTasksLock;
-
- std::queue<std::pair<size_t, std::unique_ptr<uint8_t, MallocFreeDeleter>>> _writeTasks;
- std::queue<std::pair<size_t, std::promise<std::unique_ptr<uint8_t>> &>> _readTasks;
};
#endif //ULALACA_PROJECTIONTHREAD_HPP