1/*
2 Copyright (c) 2007 Volker Krause <vkrause@kde.org>
3
4 This library is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or (at your
7 option) any later version.
8
9 This library is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12 License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this library; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17 02110-1301, USA.
18*/
19
20#include "resourcescheduler_p.h"
21
22#include "dbusconnectionpool.h"
23#include "recursivemover_p.h"
24
25#include <kdebug.h>
26#include <klocalizedstring.h>
27
28#include <QtCore/QTimer>
29#include <QtDBus/QDBusInterface>
30#include <QtDBus/QDBusConnectionInterface>
31#include <boost/graph/graph_concepts.hpp>
32
33using namespace Akonadi;
34
35qint64 ResourceScheduler::Task::latestSerial = 0;
36static QDBusAbstractInterface *s_resourcetracker = 0;
37
38//@cond PRIVATE
39
40ResourceScheduler::ResourceScheduler( QObject *parent ) :
41 QObject( parent ),
42 mCurrentTasksQueue( -1 ),
43 mOnline( false )
44{
45}
46
47void ResourceScheduler::scheduleFullSync()
48{
49 Task t;
50 t.type = SyncAll;
51 TaskList& queue = queueForTaskType( t.type );
52 if ( queue.contains( t ) || mCurrentTask == t )
53 return;
54 queue << t;
55 signalTaskToTracker( t, "SyncAll" );
56 scheduleNext();
57}
58
59void ResourceScheduler::scheduleCollectionTreeSync()
60{
61 Task t;
62 t.type = SyncCollectionTree;
63 TaskList& queue = queueForTaskType( t.type );
64 if ( queue.contains( t ) || mCurrentTask == t )
65 return;
66 queue << t;
67 signalTaskToTracker( t, "SyncCollectionTree" );
68 scheduleNext();
69}
70
71void ResourceScheduler::scheduleSync(const Collection & col)
72{
73 Task t;
74 t.type = SyncCollection;
75 t.collection = col;
76 TaskList& queue = queueForTaskType( t.type );
77 if ( queue.contains( t ) || mCurrentTask == t )
78 return;
79 queue << t;
80 signalTaskToTracker( t, "SyncCollection", QString::number( col.id() ) );
81 scheduleNext();
82}
83
84void ResourceScheduler::scheduleAttributesSync( const Collection &collection )
85{
86 Task t;
87 t.type = SyncCollectionAttributes;
88 t.collection = collection;
89
90 TaskList& queue = queueForTaskType( t.type );
91 if ( queue.contains( t ) || mCurrentTask == t )
92 return;
93 queue << t;
94 signalTaskToTracker( t, "SyncCollectionAttributes", QString::number( collection.id() ) );
95 scheduleNext();
96}
97
98void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg)
99{
100 Task t;
101 t.type = FetchItem;
102 t.item = item;
103 t.itemParts = parts;
104
105 // if the current task does already fetch the requested item, break here but
106 // keep the dbus message, so we can send the reply later on
107 if ( mCurrentTask == t ) {
108 mCurrentTask.dbusMsgs << msg;
109 return;
110 }
111
112 // If this task is already in the queue, merge with it.
113 TaskList& queue = queueForTaskType( t.type );
114 const int idx = queue.indexOf( t );
115 if ( idx != -1 ) {
116 queue[ idx ].dbusMsgs << msg;
117 return;
118 }
119
120 t.dbusMsgs << msg;
121 queue << t;
122 signalTaskToTracker( t, "FetchItem", QString::number( item.id() ) );
123 scheduleNext();
124}
125
126void ResourceScheduler::scheduleResourceCollectionDeletion()
127{
128 Task t;
129 t.type = DeleteResourceCollection;
130 TaskList& queue = queueForTaskType( t.type );
131 if ( queue.contains( t ) || mCurrentTask == t )
132 return;
133 queue << t;
134 signalTaskToTracker( t, "DeleteResourceCollection" );
135 scheduleNext();
136}
137
138void ResourceScheduler::scheduleCacheInvalidation( const Collection &collection )
139{
140 Task t;
141 t.type = InvalideCacheForCollection;
142 t.collection = collection;
143 TaskList& queue = queueForTaskType( t.type );
144 if ( queue.contains( t ) || mCurrentTask == t )
145 return;
146 queue << t;
147 signalTaskToTracker( t, "InvalideCacheForCollection", QString::number( collection.id() ) );
148 scheduleNext();
149}
150
151void ResourceScheduler::scheduleChangeReplay()
152{
153 Task t;
154 t.type = ChangeReplay;
155 TaskList& queue = queueForTaskType( t.type );
156 // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks
157 if ( queue.contains( t ) )
158 return;
159 queue << t;
160 signalTaskToTracker( t, "ChangeReplay" );
161 scheduleNext();
162}
163
164void ResourceScheduler::scheduleMoveReplay( const Collection &movedCollection, RecursiveMover *mover )
165{
166 Task t;
167 t.type = RecursiveMoveReplay;
168 t.collection = movedCollection;
169 t.argument = QVariant::fromValue( mover );
170 TaskList &queue = queueForTaskType( t.type );
171
172 if ( queue.contains( t ) || mCurrentTask == t )
173 return;
174
175 queue << t;
176 signalTaskToTracker( t, "RecursiveMoveReplay", QString::number( t.collection.id() ) );
177 scheduleNext();
178}
179
180void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
181{
182 Task t;
183 t.type = SyncAllDone;
184 TaskList& queue = queueForTaskType( t.type );
185 // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
186 queue << t;
187 signalTaskToTracker( t, "SyncAllDone" );
188 scheduleNext();
189}
190
191void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
192{
193 Task t;
194 t.type = SyncCollectionTreeDone;
195 TaskList& queue = queueForTaskType( t.type );
196 // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
197 queue << t;
198 signalTaskToTracker( t, "SyncCollectionTreeDone" );
199 scheduleNext();
200}
201
202void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority )
203{
204 Task t;
205 t.type = Custom;
206 t.receiver = receiver;
207 t.methodName = methodName;
208 t.argument = argument;
209 QueueType queueType = GenericTaskQueue;
210 if ( priority == ResourceBase::AfterChangeReplay )
211 queueType = AfterChangeReplayQueue;
212 else if ( priority == ResourceBase::Prepend )
213 queueType = PrependTaskQueue;
214 TaskList& queue = mTaskList[ queueType ];
215
216 if ( queue.contains( t ) )
217 return;
218
219 switch (priority) {
220 case ResourceBase::Prepend:
221 queue.prepend( t );
222 break;
223 default:
224 queue.append(t);
225 break;
226 }
227
228 signalTaskToTracker( t, "Custom-" + t.methodName );
229 scheduleNext();
230}
231
232void ResourceScheduler::taskDone()
233{
234 if ( isEmpty() )
235 emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) );
236
237 if ( s_resourcetracker ) {
238 QList<QVariant> argumentList;
239 argumentList << QString::number( mCurrentTask.serial )
240 << QString();
241 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
242 }
243
244 mCurrentTask = Task();
245 mCurrentTasksQueue = -1;
246 scheduleNext();
247}
248
249void ResourceScheduler::deferTask()
250{
251 if ( mCurrentTask.type == Invalid )
252 return;
253
254 if ( s_resourcetracker ) {
255 QList<QVariant> argumentList;
256 argumentList << QString::number( mCurrentTask.serial )
257 << QString();
258 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
259 }
260
261 Task t = mCurrentTask;
262 mCurrentTask = Task();
263
264 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
265 mTaskList[mCurrentTasksQueue].prepend( t );
266 mCurrentTasksQueue = -1;
267
268 signalTaskToTracker( t, "DeferedTask" );
269
270 scheduleNext();
271}
272
273bool ResourceScheduler::isEmpty()
274{
275 for ( int i = 0; i < NQueueCount; ++i ) {
276 if ( !mTaskList[i].isEmpty() )
277 return false;
278 }
279 return true;
280}
281
282void ResourceScheduler::scheduleNext()
283{
284 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
285 return;
286 QTimer::singleShot( 0, this, SLOT(executeNext()) );
287}
288
289void ResourceScheduler::executeNext()
290{
291 if ( mCurrentTask.type != Invalid || isEmpty() )
292 return;
293
294 for ( int i = 0; i < NQueueCount; ++i ) {
295 if ( !mTaskList[ i ].isEmpty() ) {
296 mCurrentTask = mTaskList[ i ].takeFirst();
297 mCurrentTasksQueue = i;
298 break;
299 }
300 }
301
302 if ( s_resourcetracker ) {
303 QList<QVariant> argumentList;
304 argumentList << QString::number( mCurrentTask.serial );
305 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList);
306 }
307
308 switch ( mCurrentTask.type ) {
309 case SyncAll:
310 emit executeFullSync();
311 break;
312 case SyncCollectionTree:
313 emit executeCollectionTreeSync();
314 break;
315 case SyncCollection:
316 emit executeCollectionSync( mCurrentTask.collection );
317 break;
318 case SyncCollectionAttributes:
319 emit executeCollectionAttributesSync( mCurrentTask.collection );
320 break;
321 case FetchItem:
322 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
323 break;
324 case DeleteResourceCollection:
325 emit executeResourceCollectionDeletion();
326 break;
327 case InvalideCacheForCollection:
328 emit executeCacheInvalidation( mCurrentTask.collection );
329 break;
330 case ChangeReplay:
331 emit executeChangeReplay();
332 break;
333 case RecursiveMoveReplay:
334 emit executeRecursiveMoveReplay( mCurrentTask.argument.value<RecursiveMover*>() );
335 break;
336 case SyncAllDone:
337 emit fullSyncComplete();
338 break;
339 case SyncCollectionTreeDone:
340 emit collectionTreeSyncComplete();
341 break;
342 case Custom:
343 {
344 const QByteArray methodSig = mCurrentTask.methodName + "(QVariant)";
345 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
346 bool success = false;
347 if ( hasSlotWithVariant ) {
348 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
349 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), "ResourceScheduler::executeNext", "Valid argument was provided but the method wasn't found" );
350 }
351 if ( !success )
352 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
353
354 if ( !success )
355 kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument;
356 break;
357 }
358 default: {
359 kError() << "Unhandled task type" << mCurrentTask.type;
360 dump();
361 Q_ASSERT( false );
362 }
363 }
364}
365
366ResourceScheduler::Task ResourceScheduler::currentTask() const
367{
368 return mCurrentTask;
369}
370
371void ResourceScheduler::setOnline(bool state)
372{
373 if ( mOnline == state )
374 return;
375 mOnline = state;
376 if ( mOnline ) {
377 scheduleNext();
378 } else {
379 if ( mCurrentTask.type != Invalid ) {
380 // abort running task
381 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
382 mCurrentTask = Task();
383 mCurrentTasksQueue = -1;
384 }
385 // abort pending synchronous tasks, might take longer until the resource goes online again
386 TaskList& itemFetchQueue = queueForTaskType( FetchItem );
387 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
388 if ( (*it).type == FetchItem ) {
389 (*it).sendDBusReplies( i18nc( "@info", "Job canceled." ) );
390 it = itemFetchQueue.erase( it );
391 if ( s_resourcetracker ) {
392 QList<QVariant> argumentList;
393 argumentList << QString::number( mCurrentTask.serial )
394 << i18nc( "@info", "Job canceled." );
395 s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList );
396 }
397 } else {
398 ++it;
399 }
400 }
401 }
402}
403
404void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType, const QString &debugString )
405{
406 // if there's a job tracer running, tell it about the new job
407 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
408 s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
409 QLatin1String( "/resourcesJobtracker" ),
410 QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
411 DBusConnectionPool::threadConnection(), 0 );
412 }
413
414 if ( s_resourcetracker ) {
415 QList<QVariant> argumentList;
416 argumentList << static_cast<AgentBase*>( parent() )->identifier() // "session" (in our case resource)
417 << QString::number( task.serial ) // "job"
418 << QString() // "parent job"
419 << QString::fromLatin1( taskType ) // "job type"
420 << debugString // "job debugging string"
421 ;
422 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList);
423 }
424}
425
426void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection )
427{
428 if ( !collection.isValid() ) // should not happen, but you never know...
429 return;
430 TaskList& queue = queueForTaskType( SyncCollection );
431 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
432 if ( (*it).type == SyncCollection && (*it).collection == collection ) {
433 it = queue.erase( it );
434 kDebug() << " erasing";
435 } else
436 ++it;
437 }
438}
439
440void ResourceScheduler::Task::sendDBusReplies( const QString &errorMsg )
441{
442 Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) {
443 QDBusMessage reply( msg.createReply() );
444 const QString methodName = msg.member();
445 if (methodName == QLatin1String("requestItemDelivery")) {
446 reply << errorMsg.isEmpty();
447 } else if (methodName == QLatin1String("requestItemDeliveryV2")) {
448 reply << errorMsg;
449 } else if (methodName.isEmpty()) {
450 continue; // unittest calls scheduleItemFetch with empty QDBusMessage
451 } else {
452 kFatal() << "Got unexpected member:" << methodName;
453 }
454 DBusConnectionPool::threadConnection().send( reply );
455 }
456}
457
458ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
459{
460 switch ( type ) {
461 case ChangeReplay:
462 case RecursiveMoveReplay:
463 return ChangeReplayQueue;
464 case FetchItem:
465 case SyncCollectionAttributes:
466 return UserActionQueue;
467 default:
468 return GenericTaskQueue;
469 }
470}
471
472ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
473{
474 const QueueType qt = queueTypeForTaskType( type );
475 return mTaskList[ qt ];
476}
477
478void ResourceScheduler::dump()
479{
480 kDebug() << dumpToString();
481}
482
483QString ResourceScheduler::dumpToString() const
484{
485 QString ret;
486 QTextStream str( &ret );
487 str << "ResourceScheduler: " << (mOnline?"Online":"Offline") << endl;
488 str << " current task: " << mCurrentTask << endl;
489 for ( int i = 0; i < NQueueCount; ++i ) {
490 const TaskList& queue = mTaskList[i];
491 if (queue.isEmpty()) {
492 str << " queue " << i << " is empty" << endl;
493 } else {
494 str << " queue " << i << " " << queue.size() << " tasks:" << endl;
495 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
496 str << " " << (*it) << endl;
497 }
498 }
499 }
500 return ret;
501}
502
503void ResourceScheduler::clear()
504{
505 kDebug() << "Clearing ResourceScheduler queues:";
506 for ( int i = 0; i < NQueueCount; ++i ) {
507 TaskList& queue = mTaskList[i];
508 queue.clear();
509 }
510 mCurrentTask = Task();
511 mCurrentTasksQueue = -1;
512}
513
514void Akonadi::ResourceScheduler::cancelQueues()
515{
516 for ( int i = 0; i < NQueueCount; ++i ) {
517 TaskList& queue = mTaskList[i];
518 if ( s_resourcetracker ) {
519 foreach ( const Task &t, queue ) {
520 QList<QVariant> argumentList;
521 argumentList << QString::number( t.serial ) << QString();
522 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
523 }
524 }
525 queue.clear();
526 }
527}
528
529static const char s_taskTypes[][27] = {
530 "Invalid (no task)",
531 "SyncAll",
532 "SyncCollectionTree",
533 "SyncCollection",
534 "SyncCollectionAttributes",
535 "FetchItem",
536 "ChangeReplay",
537 "RecursiveMoveReplay",
538 "DeleteResourceCollection",
539 "InvalideCacheForCollection",
540 "SyncAllDone",
541 "SyncCollectionTreeDone",
542 "Custom"
543};
544
545QTextStream& Akonadi::operator<<( QTextStream& d, const ResourceScheduler::Task& task )
546{
547 d << task.serial << " " << s_taskTypes[task.type] << " ";
548 if ( task.type != ResourceScheduler::Invalid ) {
549 if ( task.collection.isValid() )
550 d << "collection " << task.collection.id() << " ";
551 if ( task.item.id() != -1 )
552 d << "item " << task.item.id() << " ";
553 if ( !task.methodName.isEmpty() )
554 d << task.methodName << " " << task.argument.toString();
555 }
556 return d;
557}
558
559QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task )
560{
561 QString s;
562 QTextStream str( &s );
563 str << task;
564 d << s;
565 return d;
566}
567
568//@endcond
569
570#include "moc_resourcescheduler_p.cpp"
571