Akonadi

partstreamer.cpp
1/*
2 * SPDX-FileCopyrightText: 2014 Daniel Vrátil <dvratil@redhat.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.1-or-later
5 *
6 */
7
8#include "partstreamer.h"
9#include "akonadiserver_debug.h"
10#include "capabilities_p.h"
11#include "connection.h"
12#include "dbconfig.h"
13#include "parthelper.h"
14#include "parttypehelper.h"
15#include "selectquerybuilder.h"
16
17#include "private/externalpartstorage_p.h"
18#include "private/protocol_p.h"
19#include "private/standarddirs_p.h"
20
21#include <config-akonadi.h>
22#if HAVE_UNISTD_H
23#include <unistd.h>
24#endif
25
26#include <QFile>
27#include <QFileInfo>
28
29using namespace Akonadi;
30using namespace Akonadi::Server;
31
32PartStreamer::PartStreamer(Connection *connection, const PimItem &pimItem)
33 : mConnection(connection)
34 , mItem(pimItem)
35{
36 // Make sure the file_db_data path exists
37 StandardDirs::saveDir("data", QStringLiteral("file_db_data"));
38}
39
40PartStreamer::~PartStreamer()
41{
42}
43
44Protocol::PartMetaData PartStreamer::requestPartMetaData(const QByteArray &partName)
45{
46 {
47 Protocol::StreamPayloadCommand resp;
48 resp.setPayloadName(partName);
49 resp.setRequest(Protocol::StreamPayloadCommand::MetaData);
50 mConnection->sendResponse(std::move(resp));
51 }
52
53 const auto cmd = mConnection->readCommand();
54 if (!cmd->isValid() || Protocol::cmdCast<Protocol::Response>(cmd).isError()) {
55 throw PartStreamerException("Client failed to provide part metadata.");
56 }
57
58 return Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd).metaData();
59}
60
61void PartStreamer::streamPayload(Part &part, const QByteArray &partName)
62{
63 Protocol::PartMetaData metaPart = requestPartMetaData(partName);
64 if (metaPart.name().isEmpty()) {
65 throw PartStreamerException(QStringLiteral("Client sent empty metadata for part '%1'.").arg(QString::fromUtf8(partName)));
66 }
67 if (metaPart.name() != partName) {
68 throw PartStreamerException(
69 QStringLiteral("Client sent metadata for part '%1' but requested part '%2'.").arg(QString::fromUtf8(metaPart.name()), QString::fromUtf8(partName)));
70 }
71 part.setVersion(metaPart.version());
72
73 if (part.datasize() != metaPart.size()) {
74 part.setDatasize(metaPart.size());
75 // Shortcut: if sizes differ, we don't need to compare data later no in order
76 // to detect whether the part has changed
77 mDataChanged = mDataChanged || (metaPart.size() != part.datasize());
78 }
79
80 if (metaPart.storageType() == Protocol::PartMetaData::Foreign) {
81 streamForeignPayload(part, metaPart);
82 } else if (part.datasize() > DbConfig::configuredDatabase()->sizeThreshold()) {
83 // actual case when streaming storage is used: external payload is enabled,
84 // data is big enough in a literal
85 streamPayloadToFile(part, metaPart);
86 } else {
87 streamPayloadData(part, metaPart);
88 }
89}
90
91void PartStreamer::streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart)
92{
93 // If the part WAS external previously, remove data file
94 if (part.storage() == Part::External) {
95 ExternalPartStorage::self()->removePartFile(ExternalPartStorage::resolveAbsolutePath(part.data()));
96 }
97
98 // Request the actual data
99 {
100 Protocol::StreamPayloadCommand resp;
101 resp.setPayloadName(metaPart.name());
102 resp.setRequest(Protocol::StreamPayloadCommand::Data);
103 mConnection->sendResponse(std::move(resp));
104 }
105
106 const auto cmd = mConnection->readCommand();
107 const auto &response = Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd);
108 if (!response.isValid() || response.isError()) {
109 throw PartStreamerException(QStringLiteral("Client failed to provide payload data for part ID %1 (%2).").arg(part.id()).arg(part.partType().name()));
110 }
111 const QByteArray newData = response.data();
112 // only use the data size with internal payload parts, for foreign parts
113 // we use the size reported by client
114 const auto newSize = (metaPart.storageType() == Protocol::PartMetaData::Internal) ? newData.size() : metaPart.size();
115 if (newSize != metaPart.size()) {
116 throw PartStreamerException(QStringLiteral("Payload size mismatch: client advertised %1 bytes but sent %2 bytes.").arg(metaPart.size()).arg(newSize));
117 }
118
119 if (part.isValid()) {
120 if (!mDataChanged) {
121 mDataChanged = mDataChanged || (newData != part.data());
122 }
123 PartHelper::update(&part, newData, newSize);
124 } else {
125 part.setData(newData);
126 part.setDatasize(newSize);
127 if (!part.insert()) {
128 throw PartStreamerException("Failed to insert new part into database.");
129 }
130 }
131}
132
133void PartStreamer::streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart)
134{
135 QByteArray origData;
136 if (!mDataChanged && mCheckChanged) {
137 origData = PartHelper::translateData(part);
138 }
139
140 QByteArray filename;
141 if (part.isValid()) {
142 if (part.storage() == Part::External) {
143 // Part was external and is still external
144 filename = part.data();
145 if (!filename.isEmpty()) {
146 ExternalPartStorage::self()->removePartFile(ExternalPartStorage::resolveAbsolutePath(filename));
147 filename = ExternalPartStorage::updateFileNameRevision(filename);
148 } else {
149 // recover from data corruption
150 filename = ExternalPartStorage::nameForPartId(part.id());
151 }
152 } else {
153 // Part wasn't external, but is now
154 filename = ExternalPartStorage::nameForPartId(part.id());
155 }
156
157 QFileInfo finfo(QString::fromUtf8(filename));
158 if (finfo.isAbsolute()) {
159 filename = finfo.fileName().toUtf8();
160 }
161 }
162
163 part.setStorage(Part::External);
164 part.setDatasize(metaPart.size());
165 part.setData(filename);
166
167 if (part.isValid()) {
168 if (!part.update()) {
169 throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id()));
170 }
171 } else {
172 if (!part.insert()) {
173 throw PartStreamerException(QStringLiteral("Failed to insert new part fo PimItem %1 into database.").arg(part.pimItemId()));
174 }
175
176 filename = ExternalPartStorage::nameForPartId(part.id());
177 part.setData(filename);
178 if (!part.update()) {
179 throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id()));
180 }
181 }
182
183 {
184 Protocol::StreamPayloadCommand cmd;
185 cmd.setPayloadName(metaPart.name());
186 cmd.setRequest(Protocol::StreamPayloadCommand::Data);
187 cmd.setDestination(QString::fromUtf8(filename));
188 mConnection->sendResponse(std::move(cmd));
189 }
190
191 const auto cmd = mConnection->readCommand();
192 const auto &response = Protocol::cmdCast<Protocol::Response>(cmd);
193 if (!response.isValid() || response.isError()) {
194 throw PartStreamerException("Client failed to store payload into file.");
195 }
196
197 QFile file(ExternalPartStorage::resolveAbsolutePath(filename));
198 if (!file.exists()) {
199 throw PartStreamerException(QStringLiteral("External payload file %1 does not exist.").arg(file.fileName()));
200 }
201
202 if (file.size() != metaPart.size()) {
203 throw PartStreamerException(
204 QStringLiteral("Payload size mismatch, client advertised %1 bytes, but the file is %2 bytes.").arg(metaPart.size(), file.size()));
205 }
206
207 if (mCheckChanged && !mDataChanged) {
208 // This is invoked only when part already exists, data sizes match and
209 // caller wants to know whether parts really differ
210 mDataChanged = (origData != PartHelper::translateData(part));
211 }
212}
213
214void PartStreamer::streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart)
215{
216 QByteArray origData;
217 if (!mDataChanged && mCheckChanged) {
218 origData = PartHelper::translateData(part);
219 }
220
221 {
222 Protocol::StreamPayloadCommand cmd;
223 cmd.setPayloadName(metaPart.name());
224 cmd.setRequest(Protocol::StreamPayloadCommand::Data);
225 mConnection->sendResponse(std::move(cmd));
226 }
227
228 const auto cmd = mConnection->readCommand();
229 const auto &response = Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd);
230 if (!response.isValid() || response.isError()) {
231 throw PartStreamerException("Client failed to store payload into file.");
232 }
233
234 // If the part was previously external, clean up the data
235 if (part.storage() == Part::External) {
236 const QString filename = QString::fromUtf8(part.data());
237 ExternalPartStorage::self()->removePartFile(ExternalPartStorage::resolveAbsolutePath(filename));
238 }
239
240 part.setStorage(Part::Foreign);
241 part.setData(response.data());
242
243 if (part.isValid()) {
244 if (!part.update()) {
245 throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id()));
246 }
247 } else {
248 if (!part.insert()) {
249 throw PartStreamerException(QStringLiteral("Failed to insert part for PimItem %1 into database.").arg(part.pimItemId()));
250 }
251 }
252
253 const QString filename = QString::fromUtf8(response.data());
254 QFile file(filename);
255 if (!file.exists()) {
256 throw PartStreamerException(QStringLiteral("Foreign payload file %1 does not exist.").arg(filename));
257 }
258
259 if (file.size() != metaPart.size()) {
260 throw PartStreamerException(
261 QStringLiteral("Foreign payload size mismatch, client advertised %1 bytes, but the file size is %2 bytes.").arg(metaPart.size(), file.size()));
262 }
263
264 if (mCheckChanged && !mDataChanged) {
265 // This is invoked only when part already exists, data sizes match and
266 // caller wants to know whether parts really differ
267 mDataChanged = (origData != PartHelper::translateData(part));
268 }
269}
270
271void PartStreamer::preparePart(bool checkExists, const QByteArray &partName, Part &part)
272{
273 mDataChanged = false;
274
275 const PartType partType = PartTypeHelper::fromFqName(partName);
276
277 if (checkExists || mCheckChanged) {
279 qb.addValueCondition(Part::pimItemIdColumn(), Query::Equals, mItem.id());
280 qb.addValueCondition(Part::partTypeIdColumn(), Query::Equals, partType.id());
281 if (!qb.exec()) {
282 throw PartStreamerException(QStringLiteral("Failed to check if part %1 exists in PimItem %2.").arg(QString::fromUtf8(partName)).arg(mItem.id()));
283 }
284
285 const Part::List result = qb.result();
286 if (!result.isEmpty()) {
287 part = result.at(0);
288 }
289 }
290
291 // Shortcut: newly created parts are always "changed"
292 if (!part.isValid()) {
293 mDataChanged = true;
294 }
295
296 part.setPartType(partType);
297 part.setPimItemId(mItem.id());
298}
299
300void PartStreamer::stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed)
301{
302 mCheckChanged = (changed != nullptr);
303 if (changed != nullptr) {
304 *changed = false;
305 }
306
307 Part part;
308 preparePart(checkExists, partName, part);
309
310 streamPayload(part, partName);
311 if (changed && mCheckChanged) {
312 *changed = mDataChanged;
313 }
314
315 partSize = part.datasize();
316}
317
318void PartStreamer::streamAttribute(bool checkExists, const QByteArray &_partName, const QByteArray &value, bool *changed)
319{
320 mCheckChanged = (changed != nullptr);
321 if (changed != nullptr) {
322 *changed = false;
323 }
324
325 QByteArray partName;
326 if (!_partName.startsWith("ATR:")) {
327 partName = "ATR:" + _partName;
328 } else {
329 partName = _partName;
330 }
331
332 Part part;
333 preparePart(checkExists, partName, part);
334
335 if (part.isValid()) {
336 if (mCheckChanged) {
337 if (PartHelper::translateData(part) != value) {
338 mDataChanged = true;
339 }
340 }
341 PartHelper::update(&part, value, value.size());
342 } else {
343 const bool storeInFile = value.size() > DbConfig::configuredDatabase()->sizeThreshold();
344 part.setDatasize(value.size());
345 part.setVersion(0);
346 if (storeInFile) {
347 if (!part.insert()) {
348 throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.").arg(part.pimItemId()));
349 }
350 PartHelper::update(&part, value, value.size());
351 } else {
352 part.setData(value);
353 if (!part.insert()) {
354 throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.").arg(part.pimItemId()));
355 }
356 }
357 }
358
359 if (mCheckChanged) {
360 *changed = mDataChanged;
361 }
362}
An Connection represents one connection of a client to the server.
Definition connection.h:39
virtual qint64 sizeThreshold() const
Payload data bigger than this value will be stored in separate files, instead of the database.
Definition dbconfig.cpp:138
static DbConfig * configuredDatabase()
Returns the DbConfig instance for the database the user has configured.
Definition dbconfig.cpp:77
void addValueCondition(const QString &column, Query::CompareOperator op, const QVariant &value, ConditionType type=WhereCondition)
Add a WHERE or HAVING condition which compares a column with a given value.
bool exec()
Executes the query, returns true on success.
Helper class for creating and executing database SELECT queries.
QList< T > result()
Returns the result of this SELECT query.
void update(Part *part, const QByteArray &data, qint64 dataSize)
Update payload of an existing part part to data and size dataSize.
QByteArray translateData(const QByteArray &data, Part::Storage storageType)
Returns the payload data.
PartType fromFqName(const QString &fqName)
Retrieve (or create) PartType for the given fully qualified name.
Helper integration between Akonadi and Qt.
char * data()
bool isEmpty() const const
qsizetype size() const const
bool startsWith(QByteArrayView bv) const const
QString fromUtf8(QByteArrayView str)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Fri Nov 29 2024 11:49:12 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.