Akonadi

partstreamer.cpp
1 /*
2  * Copyright (C) 2014 Daniel Vrátil <[email protected]>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  *
18  */
19 
20 #include "partstreamer.h"
21 #include "parthelper.h"
22 #include "parttypehelper.h"
23 #include "selectquerybuilder.h"
24 #include "dbconfig.h"
25 #include "connection.h"
26 #include "capabilities_p.h"
27 #include "akonadiserver_debug.h"
28 
29 #include <private/protocol_p.h>
30 #include <private/standarddirs_p.h>
31 #include <private/externalpartstorage_p.h>
32 
33 #include <config-akonadi.h>
34 #ifdef HAVE_UNISTD_H
35 # include <unistd.h>
36 #endif
37 
38 #include <QFile>
39 #include <QFileInfo>
40 
41 using namespace Akonadi;
42 using namespace Akonadi::Server;
43 
44 PartStreamer::PartStreamer(Connection *connection,
45  const PimItem &pimItem)
46  : mConnection(connection)
47  , mItem(pimItem)
48 {
49  // Make sure the file_db_data path exists
50  StandardDirs::saveDir("data", QStringLiteral("file_db_data"));
51 }
52 
53 PartStreamer::~PartStreamer()
54 {
55 }
56 
57 
58 Protocol::PartMetaData PartStreamer::requestPartMetaData(const QByteArray &partName)
59 {
60  {
61  Protocol::StreamPayloadCommand resp;
62  resp.setPayloadName(partName);
63  resp.setRequest(Protocol::StreamPayloadCommand::MetaData);
64  mConnection->sendResponse(std::move(resp));
65  }
66 
67  const auto cmd = mConnection->readCommand();
68  if (!cmd->isValid() || Protocol::cmdCast<Protocol::Response>(cmd).isError()) {
69  throw PartStreamerException("Client failed to provide part metadata.");
70  }
71 
72  return Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd).metaData();
73 }
74 
75 void PartStreamer::streamPayload(Part &part, const QByteArray &partName)
76 {
77  Protocol::PartMetaData metaPart = requestPartMetaData(partName);
78  if (metaPart.name().isEmpty()) {
79  throw PartStreamerException(QStringLiteral("Client sent empty metadata for part '%1'.")
80  .arg(QString::fromUtf8(partName)));
81  }
82  part.setVersion(metaPart.version());
83 
84  if (part.datasize() != metaPart.size()) {
85  part.setDatasize(metaPart.size());
86  // Shortcut: if sizes differ, we don't need to compare data later no in order
87  // to detect whether the part has changed
88  mDataChanged = mDataChanged || (metaPart.size() != part.datasize());
89  }
90 
91  if (metaPart.storageType() == Protocol::PartMetaData::Foreign) {
92  streamForeignPayload(part, metaPart);
93  } else if (part.datasize() > DbConfig::configuredDatabase()->sizeThreshold()) {
94  //actual case when streaming storage is used: external payload is enabled,
95  // data is big enough in a literal
96  streamPayloadToFile(part, metaPart);
97  } else {
98  streamPayloadData(part, metaPart);
99  }
100 }
101 
102 void PartStreamer::streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart)
103 {
104  // If the part WAS external previously, remove data file
105  if (part.storage() == Part::External) {
106  ExternalPartStorage::self()->removePartFile(
107  ExternalPartStorage::resolveAbsolutePath(part.data()));
108  }
109 
110  // Request the actual data
111  {
112  Protocol::StreamPayloadCommand resp;
113  resp.setPayloadName(metaPart.name());
114  resp.setRequest(Protocol::StreamPayloadCommand::Data);
115  mConnection->sendResponse(std::move(resp));
116  }
117 
118  const auto cmd = mConnection->readCommand();
119  const auto &response = Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd);
120  if (!response.isValid() || response.isError()) {
121  throw PartStreamerException(QStringLiteral("Client failed to provide payload data for part ID %1 (%2).")
122  .arg(part.id()).arg(part.partType().name()));
123  }
124  const QByteArray newData = response.data();
125  // only use the data size with internal payload parts, for foreign parts
126  // we use the size reported by client
127  const auto newSize = (metaPart.storageType() == Protocol::PartMetaData::Internal)
128  ? newData.size()
129  : metaPart.size();
130  if (newSize != metaPart.size()) {
131  throw PartStreamerException(QStringLiteral("Payload size mismatch: client advertised %1 bytes but sent %2 bytes.")
132  .arg(metaPart.size()).arg(newSize));
133  }
134 
135  if (part.isValid()) {
136  if (!mDataChanged) {
137  mDataChanged = mDataChanged || (newData != part.data());
138  }
139  PartHelper::update(&part, newData, newSize);
140  } else {
141  part.setData(newData);
142  part.setDatasize(newSize);
143  if (!part.insert()) {
144  throw PartStreamerException("Failed to insert new part into database.");
145  }
146  }
147 }
148 
149 void PartStreamer::streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart)
150 {
151  QByteArray origData;
152  if (!mDataChanged && mCheckChanged) {
153  origData = PartHelper::translateData(part);
154  }
155 
156  QByteArray filename;
157  if (part.isValid()) {
158  if (part.storage() == Part::External) {
159  // Part was external and is still external
160  filename = part.data();
161  if (!filename.isEmpty()) {
162  ExternalPartStorage::self()->removePartFile(
163  ExternalPartStorage::resolveAbsolutePath(filename));
164  filename = ExternalPartStorage::updateFileNameRevision(filename);
165  } else {
166  // recover from data corruption
167  filename = ExternalPartStorage::nameForPartId(part.id());
168  }
169  } else {
170  // Part wasn't external, but is now
171  filename = ExternalPartStorage::nameForPartId(part.id());
172  }
173 
174  QFileInfo finfo(QString::fromUtf8(filename));
175  if (finfo.isAbsolute()) {
176  filename = finfo.fileName().toUtf8();
177  }
178  }
179 
180  part.setStorage(Part::External);
181  part.setDatasize(metaPart.size());
182  part.setData(filename);
183 
184  if (part.isValid()) {
185  if (!part.update()) {
186  throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id()));
187  }
188  } else {
189  if (!part.insert()) {
190  throw PartStreamerException(QStringLiteral("Failed to insert new part fo PimItem %1 into database.")
191  .arg(part.pimItemId()));
192  }
193 
194  filename = ExternalPartStorage::nameForPartId(part.id());
195  part.setData(filename);
196  if (!part.update()) {
197  throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id()));
198  }
199  }
200 
201  {
202  Protocol::StreamPayloadCommand cmd;
203  cmd.setPayloadName(metaPart.name());
204  cmd.setRequest(Protocol::StreamPayloadCommand::Data);
205  cmd.setDestination(QString::fromUtf8(filename));
206  mConnection->sendResponse(std::move(cmd));
207  }
208 
209  const auto cmd = mConnection->readCommand();
210  const auto &response = Protocol::cmdCast<Protocol::Response>(cmd);
211  if (!response.isValid() || response.isError()) {
212  throw PartStreamerException("Client failed to store payload into file.");
213  }
214 
215  QFile file(ExternalPartStorage::resolveAbsolutePath(filename));
216  if (!file.exists()) {
217  throw PartStreamerException(QStringLiteral("External payload file %1 does not exist.").arg(file.fileName()));
218  }
219 
220  if (file.size() != metaPart.size()) {
221  throw PartStreamerException(QStringLiteral("Payload size mismatch, client advertised %1 bytes, but the file is %2 bytes.")
222  .arg(metaPart.size(), file.size()));
223  }
224 
225  if (mCheckChanged && !mDataChanged) {
226  // This is invoked only when part already exists, data sizes match and
227  // caller wants to know whether parts really differ
228  mDataChanged = (origData != PartHelper::translateData(part));
229  }
230 }
231 
232 void PartStreamer::streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart)
233 {
234  QByteArray origData;
235  if (!mDataChanged && mCheckChanged) {
236  origData = PartHelper::translateData(part);
237  }
238 
239  {
240  Protocol::StreamPayloadCommand cmd;
241  cmd.setPayloadName(metaPart.name());
242  cmd.setRequest(Protocol::StreamPayloadCommand::Data);
243  mConnection->sendResponse(std::move(cmd));
244  }
245 
246  const auto cmd = mConnection->readCommand();
247  const auto response = Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd);
248  if (!response.isValid() || response.isError()) {
249  throw PartStreamerException("Client failed to store payload into file.");
250  }
251 
252  // If the part was previously external, clean up the data
253  if (part.storage() == Part::External) {
254  const QString filename = QString::fromUtf8(part.data());
255  ExternalPartStorage::self()->removePartFile(
256  ExternalPartStorage::resolveAbsolutePath(filename));
257  }
258 
259  part.setStorage(Part::Foreign);
260  part.setData(response.data());
261 
262  if (part.isValid()) {
263  if (!part.update()) {
264  throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id()));
265  }
266  } else {
267  if (!part.insert()) {
268  throw PartStreamerException(QStringLiteral("Failed to insert part for PimItem %1 into database.")
269  .arg(part.pimItemId()));
270  }
271  }
272 
273  const QString filename = QString::fromUtf8(response.data());
274  QFile file(filename);
275  if (!file.exists()) {
276  throw PartStreamerException(QStringLiteral("Foreign payload file %1 does not exist.").arg(filename));
277  }
278 
279  if (file.size() != metaPart.size()) {
280  throw PartStreamerException(QStringLiteral("Foreign payload size mismatch, client advertised %1 bytes, but the file size is %2 bytes.")
281  .arg(metaPart.size(), file.size()));
282  }
283 
284  if (mCheckChanged && !mDataChanged) {
285  // This is invoked only when part already exists, data sizes match and
286  // caller wants to know whether parts really differ
287  mDataChanged = (origData != PartHelper::translateData(part));
288  }
289 }
290 
291 void PartStreamer::preparePart(bool checkExists, const QByteArray &partName, Part &part)
292 {
293  mDataChanged = false;
294 
295  const PartType partType = PartTypeHelper::fromFqName(partName);
296 
297  if (checkExists || mCheckChanged) {
299  qb.addValueCondition(Part::pimItemIdColumn(), Query::Equals, mItem.id());
300  qb.addValueCondition(Part::partTypeIdColumn(), Query::Equals, partType.id());
301  if (!qb.exec()) {
302  throw PartStreamerException(QStringLiteral("Failed to check if part %1 exists in PimItem %2.")
303  .arg(QString::fromUtf8(partName)).arg(mItem.id()));
304  }
305 
306  const Part::List result = qb.result();
307  if (!result.isEmpty()) {
308  part = result.at(0);
309  }
310  }
311 
312  // Shortcut: newly created parts are always "changed"
313  if (!part.isValid()) {
314  mDataChanged = true;
315  }
316 
317  part.setPartType(partType);
318  part.setPimItemId(mItem.id());
319 }
320 
321 void PartStreamer::stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed)
322 {
323  mCheckChanged = (changed != nullptr);
324  if (changed != nullptr) {
325  *changed = false;
326  }
327 
328  Part part;
329  preparePart(checkExists, partName, part);
330 
331  streamPayload(part, partName);
332  if (changed && mCheckChanged) {
333  *changed = mDataChanged;
334  }
335 
336  partSize = part.datasize();
337 }
338 
339 void PartStreamer::streamAttribute(bool checkExists, const QByteArray &_partName, const QByteArray &value, bool *changed)
340 {
341  mCheckChanged = (changed != nullptr);
342  if (changed != nullptr) {
343  *changed = false;
344  }
345 
346  QByteArray partName;
347  if (!_partName.startsWith("ATR:")) {
348  partName = "ATR:" + _partName;
349  } else {
350  partName = _partName;
351  }
352 
353  Part part;
354  preparePart(checkExists, partName, part);
355 
356  if (part.isValid()) {
357  if (mCheckChanged) {
358  if (PartHelper::translateData(part) != value) {
359  mDataChanged = true;
360  }
361  }
362  PartHelper::update(&part, value, value.size());
363  } else {
364  const bool storeInFile = value.size() > DbConfig::configuredDatabase()->sizeThreshold();
365  part.setDatasize(value.size());
366  part.setVersion(0);
367  if (storeInFile) {
368  if (!part.insert()) {
369  throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.")
370  .arg(part.pimItemId()));
371  }
372  PartHelper::update(&part, value, value.size());
373  } else {
374  part.setData(value);
375  if (!part.insert()) {
376  throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.")
377  .arg(part.pimItemId()));
378  }
379  }
380  }
381 
382  if (mCheckChanged) {
383  *changed = mDataChanged;
384  }
385 }
386 
void addValueCondition(const QString &column, Query::CompareOperator op, const QVariant &value, ConditionType type=WhereCondition)
Add a WHERE or HAVING condition which compares a column with a given value.
Representation of a record in the PartType table.
Definition: entities.h:1780
static DbConfig * configuredDatabase()
Returns the DbConfig instance for the database the user has configured.
Definition: dbconfig.cpp:65
bool insert(qint64 *insertId=nullptr)
Inserts this record into the DataStore.
Definition: entities.cpp:6877
virtual qint64 sizeThreshold() const
Payload data bigger than this value will be stored in separate files, instead of the database...
Definition: dbconfig.cpp:119
void setPimItemId(qint64 pimItemId)
Sets the value of the pimItemId column of this record.
Definition: entities.cpp:6402
bool isEmpty() const const
bool startsWith(const QByteArray &ba) const const
bool update()
Stores all changes made to this record into the database.
Definition: entities.cpp:6951
PartType fromFqName(const QString &fqName)
Retrieve (or create) PartType for the given fully qualified name.
Representation of a record in the Part table.
Definition: entities.h:1970
void setData(const QByteArray &data)
Sets the value of the data column of this record.
Definition: entities.cpp:6428
QByteArray translateData(const QByteArray &data, Part::Storage storageType)
Returns the payload data.
Definition: parthelper.cpp:136
qint64 datasize() const
Returns the value of the datasize column of this record.
Definition: entities.cpp:6435
PartType partType() const
Retrieve the PartType record referred to by the partTypeId column of this record. ...
Definition: entities.cpp:6820
QString name() const
Returns the value of the name column of this record.
Definition: entities.cpp:5711
QString fromUtf8(const char *str, int size)
void setPartType(const PartType &value)
Set the PartType record referred to by the partTypeId column of this record.
Definition: entities.cpp:6827
QVector< T > result()
Returns the result of this SELECT query.
void setVersion(int version)
Sets the value of the version column of this record.
Definition: entities.cpp:6454
qint64 pimItemId() const
Returns the value of the pimItemId column of this record.
Definition: entities.cpp:6396
const T & at(int i) const const
Helper class for creating and executing database SELECT queries.
Definition: item.h:44
QByteArray data() const
Returns the value of the data column of this record.
Definition: entities.cpp:6422
bool isEmpty() const const
Helper integration between Akonadi and Qt.
char * data()
void update(Part *part, const QByteArray &data, qint64 dataSize)
Update payload of an existing part part to data and size dataSize.
Definition: parthelper.cpp:36
void setDatasize(qint64 datasize)
Sets the value of the datasize column of this record.
Definition: entities.cpp:6441
Part::Storage storage() const
Returns the value of the storage column of this record.
Definition: entities.cpp:6461
int size() const const
Representation of a record in the PimItem table.
Definition: entities.h:1194
void setStorage(Part::Storage storage)
Sets the value of the storage column of this record.
Definition: entities.cpp:6467
bool exec()
Executes the query, returns true on success.
An Connection represents one connection of a client to the server.
Definition: connection.h:53
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Wed May 27 2020 22:43:40 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.