1/*
2 Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
3 Copyright (c) 2007 Volker Krause <vkrause@kde.org>
4 Copyright (c) 2014 Christian Mollekopf <mollekopf@kolabsys.com>
5
6 This library is free software; you can redistribute it and/or modify it
7 under the terms of the GNU Library General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or (at your
9 option) any later version.
10
11 This library is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
14 License for more details.
15
16 You should have received a copy of the GNU Library General Public License
17 along with this library; see the file COPYING.LIB. If not, write to the
18 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 02110-1301, USA.
20*/
21
22#include "itemsync.h"
23
24#include "job_p.h"
25#include "collection.h"
26#include "item.h"
27#include "item_p.h"
28#include "itemcreatejob.h"
29#include "itemdeletejob.h"
30#include "itemfetchjob.h"
31#include "itemmodifyjob.h"
32#include "transactionsequence.h"
33#include "itemfetchscope.h"
34
35#include <kdebug.h>
36
37#include <QtCore/QStringList>
38
39using namespace Akonadi;
40
41/**
42 * @internal
43 */
44class Akonadi::ItemSyncPrivate : public JobPrivate
45{
46public:
47 ItemSyncPrivate(ItemSync *parent)
48 : JobPrivate(parent)
49 , mTransactionMode(ItemSync::SingleTransaction)
50 , mCurrentTransaction(0)
51 , mTransactionJobs(0)
52 , mPendingJobs(0)
53 , mProgress(0)
54 , mTotalItems(-1)
55 , mTotalItemsProcessed(0)
56 , mStreaming(false)
57 , mIncremental(false)
58 , mDeliveryDone(false)
59 , mFinished(false)
60 , mFullListingDone(false)
61 , mProcessingBatch(false)
62 , mDisableAutomaticDeliveryDone(false)
63 , mBatchSize(10)
64 {
65 // we want to fetch all data by default
66 mFetchScope.fetchFullPayload();
67 mFetchScope.fetchAllAttributes();
68 }
69
70 void createOrMerge(const Item &item);
71 void checkDone();
72 void slotItemsReceived(const Item::List &items);
73 void slotLocalListDone(KJob *job);
74 void slotLocalDeleteDone(KJob *);
75 void slotLocalChangeDone(KJob *job);
76 void execute();
77 void processItems();
78 void processBatch();
79 void deleteItems(const Item::List &items);
80 void slotTransactionResult(KJob *job);
81 void requestTransaction();
82 Job *subjobParent() const;
83 void fetchLocalItemsToDelete();
84 QString jobDebuggingString() const /*Q_DECL_OVERRIDE*/;
85 bool allProcessed() const;
86
87 Q_DECLARE_PUBLIC(ItemSync)
88 Collection mSyncCollection;
89 QSet<QString> mListedItems;
90
91 ItemSync::TransactionMode mTransactionMode;
92 TransactionSequence *mCurrentTransaction;
93 int mTransactionJobs;
94
95 // fetch scope for initial item listing
96 ItemFetchScope mFetchScope;
97
98 Akonadi::Item::List mRemoteItemQueue;
99 Akonadi::Item::List mRemovedRemoteItemQueue;
100 Akonadi::Item::List mCurrentBatchRemoteItems;
101 Akonadi::Item::List mCurrentBatchRemovedRemoteItems;
102 Akonadi::Item::List mItemsToDelete;
103
104 // create counter
105 int mPendingJobs;
106 int mProgress;
107 int mTotalItems;
108 int mTotalItemsProcessed;
109
110 bool mStreaming;
111 bool mIncremental;
112 bool mDeliveryDone;
113 bool mFinished;
114 bool mFullListingDone;
115 bool mProcessingBatch;
116 bool mDisableAutomaticDeliveryDone;
117
118 int mBatchSize;
119};
120
121void ItemSyncPrivate::createOrMerge(const Item &item)
122{
123 Q_Q(ItemSync);
124 // don't try to do anything in error state
125 if (q->error()) {
126 return;
127 }
128 mPendingJobs++;
129 ItemCreateJob *create = new ItemCreateJob(item, mSyncCollection, subjobParent());
130 if (!item.gid().isEmpty()) {
131 create->setMerge(ItemCreateJob::GID|ItemCreateJob::Silent);
132 } else {
133 create->setMerge(ItemCreateJob::RID|ItemCreateJob::Silent);
134 }
135 q->connect(create, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)));
136}
137
138bool ItemSyncPrivate::allProcessed() const
139{
140 return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty() && mCurrentBatchRemovedRemoteItems.isEmpty();
141}
142
143void ItemSyncPrivate::checkDone()
144{
145 Q_Q(ItemSync);
146 q->setProcessedAmount(KJob::Bytes, mProgress);
147 if (mPendingJobs > 0) {
148 return;
149 }
150
151 if (mTransactionJobs > 0) {
152 //Commit the current transaction if we're in batch processing mode or done
153 //and wait until the transaction is committed to process the next batch
154 if (mTransactionMode == ItemSync::MultipleTransactions || (mDeliveryDone && mRemoteItemQueue.isEmpty())) {
155 if (mCurrentTransaction) {
156 q->emit transactionCommitted();
157 mCurrentTransaction->commit();
158 mCurrentTransaction = 0;
159 }
160 return;
161 }
162 }
163 mProcessingBatch = false;
164 if (!mRemoteItemQueue.isEmpty()) {
165 execute();
166 //We don't have enough items, request more
167 if (!mProcessingBatch) {
168 q->emit readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
169 }
170 return;
171 }
172 q->emit readyForNextBatch(mBatchSize);
173
174 if (allProcessed() && !mFinished) {
175 // prevent double result emission, can happen since checkDone() is called from all over the place
176 mFinished = true;
177 q->emitResult();
178 }
179}
180
181ItemSync::ItemSync(const Collection &collection, QObject *parent)
182 : Job(new ItemSyncPrivate(this), parent)
183{
184 Q_D(ItemSync);
185 d->mSyncCollection = collection;
186}
187
188ItemSync::~ItemSync()
189{
190}
191
192void ItemSync::setFullSyncItems(const Item::List &items)
193{
194 /*
195 * We received a list of items from the server:
196 * * fetch all local id's + rid's only
197 * * check each full sync item wether it's locally available
198 * * if it is modify the item
199 * * if it's not create it
200 * * delete all superfluous items
201 */
202 Q_D(ItemSync);
203 Q_ASSERT(!d->mIncremental);
204 if (!d->mStreaming) {
205 d->mDeliveryDone = true;
206 }
207 d->mRemoteItemQueue += items;
208 d->mTotalItemsProcessed += items.count();
209 kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
210 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
211 d->mDeliveryDone = true;
212 }
213 d->execute();
214}
215
216void ItemSync::setTotalItems(int amount)
217{
218 Q_D(ItemSync);
219 Q_ASSERT(!d->mIncremental);
220 Q_ASSERT(amount >= 0);
221 setStreamingEnabled(true);
222 kDebug() << amount;
223 d->mTotalItems = amount;
224 setTotalAmount(KJob::Bytes, amount);
225 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) {
226 d->mDeliveryDone = true;
227 d->execute();
228 }
229}
230
231void ItemSync::setDisableAutomaticDeliveryDone(bool disable)
232{
233 Q_D(ItemSync);
234 d->mDisableAutomaticDeliveryDone = disable;
235}
236
237void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
238{
239 /*
240 * We received an incremental listing of items:
241 * * for each changed item:
242 * ** If locally available => modify
243 * ** else => create
244 * * removed items can be removed right away
245 */
246 Q_D(ItemSync);
247 d->mIncremental = true;
248 if (!d->mStreaming) {
249 d->mDeliveryDone = true;
250 }
251 d->mRemoteItemQueue += changedItems;
252 d->mRemovedRemoteItemQueue += removedItems;
253 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
254 kDebug() << "Received: " << changedItems.count() << "Removed: " << removedItems.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
255 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
256 d->mDeliveryDone = true;
257 }
258 d->execute();
259}
260
261void ItemSync::setFetchScope(ItemFetchScope &fetchScope)
262{
263 Q_D(ItemSync);
264 d->mFetchScope = fetchScope;
265}
266
267ItemFetchScope &ItemSync::fetchScope()
268{
269 Q_D(ItemSync);
270 return d->mFetchScope;
271}
272
273void ItemSync::doStart()
274{
275}
276
277bool ItemSync::updateItem(const Item &storedItem, Item &newItem)
278{
279 Q_UNUSED(storedItem);
280 Q_UNUSED(newItem);
281 return false;
282}
283
284void ItemSyncPrivate::fetchLocalItemsToDelete()
285{
286 Q_Q(ItemSync);
287 if (mIncremental) {
288 kFatal() << "This must not be called while in incremental mode";
289 return;
290 }
291 ItemFetchJob *job = new ItemFetchJob(mSyncCollection, subjobParent());
292 job->fetchScope().setFetchRemoteIdentification(true);
293 job->fetchScope().setFetchModificationTime(false);
294 job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually);
295 // we only can fetch parts already in the cache, otherwise this will deadlock
296 job->fetchScope().setCacheOnly(true);
297
298 QObject::connect(job, SIGNAL(itemsReceived(Akonadi::Item::List)), q, SLOT(slotItemsReceived(Akonadi::Item::List)));
299 QObject::connect(job, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)));
300 mPendingJobs++;
301}
302
303void ItemSyncPrivate::slotItemsReceived(const Item::List &items)
304{
305 foreach (const Akonadi::Item &item, items) {
306 //Don't delete items that have not yet been synchronized
307 if (item.remoteId().isEmpty()) {
308 continue;
309 }
310 if (!mListedItems.contains(item.remoteId())) {
311 mItemsToDelete << Item(item.id());
312 }
313 }
314}
315
316void ItemSyncPrivate::slotLocalListDone(KJob *job)
317{
318 mPendingJobs--;
319 if (job->error()) {
320 kWarning() << job->errorString();
321 }
322 deleteItems(mItemsToDelete);
323 checkDone();
324}
325
326QString ItemSyncPrivate::jobDebuggingString() const /*Q_DECL_OVERRIDE*/
327{
328 // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set after the job
329 // started, so this requires passing jobDebuggingString to jobEnded().
330 return QString::fromLatin1("Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name());
331}
332
333void ItemSyncPrivate::execute()
334{
335 Q_Q(ItemSync);
336 //shouldn't happen
337 if (mFinished) {
338 kWarning() << "Call to execute() on finished job.";
339 Q_ASSERT(false);
340 return;
341 }
342 //not doing anything, start processing
343 if (!mProcessingBatch) {
344 if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
345 //we have a new batch to process
346 const int num = qMin(mBatchSize, mRemoteItemQueue.size());
347 for (int i = 0; i < num; i++) {
348 mCurrentBatchRemoteItems << mRemoteItemQueue.takeFirst();
349 }
350 mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
351 mRemovedRemoteItemQueue.clear();
352 } else {
353 //nothing to do, let's wait for more data
354 return;
355 }
356 mProcessingBatch = true;
357 processBatch();
358 return;
359 }
360 checkDone();
361}
362
363//process the current batch of items
364void ItemSyncPrivate::processBatch()
365{
366 Q_Q(ItemSync);
367 if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
368 return;
369 }
370
371 //request a transaction, there are items that require processing
372 requestTransaction();
373
374 processItems();
375
376 // removed
377 if (!mIncremental && allProcessed()) {
378 //the full listing is done and we know which items to remove
379 fetchLocalItemsToDelete();
380 } else {
381 deleteItems(mCurrentBatchRemovedRemoteItems);
382 mCurrentBatchRemovedRemoteItems.clear();
383 }
384
385 checkDone();
386}
387
388void ItemSyncPrivate::processItems()
389{
390 Q_Q(ItemSync);
391 // added / updated
392 foreach (const Item &remoteItem, mCurrentBatchRemoteItems) {
393 if (remoteItem.remoteId().isEmpty()) {
394 kWarning() << "Item without rid passed to itemsync";
395 continue;
396 }
397 if (!mIncremental) {
398 mListedItems << remoteItem.remoteId();
399 }
400 createOrMerge(remoteItem);
401 }
402 mCurrentBatchRemoteItems.clear();
403}
404
405void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete)
406{
407 Q_Q(ItemSync);
408 // if in error state, better not change anything anymore
409 if (q->error()) {
410 return;
411 }
412
413 if (itemsToDelete.isEmpty()) {
414 return;
415 }
416
417 mPendingJobs++;
418 ItemDeleteJob *job = new ItemDeleteJob(itemsToDelete, subjobParent());
419 q->connect(job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)));
420
421 // It can happen that the groupware servers report us deleted items
422 // twice, in this case this item delete job will fail on the second try.
423 // To avoid a rollback of the complete transaction we gracefully allow the job
424 // to fail :)
425 TransactionSequence *transaction = qobject_cast<TransactionSequence *>(subjobParent());
426 if (transaction) {
427 transaction->setIgnoreJobFailure(job);
428 }
429}
430
431void ItemSyncPrivate::slotLocalDeleteDone(KJob *)
432{
433 mPendingJobs--;
434 mProgress++;
435
436 checkDone();
437}
438
439void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
440{
441 Q_UNUSED(job);
442 mPendingJobs--;
443 mProgress++;
444
445 checkDone();
446}
447
448void ItemSyncPrivate::slotTransactionResult(KJob *job)
449{
450 --mTransactionJobs;
451 if (mCurrentTransaction == job) {
452 mCurrentTransaction = 0;
453 }
454
455 checkDone();
456}
457
458void ItemSyncPrivate::requestTransaction()
459{
460 Q_Q(ItemSync);
461 //we never want parallel transactions, single transaction just makes one big transaction, and multi transaction uses multiple transaction sequentially
462 if (!mCurrentTransaction) {
463 ++mTransactionJobs;
464 mCurrentTransaction = new TransactionSequence(q);
465 mCurrentTransaction->setAutomaticCommittingEnabled(false);
466 QObject::connect(mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)));
467 }
468}
469
470Job *ItemSyncPrivate::subjobParent() const
471{
472 Q_Q(const ItemSync);
473 if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) {
474 return mCurrentTransaction;
475 }
476 return const_cast<ItemSync *>(q);
477}
478
479void ItemSync::setStreamingEnabled(bool enable)
480{
481 Q_D(ItemSync);
482 d->mStreaming = enable;
483}
484
485void ItemSync::deliveryDone()
486{
487 Q_D(ItemSync);
488 Q_ASSERT(d->mStreaming);
489 d->mDeliveryDone = true;
490 d->execute();
491}
492
493void ItemSync::slotResult(KJob *job)
494{
495 if (job->error()) {
496 // pretent there were no errors
497 Akonadi::Job::removeSubjob(job);
498 // propagate the first error we got but continue, we might still be fed with stuff from a resource
499 if (!error()) {
500 setError(job->error());
501 setErrorText(job->errorText());
502 }
503 } else {
504 Akonadi::Job::slotResult(job);
505 }
506}
507
508void ItemSync::rollback()
509{
510 Q_D(ItemSync);
511 setError(UserCanceled);
512 if (d->mCurrentTransaction) {
513 d->mCurrentTransaction->rollback();
514 }
515 d->mDeliveryDone = true; // user wont deliver more data
516 d->execute(); // end this in an ordered way, since we have an error set no real change will be done
517}
518
519void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
520{
521 Q_D(ItemSync);
522 d->mTransactionMode = mode;
523}
524
525int ItemSync::batchSize() const
526{
527 Q_D(const ItemSync);
528 return d->mBatchSize;
529}
530
531void ItemSync::setBatchSize(int size)
532{
533 Q_D(ItemSync);
534 d->mBatchSize = size;
535}
536
537
538#include "moc_itemsync.cpp"
539