Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mapsme/omim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorConstantin Shalnev <c.shalnev@corp.mail.ru>2015-08-03 13:11:33 +0300
committerAlex Zolotarev <alex@maps.me>2015-09-23 02:59:04 +0300
commita37bd5801e9441da3767e0dc81558e8577c80dd7 (patch)
treeec61b50c4070f5bddd1657001f85acc13b8fc5c1 /routing
parentbb03689eccef22b95b88275ab2478e20edc1a145 (diff)
Improved AsyncRouter
Diffstat (limited to 'routing')
-rw-r--r--routing/async_router.cpp195
-rw-r--r--routing/async_router.hpp83
-rw-r--r--routing/routing_session.cpp21
-rw-r--r--routing/routing_session.hpp7
-rw-r--r--routing/routing_tests/async_router_test.cpp8
5 files changed, 235 insertions, 79 deletions
diff --git a/routing/async_router.cpp b/routing/async_router.cpp
index 444eee16ca..8a66dc7726 100644
--- a/routing/async_router.cpp
+++ b/routing/async_router.cpp
@@ -55,48 +55,117 @@ map<string, string> PrepareStatisticsData(string const & routerName,
} // namespace
-AsyncRouter::AsyncRouter(unique_ptr<IRouter> && router, unique_ptr<IOnlineFetcher> && fetcher,
- TRoutingStatisticsCallback const & routingStatisticsCallback,
+// ----------------------------------------------------------------------------------------------------------------------------
+
+AsyncRouter::RouterDelegateProxy::RouterDelegateProxy(TReadyCallback const & onReady,
+ RouterDelegate::TPointCheckCallback const & onPointCheck,
+ RouterDelegate::TProgressCallback const & onProgress,
+ uint32_t timeoutSec)
+ : m_onReady(onReady), m_onPointCheck(onPointCheck), m_onProgress(onProgress)
+{
+ m_delegate.Reset();
+ m_delegate.SetPointCheckCallback(bind(&RouterDelegateProxy::OnPointCheck, this, _1));
+ m_delegate.SetProgressCallback(bind(&RouterDelegateProxy::OnProgress, this, _1));
+ m_delegate.SetTimeout(timeoutSec);
+}
+
+void AsyncRouter::RouterDelegateProxy::OnReady(Route & route, IRouter::ResultCode resultCode)
+{
+ if (!m_onReady)
+ return;
+ lock_guard<mutex> l(m_guard);
+ if (m_delegate.IsCancelled())
+ return;
+ m_onReady(route, resultCode);
+}
+
+void AsyncRouter::RouterDelegateProxy::Cancel()
+{
+ lock_guard<mutex> l(m_guard);
+ m_delegate.Cancel();
+}
+
+void AsyncRouter::RouterDelegateProxy::OnProgress(float progress)
+{
+ if (!m_onProgress)
+ return;
+ lock_guard<mutex> l(m_guard);
+ if (m_delegate.IsCancelled())
+ return;
+ m_onProgress(progress);
+}
+
+void AsyncRouter::RouterDelegateProxy::OnPointCheck(m2::PointD const & pt)
+{
+ if (!m_onPointCheck)
+ return;
+ lock_guard<mutex> l(m_guard);
+ if (m_delegate.IsCancelled())
+ return;
+ m_onPointCheck(pt);
+}
+
+// ----------------------------------------------------------------------------------------------------------------------------
+
+AsyncRouter::AsyncRouter(TRoutingStatisticsCallback const & routingStatisticsCallback,
RouterDelegate::TPointCheckCallback const & pointCheckCallback)
- : m_absentFetcher(move(fetcher)),
- m_router(move(router)),
- m_routingStatisticsCallback(routingStatisticsCallback)
+ : m_threadExit(false), m_hasRequest(false), m_clearState(false),
+ m_routingStatisticsCallback(routingStatisticsCallback),
+ m_pointCheckCallback(pointCheckCallback)
{
- ASSERT(m_router, ());
- m_delegate.SetPointCheckCallback(pointCheckCallback);
- m_isReadyThread.clear();
+ m_thread = thread(bind(&AsyncRouter::ThreadFunc, this));
+}
+
+AsyncRouter::~AsyncRouter()
+{
+ {
+ unique_lock<mutex> ul(m_guard);
+
+ ResetDelegate();
+
+ m_threadExit = true;
+ m_threadCondVar.notify_one();
+ }
+
+ m_thread.join();
}
-AsyncRouter::~AsyncRouter() { ClearState(); }
+void AsyncRouter::SetRouter(unique_ptr<IRouter> && router, unique_ptr<IOnlineFetcher> && fetcher)
+{
+ unique_lock<mutex> ul(m_guard);
+
+ ResetDelegate();
+
+ m_router = move(router);
+ m_absentFetcher = move(fetcher);
+}
void AsyncRouter::CalculateRoute(m2::PointD const & startPoint, m2::PointD const & direction,
m2::PointD const & finalPoint, TReadyCallback const & readyCallback,
RouterDelegate::TProgressCallback const & progressCallback,
uint32_t timeoutSec)
{
- {
- lock_guard<mutex> paramsGuard(m_paramsMutex);
+ unique_lock<mutex> ul(m_guard);
- m_startPoint = startPoint;
- m_startDirection = direction;
- m_finalPoint = finalPoint;
+ m_startPoint = startPoint;
+ m_startDirection = direction;
+ m_finalPoint = finalPoint;
- m_delegate.Cancel();
- m_delegate.SetProgressCallback(progressCallback);
- }
+ ResetDelegate();
+
+ m_delegate = make_shared<RouterDelegateProxy>(readyCallback, m_pointCheckCallback, progressCallback, timeoutSec);
- GetPlatform().RunAsync(bind(&AsyncRouter::CalculateRouteImpl, this, readyCallback, timeoutSec));
+ m_hasRequest = true;
+ m_threadCondVar.notify_one();
}
void AsyncRouter::ClearState()
{
- // Send cancel flag to the algorithms.
- m_delegate.Cancel();
+ unique_lock<mutex> ul(m_guard);
- // And wait while it is finishing.
- lock_guard<mutex> routingGuard(m_routingMutex);
+ m_clearState = true;
- m_router->ClearState();
+ ResetDelegate();
}
void AsyncRouter::LogCode(IRouter::ResultCode code, double const elapsedSec)
@@ -137,34 +206,71 @@ void AsyncRouter::LogCode(IRouter::ResultCode code, double const elapsedSec)
case IRouter::InternalError:
LOG(LINFO, ("Internal error"));
break;
+ case IRouter::FileTooOld:
+ LOG(LINFO, ("File too old"));
+ break;
}
}
-void AsyncRouter::CalculateRouteImpl(TReadyCallback const & readyCallback, uint32_t timeoutSec)
+void AsyncRouter::ResetDelegate()
{
- ASSERT(m_router, ());
- if (m_isReadyThread.test_and_set())
- return;
+ if (m_delegate)
+ {
+ m_delegate->Cancel();
+ m_delegate.reset();
+ }
+}
- Route route(m_router->GetName());
- IRouter::ResultCode code;
+void AsyncRouter::ThreadFunc()
+{
+ while (true)
+ {
+ {
+ unique_lock<mutex> ul(m_guard);
+ m_threadCondVar.wait(ul, [this](){ return m_threadExit || m_hasRequest; });
- lock_guard<mutex> routingGuard(m_routingMutex);
+ if (m_threadExit)
+ break;
+ }
- m_isReadyThread.clear();
+ CalculateRoute();
+ }
+}
+void AsyncRouter::CalculateRoute()
+{
+ bool clearState = true;
+ shared_ptr<RouterDelegateProxy> delegate;
m2::PointD startPoint, finalPoint, startDirection;
+ shared_ptr<IOnlineFetcher> absentFetcher;
+ shared_ptr<IRouter> router;
+
{
- lock_guard<mutex> paramsGuard(m_paramsMutex);
+ unique_lock<mutex> ul(m_guard);
+
+ bool hasRequest = m_hasRequest;
+ m_hasRequest = false;
+ if (!hasRequest)
+ return;
+ if (!m_router)
+ return;
+ if (!m_delegate)
+ return;
startPoint = m_startPoint;
finalPoint = m_finalPoint;
startDirection = m_startDirection;
+ clearState = m_clearState;
+ delegate = m_delegate;
+ router = m_router;
+ absentFetcher = m_absentFetcher;
- m_delegate.Reset();
- m_delegate.SetTimeout(timeoutSec);
+ m_clearState = false;
}
+ Route route(router->GetName());
+ IRouter::ResultCode code;
+
my::Timer timer;
double elapsedSec = 0.0;
@@ -172,11 +278,14 @@ void AsyncRouter::CalculateRouteImpl(TReadyCallback const & readyCallback, uint3
{
LOG(LDEBUG, ("Calculating the route from", startPoint, "to", finalPoint, "startDirection", startDirection));
- if (m_absentFetcher)
- m_absentFetcher->GenerateRequest(startPoint, finalPoint);
+ if (absentFetcher)
+ absentFetcher->GenerateRequest(startPoint, finalPoint);
+
+ if (clearState)
+ router->ClearState();
// Run basic request.
- code = m_router->CalculateRoute(startPoint, startDirection, finalPoint, m_delegate, route);
+ code = router->CalculateRoute(startPoint, startDirection, finalPoint, delegate->GetDelegate(), route);
elapsedSec = timer.ElapsedSeconds(); // routing time
LogCode(code, elapsedSec);
@@ -186,23 +295,23 @@ void AsyncRouter::CalculateRouteImpl(TReadyCallback const & readyCallback, uint3
code = IRouter::InternalError;
LOG(LERROR, ("Exception happened while calculating route:", e.Msg()));
SendStatistics(startPoint, startDirection, finalPoint, e.Msg());
- readyCallback(route, code);
+ delegate->OnReady(route, code);
return;
}
SendStatistics(startPoint, startDirection, finalPoint, code, route, elapsedSec);
- //Draw route without waiting network latency.
+ // Draw route without waiting network latency.
if (code == IRouter::NoError)
- readyCallback(route, code);
+ delegate->OnReady(route, code);
bool const needFetchAbsent = (code != IRouter::Cancelled);
// Check online response if we have.
vector<string> absent;
- if (m_absentFetcher && needFetchAbsent)
+ if (absentFetcher && needFetchAbsent)
{
- m_absentFetcher->GetAbsentCountries(absent);
+ absentFetcher->GetAbsentCountries(absent);
for (string const & country : absent)
route.AddAbsentCountry(country);
}
@@ -215,7 +324,7 @@ void AsyncRouter::CalculateRouteImpl(TReadyCallback const & readyCallback, uint3
// Call callback only if we have some new data.
if (code != IRouter::NoError)
- readyCallback(route, code);
+ delegate->OnReady(route, code);
}
void AsyncRouter::SendStatistics(m2::PointD const & startPoint, m2::PointD const & startDirection,
diff --git a/routing/async_router.hpp b/routing/async_router.hpp
index 148df81826..a290e8702b 100644
--- a/routing/async_router.hpp
+++ b/routing/async_router.hpp
@@ -5,16 +5,19 @@
#include "router.hpp"
#include "router_delegate.hpp"
-#include "std/atomic.hpp"
+#include "std/condition_variable.hpp"
#include "std/map.hpp"
#include "std/mutex.hpp"
+#include "std/shared_ptr.hpp"
#include "std/string.hpp"
+#include "std/thread.hpp"
#include "std/unique_ptr.hpp"
namespace routing
{
-class AsyncRouter
+/// Dispatches a route calculation on a worker thread
+class AsyncRouter final
{
public:
/// Callback takes ownership of passed route.
@@ -24,13 +27,14 @@ public:
using TRoutingStatisticsCallback = function<void(map<string, string> const &)>;
/// AsyncRouter is a wrapper class to run routing routines in the different thread
- /// @param router pointer to the router implementation. AsyncRouter will take ownership over
- /// router.
- AsyncRouter(unique_ptr<IRouter> && router, unique_ptr<IOnlineFetcher> && fetcher,
- TRoutingStatisticsCallback const & routingStatisticsCallback,
+ AsyncRouter(TRoutingStatisticsCallback const & routingStatisticsCallback,
RouterDelegate::TPointCheckCallback const & pointCheckCallback);
+ ~AsyncRouter();
- virtual ~AsyncRouter();
+ /// Sets a synchronous router, current route calculation will be cancelled
+ /// @param router pointer to a router implementation
+ /// @param fetcher pointer to a online fetcher
+ void SetRouter(unique_ptr<IRouter> && router, unique_ptr<IOnlineFetcher> && fetcher);
/// Main method to calulate new route from startPt to finalPt with start direction
/// Processed result will be passed to callback. Callback will called at GUI thread.
@@ -41,17 +45,22 @@ public:
/// @param readyCallback function to return routing result
/// @param progressCallback function to update the router progress
/// @param timeoutSec timeout to cancel routing. 0 is infinity.
- virtual void CalculateRoute(m2::PointD const & startPoint, m2::PointD const & direction,
- m2::PointD const & finalPoint, TReadyCallback const & readyCallback,
- RouterDelegate::TProgressCallback const & progressCallback,
- uint32_t timeoutSec);
+ void CalculateRoute(m2::PointD const & startPoint, m2::PointD const & direction,
+ m2::PointD const & finalPoint, TReadyCallback const & readyCallback,
+ RouterDelegate::TProgressCallback const & progressCallback,
+ uint32_t timeoutSec);
/// Interrupt routing and clear buffers
- virtual void ClearState();
+ void ClearState();
private:
- /// This function is called in async mode
- void CalculateRouteImpl(TReadyCallback const & readyCallback, uint32_t timeoutSec);
+ /// Worker thread function
+ void ThreadFunc();
+
+ /// This function is called in worker thread
+ void CalculateRoute();
+
+ void ResetDelegate();
/// These functions are called to send statistics about the routing
void SendStatistics(m2::PointD const & startPoint, m2::PointD const & startDirection,
@@ -65,19 +74,51 @@ private:
void LogCode(IRouter::ResultCode code, double const elapsedSec);
- mutex m_paramsMutex;
- mutex m_routingMutex;
- atomic_flag m_isReadyThread;
+ /// Blocks callbacks when routing has been cancelled
+ class RouterDelegateProxy
+ {
+ public:
+ RouterDelegateProxy(TReadyCallback const & onReady,
+ RouterDelegate::TPointCheckCallback const & onPointCheck,
+ RouterDelegate::TProgressCallback const & onProgress,
+ uint32_t timeoutSec);
+
+ void OnReady(Route & route, IRouter::ResultCode resultCode);
+ void Cancel();
+
+ RouterDelegate const & GetDelegate() const { return m_delegate; }
+
+ private:
+ void OnProgress(float progress);
+ void OnPointCheck(m2::PointD const & pt);
+ mutex m_guard;
+ TReadyCallback const m_onReady;
+ RouterDelegate::TPointCheckCallback const m_onPointCheck;
+ RouterDelegate::TProgressCallback const m_onProgress;
+ RouterDelegate m_delegate;
+ };
+
+private:
+ mutex m_guard;
+
+ /// Thread which executes routing calculation
+ thread m_thread;
+ condition_variable m_threadCondVar;
+ bool m_threadExit;
+ bool m_hasRequest;
+
+ /// Current request parameters
+ bool m_clearState;
m2::PointD m_startPoint;
m2::PointD m_finalPoint;
m2::PointD m_startDirection;
+ shared_ptr<RouterDelegateProxy> m_delegate;
+ shared_ptr<IOnlineFetcher> m_absentFetcher;
+ shared_ptr<IRouter> m_router;
- RouterDelegate m_delegate;
-
- unique_ptr<IOnlineFetcher> const m_absentFetcher;
- unique_ptr<IRouter> const m_router;
TRoutingStatisticsCallback const m_routingStatisticsCallback;
+ RouterDelegate::TPointCheckCallback const m_pointCheckCallback;
};
} // namespace routing
diff --git a/routing/routing_session.cpp b/routing/routing_session.cpp
index 8303a49051..0f8fe03ce9 100644
--- a/routing/routing_session.cpp
+++ b/routing/routing_session.cpp
@@ -34,6 +34,13 @@ RoutingSession::RoutingSession()
{
}
+void RoutingSession::Init(TRoutingStatisticsCallback const & routingStatisticsFn,
+ RouterDelegate::TPointCheckCallback const & pointCheckCallback)
+{
+ ASSERT(m_router == nullptr, ());
+ m_router.reset(new AsyncRouter(routingStatisticsFn, pointCheckCallback));
+}
+
void RoutingSession::BuildRoute(m2::PointD const & startPoint, m2::PointD const & endPoint,
TReadyCallback const & readyCallback,
TProgressCallback const & progressCallback,
@@ -91,6 +98,8 @@ void RoutingSession::RemoveRoute()
void RoutingSession::Reset()
{
+ ASSERT(m_router != nullptr, ());
+
threads::MutexGuard guard(m_routeSessionMutex);
UNUSED_VALUE(guard);
@@ -240,15 +249,11 @@ void RoutingSession::AssignRoute(Route & route, IRouter::ResultCode e)
}
void RoutingSession::SetRouter(unique_ptr<IRouter> && router,
- unique_ptr<OnlineAbsentCountriesFetcher> && fetcher,
- TRoutingStatisticsCallback const & routingStatisticsFn,
- RouterDelegate::TPointCheckCallback const & pointCheckCallback)
+ unique_ptr<OnlineAbsentCountriesFetcher> && fetcher)
{
- if (m_router)
- Reset();
-
- m_router.reset(new AsyncRouter(move(router), move(fetcher), routingStatisticsFn,
- pointCheckCallback));
+ ASSERT(m_router != nullptr, ());
+ Reset();
+ m_router->SetRouter(move(router), move(fetcher));
}
void RoutingSession::MatchLocationToRoute(location::GpsInfo & location,
diff --git a/routing/routing_session.hpp b/routing/routing_session.hpp
index cf759c2532..f4e67f6a90 100644
--- a/routing/routing_session.hpp
+++ b/routing/routing_session.hpp
@@ -56,9 +56,10 @@ public:
RoutingSession();
- void SetRouter(unique_ptr<IRouter> && router, unique_ptr<OnlineAbsentCountriesFetcher> && fetcher,
- TRoutingStatisticsCallback const & routingStatisticsFn,
- RouterDelegate::TPointCheckCallback const & pointCheckCallback);
+ void Init(TRoutingStatisticsCallback const & routingStatisticsFn,
+ RouterDelegate::TPointCheckCallback const & pointCheckCallback);
+
+ void SetRouter(unique_ptr<IRouter> && router, unique_ptr<OnlineAbsentCountriesFetcher> && fetcher);
/// @param[in] startPoint and endPoint in mercator
/// @param[in] timeoutSec timeout in seconds, if zero then there is no timeout
diff --git a/routing/routing_tests/async_router_test.cpp b/routing/routing_tests/async_router_test.cpp
index 8f826800b3..83827f58b6 100644
--- a/routing/routing_tests/async_router_test.cpp
+++ b/routing/routing_tests/async_router_test.cpp
@@ -95,8 +95,8 @@ UNIT_TEST(NeedMoreMapsSignalTest)
unique_ptr<IOnlineFetcher> fetcher(new DummyFetcher(absentData));
unique_ptr<IRouter> router(new DummyRouter(ResultCode::NoError, {}));
DummyResultCallback resultCallback(2 /* expectedCalls */);
- AsyncRouter async(move(router), move(fetcher), DummyStatisticsCallback,
- nullptr /* pointCheckCallback */);
+ AsyncRouter async(DummyStatisticsCallback, nullptr /* pointCheckCallback */);
+ async.SetRouter(move(router), move(fetcher));
async.CalculateRoute({1, 2}, {3, 4}, {5, 6}, bind(ref(resultCallback), _1, _2),
nullptr /* progressCallback */, 0 /* timeoutSec */);
@@ -116,8 +116,8 @@ UNIT_TEST(StandartAsyncFogTest)
unique_ptr<IOnlineFetcher> fetcher(new DummyFetcher({}));
unique_ptr<IRouter> router(new DummyRouter(ResultCode::NoError, {}));
DummyResultCallback resultCallback(1 /* expectedCalls */);
- AsyncRouter async(move(router), move(fetcher), DummyStatisticsCallback,
- nullptr /* pointCheckCallback */);
+ AsyncRouter async(DummyStatisticsCallback, nullptr /* pointCheckCallback */);
+ async.SetRouter(move(router), move(fetcher));
async.CalculateRoute({1, 2}, {3, 4}, {5, 6}, bind(ref(resultCallback), _1, _2),
nullptr /* progressCallback */, 0 /* timeoutSec */);