1/*
2 Copyright (c) 2006 Till Adam <adam@kde.org>
3 Copyright (c) 2007 Volker Krause <vkrause@kde.org>
4
5 This library is free software; you can redistribute it and/or modify it
6 under the terms of the GNU Library General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or (at your
8 option) any later version.
9
10 This library is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
13 License for more details.
14
15 You should have received a copy of the GNU Library General Public License
16 along with this library; see the file COPYING.LIB. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 02110-1301, USA.
19*/
20
21#include "resourcebase.h"
22#include "agentbase_p.h"
23
24#include "resourceadaptor.h"
25#include "collectiondeletejob.h"
26#include "collectionsync_p.h"
27#include "dbusconnectionpool.h"
28#include "itemsync.h"
29#include "kdepimlibs-version.h"
30#include "resourcescheduler_p.h"
31#include "tracerinterface.h"
32#include "xdgbasedirs_p.h"
33
34#include "changerecorder.h"
35#include "collectionfetchjob.h"
36#include "collectionfetchscope.h"
37#include "collectionmodifyjob.h"
38#include "invalidatecachejob_p.h"
39#include "itemfetchjob.h"
40#include "itemfetchscope.h"
41#include "itemmodifyjob.h"
42#include "itemmodifyjob_p.h"
43#include "session.h"
44#include "resourceselectjob_p.h"
45#include "monitor_p.h"
46#include "servermanager_p.h"
47#include "recursivemover_p.h"
48
49#include <kaboutdata.h>
50#include <kcmdlineargs.h>
51#include <kdebug.h>
52#include <klocalizedstring.h>
53#include <kglobal.h>
54#include <akonadi/tagmodifyjob.h>
55
56#include <QtCore/QDebug>
57#include <QtCore/QDir>
58#include <QtCore/QHash>
59#include <QtCore/QSettings>
60#include <QtCore/QTimer>
61#include <QApplication>
62#include <QtDBus/QtDBus>
63
64using namespace Akonadi;
65
66class Akonadi::ResourceBasePrivate : public AgentBasePrivate
67{
68 Q_OBJECT
69 Q_CLASSINFO("D-Bus Interface", "org.kde.dfaure")
70
71public:
72 ResourceBasePrivate(ResourceBase *parent)
73 : AgentBasePrivate(parent)
74 , scheduler(0)
75 , mItemSyncer(0)
76 , mItemSyncFetchScope(0)
77 , mItemTransactionMode(ItemSync::SingleTransaction)
78 , mCollectionSyncer(0)
79 , mHierarchicalRid(false)
80 , mUnemittedProgress(0)
81 , mAutomaticProgressReporting(true)
82 , mDisableAutomaticItemDeliveryDone(false)
83 , mItemSyncBatchSize(10)
84 {
85 Internal::setClientType(Internal::Resource);
86 mStatusMessage = defaultReadyMessage();
87 mProgressEmissionCompressor.setInterval(1000);
88 mProgressEmissionCompressor.setSingleShot(true);
89 // HACK: skip local changes of the EntityDisplayAttribute by default. Remove this for KDE5 and adjust resource implementations accordingly.
90 mKeepLocalCollectionChanges << "ENTITYDISPLAY";
91 }
92
93 ~ResourceBasePrivate()
94 {
95 delete mItemSyncFetchScope;
96 }
97
98 Q_DECLARE_PUBLIC(ResourceBase)
99
100 void delayedInit()
101 {
102 const QString serviceId = ServerManager::agentServiceName(ServerManager::Resource, mId);
103 if (!DBusConnectionPool::threadConnection().registerService(serviceId)) {
104 QString reason = DBusConnectionPool::threadConnection().lastError().message();
105 if (reason.isEmpty()) {
106 reason = QString::fromLatin1("this service is probably running already.");
107 }
108 kError() << "Unable to register service" << serviceId << "at D-Bus:" << reason;
109
110 if (QThread::currentThread() == QCoreApplication::instance()->thread()) {
111 QCoreApplication::instance()->exit(1);
112 }
113
114 } else {
115 AgentBasePrivate::delayedInit();
116 }
117 }
118
119 virtual void changeProcessed()
120 {
121 if (m_recursiveMover) {
122 m_recursiveMover->changeProcessed();
123 QTimer::singleShot(0, m_recursiveMover, SLOT(replayNext()));
124 return;
125 }
126
127 mChangeRecorder->changeProcessed();
128 if (!mChangeRecorder->isEmpty()) {
129 scheduler->scheduleChangeReplay();
130 }
131 scheduler->taskDone();
132 }
133
134 void slotAbortRequested();
135
136 void slotDeliveryDone(KJob *job);
137 void slotCollectionSyncDone(KJob *job);
138 void slotLocalListDone(KJob *job);
139 void slotSynchronizeCollection(const Collection &col);
140 void slotCollectionListDone(KJob *job);
141 void slotSynchronizeCollectionAttributes(const Collection &col);
142 void slotCollectionListForAttributesDone(KJob *job);
143 void slotCollectionAttributesSyncDone(KJob *job);
144
145 void slotItemSyncDone(KJob *job);
146
147 void slotPercent(KJob *job, unsigned long percent);
148 void slotDelayedEmitProgress();
149 void slotDeleteResourceCollection();
150 void slotDeleteResourceCollectionDone(KJob *job);
151 void slotCollectionDeletionDone(KJob *job);
152
153 void slotInvalidateCache(const Akonadi::Collection &collection);
154
155 void slotPrepareItemRetrieval(const Akonadi::Item &item);
156 void slotPrepareItemRetrievalResult(KJob *job);
157
158 void changeCommittedResult(KJob *job);
159
160 void slotRecursiveMoveReplay(RecursiveMover *mover);
161 void slotRecursiveMoveReplayResult(KJob *job);
162
163 void slotSessionReconnected()
164 {
165 Q_Q(ResourceBase);
166
167 new ResourceSelectJob(q->identifier());
168 }
169
170 void createItemSyncInstanceIfMissing()
171 {
172 Q_Q(ResourceBase);
173 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::SyncCollection,
174 "createItemSyncInstance", "Calling items retrieval methods although no item retrieval is in progress");
175 if (!mItemSyncer) {
176 mItemSyncer = new ItemSync(q->currentCollection());
177 mItemSyncer->setTransactionMode(mItemTransactionMode);
178 mItemSyncer->setBatchSize(mItemSyncBatchSize);
179 if (mItemSyncFetchScope) {
180 mItemSyncer->setFetchScope(*mItemSyncFetchScope);
181 }
182 mItemSyncer->setDisableAutomaticDeliveryDone(mDisableAutomaticItemDeliveryDone);
183 mItemSyncer->setProperty("collection", QVariant::fromValue(q->currentCollection()));
184 connect(mItemSyncer, SIGNAL(percent(KJob*,ulong)), q, SLOT(slotPercent(KJob*,ulong)));
185 connect(mItemSyncer, SIGNAL(result(KJob*)), q, SLOT(slotItemSyncDone(KJob*)));
186 connect(mItemSyncer, SIGNAL(readyForNextBatch(int)), q, SIGNAL(retrieveNextItemSyncBatch(int)));
187 }
188 Q_ASSERT(mItemSyncer);
189 }
190
191public Q_SLOTS:
192 // Dump the state of the scheduler
193 Q_SCRIPTABLE QString dumpToString() const
194 {
195 Q_Q(const ResourceBase);
196 QString retVal;
197 QMetaObject::invokeMethod(const_cast<ResourceBase *>(q), "dumpResourceToString", Qt::DirectConnection, Q_RETURN_ARG(QString, retVal));
198 return scheduler->dumpToString() + QLatin1Char('\n') + retVal;
199 }
200
201 Q_SCRIPTABLE void dump()
202 {
203 scheduler->dump();
204 }
205
206 Q_SCRIPTABLE void clear()
207 {
208 scheduler->clear();
209 }
210
211protected Q_SLOTS:
212 // reimplementations from AgentbBasePrivate, containing sanity checks that only apply to resources
213 // such as making sure that RIDs are present as well as translations of cross-resource moves
214 // TODO: we could possibly add recovery code for no-RID notifications by re-enquing those to the change recorder
215 // as the corresponding Add notifications, although that contains a risk of endless fail/retry loops
216
217 void itemAdded(const Akonadi::Item &item, const Akonadi::Collection &collection)
218 {
219 if (collection.remoteId().isEmpty()) {
220 changeProcessed();
221 return;
222 }
223 AgentBasePrivate::itemAdded(item, collection);
224 }
225
226 void itemChanged(const Akonadi::Item &item, const QSet< QByteArray > &partIdentifiers)
227 {
228 if (item.remoteId().isEmpty()) {
229 changeProcessed();
230 return;
231 }
232 AgentBasePrivate::itemChanged(item, partIdentifiers);
233 }
234
235 void itemsFlagsChanged(const Item::List &items, const QSet< QByteArray > &addedFlags,
236 const QSet< QByteArray > &removedFlags)
237 {
238 if (addedFlags.isEmpty() && removedFlags.isEmpty()) {
239 changeProcessed();
240 return;
241 }
242
243 Item::List validItems;
244 foreach (const Akonadi::Item &item, items) {
245 if (!item.remoteId().isEmpty()) {
246 validItems << item;
247 }
248 }
249 if (validItems.isEmpty()) {
250 changeProcessed();
251 return;
252 }
253
254 AgentBasePrivate::itemsFlagsChanged(validItems, addedFlags, removedFlags);
255 }
256
257 void itemsTagsChanged(const Item::List &items, const QSet<Tag> &addedTags, const QSet<Tag> &removedTags)
258 {
259 if (addedTags.isEmpty() && removedTags.isEmpty()) {
260 changeProcessed();
261 return;
262 }
263
264 Item::List validItems;
265 foreach (const Akonadi::Item &item, items) {
266 if (!item.remoteId().isEmpty()) {
267 validItems << item;
268 }
269 }
270 if (validItems.isEmpty()) {
271 changeProcessed();
272 return;
273 }
274
275 AgentBasePrivate::itemsTagsChanged(validItems, addedTags, removedTags);
276 }
277
278 // TODO move the move translation code from AgentBasePrivate here, it's wrong for agents
279 void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &source, const Akonadi::Collection &destination)
280 {
281 if (item.remoteId().isEmpty() || destination.remoteId().isEmpty() || destination == source) {
282 changeProcessed();
283 return;
284 }
285 AgentBasePrivate::itemMoved(item, source, destination);
286 }
287
288 void itemsMoved(const Item::List &items, const Collection &source, const Collection &destination)
289 {
290 if (destination.remoteId().isEmpty() || destination == source) {
291 changeProcessed();
292 return;
293 }
294
295 Item::List validItems;
296 foreach (const Akonadi::Item &item, items) {
297 if (!item.remoteId().isEmpty()) {
298 validItems << item;
299 }
300 }
301 if (validItems.isEmpty()) {
302 changeProcessed();
303 return;
304 }
305
306 AgentBasePrivate::itemsMoved(validItems, source, destination);
307 }
308
309 void itemRemoved(const Akonadi::Item &item)
310 {
311 if (item.remoteId().isEmpty()) {
312 changeProcessed();
313 return;
314 }
315 AgentBasePrivate::itemRemoved(item);
316 }
317
318 void itemsRemoved(const Item::List &items)
319 {
320 Item::List validItems;
321 foreach (const Akonadi::Item &item, items) {
322 if (!item.remoteId().isEmpty()) {
323 validItems << item;
324 }
325 }
326 if (validItems.isEmpty()) {
327 changeProcessed();
328 return;
329 }
330
331 AgentBasePrivate::itemsRemoved(validItems);
332 }
333
334 void collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent)
335 {
336 if (parent.remoteId().isEmpty()) {
337 changeProcessed();
338 return;
339 }
340 AgentBasePrivate::collectionAdded(collection, parent);
341 }
342
343 void collectionChanged(const Akonadi::Collection &collection)
344 {
345 if (collection.remoteId().isEmpty()) {
346 changeProcessed();
347 return;
348 }
349 AgentBasePrivate::collectionChanged(collection);
350 }
351
352 void collectionChanged(const Akonadi::Collection &collection, const QSet< QByteArray > &partIdentifiers)
353 {
354 if (collection.remoteId().isEmpty()) {
355 changeProcessed();
356 return;
357 }
358 AgentBasePrivate::collectionChanged(collection, partIdentifiers);
359 }
360
361 void collectionMoved(const Akonadi::Collection &collection, const Akonadi::Collection &source, const Akonadi::Collection &destination)
362 {
363 // unknown destination or source == destination means we can't do/don't have to do anything
364 if (destination.remoteId().isEmpty() || source == destination) {
365 changeProcessed();
366 return;
367 }
368
369 // inter-resource moves, requires we know which resources the source and destination are in though
370 if (!source.resource().isEmpty() && !destination.resource().isEmpty() && source.resource() != destination.resource()) {
371 if (source.resource() == q_ptr->identifier()) { // moved away from us
372 AgentBasePrivate::collectionRemoved(collection);
373 } else if (destination.resource() == q_ptr->identifier()) { // moved to us
374 scheduler->taskDone(); // stop change replay for now
375 RecursiveMover *mover = new RecursiveMover(this);
376 mover->setCollection(collection, destination);
377 scheduler->scheduleMoveReplay(collection, mover);
378 }
379 return;
380 }
381
382 // intra-resource move, requires the moved collection to have a valid id though
383 if (collection.remoteId().isEmpty()) {
384 changeProcessed();
385 return;
386 }
387
388 // intra-resource move, ie. something we can handle internally
389 AgentBasePrivate::collectionMoved(collection, source, destination);
390 }
391
392 void collectionRemoved(const Akonadi::Collection &collection)
393 {
394 if (collection.remoteId().isEmpty()) {
395 changeProcessed();
396 return;
397 }
398 AgentBasePrivate::collectionRemoved(collection);
399 }
400
401 void tagAdded(const Akonadi::Tag &tag)
402 {
403 if (!tag.isValid()) {
404 changeProcessed();
405 return;
406 }
407
408 AgentBasePrivate::tagAdded(tag);
409 }
410
411 void tagChanged(const Akonadi::Tag &tag)
412 {
413 if (tag.remoteId().isEmpty()) {
414 changeProcessed();
415 return;
416 }
417
418 AgentBasePrivate::tagChanged(tag);
419 }
420
421 void tagRemoved(const Akonadi::Tag &tag)
422 {
423 if (tag.remoteId().isEmpty()) {
424 changeProcessed();
425 return;
426 }
427
428 AgentBasePrivate::tagRemoved(tag);
429 }
430
431public:
432 // synchronize states
433 Collection currentCollection;
434
435 ResourceScheduler *scheduler;
436 ItemSync *mItemSyncer;
437 ItemFetchScope *mItemSyncFetchScope;
438 ItemSync::TransactionMode mItemTransactionMode;
439 CollectionSync *mCollectionSyncer;
440 bool mHierarchicalRid;
441 QTimer mProgressEmissionCompressor;
442 int mUnemittedProgress;
443 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
444 bool mAutomaticProgressReporting;
445 bool mDisableAutomaticItemDeliveryDone;
446 QPointer<RecursiveMover> m_recursiveMover;
447 int mItemSyncBatchSize;
448 QSet<QByteArray> mKeepLocalCollectionChanges;
449};
450
451ResourceBase::ResourceBase(const QString &id)
452 : AgentBase(new ResourceBasePrivate(this), id)
453{
454 Q_D(ResourceBase);
455
456 new Akonadi__ResourceAdaptor(this);
457
458 d->scheduler = new ResourceScheduler(this);
459
460 d->mChangeRecorder->setChangeRecordingEnabled(true);
461 d->mChangeRecorder->setCollectionMoveTranslationEnabled(false); // we deal with this ourselves
462 connect(d->mChangeRecorder, SIGNAL(changesAdded()),
463 d->scheduler, SLOT(scheduleChangeReplay()));
464
465 d->mChangeRecorder->setResourceMonitored(d->mId.toLatin1());
466 d->mChangeRecorder->fetchCollection(true);
467
468 connect(d->scheduler, SIGNAL(executeFullSync()),
469 SLOT(retrieveCollections()));
470 connect(d->scheduler, SIGNAL(executeCollectionTreeSync()),
471 SLOT(retrieveCollections()));
472 connect(d->scheduler, SIGNAL(executeCollectionSync(Akonadi::Collection)),
473 SLOT(slotSynchronizeCollection(Akonadi::Collection)));
474 connect(d->scheduler, SIGNAL(executeCollectionAttributesSync(Akonadi::Collection)),
475 SLOT(slotSynchronizeCollectionAttributes(Akonadi::Collection)));
476 connect(d->scheduler, SIGNAL(executeItemFetch(Akonadi::Item,QSet<QByteArray>)),
477 SLOT(slotPrepareItemRetrieval(Akonadi::Item)));
478 connect(d->scheduler, SIGNAL(executeResourceCollectionDeletion()),
479 SLOT(slotDeleteResourceCollection()));
480 connect(d->scheduler, SIGNAL(executeCacheInvalidation(Akonadi::Collection)),
481 SLOT(slotInvalidateCache(Akonadi::Collection)));
482 connect(d->scheduler, SIGNAL(status(int,QString)),
483 SIGNAL(status(int,QString)));
484 connect(d->scheduler, SIGNAL(executeChangeReplay()),
485 d->mChangeRecorder, SLOT(replayNext()));
486 connect(d->scheduler, SIGNAL(executeRecursiveMoveReplay(RecursiveMover*)),
487 SLOT(slotRecursiveMoveReplay(RecursiveMover*)));
488 connect(d->scheduler, SIGNAL(fullSyncComplete()), SIGNAL(synchronized()));
489 connect(d->scheduler, SIGNAL(collectionTreeSyncComplete()), SIGNAL(collectionTreeSynchronized()));
490 connect(d->mChangeRecorder, SIGNAL(nothingToReplay()), d->scheduler, SLOT(taskDone()));
491 connect(d->mChangeRecorder, SIGNAL(collectionRemoved(Akonadi::Collection)),
492 d->scheduler, SLOT(collectionRemoved(Akonadi::Collection)));
493 connect(this, SIGNAL(abortRequested()), this, SLOT(slotAbortRequested()));
494 connect(this, SIGNAL(synchronized()), d->scheduler, SLOT(taskDone()));
495 connect(this, SIGNAL(collectionTreeSynchronized()), d->scheduler, SLOT(taskDone()));
496 connect(this, SIGNAL(agentNameChanged(QString)),
497 this, SIGNAL(nameChanged(QString)));
498
499 connect(&d->mProgressEmissionCompressor, SIGNAL(timeout()),
500 this, SLOT(slotDelayedEmitProgress()));
501
502 d->scheduler->setOnline(d->mOnline);
503 if (!d->mChangeRecorder->isEmpty()) {
504 d->scheduler->scheduleChangeReplay();
505 }
506
507 new ResourceSelectJob(identifier());
508
509 connect(d->mChangeRecorder->session(), SIGNAL(reconnected()), SLOT(slotSessionReconnected()));
510}
511
512ResourceBase::~ResourceBase()
513{
514}
515
516void ResourceBase::synchronize()
517{
518 d_func()->scheduler->scheduleFullSync();
519}
520
521void ResourceBase::setName(const QString &name)
522{
523 AgentBase::setAgentName(name);
524}
525
526QString ResourceBase::name() const
527{
528 return AgentBase::agentName();
529}
530
531QString ResourceBase::parseArguments(int argc, char **argv)
532{
533 QString identifier;
534 if (argc < 3) {
535 kDebug() << "Not enough arguments passed...";
536 exit(1);
537 }
538
539 for (int i = 1; i < argc - 1; ++i) {
540 if (QLatin1String(argv[i]) == QLatin1String("--identifier")) {
541 identifier = QLatin1String(argv[i + 1]);
542 }
543 }
544
545 if (identifier.isEmpty()) {
546 kDebug() << "Identifier argument missing";
547 exit(1);
548 }
549
550 const QFileInfo fi(QString::fromLocal8Bit(argv[0]));
551 // strip off full path and possible .exe suffix
552 const QByteArray catalog = fi.baseName().toLatin1();
553
554 KCmdLineArgs::init(argc, argv, ServerManager::addNamespace(identifier).toLatin1(), catalog,
555 ki18nc("@title application name", "Akonadi Resource"), KDEPIMLIBS_VERSION,
556 ki18nc("@title application description", "Akonadi Resource"));
557
558 KCmdLineOptions options;
559 options.add("identifier <argument>",
560 ki18nc("@label commandline option", "Resource identifier"));
561 KCmdLineArgs::addCmdLineOptions(options);
562
563 return identifier;
564}
565
566int ResourceBase::init(ResourceBase *r)
567{
568 QApplication::setQuitOnLastWindowClosed(false);
569 KGlobal::locale()->insertCatalog(QLatin1String("libakonadi"));
570 int rv = kapp->exec();
571 delete r;
572 return rv;
573}
574
575void ResourceBasePrivate::slotAbortRequested()
576{
577 Q_Q(ResourceBase);
578
579 scheduler->cancelQueues();
580 QMetaObject::invokeMethod(q, "abortActivity");
581}
582
583void ResourceBase::itemRetrieved(const Item &item)
584{
585 Q_D(ResourceBase);
586 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
587 if (!item.isValid()) {
588 d->scheduler->currentTask().sendDBusReplies(i18nc("@info", "Invalid item retrieved"));
589 d->scheduler->taskDone();
590 return;
591 }
592
593 Item i(item);
594 QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
595 foreach (const QByteArray &part, requestedParts) {
596 if (!item.loadedPayloadParts().contains(part)) {
597 kWarning() << "Item does not provide part" << part;
598 }
599 }
600
601 ItemModifyJob *job = new ItemModifyJob(i);
602 job->d_func()->setSilent( true );
603 // FIXME: remove once the item with which we call retrieveItem() has a revision number
604 job->disableRevisionCheck();
605 connect(job, SIGNAL(result(KJob*)), SLOT(slotDeliveryDone(KJob*)));
606}
607
608void ResourceBasePrivate::slotDeliveryDone(KJob *job)
609{
610 Q_Q(ResourceBase);
611 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::FetchItem);
612 if (job->error()) {
613 emit q->error(i18nc("@info", "Error while creating item: %1", job->errorString()));
614 }
615 scheduler->currentTask().sendDBusReplies(job->error() ? job->errorString() : QString());
616 scheduler->taskDone();
617}
618
619void ResourceBase::collectionAttributesRetrieved(const Collection &collection)
620{
621 Q_D(ResourceBase);
622 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
623 if (!collection.isValid()) {
624 emit attributesSynchronized(d->scheduler->currentTask().collection.id());
625 d->scheduler->taskDone();
626 return;
627 }
628
629 CollectionModifyJob *job = new CollectionModifyJob(collection);
630 connect(job, SIGNAL(result(KJob*)), SLOT(slotCollectionAttributesSyncDone(KJob*)));
631}
632
633void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob *job)
634{
635 Q_Q(ResourceBase);
636 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
637 if (job->error()) {
638 emit q->error(i18nc("@info", "Error while updating collection: %1", job->errorString()));
639 }
640 emit q->attributesSynchronized(scheduler->currentTask().collection.id());
641 scheduler->taskDone();
642}
643
644void ResourceBasePrivate::slotDeleteResourceCollection()
645{
646 Q_Q(ResourceBase);
647
648 CollectionFetchJob *job = new CollectionFetchJob(Collection::root(), CollectionFetchJob::FirstLevel);
649 job->fetchScope().setResource(q->identifier());
650 connect(job, SIGNAL(result(KJob*)), q, SLOT(slotDeleteResourceCollectionDone(KJob*)));
651}
652
653void ResourceBasePrivate::slotDeleteResourceCollectionDone(KJob *job)
654{
655 Q_Q(ResourceBase);
656 if (job->error()) {
657 emit q->error(job->errorString());
658 scheduler->taskDone();
659 } else {
660 const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob *>(job);
661
662 if (!fetchJob->collections().isEmpty()) {
663 CollectionDeleteJob *job = new CollectionDeleteJob(fetchJob->collections().first());
664 connect(job, SIGNAL(result(KJob*)), q, SLOT(slotCollectionDeletionDone(KJob*)));
665 } else {
666 // there is no resource collection, so just ignore the request
667 scheduler->taskDone();
668 }
669 }
670}
671
672void ResourceBasePrivate::slotCollectionDeletionDone(KJob *job)
673{
674 Q_Q(ResourceBase);
675 if (job->error()) {
676 emit q->error(job->errorString());
677 }
678
679 scheduler->taskDone();
680}
681
682void ResourceBasePrivate::slotInvalidateCache(const Akonadi::Collection &collection)
683{
684 Q_Q(ResourceBase);
685 InvalidateCacheJob *job = new InvalidateCacheJob(collection, q);
686 connect(job, SIGNAL(result(KJob*)), scheduler, SLOT(taskDone()));
687}
688
689void ResourceBase::changeCommitted(const Item &item)
690{
691 changesCommitted(Item::List() << item);
692}
693
694void ResourceBase::changesCommitted(const Item::List &items)
695{
696 ItemModifyJob *job = new ItemModifyJob(items);
697 job->d_func()->setClean();
698 job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error?
699 job->setIgnorePayload(true); // we only want to reset the dirty flag and update the remote id
700 job->setUpdateGid(true); // allow resources to update GID too
701 connect(job, SIGNAL(finished(KJob*)), this, SLOT(changeCommittedResult(KJob*)));
702}
703
704void ResourceBase::changeCommitted(const Collection &collection)
705{
706 CollectionModifyJob *job = new CollectionModifyJob(collection);
707 connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
708}
709
710void ResourceBasePrivate::changeCommittedResult(KJob *job)
711{
712 Q_Q(ResourceBase);
713 if (qobject_cast<CollectionModifyJob *>(job)) {
714 if (job->error()) {
715 emit q->error(i18nc("@info", "Updating local collection failed: %1.", job->errorText()));
716 }
717 mChangeRecorder->d_ptr->invalidateCache(static_cast<CollectionModifyJob *>(job)->collection());
718 } else {
719 // TODO: Error handling for item changes?
720 // Item and tag cache is invalidated by modify job
721 }
722
723 changeProcessed();
724}
725
726void ResourceBase::changeCommitted(const Tag &tag)
727{
728 TagModifyJob *job = new TagModifyJob(tag);
729 connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
730}
731
732bool ResourceBase::requestItemDelivery(qint64 uid, const QString &remoteId,
733 const QString &mimeType, const QStringList &parts)
734{
735 return requestItemDeliveryV2(uid, remoteId, mimeType, parts).isEmpty();
736}
737
738QString ResourceBase::requestItemDeliveryV2(qint64 uid, const QString &remoteId, const QString &mimeType, const QStringList &_parts)
739{
740 Q_D(ResourceBase);
741 if (!isOnline()) {
742 const QString errorMsg = i18nc("@info", "Cannot fetch item in offline mode.");
743 emit error(errorMsg);
744 return errorMsg;
745 }
746
747 setDelayedReply(true);
748 // FIXME: we need at least the revision number too
749 Item item(uid);
750 item.setMimeType(mimeType);
751 item.setRemoteId(remoteId);
752
753 QSet<QByteArray> parts;
754 Q_FOREACH (const QString &str, _parts) {
755 parts.insert(str.toLatin1());
756 }
757
758 d->scheduler->scheduleItemFetch(item, parts, message());
759
760 return QString();
761
762}
763
764void ResourceBase::collectionsRetrieved(const Collection::List &collections)
765{
766 Q_D(ResourceBase);
767 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
768 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
769 "ResourceBase::collectionsRetrieved()",
770 "Calling collectionsRetrieved() although no collection retrieval is in progress");
771 if (!d->mCollectionSyncer) {
772 d->mCollectionSyncer = new CollectionSync(identifier());
773 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
774 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
775 connect(d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), SLOT(slotPercent(KJob*,ulong)));
776 connect(d->mCollectionSyncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)));
777 }
778 d->mCollectionSyncer->setRemoteCollections(collections);
779}
780
781void ResourceBase::collectionsRetrievedIncremental(const Collection::List &changedCollections,
782 const Collection::List &removedCollections)
783{
784 Q_D(ResourceBase);
785 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
786 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
787 "ResourceBase::collectionsRetrievedIncremental()",
788 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress");
789 if (!d->mCollectionSyncer) {
790 d->mCollectionSyncer = new CollectionSync(identifier());
791 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
792 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
793 connect(d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), SLOT(slotPercent(KJob*,ulong)));
794 connect(d->mCollectionSyncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)));
795 }
796 d->mCollectionSyncer->setRemoteCollections(changedCollections, removedCollections);
797}
798
799void ResourceBase::setCollectionStreamingEnabled(bool enable)
800{
801 Q_D(ResourceBase);
802 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
803 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
804 "ResourceBase::setCollectionStreamingEnabled()",
805 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress");
806 if (!d->mCollectionSyncer) {
807 d->mCollectionSyncer = new CollectionSync(identifier());
808 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
809 connect(d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), SLOT(slotPercent(KJob*,ulong)));
810 connect(d->mCollectionSyncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)));
811 }
812 d->mCollectionSyncer->setStreamingEnabled(enable);
813}
814
815void ResourceBase::collectionsRetrievalDone()
816{
817 Q_D(ResourceBase);
818 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
819 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
820 "ResourceBase::collectionsRetrievalDone()",
821 "Calling collectionsRetrievalDone() although no collection retrieval is in progress");
822 // streaming enabled, so finalize the sync
823 if (d->mCollectionSyncer) {
824 d->mCollectionSyncer->retrievalDone();
825 } else {
826 // user did the sync himself, we are done now
827 // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here!
828 d->scheduler->taskDone();
829 }
830}
831
832void ResourceBase::setKeepLocalCollectionChanges(const QSet<QByteArray> &parts)
833{
834 Q_D(ResourceBase);
835 d->mKeepLocalCollectionChanges = parts;
836}
837
838void ResourceBasePrivate::slotCollectionSyncDone(KJob *job)
839{
840 Q_Q(ResourceBase);
841 mCollectionSyncer = 0;
842 if (job->error()) {
843 if (job->error() != Job::UserCanceled) {
844 emit q->error(job->errorString());
845 }
846 } else {
847 if (scheduler->currentTask().type == ResourceScheduler::SyncAll) {
848 CollectionFetchJob *list = new CollectionFetchJob(Collection::root(), CollectionFetchJob::Recursive);
849 list->setFetchScope(q->changeRecorder()->collectionFetchScope());
850 list->fetchScope().setResource(mId);
851 list->fetchScope().setListFilter(CollectionFetchScope::Sync);
852 q->connect(list, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)));
853 return;
854 } else if (scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree) {
855 scheduler->scheduleCollectionTreeSyncCompletion();
856 }
857 }
858 scheduler->taskDone();
859}
860
861void ResourceBasePrivate::slotLocalListDone(KJob *job)
862{
863 Q_Q(ResourceBase);
864 if (job->error()) {
865 emit q->error(job->errorString());
866 } else {
867 Collection::List cols = static_cast<CollectionFetchJob *>(job)->collections();
868 foreach (const Collection &col, cols) {
869 scheduler->scheduleSync(col);
870 }
871 scheduler->scheduleFullSyncCompletion();
872 }
873 scheduler->taskDone();
874}
875
876void ResourceBasePrivate::slotSynchronizeCollection(const Collection &col)
877{
878 Q_Q(ResourceBase);
879 currentCollection = col;
880 // This can happen due to FetchHelper::triggerOnDemandFetch() in the akonadi server (not an error).
881 if (!col.remoteId().isEmpty()) {
882 // check if this collection actually can contain anything
883 QStringList contentTypes = currentCollection.contentMimeTypes();
884 contentTypes.removeAll(Collection::mimeType());
885 contentTypes.removeAll(Collection::virtualMimeType());
886 if (!contentTypes.isEmpty() || col.isVirtual()) {
887 if (mAutomaticProgressReporting) {
888 emit q->status(AgentBase::Running, i18nc("@info:status", "Syncing folder '%1'", currentCollection.displayName()));
889 }
890 q->retrieveItems(currentCollection);
891 return;
892 }
893 }
894 scheduler->taskDone();
895}
896
897int ResourceBase::itemSyncBatchSize() const
898{
899 Q_D(const ResourceBase);
900 return d->mItemSyncBatchSize;
901}
902
903void ResourceBase::setItemSyncBatchSize(int batchSize)
904{
905 Q_D(ResourceBase);
906 d->mItemSyncBatchSize = batchSize;
907}
908
909void ResourceBasePrivate::slotSynchronizeCollectionAttributes(const Collection &col)
910{
911 Q_Q(ResourceBase);
912 QMetaObject::invokeMethod(q, "retrieveCollectionAttributes", Q_ARG(Akonadi::Collection, col));
913}
914
915void ResourceBasePrivate::slotPrepareItemRetrieval(const Akonadi::Item &item)
916{
917 Q_Q(ResourceBase);
918 ItemFetchJob *fetch = new ItemFetchJob(item, this);
919 fetch->fetchScope().setAncestorRetrieval(q->changeRecorder()->itemFetchScope().ancestorRetrieval());
920 fetch->fetchScope().setCacheOnly(true);
921
922 // copy list of attributes to fetch
923 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
924 foreach (const QByteArray &attribute, attributes) {
925 fetch->fetchScope().fetchAttribute(attribute);
926 }
927
928 q->connect(fetch, SIGNAL(result(KJob*)), SLOT(slotPrepareItemRetrievalResult(KJob*)));
929}
930
931void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
932{
933 Q_Q(ResourceBase);
934 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItem,
935 "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
936 "Preparing item retrieval although no item retrieval is in progress");
937 if (job->error()) {
938 q->cancelTask(job->errorText());
939 return;
940 }
941 ItemFetchJob *fetch = qobject_cast<ItemFetchJob *>(job);
942 if (fetch->items().count() != 1) {
943 q->cancelTask(i18n("The requested item no longer exists"));
944 return;
945 }
946 const Item item = fetch->items().first();
947 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
948 if (!q->retrieveItem(item, parts)) {
949 q->cancelTask();
950 }
951}
952
953void ResourceBasePrivate::slotRecursiveMoveReplay(RecursiveMover *mover)
954{
955 Q_Q(ResourceBase);
956 Q_ASSERT(mover);
957 Q_ASSERT(!m_recursiveMover);
958 m_recursiveMover = mover;
959 connect(mover, SIGNAL(result(KJob*)), q, SLOT(slotRecursiveMoveReplayResult(KJob*)));
960 mover->start();
961}
962
963void ResourceBasePrivate::slotRecursiveMoveReplayResult(KJob *job)
964{
965 Q_Q(ResourceBase);
966 m_recursiveMover = 0;
967
968 if (job->error()) {
969 q->deferTask();
970 return;
971 }
972
973 changeProcessed();
974}
975
976void ResourceBase::itemsRetrievalDone()
977{
978 Q_D(ResourceBase);
979 // streaming enabled, so finalize the sync
980 if (d->mItemSyncer) {
981 d->mItemSyncer->deliveryDone();
982 } else {
983 // user did the sync himself, we are done now
984 d->scheduler->taskDone();
985 }
986}
987
988void ResourceBase::clearCache()
989{
990 Q_D(ResourceBase);
991 d->scheduler->scheduleResourceCollectionDeletion();
992}
993
994void ResourceBase::invalidateCache(const Collection &collection)
995{
996 Q_D(ResourceBase);
997 d->scheduler->scheduleCacheInvalidation(collection);
998}
999
1000Collection ResourceBase::currentCollection() const
1001{
1002 Q_D(const ResourceBase);
1003 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollection ,
1004 "ResourceBase::currentCollection()",
1005 "Trying to access current collection although no item retrieval is in progress");
1006 return d->currentCollection;
1007}
1008
1009Item ResourceBase::currentItem() const
1010{
1011 Q_D(const ResourceBase);
1012 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem ,
1013 "ResourceBase::currentItem()",
1014 "Trying to access current item although no item retrieval is in progress");
1015 return d->scheduler->currentTask().item;
1016}
1017
1018void ResourceBase::synchronizeCollectionTree()
1019{
1020 d_func()->scheduler->scheduleCollectionTreeSync();
1021}
1022
1023void ResourceBase::cancelTask()
1024{
1025 Q_D(ResourceBase);
1026 switch (d->scheduler->currentTask().type) {
1027 case ResourceScheduler::FetchItem:
1028 itemRetrieved(Item()); // sends the error reply and
1029 break;
1030 case ResourceScheduler::ChangeReplay:
1031 d->changeProcessed();
1032 break;
1033 case ResourceScheduler::SyncCollectionTree:
1034 case ResourceScheduler::SyncAll:
1035 if (d->mCollectionSyncer) {
1036 d->mCollectionSyncer->rollback();
1037 } else {
1038 d->scheduler->taskDone();
1039 }
1040 break;
1041 case ResourceScheduler::SyncCollection:
1042 if (d->mItemSyncer) {
1043 d->mItemSyncer->rollback();
1044 } else {
1045 d->scheduler->taskDone();
1046 }
1047 break;
1048 default:
1049 d->scheduler->taskDone();
1050 }
1051}
1052
1053void ResourceBase::cancelTask(const QString &msg)
1054{
1055 cancelTask();
1056
1057 emit error(msg);
1058}
1059
1060void ResourceBase::deferTask()
1061{
1062 Q_D(ResourceBase);
1063 d->scheduler->deferTask();
1064}
1065
1066void ResourceBase::doSetOnline(bool state)
1067{
1068 d_func()->scheduler->setOnline(state);
1069}
1070
1071void ResourceBase::synchronizeCollection(qint64 collectionId)
1072{
1073 synchronizeCollection(collectionId, false);
1074}
1075
1076void ResourceBase::synchronizeCollection(qint64 collectionId, bool recursive)
1077{
1078 CollectionFetchJob *job = new CollectionFetchJob(Collection(collectionId), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base);
1079 job->setFetchScope(changeRecorder()->collectionFetchScope());
1080 job->fetchScope().setResource(identifier());
1081 job->fetchScope().setListFilter(CollectionFetchScope::Sync);
1082 connect(job, SIGNAL(result(KJob*)), SLOT(slotCollectionListDone(KJob*)));
1083}
1084
1085void ResourceBasePrivate::slotCollectionListDone(KJob *job)
1086{
1087 if (!job->error()) {
1088 const Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1089 Q_FOREACH (const Collection &collection, list) {
1090 //We also get collections that should not be synced but are part of the tree.
1091 if (collection.shouldList(Collection::ListSync)) {
1092 scheduler->scheduleSync(collection);
1093 }
1094 }
1095 } else {
1096 kWarning() << "Failed to fetch collection for collection sync: " << job->errorString();
1097 }
1098}
1099
1100void ResourceBase::synchronizeCollectionAttributes(qint64 collectionId)
1101{
1102 CollectionFetchJob *job = new CollectionFetchJob(Collection(collectionId), CollectionFetchJob::Base);
1103 job->setFetchScope(changeRecorder()->collectionFetchScope());
1104 job->fetchScope().setResource(identifier());
1105 connect(job, SIGNAL(result(KJob*)), SLOT(slotCollectionListForAttributesDone(KJob*)));
1106}
1107
1108void ResourceBasePrivate::slotCollectionListForAttributesDone(KJob *job)
1109{
1110 if (!job->error()) {
1111 Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1112 if (!list.isEmpty()) {
1113 Collection col = list.first();
1114 scheduler->scheduleAttributesSync(col);
1115 }
1116 }
1117 // TODO: error handling
1118}
1119
1120void ResourceBase::setTotalItems(int amount)
1121{
1122 kDebug() << amount;
1123 Q_D(ResourceBase);
1124 setItemStreamingEnabled(true);
1125 if (d->mItemSyncer) {
1126 d->mItemSyncer->setTotalItems(amount);
1127 }
1128}
1129
1130void ResourceBase::setDisableAutomaticItemDeliveryDone(bool disable)
1131{
1132 Q_D(ResourceBase);
1133 d->mDisableAutomaticItemDeliveryDone = disable;
1134}
1135
1136void ResourceBase::setItemStreamingEnabled(bool enable)
1137{
1138 Q_D(ResourceBase);
1139 d->createItemSyncInstanceIfMissing();
1140 if (d->mItemSyncer) {
1141 d->mItemSyncer->setStreamingEnabled(enable);
1142 }
1143}
1144
1145void ResourceBase::itemsRetrieved(const Item::List &items)
1146{
1147 Q_D(ResourceBase);
1148 d->createItemSyncInstanceIfMissing();
1149 if (d->mItemSyncer) {
1150 d->mItemSyncer->setFullSyncItems(items);
1151 }
1152}
1153
1154void ResourceBase::itemsRetrievedIncremental(const Item::List &changedItems,
1155 const Item::List &removedItems)
1156{
1157 Q_D(ResourceBase);
1158 d->createItemSyncInstanceIfMissing();
1159 if (d->mItemSyncer) {
1160 d->mItemSyncer->setIncrementalSyncItems(changedItems, removedItems);
1161 }
1162}
1163
1164void ResourceBasePrivate::slotItemSyncDone(KJob *job)
1165{
1166 mItemSyncer = 0;
1167 Q_Q(ResourceBase);
1168 if (job->error() && job->error() != Job::UserCanceled) {
1169 emit q->error(job->errorString());
1170 }
1171 scheduler->taskDone();
1172}
1173
1174void ResourceBasePrivate::slotDelayedEmitProgress()
1175{
1176 Q_Q(ResourceBase);
1177 if (mAutomaticProgressReporting) {
1178 emit q->percent(mUnemittedProgress);
1179
1180 Q_FOREACH (const QVariantMap &statusMap, mUnemittedAdvancedStatus) {
1181 emit q->advancedStatus(statusMap);
1182 }
1183 }
1184 mUnemittedProgress = 0;
1185 mUnemittedAdvancedStatus.clear();
1186}
1187
1188void ResourceBasePrivate::slotPercent(KJob *job, unsigned long percent)
1189{
1190 mUnemittedProgress = percent;
1191
1192 const Collection collection = job->property("collection").value<Collection>();
1193 if (collection.isValid()) {
1194 QVariantMap statusMap;
1195 statusMap.insert(QLatin1String("key"), QString::fromLatin1("collectionSyncProgress"));
1196 statusMap.insert(QLatin1String("collectionId"), collection.id());
1197 statusMap.insert(QLatin1String("percent"), static_cast<unsigned int>(percent));
1198
1199 mUnemittedAdvancedStatus[collection.id()] = statusMap;
1200 }
1201 // deliver completion right away, intermediate progress at 1s intervals
1202 if (percent == 100) {
1203 mProgressEmissionCompressor.stop();
1204 slotDelayedEmitProgress();
1205 } else if (!mProgressEmissionCompressor.isActive()) {
1206 mProgressEmissionCompressor.start();
1207 }
1208}
1209
1210void ResourceBase::setHierarchicalRemoteIdentifiersEnabled(bool enable)
1211{
1212 Q_D(ResourceBase);
1213 d->mHierarchicalRid = enable;
1214}
1215
1216void ResourceBase::scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority)
1217{
1218 Q_D(ResourceBase);
1219 d->scheduler->scheduleCustomTask(receiver, method, argument, priority);
1220}
1221
1222void ResourceBase::taskDone()
1223{
1224 Q_D(ResourceBase);
1225 d->scheduler->taskDone();
1226}
1227
1228void ResourceBase::retrieveCollectionAttributes(const Collection &collection)
1229{
1230 collectionAttributesRetrieved(collection);
1231}
1232
1233void Akonadi::ResourceBase::abortActivity()
1234{
1235}
1236
1237void ResourceBase::setItemTransactionMode(ItemSync::TransactionMode mode)
1238{
1239 Q_D(ResourceBase);
1240 d->mItemTransactionMode = mode;
1241}
1242
1243void ResourceBase::setItemSynchronizationFetchScope(const ItemFetchScope &fetchScope)
1244{
1245 Q_D(ResourceBase);
1246 if (!d->mItemSyncFetchScope) {
1247 d->mItemSyncFetchScope = new ItemFetchScope;
1248 }
1249 *(d->mItemSyncFetchScope) = fetchScope;
1250}
1251
1252void ResourceBase::setAutomaticProgressReporting(bool enabled)
1253{
1254 Q_D(ResourceBase);
1255 d->mAutomaticProgressReporting = enabled;
1256}
1257
1258QString ResourceBase::dumpNotificationListToString() const
1259{
1260 Q_D(const ResourceBase);
1261 return d->dumpNotificationListToString();
1262}
1263
1264QString ResourceBase::dumpSchedulerToString() const
1265{
1266 Q_D(const ResourceBase);
1267 return d->dumpToString();
1268}
1269
1270void ResourceBase::dumpMemoryInfo() const
1271{
1272 Q_D(const ResourceBase);
1273 return d->dumpMemoryInfo();
1274}
1275
1276QString ResourceBase::dumpMemoryInfoToString() const
1277{
1278 Q_D(const ResourceBase);
1279 return d->dumpMemoryInfoToString();
1280}
1281
1282#include "resourcebase.moc"
1283#include "moc_resourcebase.cpp"
1284