1 | /* |
2 | Copyright (c) 2007 Volker Krause <vkrause@kde.org> |
3 | |
4 | This library is free software; you can redistribute it and/or modify it |
5 | under the terms of the GNU Library General Public License as published by |
6 | the Free Software Foundation; either version 2 of the License, or (at your |
7 | option) any later version. |
8 | |
9 | This library is distributed in the hope that it will be useful, but WITHOUT |
10 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
11 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public |
12 | License for more details. |
13 | |
14 | You should have received a copy of the GNU Library General Public License |
15 | along with this library; see the file COPYING.LIB. If not, write to the |
16 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
17 | 02110-1301, USA. |
18 | */ |
19 | |
20 | #include "resourcescheduler_p.h" |
21 | |
22 | #include "dbusconnectionpool.h" |
23 | #include "recursivemover_p.h" |
24 | |
25 | #include <kdebug.h> |
26 | #include <klocalizedstring.h> |
27 | |
28 | #include <QtCore/QTimer> |
29 | #include <QtDBus/QDBusInterface> |
30 | #include <QtDBus/QDBusConnectionInterface> |
31 | #include <boost/graph/graph_concepts.hpp> |
32 | |
33 | using namespace Akonadi; |
34 | |
35 | qint64 ResourceScheduler::Task::latestSerial = 0; |
36 | static QDBusAbstractInterface *s_resourcetracker = 0; |
37 | |
38 | //@cond PRIVATE |
39 | |
40 | ResourceScheduler::ResourceScheduler( QObject *parent ) : |
41 | QObject( parent ), |
42 | mCurrentTasksQueue( -1 ), |
43 | mOnline( false ) |
44 | { |
45 | } |
46 | |
47 | void ResourceScheduler::scheduleFullSync() |
48 | { |
49 | Task t; |
50 | t.type = SyncAll; |
51 | TaskList& queue = queueForTaskType( t.type ); |
52 | if ( queue.contains( t ) || mCurrentTask == t ) |
53 | return; |
54 | queue << t; |
55 | signalTaskToTracker( t, "SyncAll" ); |
56 | scheduleNext(); |
57 | } |
58 | |
59 | void ResourceScheduler::scheduleCollectionTreeSync() |
60 | { |
61 | Task t; |
62 | t.type = SyncCollectionTree; |
63 | TaskList& queue = queueForTaskType( t.type ); |
64 | if ( queue.contains( t ) || mCurrentTask == t ) |
65 | return; |
66 | queue << t; |
67 | signalTaskToTracker( t, "SyncCollectionTree" ); |
68 | scheduleNext(); |
69 | } |
70 | |
71 | void ResourceScheduler::scheduleSync(const Collection & col) |
72 | { |
73 | Task t; |
74 | t.type = SyncCollection; |
75 | t.collection = col; |
76 | TaskList& queue = queueForTaskType( t.type ); |
77 | if ( queue.contains( t ) || mCurrentTask == t ) |
78 | return; |
79 | queue << t; |
80 | signalTaskToTracker( t, "SyncCollection" , QString::number( col.id() ) ); |
81 | scheduleNext(); |
82 | } |
83 | |
84 | void ResourceScheduler::scheduleAttributesSync( const Collection &collection ) |
85 | { |
86 | Task t; |
87 | t.type = SyncCollectionAttributes; |
88 | t.collection = collection; |
89 | |
90 | TaskList& queue = queueForTaskType( t.type ); |
91 | if ( queue.contains( t ) || mCurrentTask == t ) |
92 | return; |
93 | queue << t; |
94 | signalTaskToTracker( t, "SyncCollectionAttributes" , QString::number( collection.id() ) ); |
95 | scheduleNext(); |
96 | } |
97 | |
98 | void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg) |
99 | { |
100 | Task t; |
101 | t.type = FetchItem; |
102 | t.item = item; |
103 | t.itemParts = parts; |
104 | |
105 | // if the current task does already fetch the requested item, break here but |
106 | // keep the dbus message, so we can send the reply later on |
107 | if ( mCurrentTask == t ) { |
108 | mCurrentTask.dbusMsgs << msg; |
109 | return; |
110 | } |
111 | |
112 | // If this task is already in the queue, merge with it. |
113 | TaskList& queue = queueForTaskType( t.type ); |
114 | const int idx = queue.indexOf( t ); |
115 | if ( idx != -1 ) { |
116 | queue[ idx ].dbusMsgs << msg; |
117 | return; |
118 | } |
119 | |
120 | t.dbusMsgs << msg; |
121 | queue << t; |
122 | signalTaskToTracker( t, "FetchItem" , QString::number( item.id() ) ); |
123 | scheduleNext(); |
124 | } |
125 | |
126 | void ResourceScheduler::scheduleResourceCollectionDeletion() |
127 | { |
128 | Task t; |
129 | t.type = DeleteResourceCollection; |
130 | TaskList& queue = queueForTaskType( t.type ); |
131 | if ( queue.contains( t ) || mCurrentTask == t ) |
132 | return; |
133 | queue << t; |
134 | signalTaskToTracker( t, "DeleteResourceCollection" ); |
135 | scheduleNext(); |
136 | } |
137 | |
138 | void ResourceScheduler::scheduleCacheInvalidation( const Collection &collection ) |
139 | { |
140 | Task t; |
141 | t.type = InvalideCacheForCollection; |
142 | t.collection = collection; |
143 | TaskList& queue = queueForTaskType( t.type ); |
144 | if ( queue.contains( t ) || mCurrentTask == t ) |
145 | return; |
146 | queue << t; |
147 | signalTaskToTracker( t, "InvalideCacheForCollection" , QString::number( collection.id() ) ); |
148 | scheduleNext(); |
149 | } |
150 | |
151 | void ResourceScheduler::scheduleChangeReplay() |
152 | { |
153 | Task t; |
154 | t.type = ChangeReplay; |
155 | TaskList& queue = queueForTaskType( t.type ); |
156 | // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks |
157 | if ( queue.contains( t ) ) |
158 | return; |
159 | queue << t; |
160 | signalTaskToTracker( t, "ChangeReplay" ); |
161 | scheduleNext(); |
162 | } |
163 | |
164 | void ResourceScheduler::scheduleMoveReplay( const Collection &movedCollection, RecursiveMover *mover ) |
165 | { |
166 | Task t; |
167 | t.type = RecursiveMoveReplay; |
168 | t.collection = movedCollection; |
169 | t.argument = QVariant::fromValue( mover ); |
170 | TaskList &queue = queueForTaskType( t.type ); |
171 | |
172 | if ( queue.contains( t ) || mCurrentTask == t ) |
173 | return; |
174 | |
175 | queue << t; |
176 | signalTaskToTracker( t, "RecursiveMoveReplay" , QString::number( t.collection.id() ) ); |
177 | scheduleNext(); |
178 | } |
179 | |
180 | void Akonadi::ResourceScheduler::scheduleFullSyncCompletion() |
181 | { |
182 | Task t; |
183 | t.type = SyncAllDone; |
184 | TaskList& queue = queueForTaskType( t.type ); |
185 | // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost |
186 | queue << t; |
187 | signalTaskToTracker( t, "SyncAllDone" ); |
188 | scheduleNext(); |
189 | } |
190 | |
191 | void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion() |
192 | { |
193 | Task t; |
194 | t.type = SyncCollectionTreeDone; |
195 | TaskList& queue = queueForTaskType( t.type ); |
196 | // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost |
197 | queue << t; |
198 | signalTaskToTracker( t, "SyncCollectionTreeDone" ); |
199 | scheduleNext(); |
200 | } |
201 | |
202 | void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority ) |
203 | { |
204 | Task t; |
205 | t.type = Custom; |
206 | t.receiver = receiver; |
207 | t.methodName = methodName; |
208 | t.argument = argument; |
209 | QueueType queueType = GenericTaskQueue; |
210 | if ( priority == ResourceBase::AfterChangeReplay ) |
211 | queueType = AfterChangeReplayQueue; |
212 | else if ( priority == ResourceBase::Prepend ) |
213 | queueType = PrependTaskQueue; |
214 | TaskList& queue = mTaskList[ queueType ]; |
215 | |
216 | if ( queue.contains( t ) ) |
217 | return; |
218 | |
219 | switch (priority) { |
220 | case ResourceBase::Prepend: |
221 | queue.prepend( t ); |
222 | break; |
223 | default: |
224 | queue.append(t); |
225 | break; |
226 | } |
227 | |
228 | signalTaskToTracker( t, "Custom-" + t.methodName ); |
229 | scheduleNext(); |
230 | } |
231 | |
232 | void ResourceScheduler::taskDone() |
233 | { |
234 | if ( isEmpty() ) |
235 | emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work" , "Ready" ) ); |
236 | |
237 | if ( s_resourcetracker ) { |
238 | QList<QVariant> argumentList; |
239 | argumentList << QString::number( mCurrentTask.serial ) |
240 | << QString(); |
241 | s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); |
242 | } |
243 | |
244 | mCurrentTask = Task(); |
245 | mCurrentTasksQueue = -1; |
246 | scheduleNext(); |
247 | } |
248 | |
249 | void ResourceScheduler::deferTask() |
250 | { |
251 | if ( mCurrentTask.type == Invalid ) |
252 | return; |
253 | |
254 | if ( s_resourcetracker ) { |
255 | QList<QVariant> argumentList; |
256 | argumentList << QString::number( mCurrentTask.serial ) |
257 | << QString(); |
258 | s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); |
259 | } |
260 | |
261 | Task t = mCurrentTask; |
262 | mCurrentTask = Task(); |
263 | |
264 | Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount ); |
265 | mTaskList[mCurrentTasksQueue].prepend( t ); |
266 | mCurrentTasksQueue = -1; |
267 | |
268 | signalTaskToTracker( t, "DeferedTask" ); |
269 | |
270 | scheduleNext(); |
271 | } |
272 | |
273 | bool ResourceScheduler::isEmpty() |
274 | { |
275 | for ( int i = 0; i < NQueueCount; ++i ) { |
276 | if ( !mTaskList[i].isEmpty() ) |
277 | return false; |
278 | } |
279 | return true; |
280 | } |
281 | |
282 | void ResourceScheduler::scheduleNext() |
283 | { |
284 | if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline ) |
285 | return; |
286 | QTimer::singleShot( 0, this, SLOT(executeNext()) ); |
287 | } |
288 | |
289 | void ResourceScheduler::executeNext() |
290 | { |
291 | if ( mCurrentTask.type != Invalid || isEmpty() ) |
292 | return; |
293 | |
294 | for ( int i = 0; i < NQueueCount; ++i ) { |
295 | if ( !mTaskList[ i ].isEmpty() ) { |
296 | mCurrentTask = mTaskList[ i ].takeFirst(); |
297 | mCurrentTasksQueue = i; |
298 | break; |
299 | } |
300 | } |
301 | |
302 | if ( s_resourcetracker ) { |
303 | QList<QVariant> argumentList; |
304 | argumentList << QString::number( mCurrentTask.serial ); |
305 | s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList); |
306 | } |
307 | |
308 | switch ( mCurrentTask.type ) { |
309 | case SyncAll: |
310 | emit executeFullSync(); |
311 | break; |
312 | case SyncCollectionTree: |
313 | emit executeCollectionTreeSync(); |
314 | break; |
315 | case SyncCollection: |
316 | emit executeCollectionSync( mCurrentTask.collection ); |
317 | break; |
318 | case SyncCollectionAttributes: |
319 | emit executeCollectionAttributesSync( mCurrentTask.collection ); |
320 | break; |
321 | case FetchItem: |
322 | emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts ); |
323 | break; |
324 | case DeleteResourceCollection: |
325 | emit executeResourceCollectionDeletion(); |
326 | break; |
327 | case InvalideCacheForCollection: |
328 | emit executeCacheInvalidation( mCurrentTask.collection ); |
329 | break; |
330 | case ChangeReplay: |
331 | emit executeChangeReplay(); |
332 | break; |
333 | case RecursiveMoveReplay: |
334 | emit executeRecursiveMoveReplay( mCurrentTask.argument.value<RecursiveMover*>() ); |
335 | break; |
336 | case SyncAllDone: |
337 | emit fullSyncComplete(); |
338 | break; |
339 | case SyncCollectionTreeDone: |
340 | emit collectionTreeSyncComplete(); |
341 | break; |
342 | case Custom: |
343 | { |
344 | const QByteArray methodSig = mCurrentTask.methodName + "(QVariant)" ; |
345 | const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1; |
346 | bool success = false; |
347 | if ( hasSlotWithVariant ) { |
348 | success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) ); |
349 | Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), "ResourceScheduler::executeNext" , "Valid argument was provided but the method wasn't found" ); |
350 | } |
351 | if ( !success ) |
352 | success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName ); |
353 | |
354 | if ( !success ) |
355 | kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument; |
356 | break; |
357 | } |
358 | default: { |
359 | kError() << "Unhandled task type" << mCurrentTask.type; |
360 | dump(); |
361 | Q_ASSERT( false ); |
362 | } |
363 | } |
364 | } |
365 | |
366 | ResourceScheduler::Task ResourceScheduler::currentTask() const |
367 | { |
368 | return mCurrentTask; |
369 | } |
370 | |
371 | void ResourceScheduler::setOnline(bool state) |
372 | { |
373 | if ( mOnline == state ) |
374 | return; |
375 | mOnline = state; |
376 | if ( mOnline ) { |
377 | scheduleNext(); |
378 | } else { |
379 | if ( mCurrentTask.type != Invalid ) { |
380 | // abort running task |
381 | queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask ); |
382 | mCurrentTask = Task(); |
383 | mCurrentTasksQueue = -1; |
384 | } |
385 | // abort pending synchronous tasks, might take longer until the resource goes online again |
386 | TaskList& itemFetchQueue = queueForTaskType( FetchItem ); |
387 | for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) { |
388 | if ( (*it).type == FetchItem ) { |
389 | (*it).sendDBusReplies( i18nc( "@info" , "Job canceled." ) ); |
390 | it = itemFetchQueue.erase( it ); |
391 | if ( s_resourcetracker ) { |
392 | QList<QVariant> argumentList; |
393 | argumentList << QString::number( mCurrentTask.serial ) |
394 | << i18nc( "@info" , "Job canceled." ); |
395 | s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList ); |
396 | } |
397 | } else { |
398 | ++it; |
399 | } |
400 | } |
401 | } |
402 | } |
403 | |
404 | void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType, const QString &debugString ) |
405 | { |
406 | // if there's a job tracer running, tell it about the new job |
407 | if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) { |
408 | s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ), |
409 | QLatin1String( "/resourcesJobtracker" ), |
410 | QLatin1String( "org.freedesktop.Akonadi.JobTracker" ), |
411 | DBusConnectionPool::threadConnection(), 0 ); |
412 | } |
413 | |
414 | if ( s_resourcetracker ) { |
415 | QList<QVariant> argumentList; |
416 | argumentList << static_cast<AgentBase*>( parent() )->identifier() // "session" (in our case resource) |
417 | << QString::number( task.serial ) // "job" |
418 | << QString() // "parent job" |
419 | << QString::fromLatin1( taskType ) // "job type" |
420 | << debugString // "job debugging string" |
421 | ; |
422 | s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList); |
423 | } |
424 | } |
425 | |
426 | void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection ) |
427 | { |
428 | if ( !collection.isValid() ) // should not happen, but you never know... |
429 | return; |
430 | TaskList& queue = queueForTaskType( SyncCollection ); |
431 | for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) { |
432 | if ( (*it).type == SyncCollection && (*it).collection == collection ) { |
433 | it = queue.erase( it ); |
434 | kDebug() << " erasing" ; |
435 | } else |
436 | ++it; |
437 | } |
438 | } |
439 | |
440 | void ResourceScheduler::Task::sendDBusReplies( const QString &errorMsg ) |
441 | { |
442 | Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) { |
443 | QDBusMessage reply( msg.createReply() ); |
444 | const QString methodName = msg.member(); |
445 | if (methodName == QLatin1String("requestItemDelivery" )) { |
446 | reply << errorMsg.isEmpty(); |
447 | } else if (methodName == QLatin1String("requestItemDeliveryV2" )) { |
448 | reply << errorMsg; |
449 | } else if (methodName.isEmpty()) { |
450 | continue; // unittest calls scheduleItemFetch with empty QDBusMessage |
451 | } else { |
452 | kFatal() << "Got unexpected member:" << methodName; |
453 | } |
454 | DBusConnectionPool::threadConnection().send( reply ); |
455 | } |
456 | } |
457 | |
458 | ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type ) |
459 | { |
460 | switch ( type ) { |
461 | case ChangeReplay: |
462 | case RecursiveMoveReplay: |
463 | return ChangeReplayQueue; |
464 | case FetchItem: |
465 | case SyncCollectionAttributes: |
466 | return UserActionQueue; |
467 | default: |
468 | return GenericTaskQueue; |
469 | } |
470 | } |
471 | |
472 | ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type ) |
473 | { |
474 | const QueueType qt = queueTypeForTaskType( type ); |
475 | return mTaskList[ qt ]; |
476 | } |
477 | |
478 | void ResourceScheduler::dump() |
479 | { |
480 | kDebug() << dumpToString(); |
481 | } |
482 | |
483 | QString ResourceScheduler::dumpToString() const |
484 | { |
485 | QString ret; |
486 | QTextStream str( &ret ); |
487 | str << "ResourceScheduler: " << (mOnline?"Online" :"Offline" ) << endl; |
488 | str << " current task: " << mCurrentTask << endl; |
489 | for ( int i = 0; i < NQueueCount; ++i ) { |
490 | const TaskList& queue = mTaskList[i]; |
491 | if (queue.isEmpty()) { |
492 | str << " queue " << i << " is empty" << endl; |
493 | } else { |
494 | str << " queue " << i << " " << queue.size() << " tasks:" << endl; |
495 | for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) { |
496 | str << " " << (*it) << endl; |
497 | } |
498 | } |
499 | } |
500 | return ret; |
501 | } |
502 | |
503 | void ResourceScheduler::clear() |
504 | { |
505 | kDebug() << "Clearing ResourceScheduler queues:" ; |
506 | for ( int i = 0; i < NQueueCount; ++i ) { |
507 | TaskList& queue = mTaskList[i]; |
508 | queue.clear(); |
509 | } |
510 | mCurrentTask = Task(); |
511 | mCurrentTasksQueue = -1; |
512 | } |
513 | |
514 | void Akonadi::ResourceScheduler::cancelQueues() |
515 | { |
516 | for ( int i = 0; i < NQueueCount; ++i ) { |
517 | TaskList& queue = mTaskList[i]; |
518 | if ( s_resourcetracker ) { |
519 | foreach ( const Task &t, queue ) { |
520 | QList<QVariant> argumentList; |
521 | argumentList << QString::number( t.serial ) << QString(); |
522 | s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); |
523 | } |
524 | } |
525 | queue.clear(); |
526 | } |
527 | } |
528 | |
529 | static const char s_taskTypes[][27] = { |
530 | "Invalid (no task)" , |
531 | "SyncAll" , |
532 | "SyncCollectionTree" , |
533 | "SyncCollection" , |
534 | "SyncCollectionAttributes" , |
535 | "FetchItem" , |
536 | "ChangeReplay" , |
537 | "RecursiveMoveReplay" , |
538 | "DeleteResourceCollection" , |
539 | "InvalideCacheForCollection" , |
540 | "SyncAllDone" , |
541 | "SyncCollectionTreeDone" , |
542 | "Custom" |
543 | }; |
544 | |
545 | QTextStream& Akonadi::operator<<( QTextStream& d, const ResourceScheduler::Task& task ) |
546 | { |
547 | d << task.serial << " " << s_taskTypes[task.type] << " " ; |
548 | if ( task.type != ResourceScheduler::Invalid ) { |
549 | if ( task.collection.isValid() ) |
550 | d << "collection " << task.collection.id() << " " ; |
551 | if ( task.item.id() != -1 ) |
552 | d << "item " << task.item.id() << " " ; |
553 | if ( !task.methodName.isEmpty() ) |
554 | d << task.methodName << " " << task.argument.toString(); |
555 | } |
556 | return d; |
557 | } |
558 | |
559 | QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task ) |
560 | { |
561 | QString s; |
562 | QTextStream str( &s ); |
563 | str << task; |
564 | d << s; |
565 | return d; |
566 | } |
567 | |
568 | //@endcond |
569 | |
570 | #include "moc_resourcescheduler_p.cpp" |
571 | |