Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mapsme/omim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnatoly Serdtcev <serdtcev@maps.me>2019-01-25 16:36:11 +0300
committerAnatoly Serdtcev <serdtcev@maps.me>2019-01-31 14:15:44 +0300
commit28ba5721273e5998b45cddb4066ed8a7ae5aadbb (patch)
tree13fd000c7415def55abc56bf1b8ee88d86f16712 /geocoder
parent1ea259ab053e2dce54eebfcb83d330a831115685 (diff)
[geocoder] Fix for review
Diffstat (limited to 'geocoder')
-rw-r--r--geocoder/geocoder_tests/geocoder_tests.cpp58
-rw-r--r--geocoder/hierarchy_reader.cpp95
-rw-r--r--geocoder/hierarchy_reader.hpp10
3 files changed, 105 insertions, 58 deletions
diff --git a/geocoder/geocoder_tests/geocoder_tests.cpp b/geocoder/geocoder_tests/geocoder_tests.cpp
index 08d64bc6e7..6507ddc0a0 100644
--- a/geocoder/geocoder_tests/geocoder_tests.cpp
+++ b/geocoder/geocoder_tests/geocoder_tests.cpp
@@ -54,35 +54,6 @@ void TestGeocoder(Geocoder & geocoder, string const & query, vector<Result> && e
}
}
-UNIT_TEST(Geocoder_EmptyFileConcurrentRead)
-{
- ScopedFile const regionsJsonFile("regions.jsonl", "");
- Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
-
- TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ());
-}
-
-UNIT_TEST(Geocoder_BigFileConcurrentRead)
-{
- int const kEntryCount = 1000000;
-
- stringstream s;
- for (int i = 0; i < kEntryCount; ++i)
- {
- s << i << " "
- << "{"
- << R"("type": "Feature",)"
- << R"("geometry": {"type": "Point", "coordinates": [0, 0]},)"
- << R"("properties": {"name": ")" << i << R"(", "rank": 2, "address": {"country": ")" << i << R"("}})"
- << "}\n";
- }
-
- ScopedFile const regionsJsonFile("regions.jsonl", s.str());
- Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
-
- TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ());
-}
-
UNIT_TEST(Geocoder_Smoke)
{
ScopedFile const regionsJsonFile("regions.jsonl", kRegionsData);
@@ -180,4 +151,33 @@ UNIT_TEST(Geocoder_MismatchedLocality)
// "Street 3" looks almost like a match to "Paris-Street-3" but we should not emit it.
TestGeocoder(geocoder, "Moscow Street 3", {});
}
+
+UNIT_TEST(Geocoder_EmptyFileConcurrentRead)
+{
+ ScopedFile const regionsJsonFile("regions.jsonl", "");
+ Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
+
+ TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ());
+}
+
+UNIT_TEST(Geocoder_BigFileConcurrentRead)
+{
+ int const kEntryCount = 1000000;
+
+ stringstream s;
+ for (int i = 0; i < kEntryCount; ++i)
+ {
+ s << i << " "
+ << "{"
+ << R"("type": "Feature",)"
+ << R"("geometry": {"type": "Point", "coordinates": [0, 0]},)"
+ << R"("properties": {"name": ")" << i << R"(", "rank": 2, "address": {"country": ")" << i << R"("}})"
+ << "}\n";
+ }
+
+ ScopedFile const regionsJsonFile("regions.jsonl", s.str());
+ Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
+
+ TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ());
+}
} // namespace geocoder
diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp
index 2ac3e1ae8b..3f8cdd5e0e 100644
--- a/geocoder/hierarchy_reader.cpp
+++ b/geocoder/hierarchy_reader.cpp
@@ -1,7 +1,12 @@
#include "geocoder/hierarchy_reader.hpp"
#include "base/logging.hpp"
+#include "base/string_utils.hpp"
+#include <boost/iostreams/device/file.hpp>
+#include <boost/iostreams/filter/gzip.hpp>
+
+#include <queue>
#include <thread>
using namespace std;
@@ -14,10 +19,15 @@ namespace
size_t const kLogBatch = 100000;
} // namespace
-HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) :
- m_fileStm{pathToJsonHierarchy}
+HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy)
{
- if (!m_fileStm)
+ using namespace boost::iostreams;
+
+ if (strings::EndsWith(pathToJsonHierarchy, ".gz"))
+ m_fileStream.push(gzip_decompressor());
+ m_fileStream.push(file_source(pathToJsonHierarchy));
+
+ if (!m_fileStream)
MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy));
}
@@ -25,7 +35,8 @@ vector<Hierarchy::Entry> HierarchyReader::ReadEntries(size_t readersCount, Parsi
{
LOG(LINFO, ("Reading entries..."));
- readersCount = min(readersCount, size_t{8});
+ readersCount = min(readersCount, size_t{thread::hardware_concurrency()});
+
vector<multimap<base::GeoObjectId, Entry>> taskEntries(readersCount);
vector<thread> tasks{};
for (size_t t = 0; t < readersCount; ++t)
@@ -37,10 +48,10 @@ vector<Hierarchy::Entry> HierarchyReader::ReadEntries(size_t readersCount, Parsi
if (stats.m_numLoaded % kLogBatch != 0)
LOG(LINFO, ("Read", stats.m_numLoaded, "entries"));
- return UnionEntries(taskEntries);
+ return MergeEntries(taskEntries);
}
-vector<Hierarchy::Entry> HierarchyReader::UnionEntries(vector<multimap<base::GeoObjectId, Entry>> & entryParts)
+vector<Hierarchy::Entry> HierarchyReader::MergeEntries(vector<multimap<base::GeoObjectId, Entry>> & entryParts)
{
auto entries = vector<Entry>{};
@@ -50,20 +61,30 @@ vector<Hierarchy::Entry> HierarchyReader::UnionEntries(vector<multimap<base::Geo
entries.reserve(size);
- LOG(LINFO, ("Sorting entries..."));
+ LOG(LINFO, ("Merging entries..."));
+
+ using PartReference = reference_wrapper<multimap<base::GeoObjectId, Entry>>;
+ struct ReferenceGreater
+ {
+ bool operator () (PartReference const & l, PartReference const & r) const noexcept
+ { return l.get() > r.get(); }
+ };
- while (entryParts.size())
+ auto partsQueue = priority_queue<PartReference, std::vector<PartReference>, ReferenceGreater>
+ (entryParts.begin(), entryParts.end());
+ while (partsQueue.size())
{
- auto minPart = min_element(entryParts.begin(), entryParts.end());
+ auto & minPart = partsQueue.top().get();
+ partsQueue.pop();
- if (minPart->size())
+ while (minPart.size() && (partsQueue.empty() || minPart <= partsQueue.top().get()))
{
- entries.emplace_back(move(minPart->begin()->second));
- minPart->erase(minPart->begin());
+ entries.emplace_back(move(minPart.begin()->second));
+ minPart.erase(minPart.begin());
}
- if (minPart->empty())
- entryParts.erase(minPart);
+ if (minPart.size())
+ partsQueue.push(ref(minPart));
}
return entries;
@@ -74,35 +95,59 @@ void HierarchyReader::ReadEntryMap(multimap<base::GeoObjectId, Entry> & entries,
// Temporary local object for efficient concurent processing (individual cache line for container).
auto localEntries = multimap<base::GeoObjectId, Entry>{};
- string line;
+ int const kLineBufferCapacity = 10000;
+ vector<string> linesBuffer(kLineBufferCapacity);
+ int bufferSize = 0;
+
while (true)
{
+ bufferSize = 0;
+
{
auto && lock = lock_guard<mutex>(m_mutex);
-
- if (!getline(m_fileStm, line))
- break;
+
+ for (; bufferSize < kLineBufferCapacity; ++bufferSize)
+ {
+ if (!getline(m_fileStream, linesBuffer[bufferSize]))
+ break;
+ }
}
-
+
+ if (!bufferSize)
+ break;
+
+ DeserializeEntryMap(linesBuffer, bufferSize, localEntries, stats);
+ }
+
+ entries = move(localEntries);
+}
+
+void HierarchyReader::DeserializeEntryMap(vector<string> const & linesBuffer, int const bufferSize,
+ multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats)
+{
+ for (int i = 0; i < bufferSize; ++i)
+ {
+ auto & line = linesBuffer[i];
+
if (line.empty())
continue;
- auto const i = line.find(' ');
+ auto const p = line.find(' ');
int64_t encodedId;
- if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId))
+ if (p == string::npos || !strings::to_any(line.substr(0, p), encodedId))
{
LOG(LWARNING, ("Cannot read osm id. Line:", line));
++stats.m_badOsmIds;
continue;
}
- line = line.substr(i + 1);
+ auto json = line.substr(p + 1);
Entry entry;
// todo(@m) We should really write uints as uints.
auto const osmId = base::GeoObjectId(static_cast<uint64_t>(encodedId));
entry.m_osmId = osmId;
- if (!entry.DeserializeFromJSON(line, stats))
+ if (!entry.DeserializeFromJSON(json, stats))
continue;
if (entry.m_type == Type::Count)
@@ -112,9 +157,7 @@ void HierarchyReader::ReadEntryMap(multimap<base::GeoObjectId, Entry> & entries,
if (stats.m_numLoaded % kLogBatch == 0)
LOG(LINFO, ("Read", (stats.m_numLoaded / kLogBatch) * kLogBatch, "entries"));
- localEntries.emplace(osmId, move(entry));
+ entries.emplace(osmId, move(entry));
}
-
- entries = move(localEntries);
}
} // namespace geocoder
diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp
index e28a318fe6..70b21badb4 100644
--- a/geocoder/hierarchy_reader.hpp
+++ b/geocoder/hierarchy_reader.hpp
@@ -5,7 +5,8 @@
#include "base/exception.hpp"
#include "base/geo_object_id.hpp"
-#include <fstream>
+#include <boost/iostreams/filtering_stream.hpp>
+
#include <map>
#include <mutex>
#include <string>
@@ -27,9 +28,12 @@ public:
private:
void ReadEntryMap(std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats);
- std::vector<Entry> UnionEntries(std::vector<std::multimap<base::GeoObjectId, Entry>> & entryParts);
- std::ifstream m_fileStm;
+ void DeserializeEntryMap(std::vector<std::string> const & linesBuffer, int const bufferSize,
+ std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats);
+ std::vector<Entry> MergeEntries(std::vector<std::multimap<base::GeoObjectId, Entry>> & entryParts);
+
+ boost::iostreams::filtering_istream m_fileStream;
std::mutex m_mutex;
};
} // namespace geocoder