diff options
author | Anatoly Serdtcev <serdtcev@maps.me> | 2019-01-25 16:36:11 +0300 |
---|---|---|
committer | Anatoly Serdtcev <serdtcev@maps.me> | 2019-01-31 14:15:44 +0300 |
commit | 28ba5721273e5998b45cddb4066ed8a7ae5aadbb (patch) | |
tree | 13fd000c7415def55abc56bf1b8ee88d86f16712 /geocoder | |
parent | 1ea259ab053e2dce54eebfcb83d330a831115685 (diff) |
[geocoder] Fix for review
Diffstat (limited to 'geocoder')
-rw-r--r-- | geocoder/geocoder_tests/geocoder_tests.cpp | 58 | ||||
-rw-r--r-- | geocoder/hierarchy_reader.cpp | 95 | ||||
-rw-r--r-- | geocoder/hierarchy_reader.hpp | 10 |
3 files changed, 105 insertions, 58 deletions
diff --git a/geocoder/geocoder_tests/geocoder_tests.cpp b/geocoder/geocoder_tests/geocoder_tests.cpp index 08d64bc6e7..6507ddc0a0 100644 --- a/geocoder/geocoder_tests/geocoder_tests.cpp +++ b/geocoder/geocoder_tests/geocoder_tests.cpp @@ -54,35 +54,6 @@ void TestGeocoder(Geocoder & geocoder, string const & query, vector<Result> && e } } -UNIT_TEST(Geocoder_EmptyFileConcurrentRead) -{ - ScopedFile const regionsJsonFile("regions.jsonl", ""); - Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); - - TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ()); -} - -UNIT_TEST(Geocoder_BigFileConcurrentRead) -{ - int const kEntryCount = 1000000; - - stringstream s; - for (int i = 0; i < kEntryCount; ++i) - { - s << i << " " - << "{" - << R"("type": "Feature",)" - << R"("geometry": {"type": "Point", "coordinates": [0, 0]},)" - << R"("properties": {"name": ")" << i << R"(", "rank": 2, "address": {"country": ")" << i << R"("}})" - << "}\n"; - } - - ScopedFile const regionsJsonFile("regions.jsonl", s.str()); - Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); - - TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ()); -} - UNIT_TEST(Geocoder_Smoke) { ScopedFile const regionsJsonFile("regions.jsonl", kRegionsData); @@ -180,4 +151,33 @@ UNIT_TEST(Geocoder_MismatchedLocality) // "Street 3" looks almost like a match to "Paris-Street-3" but we should not emit it. TestGeocoder(geocoder, "Moscow Street 3", {}); } + +UNIT_TEST(Geocoder_EmptyFileConcurrentRead) +{ + ScopedFile const regionsJsonFile("regions.jsonl", ""); + Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + + TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ()); +} + +UNIT_TEST(Geocoder_BigFileConcurrentRead) +{ + int const kEntryCount = 1000000; + + stringstream s; + for (int i = 0; i < kEntryCount; ++i) + { + s << i << " " + << "{" + << R"("type": "Feature",)" + << R"("geometry": {"type": "Point", "coordinates": [0, 0]},)" + << R"("properties": {"name": ")" << i << R"(", "rank": 2, "address": {"country": ")" << i << R"("}})" + << "}\n"; + } + + ScopedFile const regionsJsonFile("regions.jsonl", s.str()); + Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + + TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ()); +} } // namespace geocoder diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index 2ac3e1ae8b..3f8cdd5e0e 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -1,7 +1,12 @@ #include "geocoder/hierarchy_reader.hpp" #include "base/logging.hpp" +#include "base/string_utils.hpp" +#include <boost/iostreams/device/file.hpp> +#include <boost/iostreams/filter/gzip.hpp> + +#include <queue> #include <thread> using namespace std; @@ -14,10 +19,15 @@ namespace size_t const kLogBatch = 100000; } // namespace -HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) : - m_fileStm{pathToJsonHierarchy} +HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) { - if (!m_fileStm) + using namespace boost::iostreams; + + if (strings::EndsWith(pathToJsonHierarchy, ".gz")) + m_fileStream.push(gzip_decompressor()); + m_fileStream.push(file_source(pathToJsonHierarchy)); + + if (!m_fileStream) MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy)); } @@ -25,7 +35,8 @@ vector<Hierarchy::Entry> HierarchyReader::ReadEntries(size_t readersCount, Parsi { LOG(LINFO, ("Reading entries...")); - readersCount = min(readersCount, size_t{8}); + readersCount = min(readersCount, size_t{thread::hardware_concurrency()}); + vector<multimap<base::GeoObjectId, Entry>> taskEntries(readersCount); vector<thread> tasks{}; for (size_t t = 0; t < readersCount; ++t) @@ -37,10 +48,10 @@ vector<Hierarchy::Entry> HierarchyReader::ReadEntries(size_t readersCount, Parsi if (stats.m_numLoaded % kLogBatch != 0) LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); - return UnionEntries(taskEntries); + return MergeEntries(taskEntries); } -vector<Hierarchy::Entry> HierarchyReader::UnionEntries(vector<multimap<base::GeoObjectId, Entry>> & entryParts) +vector<Hierarchy::Entry> HierarchyReader::MergeEntries(vector<multimap<base::GeoObjectId, Entry>> & entryParts) { auto entries = vector<Entry>{}; @@ -50,20 +61,30 @@ vector<Hierarchy::Entry> HierarchyReader::UnionEntries(vector<multimap<base::Geo entries.reserve(size); - LOG(LINFO, ("Sorting entries...")); + LOG(LINFO, ("Merging entries...")); + + using PartReference = reference_wrapper<multimap<base::GeoObjectId, Entry>>; + struct ReferenceGreater + { + bool operator () (PartReference const & l, PartReference const & r) const noexcept + { return l.get() > r.get(); } + }; - while (entryParts.size()) + auto partsQueue = priority_queue<PartReference, std::vector<PartReference>, ReferenceGreater> + (entryParts.begin(), entryParts.end()); + while (partsQueue.size()) { - auto minPart = min_element(entryParts.begin(), entryParts.end()); + auto & minPart = partsQueue.top().get(); + partsQueue.pop(); - if (minPart->size()) + while (minPart.size() && (partsQueue.empty() || minPart <= partsQueue.top().get())) { - entries.emplace_back(move(minPart->begin()->second)); - minPart->erase(minPart->begin()); + entries.emplace_back(move(minPart.begin()->second)); + minPart.erase(minPart.begin()); } - if (minPart->empty()) - entryParts.erase(minPart); + if (minPart.size()) + partsQueue.push(ref(minPart)); } return entries; @@ -74,35 +95,59 @@ void HierarchyReader::ReadEntryMap(multimap<base::GeoObjectId, Entry> & entries, // Temporary local object for efficient concurent processing (individual cache line for container). auto localEntries = multimap<base::GeoObjectId, Entry>{}; - string line; + int const kLineBufferCapacity = 10000; + vector<string> linesBuffer(kLineBufferCapacity); + int bufferSize = 0; + while (true) { + bufferSize = 0; + { auto && lock = lock_guard<mutex>(m_mutex); - - if (!getline(m_fileStm, line)) - break; + + for (; bufferSize < kLineBufferCapacity; ++bufferSize) + { + if (!getline(m_fileStream, linesBuffer[bufferSize])) + break; + } } - + + if (!bufferSize) + break; + + DeserializeEntryMap(linesBuffer, bufferSize, localEntries, stats); + } + + entries = move(localEntries); +} + +void HierarchyReader::DeserializeEntryMap(vector<string> const & linesBuffer, int const bufferSize, + multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats) +{ + for (int i = 0; i < bufferSize; ++i) + { + auto & line = linesBuffer[i]; + if (line.empty()) continue; - auto const i = line.find(' '); + auto const p = line.find(' '); int64_t encodedId; - if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId)) + if (p == string::npos || !strings::to_any(line.substr(0, p), encodedId)) { LOG(LWARNING, ("Cannot read osm id. Line:", line)); ++stats.m_badOsmIds; continue; } - line = line.substr(i + 1); + auto json = line.substr(p + 1); Entry entry; // todo(@m) We should really write uints as uints. auto const osmId = base::GeoObjectId(static_cast<uint64_t>(encodedId)); entry.m_osmId = osmId; - if (!entry.DeserializeFromJSON(line, stats)) + if (!entry.DeserializeFromJSON(json, stats)) continue; if (entry.m_type == Type::Count) @@ -112,9 +157,7 @@ void HierarchyReader::ReadEntryMap(multimap<base::GeoObjectId, Entry> & entries, if (stats.m_numLoaded % kLogBatch == 0) LOG(LINFO, ("Read", (stats.m_numLoaded / kLogBatch) * kLogBatch, "entries")); - localEntries.emplace(osmId, move(entry)); + entries.emplace(osmId, move(entry)); } - - entries = move(localEntries); } } // namespace geocoder diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index e28a318fe6..70b21badb4 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -5,7 +5,8 @@ #include "base/exception.hpp" #include "base/geo_object_id.hpp" -#include <fstream> +#include <boost/iostreams/filtering_stream.hpp> + #include <map> #include <mutex> #include <string> @@ -27,9 +28,12 @@ public: private: void ReadEntryMap(std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats); - std::vector<Entry> UnionEntries(std::vector<std::multimap<base::GeoObjectId, Entry>> & entryParts); - std::ifstream m_fileStm; + void DeserializeEntryMap(std::vector<std::string> const & linesBuffer, int const bufferSize, + std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats); + std::vector<Entry> MergeEntries(std::vector<std::multimap<base::GeoObjectId, Entry>> & entryParts); + + boost::iostreams::filtering_istream m_fileStream; std::mutex m_mutex; }; } // namespace geocoder |