1/*
2 Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
3
4 This library is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or (at your
7 option) any later version.
8
9 This library is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12 License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this library; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17 02110-1301, USA.
18*/
19
20// @cond PRIVATE
21
22#include "monitor_p.h"
23
24#include "collectionfetchjob.h"
25#include "collectionstatistics.h"
26#include "dbusconnectionpool.h"
27#include "itemfetchjob.h"
28#include "notificationmessagev2_p.h"
29#include "notificationmanagerinterface.h"
30#include "session.h"
31#include "changemediator_p.h"
32
33#include <kdebug.h>
34#include <kcomponentdata.h>
35
36using namespace Akonadi;
37
38static const int PipelineSize = 5;
39
40MonitorPrivate::MonitorPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, Monitor *parent)
41 : q_ptr(parent)
42 , dependenciesFactory(dependenciesFactory_ ? dependenciesFactory_ : new ChangeNotificationDependenciesFactory)
43 , notificationSource(0)
44 , monitorAll(false)
45 , mFetchChangedOnly(false)
46 , session(Session::defaultSession())
47 , collectionCache(0)
48 , itemCache(0)
49 , tagCache(0)
50 , fetchCollection(false)
51 , fetchCollectionStatistics(false)
52 , collectionMoveTranslationEnabled(true)
53 , useRefCounting(false)
54{
55}
56
57void MonitorPrivate::init()
58{
59 // needs to be at least 3x pipeline size for the collection move case
60 collectionCache = dependenciesFactory->createCollectionCache(3 * PipelineSize, session);
61 // needs to be at least 1x pipeline size
62 itemCache = dependenciesFactory->createItemListCache(PipelineSize, session);
63
64 // 20 tags looks like a reasonable mount to keep around
65 tagCache = dependenciesFactory->createTagListCache(20, session);
66
67 QObject::connect(collectionCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable()));
68 QObject::connect(itemCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable()));
69 QObject::connect(tagCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable()));
70 QObject::connect(ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)),
71 q_ptr, SLOT(serverStateChanged(Akonadi::ServerManager::State)));
72
73 NotificationMessageV2::registerDBusTypes();
74 NotificationMessageV3::registerDBusTypes();
75
76 statisticsCompressionTimer.setSingleShot(true);
77 statisticsCompressionTimer.setInterval(500);
78 QObject::connect(&statisticsCompressionTimer, SIGNAL(timeout()), q_ptr, SLOT(slotFlushRecentlyChangedCollections()));
79}
80
81bool MonitorPrivate::connectToNotificationManager()
82{
83 delete notificationSource;
84 notificationSource = 0;
85
86 notificationSource = dependenciesFactory->createNotificationSource(q_ptr);
87
88 if (!notificationSource) {
89 return false;
90 }
91
92 QObject::connect(notificationSource, SIGNAL(notifyV3(Akonadi::NotificationMessageV3::List)),
93 q_ptr, SLOT(slotNotify(Akonadi::NotificationMessageV3::List)));
94
95 return true;
96}
97
98void MonitorPrivate::serverStateChanged(ServerManager::State state)
99{
100 if (state == ServerManager::Running) {
101 connectToNotificationManager();
102 notificationSource->setAllMonitored(monitorAll);
103 Q_FOREACH (const Collection &col, collections) {
104 notificationSource->setMonitoredCollection(col.id(), true);
105 }
106 Q_FOREACH (const Entity::Id id, items) {
107 notificationSource->setMonitoredItem(id, true);
108 }
109 Q_FOREACH (const QByteArray &resource, resources) {
110 notificationSource->setMonitoredResource(resource, true);
111 }
112 Q_FOREACH (const QByteArray &session, sessions) {
113 notificationSource->setIgnoredSession(session, true);
114 }
115 Q_FOREACH (const QString &mimeType, mimetypes) {
116 notificationSource->setMonitoredMimeType(mimeType, true);
117 }
118 Q_FOREACH (Tag::Id tagId, tags) {
119 notificationSource->setMonitoredTag(tagId, true);
120 }
121 Q_FOREACH (Monitor::Type type, types) {
122 notificationSource->setMonitoredType(static_cast<NotificationMessageV2::Type>(type), true);
123 }
124 }
125}
126
127void MonitorPrivate::invalidateCollectionCache(qint64 id)
128{
129 collectionCache->update(id, mCollectionFetchScope);
130}
131
132void MonitorPrivate::invalidateItemCache(qint64 id)
133{
134 itemCache->update(QList<Entity::Id>() << id, mItemFetchScope);
135}
136
137void MonitorPrivate::invalidateTagCache(qint64 id)
138{
139 tagCache->update(QList<Tag::Id>() << id, mTagFetchScope);
140}
141
142int MonitorPrivate::pipelineSize() const
143{
144 return PipelineSize;
145}
146
147bool MonitorPrivate::isLazilyIgnored(const NotificationMessageV3 &msg, bool allowModifyFlagsConversion) const
148{
149 NotificationMessageV2::Operation op = msg.operation();
150
151 if (msg.type() == NotificationMessageV2::Tags
152 && ((op == NotificationMessageV2::Add && q_ptr->receivers(SIGNAL(tagAdded(Akonadi::Tag))) == 0)
153 || (op == NotificationMessageV2::Modify && q_ptr->receivers(SIGNAL(tagChanged(Akonadi::Tag))) == 0)
154 || (op == NotificationMessageV2::Remove && q_ptr->receivers(SIGNAL(tagRemoved(Akonadi::Tag))) == 0))) {
155 return true;
156 }
157
158 if (!fetchCollectionStatistics
159 && (msg.type() == NotificationMessageV2::Items)
160 && ((op == NotificationMessageV2::Add && q_ptr->receivers(SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))) == 0)
161 || (op == NotificationMessageV2::Remove && q_ptr->receivers(SIGNAL(itemRemoved(Akonadi::Item))) == 0
162 && q_ptr->receivers(SIGNAL(itemsRemoved(Akonadi::Item::List))) == 0)
163 || (op == NotificationMessageV2::Modify && q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet<QByteArray>))) == 0)
164 || (op == NotificationMessageV2::ModifyFlags
165 && (q_ptr->receivers(SIGNAL(itemsFlagsChanged(Akonadi::Item::List,QSet<QByteArray>,QSet<QByteArray>))) == 0
166 // Newly delivered ModifyFlags notifications will be converted to
167 // itemChanged(item, "FLAGS") for legacy clients.
168 && (!allowModifyFlagsConversion || q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet<QByteArray>))) == 0)))
169 || (op == NotificationMessageV2::ModifyTags && q_ptr->receivers(SIGNAL(itemsTagsChanged(Akonadi::Item::List,QSet<Akonadi::Tag>,QSet<Akonadi::Tag>))) == 0)
170 || (op == NotificationMessageV2::Move && q_ptr->receivers(SIGNAL(itemMoved(Akonadi::Item,Akonadi::Collection,Akonadi::Collection))) == 0
171 && q_ptr->receivers(SIGNAL(itemsMoved(Akonadi::Item::List,Akonadi::Collection,Akonadi::Collection))) == 0)
172 || (op == NotificationMessageV2::Link && q_ptr->receivers(SIGNAL(itemLinked(Akonadi::Item,Akonadi::Collection))) == 0
173 && q_ptr->receivers(SIGNAL(itemsLinked(Akonadi::Item::List,Akonadi::Collection))) == 0)
174 || (op == NotificationMessageV2::Unlink && q_ptr->receivers(SIGNAL(itemUnlinked(Akonadi::Item,Akonadi::Collection))) == 0
175 && q_ptr->receivers(SIGNAL(itemsUnlinked(Akonadi::Item::List,Akonadi::Collection))) == 0))) {
176 return true;
177 }
178
179 if (!useRefCounting) {
180 return false;
181 }
182
183 if (msg.type() == NotificationMessageV2::Collections) {
184 // Lazy fetching can only affects items.
185 return false;
186 }
187
188 Collection::Id parentCollectionId = msg.parentCollection();
189
190 if ((op == NotificationMessageV2::Add)
191 || (op == NotificationMessageV2::Remove)
192 || (op == NotificationMessageV2::Modify)
193 || (op == NotificationMessageV2::ModifyFlags)
194 || (op == NotificationMessageV2::ModifyTags)
195 || (op == NotificationMessageV2::Link)
196 || (op == NotificationMessageV2::Unlink)) {
197 if (isMonitored(parentCollectionId)) {
198 return false;
199 }
200 }
201
202 if (op == NotificationMessageV2::Move) {
203 if (!isMonitored(parentCollectionId) && !isMonitored(msg.parentDestCollection())) {
204 return true;
205 }
206 // We can't ignore the move. It must be transformed later into a removal or insertion.
207 return false;
208 }
209 return true;
210}
211
212void MonitorPrivate::checkBatchSupport(const NotificationMessageV3 &msg, bool &needsSplit, bool &batchSupported) const
213{
214 const bool isBatch = (msg.entities().count() > 1);
215
216 if (msg.type() == NotificationMessageV2::Items) {
217 switch (msg.operation()) {
218 case NotificationMessageV2::Add:
219 needsSplit = isBatch;
220 batchSupported = false;
221 return;
222 case NotificationMessageV2::Modify:
223 needsSplit = isBatch;
224 batchSupported = false;
225 return;
226 case NotificationMessageV2::ModifyFlags:
227 batchSupported = q_ptr->receivers(SIGNAL(itemsFlagsChanged(Akonadi::Item::List,QSet<QByteArray>,QSet<QByteArray>))) > 0;
228 needsSplit = isBatch && !batchSupported && q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet<QByteArray>))) > 0;
229 return;
230 case NotificationMessageV2::ModifyTags:
231 // Tags were added after batch notifications, so they are always supported
232 batchSupported = true;
233 needsSplit = false;
234 return;
235 case NotificationMessageV2::Move:
236 needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemMoved(Akonadi::Item,Akonadi::Collection,Akonadi::Collection))) > 0;
237 batchSupported = q_ptr->receivers(SIGNAL(itemsMoved(Akonadi::Item::List,Akonadi::Collection,Akonadi::Collection))) > 0;
238 return;
239 case NotificationMessageV2::Remove:
240 needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemRemoved(Akonadi::Item))) > 0;
241 batchSupported = q_ptr->receivers(SIGNAL(itemsRemoved(Akonadi::Item::List))) > 0;
242 return;
243 case NotificationMessageV2::Link:
244 needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemLinked(Akonadi::Item,Akonadi::Collection))) > 0;
245 batchSupported = q_ptr->receivers(SIGNAL(itemsLinked(Akonadi::Item::List,Akonadi::Collection))) > 0;
246 return;
247 case NotificationMessageV2::Unlink:
248 needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemUnlinked(Akonadi::Item,Akonadi::Collection))) > 0;
249 batchSupported = q_ptr->receivers(SIGNAL(itemsUnlinked(Akonadi::Item::List,Akonadi::Collection))) > 0;
250 return;
251 default:
252 needsSplit = isBatch;
253 batchSupported = false;
254 kDebug() << "Unknown operation type" << msg.operation() << "in item change notification";
255 return;
256 }
257 } else if (msg.type() == NotificationMessageV2::Collections) {
258 needsSplit = isBatch;
259 batchSupported = false;
260 } else if (msg.type() == NotificationMessageV2::Tags) {
261 needsSplit = isBatch;
262 batchSupported = false;
263 }
264}
265
266NotificationMessageV3::List MonitorPrivate::splitMessage(const NotificationMessageV3 &msg, bool legacy) const
267{
268 NotificationMessageV3::List list;
269
270 NotificationMessageV3 baseMsg;
271 baseMsg.setSessionId(msg.sessionId());
272 baseMsg.setType(msg.type());
273 if (legacy && msg.operation() == NotificationMessageV2::ModifyFlags) {
274 baseMsg.setOperation(NotificationMessageV2::Modify);
275 baseMsg.setItemParts(QSet<QByteArray>() << "FLAGS");
276 } else {
277 baseMsg.setOperation(msg.operation());
278 baseMsg.setItemParts(msg.itemParts());
279 }
280 baseMsg.setParentCollection(msg.parentCollection());
281 baseMsg.setParentDestCollection(msg.parentDestCollection());
282 baseMsg.setResource(msg.resource());
283 baseMsg.setDestinationResource(msg.destinationResource());
284 baseMsg.setAddedFlags(msg.addedFlags());
285 baseMsg.setRemovedFlags(msg.removedFlags());
286 baseMsg.setAddedTags(msg.addedTags());
287 baseMsg.setRemovedTags(msg.removedTags());
288
289 Q_FOREACH (const NotificationMessageV2::Entity &entity, msg.entities()) {
290 NotificationMessageV3 copy = baseMsg;
291 copy.addEntity(entity.id, entity.remoteId, entity.remoteRevision, entity.mimeType);
292
293 list << copy;
294 }
295
296 return list;
297}
298
299bool MonitorPrivate::acceptNotification(const Akonadi::NotificationMessageV3 &msg) const
300{
301 // session is ignored
302 if (sessions.contains(msg.sessionId())) {
303 return false;
304 }
305
306 if (msg.entities().count() == 0) {
307 return false;
308 }
309
310 // user requested everything
311 if (monitorAll && msg.type() != NotificationMessageV2::InvalidType) {
312 return true;
313 }
314
315 // Types are monitored, but not this one
316 if (!types.isEmpty() && !types.contains(static_cast<Monitor::Type>(msg.type()))) {
317 return false;
318 }
319
320 switch (msg.type()) {
321 case NotificationMessageV2::InvalidType:
322 kWarning() << "Received invalid change notification!";
323 return false;
324
325 case NotificationMessageV2::Items:
326 // we have a resource or mimetype filter
327 if (!resources.isEmpty() || !mimetypes.isEmpty()) {
328 if (resources.contains(msg.resource()) || isMoveDestinationResourceMonitored(msg)) {
329 return true;
330 }
331
332 Q_FOREACH (const NotificationMessageV2::Entity &entity, msg.entities()) {
333 if (isMimeTypeMonitored(entity.mimeType)) {
334 return true;
335 }
336 }
337 return false;
338 }
339
340 // we explicitly monitor that item or the collections it's in
341 Q_FOREACH (const NotificationMessageV2::Entity &entity, msg.entities()) {
342 if (items.contains(entity.id)) {
343 return true;
344 }
345 }
346
347 return isCollectionMonitored(msg.parentCollection())
348 || isCollectionMonitored(msg.parentDestCollection());
349
350 case NotificationMessageV2::Collections:
351 // we have a resource filter
352 if (!resources.isEmpty()) {
353 const bool resourceMatches = resources.contains(msg.resource()) || isMoveDestinationResourceMonitored(msg);
354 // a bit hacky, but match the behaviour from the item case,
355 // if resource is the only thing we are filtering on, stop here, and if the resource filter matched, of course
356 if (mimetypes.isEmpty() || resourceMatches) {
357 return resourceMatches;
358 }
359 // else continue
360 }
361
362 // we explicitly monitor that colleciton, or all of them
363 Q_FOREACH (const NotificationMessageV2::Entity &entity, msg.entities()) {
364 if (isCollectionMonitored(entity.id)) {
365 return true;
366 }
367 }
368 return isCollectionMonitored(msg.parentCollection())
369 || isCollectionMonitored(msg.parentDestCollection());
370
371 case NotificationMessageV2::Tags:
372 if (!tags.isEmpty()) {
373 Q_FOREACH (const NotificationMessageV2::Entity &entity, msg.entities()) {
374 if (tags.contains(entity.id)) {
375 return true;
376 }
377 }
378 return false;
379 }
380 return true;
381 }
382 Q_ASSERT(false);
383 return false;
384}
385
386void MonitorPrivate::cleanOldNotifications()
387{
388 bool erased = false;
389 for (QQueue<NotificationMessageV3>::iterator it = pipeline.begin(); it != pipeline.end();) {
390 if (!acceptNotification(*it) || isLazilyIgnored(*it)) {
391 it = pipeline.erase(it);
392 erased = true;
393 } else {
394 ++it;
395 }
396 }
397
398 for (QQueue<NotificationMessageV3>::iterator it = pendingNotifications.begin(); it != pendingNotifications.end();) {
399 if (!acceptNotification(*it) || isLazilyIgnored(*it)) {
400 it = pendingNotifications.erase(it);
401 erased = true;
402 } else {
403 ++it;
404 }
405 }
406 if (erased) {
407 notificationsErased();
408 }
409}
410
411bool MonitorPrivate::ensureDataAvailable(const NotificationMessageV3 &msg)
412{
413 if (msg.type() == NotificationMessageV2::Tags) {
414 Q_FOREACH (const NotificationMessageV2::Entity &entity, msg.entities()) {
415 if (!tagCache->ensureCached(QList<Tag::Id>() << entity.id, mTagFetchScope)) {
416 return false;
417 }
418 }
419 return true;
420 }
421
422 bool allCached = true;
423 if (fetchCollection) {
424 if (!collectionCache->ensureCached(msg.parentCollection(), mCollectionFetchScope)) {
425 allCached = false;
426 }
427 if (msg.operation() == NotificationMessageV2::Move && !collectionCache->ensureCached(msg.parentDestCollection(), mCollectionFetchScope)) {
428 allCached = false;
429 }
430 }
431 if (msg.operation() == NotificationMessageV2::Remove) {
432 return allCached; // the actual object is gone already, nothing to fetch there
433 }
434
435 if (msg.type() == NotificationMessageV2::Items && !mItemFetchScope.isEmpty()) {
436 ItemFetchScope scope(mItemFetchScope);
437 if (mFetchChangedOnly && (msg.operation() == NotificationMessageV2::Modify || msg.operation() == NotificationMessageV2::ModifyFlags)) {
438 bool fullPayloadWasRequested = scope.fullPayload();
439 scope.fetchFullPayload(false);
440 QSet<QByteArray> requestedPayloadParts = scope.payloadParts();
441 Q_FOREACH (const QByteArray &part, requestedPayloadParts) {
442 scope.fetchPayloadPart(part, false);
443 }
444
445 bool allAttributesWereRequested = scope.allAttributes();
446 QSet<QByteArray> requestedAttrParts = scope.attributes();
447 Q_FOREACH (const QByteArray &part, requestedAttrParts) {
448 scope.fetchAttribute(part, false);
449 }
450
451 QSet<QByteArray> changedParts = msg.itemParts();
452 Q_FOREACH (const QByteArray &part, changedParts) {
453 if (part.startsWith("PLD:") && //krazy:exclude=strings since QByteArray
454 (fullPayloadWasRequested || requestedPayloadParts.contains(part))) {
455 scope.fetchPayloadPart(part.mid(4), true);;
456 }
457 if (part.startsWith("ATR:") && //krazy:exclude=strings since QByteArray
458 (allAttributesWereRequested || requestedAttrParts.contains(part))) {
459 scope.fetchAttribute(part.mid(4), true);
460 }
461 }
462 }
463 if (!itemCache->ensureCached(msg.uids(), scope)) {
464 allCached = false;
465
466 }
467
468 // Make sure all tags for ModifyTags operation are in cache too
469 if (msg.operation() == NotificationMessageV2::ModifyTags) {
470 if (!tagCache->ensureCached((msg.addedTags() + msg.removedTags()).toList(), mTagFetchScope)) {
471 allCached = false;
472 }
473 }
474
475 } else if (msg.type() == NotificationMessageV2::Collections && fetchCollection) {
476 Q_FOREACH (const NotificationMessageV2::Entity &entity, msg.entities()) {
477 if (!collectionCache->ensureCached(entity.id, mCollectionFetchScope)) {
478 allCached = false;
479 break;
480 }
481 }
482 }
483 return allCached;
484}
485
486bool MonitorPrivate::emitNotification(const NotificationMessageV3 &msg)
487{
488 bool someoneWasListening = false;
489 if (msg.type() == NotificationMessageV2::Tags) {
490 //In case of a Remove notification this will return a list of invalid entities (we'll deal later with them)
491 const Tag::List tags = tagCache->retrieve(msg.uids());
492 someoneWasListening = emitTagsNotification(msg, tags);
493 } else {
494 const Collection parent = collectionCache->retrieve(msg.parentCollection());
495 Collection destParent;
496 if (msg.operation() == NotificationMessageV2::Move) {
497 destParent = collectionCache->retrieve(msg.parentDestCollection());
498 }
499
500 if (msg.type() == NotificationMessageV2::Collections) {
501 Collection col;
502 Q_FOREACH (const NotificationMessageV2::Entity &entity, msg.entities()) {
503 col = collectionCache->retrieve(entity.id);
504 if (emitCollectionNotification(msg, col, parent, destParent) && !someoneWasListening) {
505 someoneWasListening = true;
506 }
507 }
508 } else if (msg.type() == NotificationMessageV2::Items) {
509 //In case of a Remove notification this will return a list of invalid entities (we'll deal later with them)
510 const Item::List items = itemCache->retrieve(msg.uids());
511 someoneWasListening = emitItemsNotification(msg, items, parent, destParent);
512 }
513 }
514
515 if (!someoneWasListening) {
516 cleanOldNotifications(); // probably someone disconnected a signal in the meantime, get rid of the no longer interesting stuff
517 }
518
519 return someoneWasListening;
520}
521
522void MonitorPrivate::updatePendingStatistics(const NotificationMessageV3 &msg)
523{
524 if (msg.type() == NotificationMessageV2::Items) {
525 notifyCollectionStatisticsWatchers(msg.parentCollection(), msg.resource());
526 // FIXME use the proper resource of the target collection, for cross resource moves
527 notifyCollectionStatisticsWatchers(msg.parentDestCollection(), msg.destinationResource());
528 } else if (msg.type() == NotificationMessageV2::Collections && msg.operation() == NotificationMessageV2::Remove) {
529 // no need for statistics updates anymore
530 Q_FOREACH (const NotificationMessageV2::Entity &entity, msg.entities()) {
531 recentlyChangedCollections.remove(entity.id);
532 }
533 }
534}
535
536void MonitorPrivate::slotSessionDestroyed(QObject *object)
537{
538 Session *objectSession = qobject_cast<Session *>(object);
539 if (objectSession) {
540 sessions.removeAll(objectSession->sessionId());
541 if (notificationSource) {
542 notificationSource->setIgnoredSession(objectSession->sessionId(), false);
543 }
544 }
545}
546
547void MonitorPrivate::slotStatisticsChangedFinished(KJob *job)
548{
549 if (job->error()) {
550 kWarning() << "Error on fetching collection statistics: " << job->errorText();
551 } else {
552 CollectionStatisticsJob *statisticsJob = static_cast<CollectionStatisticsJob *>(job);
553 Q_ASSERT(statisticsJob->collection().isValid());
554 emit q_ptr->collectionStatisticsChanged(statisticsJob->collection().id(),
555 statisticsJob->statistics());
556 }
557}
558
559void MonitorPrivate::slotFlushRecentlyChangedCollections()
560{
561 foreach (Collection::Id collection, recentlyChangedCollections) {
562 Q_ASSERT(collection >= 0);
563 if (fetchCollectionStatistics) {
564 fetchStatistics(collection);
565 } else {
566 static const CollectionStatistics dummyStatistics;
567 emit q_ptr->collectionStatisticsChanged(collection, dummyStatistics);
568 }
569 }
570 recentlyChangedCollections.clear();
571}
572
573int MonitorPrivate::translateAndCompress(QQueue< NotificationMessageV3 > &notificationQueue, const NotificationMessageV3 &msg)
574{
575 // We have to split moves into insert or remove if the source or destination
576 // is not monitored.
577 if (msg.operation() != NotificationMessageV2::Move) {
578 return NotificationMessageV3::appendAndCompress(notificationQueue, msg) ? 1 : 0;
579 }
580
581 // Always handle tags
582 if (msg.type() == NotificationMessageV2::Tags) {
583 return NotificationMessageV3::appendAndCompress(notificationQueue, msg);
584 }
585
586 bool sourceWatched = false;
587 bool destWatched = false;
588
589 if (useRefCounting && msg.type() == NotificationMessageV2::Items) {
590 sourceWatched = isMonitored(msg.parentCollection());
591 destWatched = isMonitored(msg.parentDestCollection());
592 } else {
593 if (!resources.isEmpty()) {
594 sourceWatched = resources.contains(msg.resource());
595 destWatched = isMoveDestinationResourceMonitored(msg);
596 }
597 if (!sourceWatched) {
598 sourceWatched = isCollectionMonitored(msg.parentCollection());
599 }
600 if (!destWatched) {
601 destWatched = isCollectionMonitored(msg.parentDestCollection());
602 }
603 }
604
605 if (!sourceWatched && !destWatched) {
606 return 0;
607 }
608
609 if ((sourceWatched && destWatched) || (!collectionMoveTranslationEnabled && msg.type() == NotificationMessageV2::Collections)) {
610 return NotificationMessageV3::appendAndCompress(notificationQueue, msg) ? 1 : 0;
611 }
612
613 if (sourceWatched) {
614 // Transform into a removal
615 NotificationMessageV3 removalMessage = msg;
616 removalMessage.setOperation(NotificationMessageV2::Remove);
617 removalMessage.setParentDestCollection(-1);
618 return NotificationMessageV3::appendAndCompress(notificationQueue, removalMessage) ? 1 : 0;
619 }
620
621 // Transform into an insertion
622 NotificationMessageV3 insertionMessage = msg;
623 insertionMessage.setOperation(NotificationMessageV2::Add);
624 insertionMessage.setParentCollection(msg.parentDestCollection());
625 insertionMessage.setParentDestCollection(-1);
626 // We don't support batch insertion, so we have to do it one by one
627 const NotificationMessageV3::List split = splitMessage(insertionMessage, false);
628 int appended = 0;
629 Q_FOREACH (const NotificationMessageV3 &insertion, split) {
630 if (NotificationMessageV3::appendAndCompress(notificationQueue, insertion)) {
631 ++appended;
632 }
633 }
634 return appended;
635}
636
637/*
638
639 server notification --> ?accepted --> pendingNotifications --> ?dataAvailable --> emit
640 | |
641 x --> discard x --> pipeline
642
643 fetchJobDone --> pipeline ?dataAvailable --> emit
644 */
645
646void MonitorPrivate::slotNotify(const NotificationMessageV3::List &msgs)
647{
648 int appendedMessages = 0;
649 int modifiedMessages = 0;
650 int erasedMessages = 0;
651 Q_FOREACH (const NotificationMessageV3 &msg, msgs) {
652 invalidateCaches(msg);
653 updatePendingStatistics(msg);
654 bool needsSplit = true;
655 bool supportsBatch = false;
656
657 if (isLazilyIgnored(msg, true)) {
658 continue;
659 }
660
661 checkBatchSupport(msg, needsSplit, supportsBatch);
662
663 if (supportsBatch
664 || (!needsSplit && !supportsBatch && msg.operation() != NotificationMessageV2::ModifyFlags)
665 || msg.type() == NotificationMessageV2::Collections) {
666 // Make sure the batch msg is always queued before the split notifications
667 const int oldSize = pendingNotifications.size();
668 const int appended = translateAndCompress(pendingNotifications, msg);
669 if (appended > 0) {
670 appendedMessages += appended;
671 } else {
672 ++modifiedMessages;
673 }
674 // translateAndCompress can remove an existing "modify" when msg is a "delete".
675 // Or it can merge two ModifyFlags and return false.
676 // We need to detect such removals, for ChangeRecorder.
677 if (pendingNotifications.count() != oldSize + appended) {
678 ++erasedMessages; // this count isn't exact, but it doesn't matter
679 }
680 } else if (needsSplit) {
681 // If it's not queued at least make sure we fetch all the items from split
682 // notifications in one go.
683 itemCache->ensureCached(msg.uids(), mItemFetchScope);
684 }
685
686 // if the message contains more items, but we need to emit single-item notification,
687 // split the message into one message per item and queue them
688 // if the message contains only one item, but batches are not supported
689 // (and thus neither is flagsModified), splitMessage() will convert the
690 // notification to regular Modify with "FLAGS" part changed
691 if (needsSplit || (!needsSplit && !supportsBatch && msg.operation() == Akonadi::NotificationMessageV2::ModifyFlags)) {
692 const NotificationMessageV3::List split = splitMessage(msg, !supportsBatch);
693 pendingNotifications << split.toList();
694 appendedMessages += split.count();
695 }
696 }
697
698 // tell ChangeRecorder (even if 0 appended, the compression could have made changes to existing messages)
699 if (appendedMessages > 0 || modifiedMessages > 0 || erasedMessages > 0) {
700 if (erasedMessages > 0) {
701 notificationsErased();
702 } else {
703 notificationsEnqueued(appendedMessages);
704 }
705 }
706
707 dispatchNotifications();
708}
709
710void MonitorPrivate::flushPipeline()
711{
712 while (!pipeline.isEmpty()) {
713 const NotificationMessageV3 msg = pipeline.head();
714 if (ensureDataAvailable(msg)) {
715 // dequeue should be before emit, otherwise stuff might happen (like dataAvailable
716 // being called again) and we end up dequeuing an empty pipeline
717 pipeline.dequeue();
718 emitNotification(msg);
719 } else {
720 break;
721 }
722 }
723}
724
725void MonitorPrivate::dataAvailable()
726{
727 flushPipeline();
728 dispatchNotifications();
729}
730
731void MonitorPrivate::dispatchNotifications()
732{
733 // Note that this code is not used in a ChangeRecorder (pipelineSize==0)
734 while (pipeline.size() < pipelineSize() && !pendingNotifications.isEmpty()) {
735 const NotificationMessageV3 msg = pendingNotifications.dequeue();
736 if (ensureDataAvailable(msg) && pipeline.isEmpty()) {
737 emitNotification(msg);
738 } else {
739 pipeline.enqueue(msg);
740 }
741 }
742}
743
744bool MonitorPrivate::emitItemsNotification(const NotificationMessageV3 &msg, const Item::List &items, const Collection &collection, const Collection &collectionDest)
745{
746 Q_ASSERT(msg.type() == NotificationMessageV2::Items);
747 Collection col = collection;
748 Collection colDest = collectionDest;
749 if (!col.isValid()) {
750 col = Collection(msg.parentCollection());
751 col.setResource(QString::fromUtf8(msg.resource()));
752 }
753 if (!colDest.isValid()) {
754 colDest = Collection(msg.parentDestCollection());
755 // HACK: destination resource is delivered in the parts field...
756 if (!msg.itemParts().isEmpty()) {
757 colDest.setResource(QString::fromLatin1(*(msg.itemParts().begin())));
758 }
759 }
760
761 Tag::List addedTags, removedTags;
762 if (msg.operation() == NotificationMessageV2::ModifyTags) {
763 addedTags = tagCache->retrieve(msg.addedTags().toList());
764 removedTags = tagCache->retrieve(msg.removedTags().toList());
765 }
766
767 QMap<NotificationMessageV2::Id, NotificationMessageV2::Entity> msgEntities = msg.entities();
768 Item::List its = items;
769 QMutableListIterator<Item> iter(its);
770 while (iter.hasNext()) {
771 Item it = iter.next();
772 if (it.isValid()) {
773 const NotificationMessageV2::Entity msgEntity = msgEntities[it.id()];
774 if (msg.operation() == NotificationMessageV2::Remove) {
775 it.setRemoteId(msgEntity.remoteId);
776 it.setRemoteRevision(msgEntity.remoteRevision);
777 it.setMimeType(msgEntity.mimeType);
778 }
779
780 if (!it.parentCollection().isValid()) {
781 if (msg.operation() == NotificationMessageV2::Move) {
782 it.setParentCollection(colDest);
783 } else {
784 it.setParentCollection(col);
785 }
786 } else {
787 // item has a valid parent collection, most likely due to retrieved ancestors
788 // still, collection might contain extra info, so inject that
789 if (it.parentCollection() == col) {
790 const Collection oldParent = it.parentCollection();
791 if (oldParent.parentCollection().isValid() && !col.parentCollection().isValid()) {
792 col.setParentCollection(oldParent.parentCollection()); // preserve ancestor chain
793 }
794 it.setParentCollection(col);
795 } else {
796 // If one client does a modify followed by a move we have to make sure that the
797 // AgentBase::itemChanged() in another client always sees the parent collection
798 // of the item before it has been moved.
799 if (msg.operation() != NotificationMessageV2::Move) {
800 it.setParentCollection(col);
801 }
802 }
803 }
804 iter.setValue(it);
805 msgEntities.remove(it.id());
806 } else {
807 // remove the invalid item
808 iter.remove();
809 }
810 }
811
812 // Now reconstruct any items there were left in msgItems
813 Q_FOREACH (const NotificationMessageV2::Entity &msgItem, msgEntities) {
814 Item it(msgItem.id);
815 it.setRemoteId(msgItem.remoteId);
816 it.setRemoteRevision(msgItem.remoteRevision);
817 it.setMimeType(msgItem.mimeType);
818 if (msg.operation() == NotificationMessageV2::Move) {
819 it.setParentCollection(colDest);
820 } else {
821 it.setParentCollection(col);
822 }
823 its << it;
824 }
825
826 bool handled = false;
827 switch (msg.operation()) {
828 case NotificationMessageV2::Add:
829 if (q_ptr->receivers(SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))) > 0) {
830 Q_ASSERT(its.count() == 1);
831 emit q_ptr->itemAdded(its.first(), col);
832 return true;
833 }
834 return false;
835 case NotificationMessageV2::Modify:
836 if (q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet<QByteArray>))) > 0) {
837 Q_ASSERT(its.count() == 1);
838 emit q_ptr->itemChanged(its.first(), msg.itemParts());
839 return true;
840 }
841 return false;
842 case NotificationMessageV2::ModifyFlags:
843 if (q_ptr->receivers(SIGNAL(itemsFlagsChanged(Akonadi::Item::List,QSet<QByteArray>,QSet<QByteArray>))) > 0) {
844 emit q_ptr->itemsFlagsChanged(its, msg.addedFlags(), msg.removedFlags());
845 handled = true;
846 }
847 return handled;
848 case NotificationMessageV2::Move:
849 if (q_ptr->receivers(SIGNAL(itemMoved(Akonadi::Item,Akonadi::Collection,Akonadi::Collection))) > 0) {
850 Q_ASSERT(its.count() == 1);
851 emit q_ptr->itemMoved(its.first(), col, colDest);
852 handled = true;
853 }
854 if (q_ptr->receivers(SIGNAL(itemsMoved(Akonadi::Item::List,Akonadi::Collection,Akonadi::Collection))) > 0) {
855 emit q_ptr->itemsMoved(its, col, colDest);
856 handled = true;
857 }
858 return handled;
859 case NotificationMessageV2::Remove:
860 if (q_ptr->receivers(SIGNAL(itemRemoved(Akonadi::Item))) > 0) {
861 Q_ASSERT(its.count() == 1);
862 emit q_ptr->itemRemoved(its.first());
863 handled = true;
864 }
865 if (q_ptr->receivers(SIGNAL(itemsRemoved(Akonadi::Item::List))) > 0) {
866 emit q_ptr->itemsRemoved(its);
867 handled = true;
868 }
869 return handled;
870 case NotificationMessageV2::Link:
871 if (q_ptr->receivers(SIGNAL(itemLinked(Akonadi::Item,Akonadi::Collection))) > 0) {
872 Q_ASSERT(its.count() == 1);
873 emit q_ptr->itemLinked(its.first(), col);
874 handled = true;
875 }
876 if (q_ptr->receivers(SIGNAL(itemsLinked(Akonadi::Item::List,Akonadi::Collection))) > 0) {
877 emit q_ptr->itemsLinked(its, col);
878 handled = true;
879 }
880 return handled;
881 case NotificationMessageV2::Unlink:
882 if (q_ptr->receivers(SIGNAL(itemUnlinked(Akonadi::Item,Akonadi::Collection))) > 0) {
883 Q_ASSERT(its.count() == 1);
884 emit q_ptr->itemUnlinked(its.first(), col);
885 handled = true;
886 }
887 if (q_ptr->receivers(SIGNAL(itemsUnlinked(Akonadi::Item::List,Akonadi::Collection))) > 0) {
888 emit q_ptr->itemsUnlinked(its, col);
889 handled = true;
890 }
891 return handled;
892 case NotificationMessageV2::ModifyTags:
893 if (q_ptr->receivers(SIGNAL(itemsTagsChanged(Akonadi::Item::List,QSet<Akonadi::Tag>,QSet<Akonadi::Tag>))) > 0) {
894 emit q_ptr->itemsTagsChanged(its, addedTags.toSet(), removedTags.toSet());
895 return true;
896 }
897 return false;
898 default:
899 kDebug() << "Unknown operation type" << msg.operation() << "in item change notification";
900 }
901
902 return false;
903}
904
905bool MonitorPrivate::emitCollectionNotification(const NotificationMessageV3 &msg, const Collection &col, const Collection &par, const Collection &dest)
906{
907 Q_ASSERT(msg.type() == NotificationMessageV2::Collections);
908 Collection parent = par;
909 if (!parent.isValid()) {
910 parent = Collection(msg.parentCollection());
911 }
912 Collection destination = dest;
913 if (!destination.isValid()) {
914 destination = Collection(msg.parentDestCollection());
915 }
916
917 Collection collection = col;
918 NotificationMessageV2::Entity msgEntities = msg.entities().values().first();
919 if (!collection.isValid() || msg.operation() == NotificationMessageV2::Remove) {
920 collection = Collection(msgEntities.id);
921 collection.setResource(QString::fromUtf8(msg.resource()));
922 collection.setRemoteId(msgEntities.remoteId);
923 }
924
925 if (!collection.parentCollection().isValid()) {
926 if (msg.operation() == NotificationMessageV2::Move) {
927 collection.setParentCollection(destination);
928 } else {
929 collection.setParentCollection(parent);
930 }
931 }
932
933 switch (msg.operation()) {
934 case NotificationMessageV2::Add:
935 if (q_ptr->receivers(SIGNAL(collectionAdded(Akonadi::Collection,Akonadi::Collection))) == 0) {
936 return false;
937 }
938 emit q_ptr->collectionAdded(collection, parent);
939 return true;
940 case NotificationMessageV2::Modify:
941 if (q_ptr->receivers(SIGNAL(collectionChanged(Akonadi::Collection))) == 0
942 && q_ptr->receivers(SIGNAL(collectionChanged(Akonadi::Collection,QSet<QByteArray>))) == 0) {
943 return false;
944 }
945 emit q_ptr->collectionChanged(collection);
946 emit q_ptr->collectionChanged(collection, msg.itemParts());
947 return true;
948 case NotificationMessageV2::Move:
949 if (q_ptr->receivers(SIGNAL(collectionMoved(Akonadi::Collection,Akonadi::Collection,Akonadi::Collection))) == 0) {
950 return false;
951 }
952 emit q_ptr->collectionMoved(collection, parent, destination);
953 return true;
954 case NotificationMessageV2::Remove:
955 if (q_ptr->receivers(SIGNAL(collectionRemoved(Akonadi::Collection))) == 0) {
956 return false;
957 }
958 emit q_ptr->collectionRemoved(collection);
959 return true;
960 case NotificationMessageV2::Subscribe:
961 if (q_ptr->receivers(SIGNAL(collectionSubscribed(Akonadi::Collection,Akonadi::Collection))) == 0) {
962 return false;
963 }
964 if (!monitorAll) { // ### why??
965 emit q_ptr->collectionSubscribed(collection, parent);
966 }
967 return true;
968 case NotificationMessageV2::Unsubscribe:
969 if (q_ptr->receivers(SIGNAL(collectionUnsubscribed(Akonadi::Collection))) == 0) {
970 return false;
971 }
972 if (!monitorAll) { // ### why??
973 emit q_ptr->collectionUnsubscribed(collection);
974 }
975 return true;
976 default:
977 kDebug() << "Unknown operation type" << msg.operation() << "in collection change notification";
978 }
979
980 return false;
981}
982
983bool MonitorPrivate::emitTagsNotification(const NotificationMessageV3 &msg, const Tag::List &tags)
984{
985 Q_ASSERT(msg.type() == NotificationMessageV2::Tags);
986
987 Tag::List validTags;
988 if (msg.operation() == NotificationMessageV2::Remove) {
989 //In case of a removed signal the cache entry was already invalidated, and we therefore received an empty list of tags
990 Q_FOREACH (const Entity::Id &uid, msg.uids()) {
991 validTags << Tag(uid);
992 }
993 } else {
994 validTags = tags;
995 }
996
997 switch (msg.operation()) {
998 case NotificationMessageV2::Add:
999 if (q_ptr->receivers(SIGNAL(tagAdded(Akonadi::Tag))) == 0) {
1000 return false;
1001 }
1002 Q_FOREACH (const Tag &tag, validTags) {
1003 Q_EMIT q_ptr->tagAdded(tag);
1004 }
1005 return true;
1006 case NotificationMessageV2::Modify:
1007 if (q_ptr->receivers(SIGNAL(tagChanged(Akonadi::Tag))) == 0) {
1008 return false;
1009 }
1010 Q_FOREACH (const Tag &tag, validTags) {
1011 Q_EMIT q_ptr->tagChanged(tag);
1012 }
1013 return true;
1014 case NotificationMessageV2::Remove:
1015 if (q_ptr->receivers(SIGNAL(tagRemoved(Akonadi::Tag))) == 0) {
1016 return false;
1017 }
1018 Q_FOREACH (const Tag &tag, validTags) {
1019 Q_EMIT q_ptr->tagRemoved(tag);
1020 }
1021 return true;
1022 default:
1023 kDebug() << "Unknown operation type" << msg.operation() << "in tag change notification";
1024 }
1025
1026 return false;
1027}
1028
1029void MonitorPrivate::invalidateCaches(const NotificationMessageV3 &msg)
1030{
1031 // remove invalidates
1032 if (msg.operation() == NotificationMessageV2::Remove) {
1033 if (msg.type() == NotificationMessageV2::Collections) {
1034 Q_FOREACH (qint64 uid, msg.uids())
1035 collectionCache->invalidate(uid);
1036 } else if (msg.type() == NotificationMessageV2::Items) {
1037 itemCache->invalidate(msg.uids());
1038 } else if (msg.type() == NotificationMessageV2::Tags) {
1039 tagCache->invalidate(msg.uids());
1040 }
1041 }
1042
1043 // modify removes the cache entry, as we need to re-fetch
1044 // And subscription modify the visibility of the collection by the collectionFetchScope.
1045 if (msg.operation() == NotificationMessageV2::Modify
1046 || msg.operation() == NotificationMessageV2::ModifyFlags
1047 || msg.operation() == NotificationMessageV3::ModifyTags
1048 || msg.operation() == NotificationMessageV2::Move
1049 || msg.operation() == NotificationMessageV2::Subscribe) {
1050 if (msg.type() == NotificationMessageV2::Collections) {
1051 Q_FOREACH (quint64 uid, msg.uids())
1052 collectionCache->update(uid, mCollectionFetchScope);
1053 } else if (msg.type() == NotificationMessageV2::Items) {
1054 itemCache->update(msg.uids(), mItemFetchScope);
1055 } else if (msg.type() == NotificationMessageV2::Tags) {
1056 tagCache->update(msg.uids(), mTagFetchScope);
1057 }
1058 }
1059}
1060
1061void MonitorPrivate::invalidateCache(const Collection &col)
1062{
1063 collectionCache->update(col.id(), mCollectionFetchScope);
1064}
1065
1066void MonitorPrivate::ref(Collection::Id id)
1067{
1068 if (!refCountMap.contains(id)) {
1069 refCountMap.insert(id, 0);
1070 }
1071 ++refCountMap[id];
1072
1073 if (m_buffer.isBuffered(id)) {
1074 m_buffer.purge(id);
1075 }
1076}
1077
1078Akonadi::Collection::Id MonitorPrivate::deref(Collection::Id id)
1079{
1080 Q_ASSERT(refCountMap.contains(id));
1081 if (--refCountMap[id] == 0) {
1082 refCountMap.remove(id);
1083 return m_buffer.buffer(id);
1084 }
1085 return -1;
1086}
1087
1088void MonitorPrivate::PurgeBuffer::purge(Collection::Id id)
1089{
1090 m_buffer.removeOne(id);
1091}
1092
1093Akonadi::Collection::Id MonitorPrivate::PurgeBuffer::buffer(Collection::Id id)
1094{
1095 // Ensure that we don't put a duplicate @p id into the buffer.
1096 purge(id);
1097
1098 Collection::Id bumpedId = -1;
1099 if (m_buffer.size() == MAXBUFFERSIZE) {
1100 bumpedId = m_buffer.dequeue();
1101 purge(bumpedId);
1102 }
1103
1104 m_buffer.enqueue(id);
1105
1106 return bumpedId;
1107}
1108
1109int MonitorPrivate::PurgeBuffer::buffersize()
1110{
1111 return MAXBUFFERSIZE;
1112}
1113
1114bool MonitorPrivate::isMonitored(Entity::Id colId) const
1115{
1116 if (!useRefCounting) {
1117 return true;
1118 }
1119 return refCountMap.contains(colId) || m_buffer.isBuffered(colId);
1120}
1121
1122void MonitorPrivate::notifyCollectionStatisticsWatchers(Entity::Id collection, const QByteArray &resource) {
1123 if (collection > 0 && (monitorAll || isCollectionMonitored(collection) || resources.contains(resource))) {
1124 recentlyChangedCollections.insert(collection);
1125 if (!statisticsCompressionTimer.isActive()) {
1126 statisticsCompressionTimer.start();
1127 }
1128 }
1129}
1130
1131// @endcond
1132