1 | /* |
2 | Copyright (c) 2006 Tobias Koenig <tokoe@kde.org> |
3 | 2006 Marc Mutz <mutz@kde.org> |
4 | 2006 - 2007 Volker Krause <vkrause@kde.org> |
5 | |
6 | This library is free software; you can redistribute it and/or modify it |
7 | under the terms of the GNU Library General Public License as published by |
8 | the Free Software Foundation; either version 2 of the License, or (at your |
9 | option) any later version. |
10 | |
11 | This library is distributed in the hope that it will be useful, but WITHOUT |
12 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
13 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public |
14 | License for more details. |
15 | |
16 | You should have received a copy of the GNU Library General Public License |
17 | along with this library; see the file COPYING.LIB. If not, write to the |
18 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
19 | 02110-1301, USA. |
20 | */ |
21 | |
22 | #include "job.h" |
23 | #include "job_p.h" |
24 | #include "dbusconnectionpool.h" |
25 | #include <QTime> |
26 | #include "imapparser_p.h" |
27 | #include "session.h" |
28 | #include "session_p.h" |
29 | |
30 | #include <kdebug.h> |
31 | #include <klocalizedstring.h> |
32 | |
33 | #include <QtCore/QEventLoop> |
34 | #include <QtCore/QTimer> |
35 | #include <QtCore/QTextStream> |
36 | #include <QtDBus/QDBusInterface> |
37 | #include <QtDBus/QDBusConnectionInterface> |
38 | |
39 | using namespace Akonadi; |
40 | |
41 | static QDBusAbstractInterface *s_jobtracker = 0; |
42 | |
43 | //@cond PRIVATE |
44 | void JobPrivate::handleResponse(const QByteArray &tag, const QByteArray &data) |
45 | { |
46 | Q_Q(Job); |
47 | |
48 | if (mCurrentSubJob) { |
49 | mCurrentSubJob->d_ptr->handleResponse(tag, data); |
50 | return; |
51 | } |
52 | |
53 | if (tag == mTag) { |
54 | if (data.startsWith("NO " ) || data.startsWith("BAD " )) { //krazy:exclude=strings |
55 | QString msg = QString::fromUtf8(data); |
56 | |
57 | msg.remove(0, msg.startsWith(QLatin1String("NO " )) ? 3 : 4); |
58 | |
59 | if (msg.endsWith(QLatin1String("\r\n" ))) { |
60 | msg.chop(2); |
61 | } |
62 | |
63 | q->setError(Job::Unknown); |
64 | q->setErrorText(msg); |
65 | q->emitResult(); |
66 | return; |
67 | } else if (data.startsWith("OK" )) { //krazy:exclude=strings |
68 | |
69 | // We can't use emitResult() here: The slot connected to the result signal might exec() |
70 | // another job, and therefore this method would never return. That causes the session |
71 | // to deadlock, since it calls this method and does not continue starting new jobs until |
72 | // this method finishes. Which would also mean the exec()'d job is never started,, and there- |
73 | // fore everything deadlocks. |
74 | QTimer::singleShot(0, q, SLOT(delayedEmitResult())); |
75 | return; |
76 | } |
77 | } |
78 | |
79 | q->doHandleResponse(tag, data); |
80 | } |
81 | |
82 | void JobPrivate::init(QObject *parent) |
83 | { |
84 | Q_Q(Job); |
85 | |
86 | mParentJob = dynamic_cast<Job *>(parent); |
87 | mSession = dynamic_cast<Session *>(parent); |
88 | |
89 | if (!mSession) { |
90 | if (!mParentJob) { |
91 | mSession = Session::defaultSession(); |
92 | } else { |
93 | mSession = mParentJob->d_ptr->mSession; |
94 | } |
95 | } |
96 | |
97 | if (!mParentJob) { |
98 | mSession->d->addJob(q); |
99 | } else { |
100 | mParentJob->addSubjob(q); |
101 | } |
102 | |
103 | // if there's a job tracker running, tell it about the new job |
104 | if (!s_jobtracker) { |
105 | // Let's only check for the debugging console every 3 seconds, otherwise every single job |
106 | // makes a dbus call to the dbus daemon, doesn't help performance. |
107 | static QTime s_lastTime; |
108 | if (s_lastTime.isNull() || s_lastTime.elapsed() > 3000) { |
109 | if (s_lastTime.isNull()) { |
110 | s_lastTime.start(); |
111 | } |
112 | if (DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String("org.kde.akonadiconsole" ))) { |
113 | s_jobtracker = new QDBusInterface(QLatin1String("org.kde.akonadiconsole" ), |
114 | QLatin1String("/jobtracker" ), |
115 | QLatin1String("org.freedesktop.Akonadi.JobTracker" ), |
116 | DBusConnectionPool::threadConnection(), 0); |
117 | } else { |
118 | s_lastTime.restart(); |
119 | } |
120 | } |
121 | // Note: we never reset s_jobtracker to 0 when a call fails; but if we did |
122 | // then we should restart s_lastTime. |
123 | } |
124 | QMetaObject::invokeMethod(q, "signalCreationToJobTracker" , Qt::QueuedConnection); |
125 | } |
126 | |
127 | void JobPrivate::signalCreationToJobTracker() |
128 | { |
129 | Q_Q(Job); |
130 | if (s_jobtracker) { |
131 | // We do these dbus calls manually, so as to avoid having to install (or copy) the console's |
132 | // xml interface document. Since this is purely a debugging aid, that seems preferable to |
133 | // publishing something not intended for public consumption. |
134 | // WARNING: for any signature change here, apply it to resourcescheduler.cpp too |
135 | QList<QVariant> argumentList; |
136 | argumentList << QLatin1String(mSession->sessionId()) |
137 | << QString::number(reinterpret_cast<quintptr>(q), 16) |
138 | << (mParentJob ? QString::number(reinterpret_cast<quintptr>(mParentJob), 16) : QString()) |
139 | << QString::fromLatin1(q->metaObject()->className()) |
140 | << jobDebuggingString(); |
141 | s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String("jobCreated" ), argumentList); |
142 | } |
143 | } |
144 | |
145 | void JobPrivate::signalStartedToJobTracker() |
146 | { |
147 | Q_Q(Job); |
148 | if (s_jobtracker) { |
149 | // if there's a job tracker running, tell it a job started |
150 | QList<QVariant> argumentList; |
151 | argumentList << QString::number(reinterpret_cast<quintptr>(q), 16); |
152 | s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String("jobStarted" ), argumentList); |
153 | } |
154 | } |
155 | |
156 | void JobPrivate::aboutToFinish() |
157 | { |
158 | // Dummy |
159 | } |
160 | |
161 | void JobPrivate::delayedEmitResult() |
162 | { |
163 | Q_Q(Job); |
164 | aboutToFinish(); |
165 | q->emitResult(); |
166 | } |
167 | |
168 | void JobPrivate::startQueued() |
169 | { |
170 | Q_Q(Job); |
171 | mStarted = true; |
172 | |
173 | emit q->aboutToStart(q); |
174 | q->doStart(); |
175 | QTimer::singleShot(0, q, SLOT(startNext())); |
176 | QMetaObject::invokeMethod(q, "signalStartedToJobTracker" , Qt::QueuedConnection); |
177 | } |
178 | |
179 | void JobPrivate::lostConnection() |
180 | { |
181 | Q_Q(Job); |
182 | |
183 | if (mCurrentSubJob) { |
184 | mCurrentSubJob->d_ptr->lostConnection(); |
185 | } else { |
186 | q->setError(Job::ConnectionFailed); |
187 | q->emitResult(); |
188 | } |
189 | } |
190 | |
191 | void JobPrivate::slotSubJobAboutToStart(Job *job) |
192 | { |
193 | Q_ASSERT(mCurrentSubJob == 0); |
194 | mCurrentSubJob = job; |
195 | } |
196 | |
197 | void JobPrivate::startNext() |
198 | { |
199 | Q_Q(Job); |
200 | |
201 | if (mStarted && !mCurrentSubJob && q->hasSubjobs()) { |
202 | Job *job = dynamic_cast<Akonadi::Job *>(q->subjobs().first()); |
203 | Q_ASSERT(job); |
204 | job->d_ptr->startQueued(); |
205 | } |
206 | } |
207 | |
208 | QByteArray JobPrivate::newTag() |
209 | { |
210 | if (mParentJob) { |
211 | mTag = mParentJob->d_ptr->newTag(); |
212 | } else { |
213 | mTag = QByteArray::number(mSession->d->nextTag()); |
214 | } |
215 | return mTag; |
216 | } |
217 | |
218 | QByteArray JobPrivate::tag() const |
219 | { |
220 | return mTag; |
221 | } |
222 | |
223 | void JobPrivate::writeData(const QByteArray &data) |
224 | { |
225 | Q_ASSERT_X(!mWriteFinished, "Job::writeData()" , "Calling writeData() after emitting writeFinished()" ); |
226 | mSession->d->writeData(data); |
227 | } |
228 | |
229 | void JobPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision) |
230 | { |
231 | mSession->d->itemRevisionChanged(itemId, oldRevision, newRevision); |
232 | } |
233 | |
234 | void JobPrivate::updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision) |
235 | { |
236 | Q_Q(Job); |
237 | foreach (KJob *j, q->subjobs()) { |
238 | Akonadi::Job *job = qobject_cast<Akonadi::Job *>(j); |
239 | if (job) { |
240 | job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision); |
241 | } |
242 | } |
243 | doUpdateItemRevision(itemId, oldRevision, newRevision); |
244 | } |
245 | |
246 | void JobPrivate::doUpdateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision) |
247 | { |
248 | Q_UNUSED(itemId); |
249 | Q_UNUSED(oldRevision); |
250 | Q_UNUSED(newRevision); |
251 | } |
252 | |
253 | int JobPrivate::protocolVersion() const |
254 | { |
255 | return mSession->d->protocolVersion; |
256 | } |
257 | //@endcond |
258 | |
259 | Job::Job(QObject *parent) |
260 | : KCompositeJob(parent) |
261 | , d_ptr(new JobPrivate(this)) |
262 | { |
263 | d_ptr->init(parent); |
264 | } |
265 | |
266 | Job::Job(JobPrivate *dd, QObject *parent) |
267 | : KCompositeJob(parent) |
268 | , d_ptr(dd) |
269 | { |
270 | d_ptr->init(parent); |
271 | } |
272 | |
273 | Job::~Job() |
274 | { |
275 | delete d_ptr; |
276 | |
277 | // if there is a job tracer listening, tell it the job is done now |
278 | if (s_jobtracker) { |
279 | QList<QVariant> argumentList; |
280 | argumentList << QString::number(reinterpret_cast<quintptr>(this), 16) |
281 | << errorString(); |
282 | s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String("jobEnded" ), argumentList); |
283 | } |
284 | } |
285 | |
286 | void Job::start() |
287 | { |
288 | } |
289 | |
290 | bool Job::doKill() |
291 | { |
292 | Q_D(Job); |
293 | if (d->mStarted) { |
294 | // the only way to cancel an already started job is reconnecting to the server |
295 | d->mSession->d->forceReconnect(); |
296 | } |
297 | d->mStarted = false; |
298 | return true; |
299 | } |
300 | |
301 | QString Job::errorString() const |
302 | { |
303 | QString str; |
304 | switch (error()) { |
305 | case NoError: |
306 | break; |
307 | case ConnectionFailed: |
308 | str = i18n("Cannot connect to the Akonadi service." ); |
309 | break; |
310 | case ProtocolVersionMismatch: |
311 | str = i18n("The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed." ); |
312 | break; |
313 | case UserCanceled: |
314 | str = i18n("User canceled operation." ); |
315 | break; |
316 | case Unknown: |
317 | return errorText(); |
318 | default: |
319 | str = i18n("Unknown error." ); |
320 | break; |
321 | } |
322 | if (!errorText().isEmpty()) { |
323 | str += QString::fromLatin1(" (%1)" ).arg(errorText()); |
324 | } |
325 | return str; |
326 | } |
327 | |
328 | bool Job::addSubjob(KJob *job) |
329 | { |
330 | bool rv = KCompositeJob::addSubjob(job); |
331 | if (rv) { |
332 | connect(job, SIGNAL(aboutToStart(Akonadi::Job*)), SLOT(slotSubJobAboutToStart(Akonadi::Job*))); |
333 | QTimer::singleShot(0, this, SLOT(startNext())); |
334 | } |
335 | return rv; |
336 | } |
337 | |
338 | bool Job::removeSubjob(KJob *job) |
339 | { |
340 | bool rv = KCompositeJob::removeSubjob(job); |
341 | if (job == d_ptr->mCurrentSubJob) { |
342 | d_ptr->mCurrentSubJob = 0; |
343 | QTimer::singleShot(0, this, SLOT(startNext())); |
344 | } |
345 | return rv; |
346 | } |
347 | |
348 | void Job::doHandleResponse(const QByteArray &tag, const QByteArray &data) |
349 | { |
350 | kDebug() << "Unhandled response: " << tag << data; |
351 | } |
352 | |
353 | void Job::slotResult(KJob *job) |
354 | { |
355 | if (d_ptr->mCurrentSubJob == job) { |
356 | // current job finished, start the next one |
357 | d_ptr->mCurrentSubJob = 0; |
358 | KCompositeJob::slotResult(job); |
359 | if (!job->error()) { |
360 | QTimer::singleShot(0, this, SLOT(startNext())); |
361 | } |
362 | } else { |
363 | // job that was still waiting for execution finished, probably canceled, |
364 | // so just remove it from the queue and move on without caring about |
365 | // its error code |
366 | KCompositeJob::removeSubjob(job); |
367 | } |
368 | } |
369 | |
370 | void Job::emitWriteFinished() |
371 | { |
372 | d_ptr->mWriteFinished = true; |
373 | emit writeFinished(this); |
374 | } |
375 | |
376 | #include "moc_job.cpp" |
377 | |