Akonadi

partstreamer.cpp
1 /*
2  * SPDX-FileCopyrightText: 2014 Daniel Vr├ítil <[email protected]>
3  *
4  * SPDX-License-Identifier: LGPL-2.1-or-later
5  *
6  */
7 
8 #include "partstreamer.h"
9 #include "akonadiserver_debug.h"
10 #include "capabilities_p.h"
11 #include "connection.h"
12 #include "dbconfig.h"
13 #include "parthelper.h"
14 #include "parttypehelper.h"
15 #include "selectquerybuilder.h"
16 
17 #include <private/externalpartstorage_p.h>
18 #include <private/protocol_p.h>
19 #include <private/standarddirs_p.h>
20 
21 #include <config-akonadi.h>
22 #if HAVE_UNISTD_H
23 #include <unistd.h>
24 #endif
25 
26 #include <QFile>
27 #include <QFileInfo>
28 
29 using namespace Akonadi;
30 using namespace Akonadi::Server;
31 
32 PartStreamer::PartStreamer(Connection *connection, const PimItem &pimItem)
33  : mConnection(connection)
34  , mItem(pimItem)
35 {
36  // Make sure the file_db_data path exists
37  StandardDirs::saveDir("data", QStringLiteral("file_db_data"));
38 }
39 
40 PartStreamer::~PartStreamer()
41 {
42 }
43 
44 Protocol::PartMetaData PartStreamer::requestPartMetaData(const QByteArray &partName)
45 {
46  {
47  Protocol::StreamPayloadCommand resp;
48  resp.setPayloadName(partName);
49  resp.setRequest(Protocol::StreamPayloadCommand::MetaData);
50  mConnection->sendResponse(std::move(resp));
51  }
52 
53  const auto cmd = mConnection->readCommand();
54  if (!cmd->isValid() || Protocol::cmdCast<Protocol::Response>(cmd).isError()) {
55  throw PartStreamerException("Client failed to provide part metadata.");
56  }
57 
58  return Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd).metaData();
59 }
60 
61 void PartStreamer::streamPayload(Part &part, const QByteArray &partName)
62 {
63  Protocol::PartMetaData metaPart = requestPartMetaData(partName);
64  if (metaPart.name().isEmpty()) {
65  throw PartStreamerException(QStringLiteral("Client sent empty metadata for part '%1'.").arg(QString::fromUtf8(partName)));
66  }
67  part.setVersion(metaPart.version());
68 
69  if (part.datasize() != metaPart.size()) {
70  part.setDatasize(metaPart.size());
71  // Shortcut: if sizes differ, we don't need to compare data later no in order
72  // to detect whether the part has changed
73  mDataChanged = mDataChanged || (metaPart.size() != part.datasize());
74  }
75 
76  if (metaPart.storageType() == Protocol::PartMetaData::Foreign) {
77  streamForeignPayload(part, metaPart);
78  } else if (part.datasize() > DbConfig::configuredDatabase()->sizeThreshold()) {
79  // actual case when streaming storage is used: external payload is enabled,
80  // data is big enough in a literal
81  streamPayloadToFile(part, metaPart);
82  } else {
83  streamPayloadData(part, metaPart);
84  }
85 }
86 
87 void PartStreamer::streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart)
88 {
89  // If the part WAS external previously, remove data file
90  if (part.storage() == Part::External) {
91  ExternalPartStorage::self()->removePartFile(ExternalPartStorage::resolveAbsolutePath(part.data()));
92  }
93 
94  // Request the actual data
95  {
96  Protocol::StreamPayloadCommand resp;
97  resp.setPayloadName(metaPart.name());
98  resp.setRequest(Protocol::StreamPayloadCommand::Data);
99  mConnection->sendResponse(std::move(resp));
100  }
101 
102  const auto cmd = mConnection->readCommand();
103  const auto &response = Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd);
104  if (!response.isValid() || response.isError()) {
105  throw PartStreamerException(QStringLiteral("Client failed to provide payload data for part ID %1 (%2).").arg(part.id()).arg(part.partType().name()));
106  }
107  const QByteArray newData = response.data();
108  // only use the data size with internal payload parts, for foreign parts
109  // we use the size reported by client
110  const auto newSize = (metaPart.storageType() == Protocol::PartMetaData::Internal) ? newData.size() : metaPart.size();
111  if (newSize != metaPart.size()) {
112  throw PartStreamerException(QStringLiteral("Payload size mismatch: client advertised %1 bytes but sent %2 bytes.").arg(metaPart.size()).arg(newSize));
113  }
114 
115  if (part.isValid()) {
116  if (!mDataChanged) {
117  mDataChanged = mDataChanged || (newData != part.data());
118  }
119  PartHelper::update(&part, newData, newSize);
120  } else {
121  part.setData(newData);
122  part.setDatasize(newSize);
123  if (!part.insert()) {
124  throw PartStreamerException("Failed to insert new part into database.");
125  }
126  }
127 }
128 
129 void PartStreamer::streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart)
130 {
131  QByteArray origData;
132  if (!mDataChanged && mCheckChanged) {
133  origData = PartHelper::translateData(part);
134  }
135 
136  QByteArray filename;
137  if (part.isValid()) {
138  if (part.storage() == Part::External) {
139  // Part was external and is still external
140  filename = part.data();
141  if (!filename.isEmpty()) {
142  ExternalPartStorage::self()->removePartFile(ExternalPartStorage::resolveAbsolutePath(filename));
143  filename = ExternalPartStorage::updateFileNameRevision(filename);
144  } else {
145  // recover from data corruption
146  filename = ExternalPartStorage::nameForPartId(part.id());
147  }
148  } else {
149  // Part wasn't external, but is now
150  filename = ExternalPartStorage::nameForPartId(part.id());
151  }
152 
153  QFileInfo finfo(QString::fromUtf8(filename));
154  if (finfo.isAbsolute()) {
155  filename = finfo.fileName().toUtf8();
156  }
157  }
158 
159  part.setStorage(Part::External);
160  part.setDatasize(metaPart.size());
161  part.setData(filename);
162 
163  if (part.isValid()) {
164  if (!part.update()) {
165  throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id()));
166  }
167  } else {
168  if (!part.insert()) {
169  throw PartStreamerException(QStringLiteral("Failed to insert new part fo PimItem %1 into database.").arg(part.pimItemId()));
170  }
171 
172  filename = ExternalPartStorage::nameForPartId(part.id());
173  part.setData(filename);
174  if (!part.update()) {
175  throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id()));
176  }
177  }
178 
179  {
180  Protocol::StreamPayloadCommand cmd;
181  cmd.setPayloadName(metaPart.name());
182  cmd.setRequest(Protocol::StreamPayloadCommand::Data);
183  cmd.setDestination(QString::fromUtf8(filename));
184  mConnection->sendResponse(std::move(cmd));
185  }
186 
187  const auto cmd = mConnection->readCommand();
188  const auto &response = Protocol::cmdCast<Protocol::Response>(cmd);
189  if (!response.isValid() || response.isError()) {
190  throw PartStreamerException("Client failed to store payload into file.");
191  }
192 
193  QFile file(ExternalPartStorage::resolveAbsolutePath(filename));
194  if (!file.exists()) {
195  throw PartStreamerException(QStringLiteral("External payload file %1 does not exist.").arg(file.fileName()));
196  }
197 
198  if (file.size() != metaPart.size()) {
199  throw PartStreamerException(
200  QStringLiteral("Payload size mismatch, client advertised %1 bytes, but the file is %2 bytes.").arg(metaPart.size(), file.size()));
201  }
202 
203  if (mCheckChanged && !mDataChanged) {
204  // This is invoked only when part already exists, data sizes match and
205  // caller wants to know whether parts really differ
206  mDataChanged = (origData != PartHelper::translateData(part));
207  }
208 }
209 
210 void PartStreamer::streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart)
211 {
212  QByteArray origData;
213  if (!mDataChanged && mCheckChanged) {
214  origData = PartHelper::translateData(part);
215  }
216 
217  {
218  Protocol::StreamPayloadCommand cmd;
219  cmd.setPayloadName(metaPart.name());
220  cmd.setRequest(Protocol::StreamPayloadCommand::Data);
221  mConnection->sendResponse(std::move(cmd));
222  }
223 
224  const auto cmd = mConnection->readCommand();
225  const auto &response = Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd);
226  if (!response.isValid() || response.isError()) {
227  throw PartStreamerException("Client failed to store payload into file.");
228  }
229 
230  // If the part was previously external, clean up the data
231  if (part.storage() == Part::External) {
232  const QString filename = QString::fromUtf8(part.data());
233  ExternalPartStorage::self()->removePartFile(ExternalPartStorage::resolveAbsolutePath(filename));
234  }
235 
236  part.setStorage(Part::Foreign);
237  part.setData(response.data());
238 
239  if (part.isValid()) {
240  if (!part.update()) {
241  throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id()));
242  }
243  } else {
244  if (!part.insert()) {
245  throw PartStreamerException(QStringLiteral("Failed to insert part for PimItem %1 into database.").arg(part.pimItemId()));
246  }
247  }
248 
249  const QString filename = QString::fromUtf8(response.data());
250  QFile file(filename);
251  if (!file.exists()) {
252  throw PartStreamerException(QStringLiteral("Foreign payload file %1 does not exist.").arg(filename));
253  }
254 
255  if (file.size() != metaPart.size()) {
256  throw PartStreamerException(
257  QStringLiteral("Foreign payload size mismatch, client advertised %1 bytes, but the file size is %2 bytes.").arg(metaPart.size(), file.size()));
258  }
259 
260  if (mCheckChanged && !mDataChanged) {
261  // This is invoked only when part already exists, data sizes match and
262  // caller wants to know whether parts really differ
263  mDataChanged = (origData != PartHelper::translateData(part));
264  }
265 }
266 
267 void PartStreamer::preparePart(bool checkExists, const QByteArray &partName, Part &part)
268 {
269  mDataChanged = false;
270 
271  const PartType partType = PartTypeHelper::fromFqName(partName);
272 
273  if (checkExists || mCheckChanged) {
275  qb.addValueCondition(Part::pimItemIdColumn(), Query::Equals, mItem.id());
276  qb.addValueCondition(Part::partTypeIdColumn(), Query::Equals, partType.id());
277  if (!qb.exec()) {
278  throw PartStreamerException(QStringLiteral("Failed to check if part %1 exists in PimItem %2.").arg(QString::fromUtf8(partName)).arg(mItem.id()));
279  }
280 
281  const Part::List result = qb.result();
282  if (!result.isEmpty()) {
283  part = result.at(0);
284  }
285  }
286 
287  // Shortcut: newly created parts are always "changed"
288  if (!part.isValid()) {
289  mDataChanged = true;
290  }
291 
292  part.setPartType(partType);
293  part.setPimItemId(mItem.id());
294 }
295 
296 void PartStreamer::stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed)
297 {
298  mCheckChanged = (changed != nullptr);
299  if (changed != nullptr) {
300  *changed = false;
301  }
302 
303  Part part;
304  preparePart(checkExists, partName, part);
305 
306  streamPayload(part, partName);
307  if (changed && mCheckChanged) {
308  *changed = mDataChanged;
309  }
310 
311  partSize = part.datasize();
312 }
313 
314 void PartStreamer::streamAttribute(bool checkExists, const QByteArray &_partName, const QByteArray &value, bool *changed)
315 {
316  mCheckChanged = (changed != nullptr);
317  if (changed != nullptr) {
318  *changed = false;
319  }
320 
321  QByteArray partName;
322  if (!_partName.startsWith("ATR:")) {
323  partName = "ATR:" + _partName;
324  } else {
325  partName = _partName;
326  }
327 
328  Part part;
329  preparePart(checkExists, partName, part);
330 
331  if (part.isValid()) {
332  if (mCheckChanged) {
333  if (PartHelper::translateData(part) != value) {
334  mDataChanged = true;
335  }
336  }
337  PartHelper::update(&part, value, value.size());
338  } else {
339  const bool storeInFile = value.size() > DbConfig::configuredDatabase()->sizeThreshold();
340  part.setDatasize(value.size());
341  part.setVersion(0);
342  if (storeInFile) {
343  if (!part.insert()) {
344  throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.").arg(part.pimItemId()));
345  }
346  PartHelper::update(&part, value, value.size());
347  } else {
348  part.setData(value);
349  if (!part.insert()) {
350  throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.").arg(part.pimItemId()));
351  }
352  }
353  }
354 
355  if (mCheckChanged) {
356  *changed = mDataChanged;
357  }
358 }
QString fromUtf8(const char *str, int size)
PartType fromFqName(const QString &fqName)
Retrieve (or create) PartType for the given fully qualified name.
QByteArray translateData(const QByteArray &data, Part::Storage storageType)
Returns the payload data.
Definition: parthelper.cpp:122
static DbConfig * configuredDatabase()
Returns the DbConfig instance for the database the user has configured.
Definition: dbconfig.cpp:73
QVector< T > result()
Returns the result of this SELECT query.
bool startsWith(const QByteArray &ba) const const
void addValueCondition(const QString &column, Query::CompareOperator op, const QVariant &value, ConditionType type=WhereCondition)
Add a WHERE or HAVING condition which compares a column with a given value.
virtual qint64 sizeThreshold() const
Payload data bigger than this value will be stored in separate files, instead of the database.
Definition: dbconfig.cpp:128
bool exec()
Executes the query, returns true on success.
bool isEmpty() const const
Helper class for creating and executing database SELECT queries.
void update(Part *part, const QByteArray &data, qint64 dataSize)
Update payload of an existing part part to data and size dataSize.
Definition: parthelper.cpp:22
int size() const const
An Connection represents one connection of a client to the server.
Definition: connection.h:46
char * data()
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2022 The KDE developers.
Generated on Sat Jun 25 2022 06:00:32 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.