1 | /* |
2 | Copyright (c) 2007 Tobias Koenig <tokoe@kde.org> |
3 | Copyright (c) 2007 Volker Krause <vkrause@kde.org> |
4 | Copyright (c) 2014 Christian Mollekopf <mollekopf@kolabsys.com> |
5 | |
6 | This library is free software; you can redistribute it and/or modify it |
7 | under the terms of the GNU Library General Public License as published by |
8 | the Free Software Foundation; either version 2 of the License, or (at your |
9 | option) any later version. |
10 | |
11 | This library is distributed in the hope that it will be useful, but WITHOUT |
12 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
13 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public |
14 | License for more details. |
15 | |
16 | You should have received a copy of the GNU Library General Public License |
17 | along with this library; see the file COPYING.LIB. If not, write to the |
18 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
19 | 02110-1301, USA. |
20 | */ |
21 | |
22 | #include "itemsync.h" |
23 | |
24 | #include "job_p.h" |
25 | #include "collection.h" |
26 | #include "item.h" |
27 | #include "item_p.h" |
28 | #include "itemcreatejob.h" |
29 | #include "itemdeletejob.h" |
30 | #include "itemfetchjob.h" |
31 | #include "itemmodifyjob.h" |
32 | #include "transactionsequence.h" |
33 | #include "itemfetchscope.h" |
34 | |
35 | #include <kdebug.h> |
36 | |
37 | #include <QtCore/QStringList> |
38 | |
39 | using namespace Akonadi; |
40 | |
41 | /** |
42 | * @internal |
43 | */ |
44 | class Akonadi::ItemSyncPrivate : public JobPrivate |
45 | { |
46 | public: |
47 | ItemSyncPrivate(ItemSync *parent) |
48 | : JobPrivate(parent) |
49 | , mTransactionMode(ItemSync::SingleTransaction) |
50 | , mCurrentTransaction(0) |
51 | , mTransactionJobs(0) |
52 | , mPendingJobs(0) |
53 | , mProgress(0) |
54 | , mTotalItems(-1) |
55 | , mTotalItemsProcessed(0) |
56 | , mStreaming(false) |
57 | , mIncremental(false) |
58 | , mDeliveryDone(false) |
59 | , mFinished(false) |
60 | , mFullListingDone(false) |
61 | , mProcessingBatch(false) |
62 | , mDisableAutomaticDeliveryDone(false) |
63 | , mBatchSize(10) |
64 | { |
65 | // we want to fetch all data by default |
66 | mFetchScope.fetchFullPayload(); |
67 | mFetchScope.fetchAllAttributes(); |
68 | } |
69 | |
70 | void createOrMerge(const Item &item); |
71 | void checkDone(); |
72 | void slotItemsReceived(const Item::List &items); |
73 | void slotLocalListDone(KJob *job); |
74 | void slotLocalDeleteDone(KJob *); |
75 | void slotLocalChangeDone(KJob *job); |
76 | void execute(); |
77 | void processItems(); |
78 | void processBatch(); |
79 | void deleteItems(const Item::List &items); |
80 | void slotTransactionResult(KJob *job); |
81 | void requestTransaction(); |
82 | Job *subjobParent() const; |
83 | void fetchLocalItemsToDelete(); |
84 | QString jobDebuggingString() const /*Q_DECL_OVERRIDE*/; |
85 | bool allProcessed() const; |
86 | |
87 | Q_DECLARE_PUBLIC(ItemSync) |
88 | Collection mSyncCollection; |
89 | QSet<QString> mListedItems; |
90 | |
91 | ItemSync::TransactionMode mTransactionMode; |
92 | TransactionSequence *mCurrentTransaction; |
93 | int mTransactionJobs; |
94 | |
95 | // fetch scope for initial item listing |
96 | ItemFetchScope mFetchScope; |
97 | |
98 | Akonadi::Item::List mRemoteItemQueue; |
99 | Akonadi::Item::List mRemovedRemoteItemQueue; |
100 | Akonadi::Item::List mCurrentBatchRemoteItems; |
101 | Akonadi::Item::List mCurrentBatchRemovedRemoteItems; |
102 | Akonadi::Item::List mItemsToDelete; |
103 | |
104 | // create counter |
105 | int mPendingJobs; |
106 | int mProgress; |
107 | int mTotalItems; |
108 | int mTotalItemsProcessed; |
109 | |
110 | bool mStreaming; |
111 | bool mIncremental; |
112 | bool mDeliveryDone; |
113 | bool mFinished; |
114 | bool mFullListingDone; |
115 | bool mProcessingBatch; |
116 | bool mDisableAutomaticDeliveryDone; |
117 | |
118 | int mBatchSize; |
119 | }; |
120 | |
121 | void ItemSyncPrivate::createOrMerge(const Item &item) |
122 | { |
123 | Q_Q(ItemSync); |
124 | // don't try to do anything in error state |
125 | if (q->error()) { |
126 | return; |
127 | } |
128 | mPendingJobs++; |
129 | ItemCreateJob *create = new ItemCreateJob(item, mSyncCollection, subjobParent()); |
130 | if (!item.gid().isEmpty()) { |
131 | create->setMerge(ItemCreateJob::GID|ItemCreateJob::Silent); |
132 | } else { |
133 | create->setMerge(ItemCreateJob::RID|ItemCreateJob::Silent); |
134 | } |
135 | q->connect(create, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*))); |
136 | } |
137 | |
138 | bool ItemSyncPrivate::allProcessed() const |
139 | { |
140 | return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty() && mCurrentBatchRemovedRemoteItems.isEmpty(); |
141 | } |
142 | |
143 | void ItemSyncPrivate::checkDone() |
144 | { |
145 | Q_Q(ItemSync); |
146 | q->setProcessedAmount(KJob::Bytes, mProgress); |
147 | if (mPendingJobs > 0) { |
148 | return; |
149 | } |
150 | |
151 | if (mTransactionJobs > 0) { |
152 | //Commit the current transaction if we're in batch processing mode or done |
153 | //and wait until the transaction is committed to process the next batch |
154 | if (mTransactionMode == ItemSync::MultipleTransactions || (mDeliveryDone && mRemoteItemQueue.isEmpty())) { |
155 | if (mCurrentTransaction) { |
156 | q->emit transactionCommitted(); |
157 | mCurrentTransaction->commit(); |
158 | mCurrentTransaction = 0; |
159 | } |
160 | return; |
161 | } |
162 | } |
163 | mProcessingBatch = false; |
164 | if (!mRemoteItemQueue.isEmpty()) { |
165 | execute(); |
166 | //We don't have enough items, request more |
167 | if (!mProcessingBatch) { |
168 | q->emit readyForNextBatch(mBatchSize - mRemoteItemQueue.size()); |
169 | } |
170 | return; |
171 | } |
172 | q->emit readyForNextBatch(mBatchSize); |
173 | |
174 | if (allProcessed() && !mFinished) { |
175 | // prevent double result emission, can happen since checkDone() is called from all over the place |
176 | mFinished = true; |
177 | q->emitResult(); |
178 | } |
179 | } |
180 | |
181 | ItemSync::ItemSync(const Collection &collection, QObject *parent) |
182 | : Job(new ItemSyncPrivate(this), parent) |
183 | { |
184 | Q_D(ItemSync); |
185 | d->mSyncCollection = collection; |
186 | } |
187 | |
188 | ItemSync::~ItemSync() |
189 | { |
190 | } |
191 | |
192 | void ItemSync::setFullSyncItems(const Item::List &items) |
193 | { |
194 | /* |
195 | * We received a list of items from the server: |
196 | * * fetch all local id's + rid's only |
197 | * * check each full sync item wether it's locally available |
198 | * * if it is modify the item |
199 | * * if it's not create it |
200 | * * delete all superfluous items |
201 | */ |
202 | Q_D(ItemSync); |
203 | Q_ASSERT(!d->mIncremental); |
204 | if (!d->mStreaming) { |
205 | d->mDeliveryDone = true; |
206 | } |
207 | d->mRemoteItemQueue += items; |
208 | d->mTotalItemsProcessed += items.count(); |
209 | kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems; |
210 | if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) { |
211 | d->mDeliveryDone = true; |
212 | } |
213 | d->execute(); |
214 | } |
215 | |
216 | void ItemSync::setTotalItems(int amount) |
217 | { |
218 | Q_D(ItemSync); |
219 | Q_ASSERT(!d->mIncremental); |
220 | Q_ASSERT(amount >= 0); |
221 | setStreamingEnabled(true); |
222 | kDebug() << amount; |
223 | d->mTotalItems = amount; |
224 | setTotalAmount(KJob::Bytes, amount); |
225 | if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) { |
226 | d->mDeliveryDone = true; |
227 | d->execute(); |
228 | } |
229 | } |
230 | |
231 | void ItemSync::setDisableAutomaticDeliveryDone(bool disable) |
232 | { |
233 | Q_D(ItemSync); |
234 | d->mDisableAutomaticDeliveryDone = disable; |
235 | } |
236 | |
237 | void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems) |
238 | { |
239 | /* |
240 | * We received an incremental listing of items: |
241 | * * for each changed item: |
242 | * ** If locally available => modify |
243 | * ** else => create |
244 | * * removed items can be removed right away |
245 | */ |
246 | Q_D(ItemSync); |
247 | d->mIncremental = true; |
248 | if (!d->mStreaming) { |
249 | d->mDeliveryDone = true; |
250 | } |
251 | d->mRemoteItemQueue += changedItems; |
252 | d->mRemovedRemoteItemQueue += removedItems; |
253 | d->mTotalItemsProcessed += changedItems.count() + removedItems.count(); |
254 | kDebug() << "Received: " << changedItems.count() << "Removed: " << removedItems.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems; |
255 | if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) { |
256 | d->mDeliveryDone = true; |
257 | } |
258 | d->execute(); |
259 | } |
260 | |
261 | void ItemSync::setFetchScope(ItemFetchScope &fetchScope) |
262 | { |
263 | Q_D(ItemSync); |
264 | d->mFetchScope = fetchScope; |
265 | } |
266 | |
267 | ItemFetchScope &ItemSync::fetchScope() |
268 | { |
269 | Q_D(ItemSync); |
270 | return d->mFetchScope; |
271 | } |
272 | |
273 | void ItemSync::doStart() |
274 | { |
275 | } |
276 | |
277 | bool ItemSync::updateItem(const Item &storedItem, Item &newItem) |
278 | { |
279 | Q_UNUSED(storedItem); |
280 | Q_UNUSED(newItem); |
281 | return false; |
282 | } |
283 | |
284 | void ItemSyncPrivate::fetchLocalItemsToDelete() |
285 | { |
286 | Q_Q(ItemSync); |
287 | if (mIncremental) { |
288 | kFatal() << "This must not be called while in incremental mode" ; |
289 | return; |
290 | } |
291 | ItemFetchJob *job = new ItemFetchJob(mSyncCollection, subjobParent()); |
292 | job->fetchScope().setFetchRemoteIdentification(true); |
293 | job->fetchScope().setFetchModificationTime(false); |
294 | job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually); |
295 | // we only can fetch parts already in the cache, otherwise this will deadlock |
296 | job->fetchScope().setCacheOnly(true); |
297 | |
298 | QObject::connect(job, SIGNAL(itemsReceived(Akonadi::Item::List)), q, SLOT(slotItemsReceived(Akonadi::Item::List))); |
299 | QObject::connect(job, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*))); |
300 | mPendingJobs++; |
301 | } |
302 | |
303 | void ItemSyncPrivate::slotItemsReceived(const Item::List &items) |
304 | { |
305 | foreach (const Akonadi::Item &item, items) { |
306 | //Don't delete items that have not yet been synchronized |
307 | if (item.remoteId().isEmpty()) { |
308 | continue; |
309 | } |
310 | if (!mListedItems.contains(item.remoteId())) { |
311 | mItemsToDelete << Item(item.id()); |
312 | } |
313 | } |
314 | } |
315 | |
316 | void ItemSyncPrivate::slotLocalListDone(KJob *job) |
317 | { |
318 | mPendingJobs--; |
319 | if (job->error()) { |
320 | kWarning() << job->errorString(); |
321 | } |
322 | deleteItems(mItemsToDelete); |
323 | checkDone(); |
324 | } |
325 | |
326 | QString ItemSyncPrivate::jobDebuggingString() const /*Q_DECL_OVERRIDE*/ |
327 | { |
328 | // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set after the job |
329 | // started, so this requires passing jobDebuggingString to jobEnded(). |
330 | return QString::fromLatin1("Collection %1 (%2)" ).arg(mSyncCollection.id()).arg(mSyncCollection.name()); |
331 | } |
332 | |
333 | void ItemSyncPrivate::execute() |
334 | { |
335 | Q_Q(ItemSync); |
336 | //shouldn't happen |
337 | if (mFinished) { |
338 | kWarning() << "Call to execute() on finished job." ; |
339 | Q_ASSERT(false); |
340 | return; |
341 | } |
342 | //not doing anything, start processing |
343 | if (!mProcessingBatch) { |
344 | if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) { |
345 | //we have a new batch to process |
346 | const int num = qMin(mBatchSize, mRemoteItemQueue.size()); |
347 | for (int i = 0; i < num; i++) { |
348 | mCurrentBatchRemoteItems << mRemoteItemQueue.takeFirst(); |
349 | } |
350 | mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue; |
351 | mRemovedRemoteItemQueue.clear(); |
352 | } else { |
353 | //nothing to do, let's wait for more data |
354 | return; |
355 | } |
356 | mProcessingBatch = true; |
357 | processBatch(); |
358 | return; |
359 | } |
360 | checkDone(); |
361 | } |
362 | |
363 | //process the current batch of items |
364 | void ItemSyncPrivate::processBatch() |
365 | { |
366 | Q_Q(ItemSync); |
367 | if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) { |
368 | return; |
369 | } |
370 | |
371 | //request a transaction, there are items that require processing |
372 | requestTransaction(); |
373 | |
374 | processItems(); |
375 | |
376 | // removed |
377 | if (!mIncremental && allProcessed()) { |
378 | //the full listing is done and we know which items to remove |
379 | fetchLocalItemsToDelete(); |
380 | } else { |
381 | deleteItems(mCurrentBatchRemovedRemoteItems); |
382 | mCurrentBatchRemovedRemoteItems.clear(); |
383 | } |
384 | |
385 | checkDone(); |
386 | } |
387 | |
388 | void ItemSyncPrivate::processItems() |
389 | { |
390 | Q_Q(ItemSync); |
391 | // added / updated |
392 | foreach (const Item &remoteItem, mCurrentBatchRemoteItems) { |
393 | if (remoteItem.remoteId().isEmpty()) { |
394 | kWarning() << "Item without rid passed to itemsync" ; |
395 | continue; |
396 | } |
397 | if (!mIncremental) { |
398 | mListedItems << remoteItem.remoteId(); |
399 | } |
400 | createOrMerge(remoteItem); |
401 | } |
402 | mCurrentBatchRemoteItems.clear(); |
403 | } |
404 | |
405 | void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete) |
406 | { |
407 | Q_Q(ItemSync); |
408 | // if in error state, better not change anything anymore |
409 | if (q->error()) { |
410 | return; |
411 | } |
412 | |
413 | if (itemsToDelete.isEmpty()) { |
414 | return; |
415 | } |
416 | |
417 | mPendingJobs++; |
418 | ItemDeleteJob *job = new ItemDeleteJob(itemsToDelete, subjobParent()); |
419 | q->connect(job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*))); |
420 | |
421 | // It can happen that the groupware servers report us deleted items |
422 | // twice, in this case this item delete job will fail on the second try. |
423 | // To avoid a rollback of the complete transaction we gracefully allow the job |
424 | // to fail :) |
425 | TransactionSequence *transaction = qobject_cast<TransactionSequence *>(subjobParent()); |
426 | if (transaction) { |
427 | transaction->setIgnoreJobFailure(job); |
428 | } |
429 | } |
430 | |
431 | void ItemSyncPrivate::slotLocalDeleteDone(KJob *) |
432 | { |
433 | mPendingJobs--; |
434 | mProgress++; |
435 | |
436 | checkDone(); |
437 | } |
438 | |
439 | void ItemSyncPrivate::slotLocalChangeDone(KJob *job) |
440 | { |
441 | Q_UNUSED(job); |
442 | mPendingJobs--; |
443 | mProgress++; |
444 | |
445 | checkDone(); |
446 | } |
447 | |
448 | void ItemSyncPrivate::slotTransactionResult(KJob *job) |
449 | { |
450 | --mTransactionJobs; |
451 | if (mCurrentTransaction == job) { |
452 | mCurrentTransaction = 0; |
453 | } |
454 | |
455 | checkDone(); |
456 | } |
457 | |
458 | void ItemSyncPrivate::requestTransaction() |
459 | { |
460 | Q_Q(ItemSync); |
461 | //we never want parallel transactions, single transaction just makes one big transaction, and multi transaction uses multiple transaction sequentially |
462 | if (!mCurrentTransaction) { |
463 | ++mTransactionJobs; |
464 | mCurrentTransaction = new TransactionSequence(q); |
465 | mCurrentTransaction->setAutomaticCommittingEnabled(false); |
466 | QObject::connect(mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*))); |
467 | } |
468 | } |
469 | |
470 | Job *ItemSyncPrivate::subjobParent() const |
471 | { |
472 | Q_Q(const ItemSync); |
473 | if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) { |
474 | return mCurrentTransaction; |
475 | } |
476 | return const_cast<ItemSync *>(q); |
477 | } |
478 | |
479 | void ItemSync::setStreamingEnabled(bool enable) |
480 | { |
481 | Q_D(ItemSync); |
482 | d->mStreaming = enable; |
483 | } |
484 | |
485 | void ItemSync::deliveryDone() |
486 | { |
487 | Q_D(ItemSync); |
488 | Q_ASSERT(d->mStreaming); |
489 | d->mDeliveryDone = true; |
490 | d->execute(); |
491 | } |
492 | |
493 | void ItemSync::slotResult(KJob *job) |
494 | { |
495 | if (job->error()) { |
496 | // pretent there were no errors |
497 | Akonadi::Job::removeSubjob(job); |
498 | // propagate the first error we got but continue, we might still be fed with stuff from a resource |
499 | if (!error()) { |
500 | setError(job->error()); |
501 | setErrorText(job->errorText()); |
502 | } |
503 | } else { |
504 | Akonadi::Job::slotResult(job); |
505 | } |
506 | } |
507 | |
508 | void ItemSync::rollback() |
509 | { |
510 | Q_D(ItemSync); |
511 | setError(UserCanceled); |
512 | if (d->mCurrentTransaction) { |
513 | d->mCurrentTransaction->rollback(); |
514 | } |
515 | d->mDeliveryDone = true; // user wont deliver more data |
516 | d->execute(); // end this in an ordered way, since we have an error set no real change will be done |
517 | } |
518 | |
519 | void ItemSync::setTransactionMode(ItemSync::TransactionMode mode) |
520 | { |
521 | Q_D(ItemSync); |
522 | d->mTransactionMode = mode; |
523 | } |
524 | |
525 | int ItemSync::batchSize() const |
526 | { |
527 | Q_D(const ItemSync); |
528 | return d->mBatchSize; |
529 | } |
530 | |
531 | void ItemSync::setBatchSize(int size) |
532 | { |
533 | Q_D(ItemSync); |
534 | d->mBatchSize = size; |
535 | } |
536 | |
537 | |
538 | #include "moc_itemsync.cpp" |
539 | |