diff options
author | Добрый Ээх <bukharaev@gmail.com> | 2016-10-07 17:17:03 +0300 |
---|---|---|
committer | Добрый Ээх <bukharaev@gmail.com> | 2016-10-13 21:25:22 +0300 |
commit | a65d86ebcf216bd0480c7f576d0eff60df44a160 (patch) | |
tree | 0ba4b8802b4d60179190523d327b2246966fc1b8 /tracking | |
parent | a8067e8826da51278fa78ce652b80cec2848536b (diff) |
gps tracking reporter
Diffstat (limited to 'tracking')
-rw-r--r-- | tracking/connection.cpp | 44 | ||||
-rw-r--r-- | tracking/connection.hpp | 35 | ||||
-rw-r--r-- | tracking/reporter.cpp | 147 | ||||
-rw-r--r-- | tracking/reporter.hpp | 38 | ||||
-rw-r--r-- | tracking/tracking.pro | 15 | ||||
-rw-r--r-- | tracking/tracking_tests/reporter_tests.cpp | 62 | ||||
-rw-r--r-- | tracking/tracking_tests/tracking_tests.pro | 29 |
7 files changed, 370 insertions, 0 deletions
diff --git a/tracking/connection.cpp b/tracking/connection.cpp new file mode 100644 index 0000000000..09a78a1f41 --- /dev/null +++ b/tracking/connection.cpp @@ -0,0 +1,44 @@ +#include "connection.hpp" + +#include "platform/socket.hpp" + +namespace +{ +uint32_t constexpr kSocketTimeoutMs = 10000; +} // namespace + +namespace tracking +{ +// static +const char Connection::kHost[] = "gps.host"; // TODO change to real host value +uint16_t Connection::kPort = 666; // TODO change to real port value + +Connection::Connection(unique_ptr<platform::Socket> socket, string const & host, uint16_t port, + bool isHistorical) + : m_socket(move(socket)), m_host(host), m_port(port) +{ + ASSERT(m_socket.get() != nullptr, ()); + + m_socket->SetTimeout(kSocketTimeoutMs); +} + +// TODO: implement handshake +bool Connection::Reconnect() +{ + m_socket->Close(); + return m_socket->Open(m_host, m_port); +} + +// TODO: implement historical +bool Connection::Send(boost::circular_buffer<DataPoint> const & points) +{ + ASSERT(m_buffer.empty(), ()); + + MemWriter<decltype(m_buffer)> writer(m_buffer); + coding::TrafficGPSEncoder::SerializeDataPoints(coding::TrafficGPSEncoder::kLatestVersion, writer, + points); + bool isSuccess = m_socket->Write(m_buffer.data(), m_buffer.size()); + m_buffer.clear(); + return isSuccess; +} +} // namespace tracking diff --git a/tracking/connection.hpp b/tracking/connection.hpp new file mode 100644 index 0000000000..117d014903 --- /dev/null +++ b/tracking/connection.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include "boost/circular_buffer.hpp" + +#include "coding/traffic.hpp" + +#include "std/vector.hpp" + +namespace platform +{ +class Socket; +} + +namespace tracking +{ +using DataPoint = coding::TrafficGPSEncoder::DataPoint; + +class Connection final +{ +public: + static const char kHost[]; + static uint16_t kPort; + + Connection(unique_ptr<platform::Socket> socket, string const & host, uint16_t port, + bool isHistorical); + bool Reconnect(); + bool Send(boost::circular_buffer<DataPoint> const & points); + +private: + unique_ptr<platform::Socket> m_socket; + string const m_host; + uint16_t const m_port; + vector<uint8_t> m_buffer; +}; +} // namespace tracking diff --git a/tracking/reporter.cpp b/tracking/reporter.cpp new file mode 100644 index 0000000000..6021ac5d9c --- /dev/null +++ b/tracking/reporter.cpp @@ -0,0 +1,147 @@ +#include "reporter.hpp" + +#include "base/logging.hpp" +#include "base/thread.hpp" +#include "base/timer.hpp" + +#include "boost/circular_buffer.hpp" + +#include "platform/location.hpp" +#include "platform/socket.hpp" + +#include "std/mutex.hpp" +#include "std/vector.hpp" + +#include "tracking/connection.hpp" + +using namespace tracking; + +namespace +{ +double constexpr kMinDelaySeconds = 1.0; +double constexpr kReconnectDelaySeconds = 60.0; +size_t constexpr kRealTimeBufferSize = 60; + +class WorkerImpl final : public Reporter::Worker +{ +public: + WorkerImpl(unique_ptr<platform::Socket> socket, size_t pushDelayMs); + void Run(); + + // Worker overrides + void AddLocation(location::GpsInfo const & info); + void Stop(); + +private: + void FetchInput(); + bool SendPoints(); + + volatile bool m_stop = false; + Connection m_realtimeSender; + size_t m_pushDelayMs; + bool m_wasConnected = false; + double m_lastConnectTryTime = 0.0; + vector<tracking::DataPoint> m_input; + mutex m_inputMutex; + boost::circular_buffer<DataPoint> m_points; + double m_lastGpsTime = 0.0; +}; +} // namespace + +namespace tracking +{ +// static +const char Reporter::kAllowKey[] = "AllowStat"; + +Reporter::Reporter(unique_ptr<platform::Socket> socket, size_t pushDelayMs) +{ + WorkerImpl * worker = new WorkerImpl(move(socket), pushDelayMs); + m_worker = worker; + threads::SimpleThread thread([worker] + { + worker->Run(); + delete worker; + }); + thread.detach(); +} + +Reporter::~Reporter() { m_worker->Stop(); } + +void Reporter::AddLocation(location::GpsInfo const & info) { m_worker->AddLocation(info); } +} // namespace tracking + +namespace +{ +WorkerImpl::WorkerImpl(unique_ptr<platform::Socket> socket, size_t pushDelayMs) + : m_realtimeSender(move(socket), Connection::kHost, Connection::kPort, false) + , m_pushDelayMs(pushDelayMs) + , m_points(kRealTimeBufferSize) +{ +} + +void WorkerImpl::AddLocation(location::GpsInfo const & info) +{ + lock_guard<mutex> lg(m_inputMutex); + + if (info.m_timestamp < m_lastGpsTime + kMinDelaySeconds) + return; + + m_lastGpsTime = info.m_timestamp; + m_input.push_back(DataPoint(info.m_timestamp, ms::LatLon(info.m_latitude, info.m_longitude))); +} + +void WorkerImpl::Run() +{ + LOG(LINFO, ("Tracking Reporter started")); + + my::HighResTimer timer; + + while (!m_stop) + { + uint64_t const startMs = timer.ElapsedMillis(); + + FetchInput(); + if (SendPoints()) + m_points.clear(); + + uint64_t const passedMs = timer.ElapsedMillis() - startMs; + if (passedMs < m_pushDelayMs) + threads::Sleep(m_pushDelayMs - passedMs); + } + + LOG(LINFO, ("Tracking Reporter finished")); +} + +void WorkerImpl::Stop() { m_stop = true; } +void WorkerImpl::FetchInput() +{ + lock_guard<mutex> lg(m_inputMutex); + m_points.insert(m_points.end(), m_input.begin(), m_input.end()); + m_input.clear(); +} + +bool WorkerImpl::SendPoints() +{ + if (m_points.empty()) + return true; + + if (m_wasConnected) + m_wasConnected = m_realtimeSender.Send(m_points); + + if (m_wasConnected) + return true; + + double currentTime = my::Timer::LocalTime(); + if (currentTime < m_lastConnectTryTime + kReconnectDelaySeconds) + return false; + + m_lastConnectTryTime = currentTime; + + m_wasConnected = m_realtimeSender.Reconnect(); + if (!m_wasConnected) + return false; + + m_wasConnected = m_realtimeSender.Send(m_points); + return m_wasConnected; +} +} // namespace diff --git a/tracking/reporter.hpp b/tracking/reporter.hpp new file mode 100644 index 0000000000..13b5ffea7a --- /dev/null +++ b/tracking/reporter.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include "std/unique_ptr.hpp" + +namespace location +{ +class GpsInfo; +} + +namespace platform +{ +class Socket; +} + +namespace tracking +{ +class Reporter final +{ +public: + static size_t constexpr kPushDelayMs = 10000; + static const char kAllowKey[]; + + Reporter(unique_ptr<platform::Socket> socket, size_t pushDelayMs); + ~Reporter(); + void AddLocation(location::GpsInfo const & info); + + class Worker + { + public: + virtual void AddLocation(location::GpsInfo const & info) = 0; + virtual void Stop() = 0; + }; + +private: + Worker * m_worker; +}; + +} // namespace tracking diff --git a/tracking/tracking.pro b/tracking/tracking.pro new file mode 100644 index 0000000000..2418e67637 --- /dev/null +++ b/tracking/tracking.pro @@ -0,0 +1,15 @@ +TARGET = tracking +TEMPLATE = lib +CONFIG += staticlib warn_on + +ROOT_DIR = .. + +include($$ROOT_DIR/common.pri) + +SOURCES += \ + connection.cpp \ + reporter.cpp \ + +HEADERS += \ + connection.hpp \ + reporter.hpp \ diff --git a/tracking/tracking_tests/reporter_tests.cpp b/tracking/tracking_tests/reporter_tests.cpp new file mode 100644 index 0000000000..f8dc89d38b --- /dev/null +++ b/tracking/tracking_tests/reporter_tests.cpp @@ -0,0 +1,62 @@ +#include "base/thread.hpp" +#include "coding/traffic.hpp" + +#include "platform/location.hpp" +#include "platform/socket.hpp" + +#include "std/cmath.hpp" + +#include "testing/testing.hpp" + +#include "tracking/reporter.hpp" + +namespace +{ +template <class Condition> +bool WaitCondition(Condition condition, size_t durationMs = 1000) +{ + size_t sleepMs = 10; + size_t cyclesLimit = durationMs / sleepMs; + for (size_t i = 0; i < cyclesLimit; ++i) + { + threads::Sleep(sleepMs); + if (condition()) + return true; + } + + return false; +} +} // namespace + +using namespace tracking; + +UNIT_TEST(Reporter_TransferLocation) +{ + unique_ptr<platform::TestSocket> socket = platform::createTestSocket(); + platform::TestSocket * testSocket = socket.get(); + + Reporter reporter(move(socket), 10); + + location::GpsInfo gpsInfo; + gpsInfo.m_timestamp = 3.0; + gpsInfo.m_latitude = 4.0; + gpsInfo.m_longitude = 5.0; + reporter.AddLocation(gpsInfo); + + TEST(WaitCondition([testSocket] { return testSocket->HasOutput(); }), ()); + + vector<uint8_t> buffer; + testSocket->ReadServer(buffer); + + vector<coding::TrafficGPSEncoder::DataPoint> points; + MemReader memReader(buffer.data(), buffer.size()); + ReaderSource<MemReader> src(memReader); + coding::TrafficGPSEncoder::DeserializeDataPoints(coding::TrafficGPSEncoder::kLatestVersion, src, + points); + + TEST_EQUAL(points.size(), 1, ()); + coding::TrafficGPSEncoder::DataPoint const & point = points[0]; + TEST_EQUAL(point.m_timestamp, 3, ()); + TEST(abs(point.m_latLon.lat - 4.0) < 0.001, ()); + TEST(abs(point.m_latLon.lon - 5.0) < 0.001, ()); +} diff --git a/tracking/tracking_tests/tracking_tests.pro b/tracking/tracking_tests/tracking_tests.pro new file mode 100644 index 0000000000..f37f3fb860 --- /dev/null +++ b/tracking/tracking_tests/tracking_tests.pro @@ -0,0 +1,29 @@ +TARGET = tracking_tests +CONFIG += console warn_on +CONFIG -= app_bundle +TEMPLATE = app + +ROOT_DIR = ../.. + +INCLUDEPATH *= $$ROOT_DIR/3party/jansson/src + +DEPENDENCIES = base coding geometry platform routing stats_client tracking + +include($$ROOT_DIR/common.pri) + +DEFINES *= OMIM_UNIT_TEST_WITH_QT_EVENT_LOOP + +QT *= core + +macx-* { + QT *= widgets # needed for QApplication with event loop, to test async events + LIBS *= "-framework IOKit" "-framework SystemConfiguration" +} + +win*|linux* { + QT *= network +} + +SOURCES += \ + $$ROOT_DIR/testing/testingmain.cpp \ + reporter_tests.cpp \ |