KOSMIndoorMap

overpassquerymanager.cpp
1 /*
2  SPDX-FileCopyrightText: 2020 Volker Krause <[email protected]>
3 
4  SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6 
7 #include "overpassquerymanager.h"
8 #include "overpassquery.h"
9 
10 #include <QDateTime>
11 #include <QNetworkAccessManager>
12 #include <QNetworkDiskCache>
13 #include <QNetworkReply>
14 #include <QStandardPaths>
15 #include <QTimer>
16 #include <QUrl>
17 #include <QUrlQuery>
18 
19 #include <chrono>
20 #include <deque>
21 
22 using namespace OSM;
23 
24 namespace OSM {
25 struct OverpassQueryTask {
26  OverpassQuery *query = nullptr;
27  QRectF bbox;
28  bool forceReload = false;
29 };
30 
31 struct OverpassQueryExecutor {
32  QUrl endpoint;
33  std::chrono::seconds cooldownTime = std::chrono::seconds(3);
34  QDateTime nextSlot;
35  std::unique_ptr<OverpassQueryTask> task;
36 };
37 
38 class OverpassQueryManagerPrivate {
39 public:
40  void executeTasks();
41  void taskFinished(OverpassQueryExecutor *executor, QNetworkReply *reply);
42  void checkQueryFinished(OverpassQuery *query) const;
43  void cancelQuery(OverpassQuery *query);
44 
46  QNetworkAccessManager *m_nam;
47  QTimer *m_nextTaskTimer;
48  std::vector<OverpassQueryExecutor> m_executors;
49  std::deque<std::unique_ptr<OverpassQueryTask>> m_tasks;
50 };
51 }
52 
53 static const char* executor_configs[] = {
54  "https://overpass-api.de/api/interpreter",
55  "https://overpass.openstreetmap.fr/api/interpreter",
56 // "https://1.overpass.kumi.systems/api/interpreter",
57 // "https://2.overpass.kumi.systems/api/interpreter",
58 // "https://3.overpass.kumi.systems/api/interpreter",
59 // "https://4.overpass.kumi.systems/api/interpreter",
60 };
61 
62 OverpassQueryManager::OverpassQueryManager(QObject *parent)
63  : QObject(parent)
64  , d(new OverpassQueryManagerPrivate)
65 {
66  d->q = this;
67  d->m_nam = new QNetworkAccessManager(this);
68  d->m_nam->setRedirectPolicy(QNetworkRequest::NoLessSafeRedirectPolicy);
69  d->m_nam->setStrictTransportSecurityEnabled(true);
70  d->m_nam->enableStrictTransportSecurityStore(true, QStandardPaths::writableLocation(QStandardPaths::GenericCacheLocation) + QLatin1String("/org.kde.osm/hsts/"));
71 
72  auto diskCache = new QNetworkDiskCache;
74  diskCache->setMaximumCacheSize(1'000'000'000); // 1GB
75  d->m_nam->setCache(diskCache);
76 
77  d->m_nextTaskTimer = new QTimer(this);
78  d->m_nextTaskTimer->setSingleShot(true);
79  connect(d->m_nextTaskTimer, &QTimer::timeout, this, [this]() { d->executeTasks(); });
80 
81  for (const auto &config : executor_configs) {
82  OverpassQueryExecutor executor;
83  executor.endpoint = QUrl(QString::fromUtf8(config));
84  d->m_executors.push_back(std::move(executor));
85  }
86 }
87 
88 OverpassQueryManager::~OverpassQueryManager() = default;
89 
90 void OverpassQueryManager::execute(OverpassQuery *query)
91 {
92  // validate input
93  if (query->query().isEmpty() || query->boundingBox().isNull() || !query->boundingBox().isValid() || query->tileSize().isNull() || !query->tileSize().isValid()) {
94  query->m_error = OverpassQuery::QueryError;
95  QTimer::singleShot(0, query, &OverpassQuery::finished);
96  return;
97  }
98 
99  // generate tasks for the query
100  const auto xTileCount = std::max<int>(1, query->boundingBox().width() / query->tileSize().width());
101  const auto yTileCount = std::max<int>(1, query->boundingBox().height() / query->tileSize().height());
102  const auto xTileSize = query->boundingBox().width() / xTileCount;
103  const auto yTileSize = query->boundingBox().height() / yTileCount;
104  qDebug() << "Creating" << xTileCount * yTileCount << "tasks with tile size" << xTileSize << "x" << yTileSize;
105  for (auto x = 0; x < xTileCount; ++x) {
106  for (auto y = 0; y < yTileCount; ++y) {
107  auto task = std::make_unique<OverpassQueryTask>();
108  task->query = query;
109  task->bbox = { query->boundingBox().x() + x * xTileSize, query->boundingBox().y() + y * yTileSize, xTileSize, yTileSize };
110  d->m_tasks.push_back(std::move(task));
111  }
112  }
113 
114  d->executeTasks();
115 }
116 
117 void OverpassQueryManagerPrivate::executeTasks()
118 {
119  const auto now = QDateTime::currentDateTimeUtc();
120  std::chrono::seconds nextSlot = std::chrono::hours(1);
121 
122  for (auto &executor : m_executors) {
123  if (m_tasks.empty()) { // nothing to do
124  return;
125  }
126 
127  if (executor.task) { // executor is busy already
128  continue;
129  }
130 
131  if (executor.nextSlot > now) { // executor is still in rate limit cooldown
132  nextSlot = std::min(std::chrono::seconds(now.secsTo(executor.nextSlot)), nextSlot);
133  nextSlot += std::chrono::seconds(1); // for msec rounding errors that would other wise give us a busy loop
134  if (m_tasks.front()->forceReload) {
135  continue;
136  }
137  }
138 
139  executor.task = std::move(m_tasks.front());
140  m_tasks.pop_front();
141 
142  // actually execute query
143  auto url = executor.endpoint;
144  QUrlQuery params;
145  params.addQueryItem(QStringLiteral("data"), executor.task->query->query(executor.task->bbox));
146  url.setQuery(params);
147  QNetworkRequest req(url);
148  req.setAttribute(QNetworkRequest::CacheLoadControlAttribute, executor.task->forceReload ? QNetworkRequest::PreferNetwork : QNetworkRequest::AlwaysCache);
149  auto reply = m_nam->get(req);
150  // TODO enable stream parsing for XML replies by connecting to QNetworkReply::readyRead
151  QObject::connect(reply, &QNetworkReply::finished, q, [this, &executor, reply]() {
152  taskFinished(&executor, reply);
153  reply->deleteLater();
154  });
155  }
156 
157  m_nextTaskTimer->start(nextSlot);
158 }
159 
160 void OverpassQueryManagerPrivate::taskFinished(OverpassQueryExecutor *executor, QNetworkReply *reply)
161 {
162  auto query = executor->task->query;
163  if (reply->error() == QNetworkReply::UnknownContentError && reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt() == 429) {
164  // rate limiting error
165  executor->cooldownTime *= 2;
166  qDebug() << "rate limit error, increasing cooldown time to" << executor->cooldownTime.count() << "seconds";
167  m_tasks.push_back(std::move(executor->task));
168  } else if (reply->error() == QNetworkReply::ContentNotFoundError && !executor->task->forceReload) {
169  // cache miss, retry from network
170  executor->task->forceReload = true;
171  m_tasks.push_back(std::move(executor->task));
172  } else if (reply->error() != QNetworkReply::NoError) {
173  // TODO disable affected executors here and reschedule the failed task, rather than cancelling entirely
174  qDebug() << reply->error() << reply->errorString() << reply->attribute(QNetworkRequest::HttpStatusCodeAttribute) << reply->readAll();
175  query->m_error = OverpassQuery::NetworkError;
176  cancelQuery(query);
177  } else {
178  const auto queryError = query->processReply(reply);
179  // on query timeout, break up the task in 4 sub-tasks, if we are allowed to
180  if (queryError == OverpassQuery::QueryTimeout
181  && executor->task->bbox.width() > query->minimumTileSize().width()
182  && executor->task->bbox.height() > query->minimumTileSize().height())
183  {
184  qDebug() << "Splitting task due to query timeout:" << executor->task->bbox;
185  const auto xTileSize = executor->task->bbox.width() / 2.0;
186  const auto yTileSize = executor->task->bbox.height() / 2.0;
187  for (auto x = 0; x < 2; ++x) {
188  for (auto y = 0; y < 2; ++y) {
189  auto task = std::make_unique<OverpassQueryTask>();
190  task->query = query;
191  task->bbox = { executor->task->bbox.x() + x * xTileSize, executor->task->bbox.y() + y * yTileSize, xTileSize, yTileSize };
192  m_tasks.push_back(std::move(task));
193  }
194  }
195  }
196  else if (queryError != OverpassQuery::NoError) {
197  if (executor->task->forceReload) {
198  query->m_error = queryError;
199  cancelQuery(query);
200  } else {
201  // query error in cached result, retry
202  executor->task->forceReload = true;
203  m_tasks.push_back(std::move(executor->task));
204  }
205  }
206  }
207 
208  // free the executor for the next query
209  executor->task.reset();
210  executor->nextSlot = QDateTime::currentDateTimeUtc().addSecs(executor->cooldownTime.count());
211 
212  checkQueryFinished(query);
213  executeTasks();
214 }
215 
216 void OverpassQueryManagerPrivate::checkQueryFinished(OverpassQuery *query) const
217 {
218  if (std::any_of(m_executors.begin(), m_executors.end(), [query](const auto &executor) { return executor.task && executor.task->query == query; })
219  || std::any_of(m_tasks.begin(), m_tasks.end(), [query](const auto &task) { return task->query == query; }))
220  return;
221  Q_EMIT query->finished();
222 }
223 
224 void OverpassQueryManagerPrivate::cancelQuery(OverpassQuery *query)
225 {
226  qDebug() << "cancelling query...";
227  m_tasks.erase(std::remove_if(m_tasks.begin(), m_tasks.end(), [query](const auto &task) { return task->query == query; }), m_tasks.end());
228  checkQueryFinished(query);
229 }
QString writableLocation(QStandardPaths::StandardLocation type)
std::optional< QSqlQuery > query(const QString &queryStatement)
void setCacheDirectory(const QString &cacheDir)
Executes OverpassQuery jobs.
An Overpass QL query job, to be executed by OverpassQueryManager.
Definition: overpassquery.h:25
Low-level types and functions to work with raw OSM data as efficiently as possible.
This file is part of the KDE documentation.
Documentation copyright © 1996-2021 The KDE developers.
Generated on Sat Oct 23 2021 23:03:45 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.