diff options
author | Arsentiy Milchakov <milcars@mapswithme.com> | 2017-12-18 15:54:27 +0300 |
---|---|---|
committer | Roman Kuznetsov <r.kuznetsow@gmail.com> | 2017-12-19 16:28:12 +0300 |
commit | 9d7092946ad908e4e75aaf05ce50a6e81ffcdccf (patch) | |
tree | bca31a5b5ce7d5bdb8d89d04fb8c0889c56339b8 /local_ads | |
parent | 70090735f71838be7bd92656959b502af7f1f83b (diff) |
[local_ads][user] reduced number of threads
Diffstat (limited to 'local_ads')
-rw-r--r-- | local_ads/local_ads_tests/statistics_tests.cpp | 1 | ||||
-rw-r--r-- | local_ads/statistics.cpp | 102 | ||||
-rw-r--r-- | local_ads/statistics.hpp | 14 |
3 files changed, 20 insertions, 97 deletions
diff --git a/local_ads/local_ads_tests/statistics_tests.cpp b/local_ads/local_ads_tests/statistics_tests.cpp index f79fa0b0dc..efe791a04a 100644 --- a/local_ads/local_ads_tests/statistics_tests.cpp +++ b/local_ads/local_ads_tests/statistics_tests.cpp @@ -12,7 +12,6 @@ public: StatisticsGuard(local_ads::Statistics & statistics) : m_statistics(statistics) {} ~StatisticsGuard() { - m_statistics.Teardown(); m_statistics.CleanupAfterTesting(); } diff --git a/local_ads/statistics.cpp b/local_ads/statistics.cpp index e55c44b4c0..ae1e4e98f7 100644 --- a/local_ads/statistics.cpp +++ b/local_ads/statistics.cpp @@ -20,6 +20,7 @@ #include <functional> #include <sstream> +#include <utility> #include "private.h" @@ -159,87 +160,35 @@ std::string MakeRemoteURL(std::string const & userId, std::string const & name, namespace local_ads { -Statistics::~Statistics() -{ - std::lock_guard<std::mutex> lock(m_mutex); - ASSERT(!m_isRunning, ()); -} - void Statistics::Startup() { + auto const asyncTask = [this] { - std::lock_guard<std::mutex> lock(m_mutex); - if (m_isRunning) - return; - m_isRunning = true; - } - m_thread = threads::SimpleThread(&Statistics::ThreadRoutine, this); -} + SendToServer(); + }; -void Statistics::Teardown() -{ + auto const recursiveAsyncTask = [asyncTask] { - std::lock_guard<std::mutex> lock(m_mutex); - if (!m_isRunning) - return; - m_isRunning = false; - } - m_condition.notify_one(); - m_thread.join(); -} + asyncTask(); + GetPlatform().RunDelayedTask(Platform::Thread::File, kSendingTimeout, asyncTask); + }; -bool Statistics::RequestEvents(std::list<Event> & events, bool & needToSend) -{ - std::unique_lock<std::mutex> lock(m_mutex); - - bool const isTimeout = !m_condition.wait_for(lock, kSendingTimeout, [this] + // The first send immediately, and then every |kSendingTimeout|. + GetPlatform().RunTask(Platform::Thread::File, [recursiveAsyncTask] { - return !m_isRunning || !m_events.empty(); + recursiveAsyncTask(); }); - - if (!m_isRunning) - return false; - - using namespace std::chrono; - needToSend = m_isFirstSending || isTimeout || - (std::chrono::steady_clock::now() > (m_lastSending + kSendingTimeout)); - - events = std::move(m_events); - m_events.clear(); - return true; } void Statistics::RegisterEvent(Event && event) { - std::lock_guard<std::mutex> lock(m_mutex); - if (!m_isRunning) - return; - m_events.push_back(std::move(event)); - m_condition.notify_one(); + RegisterEvents({std::move(event)}); } void Statistics::RegisterEvents(std::list<Event> && events) { - std::lock_guard<std::mutex> lock(m_mutex); - if (!m_isRunning) - return; - m_events.splice(m_events.end(), std::move(events)); - m_condition.notify_one(); -} - -void Statistics::ThreadRoutine() -{ - std::list<Event> events; - bool needToSend = false; - while (RequestEvents(events, needToSend)) - { - ProcessEvents(events); - events.clear(); - - // Send statistics to server. - if (needToSend) - SendToServer(); - } + GetPlatform().RunTask(Platform::Thread::File, + std::bind(&Statistics::ProcessEvents, this, std::move(events))); } std::list<Event> Statistics::WriteEvents(std::list<Event> & events, std::string & fileNameToRebuild) @@ -385,14 +334,6 @@ void Statistics::ProcessEvents(std::list<Event> & events) void Statistics::SendToServer() { - std::string userId; - ServerSerializer serializer; - { - std::lock_guard<std::mutex> lock(m_mutex); - userId = m_userId; - serializer = m_serverSerializer; - } - for (auto it = m_metadataCache.begin(); it != m_metadataCache.end();) { std::string const url = MakeRemoteURL(m_userId, it->first.first, it->first.second); @@ -408,8 +349,8 @@ void Statistics::SendToServer() std::string contentType = "application/octet-stream"; std::string contentEncoding = ""; - std::vector<uint8_t> bytes = serializer != nullptr - ? serializer(events, userId, contentType, contentEncoding) + std::vector<uint8_t> bytes = m_serverSerializer != nullptr + ? m_serverSerializer(events, m_userId, contentType, contentEncoding) : SerializeForServer(events); ASSERT(!bytes.empty(), ()); @@ -432,15 +373,13 @@ void Statistics::SendToServer() ++it; } } - m_lastSending = std::chrono::steady_clock::now(); - m_isFirstSending = false; } std::vector<uint8_t> Statistics::SerializeForServer(std::list<Event> const & events) const { ASSERT(!events.empty(), ()); - // TODO: implement serialization + // TODO: implement binary serialization (so far, we are using json serialization). return std::vector<uint8_t>{1, 2, 3, 4, 5}; } @@ -534,8 +473,7 @@ void Statistics::BalanceMemory() void Statistics::SetUserId(std::string const & userId) { - std::lock_guard<std::mutex> lock(m_mutex); - m_userId = userId; + GetPlatform().RunTask(Platform::Thread::File, [this, userId] { m_userId = userId; }); } std::list<Event> Statistics::ReadEventsForTesting(std::string const & fileName) @@ -558,7 +496,7 @@ void Statistics::CleanupAfterTesting() void Statistics::SetCustomServerSerializer(ServerSerializer && serializer) { - std::lock_guard<std::mutex> lock(m_mutex); - m_serverSerializer = std::move(serializer); + GetPlatform().RunTask(Platform::Thread::File, + [this, serializer] { m_serverSerializer = serializer; }); } } // namespace local_ads diff --git a/local_ads/statistics.hpp b/local_ads/statistics.hpp index 875bf62c3f..f5a71353f0 100644 --- a/local_ads/statistics.hpp +++ b/local_ads/statistics.hpp @@ -33,10 +33,8 @@ public: }; Statistics() = default; - ~Statistics(); void Startup(); - void Teardown(); void SetUserId(std::string const & userId); @@ -52,9 +50,6 @@ public: void CleanupAfterTesting(); private: - void ThreadRoutine(); - bool RequestEvents(std::list<Event> & events, bool & needToSend); - void IndexMetadata(); void ExtractMetadata(std::string const & fileName); void BalanceMemory(); @@ -79,17 +74,8 @@ private: } }; std::map<MetadataKey, Metadata> m_metadataCache; - std::chrono::steady_clock::time_point m_lastSending; - bool m_isFirstSending = true; std::string m_userId; ServerSerializer m_serverSerializer; - - bool m_isRunning = false; - std::list<Event> m_events; - - std::condition_variable m_condition; - std::mutex m_mutex; - threads::SimpleThread m_thread; }; } // namespace local_ads |