1 | /* |
2 | Copyright (c) 2007 Volker Krause <vkrause@kde.org> |
3 | |
4 | This library is free software; you can redistribute it and/or modify it |
5 | under the terms of the GNU Library General Public License as published by |
6 | the Free Software Foundation; either version 2 of the License, or (at your |
7 | option) any later version. |
8 | |
9 | This library is distributed in the hope that it will be useful, but WITHOUT |
10 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
11 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public |
12 | License for more details. |
13 | |
14 | You should have received a copy of the GNU Library General Public License |
15 | along with this library; see the file COPYING.LIB. If not, write to the |
16 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
17 | 02110-1301, USA. |
18 | */ |
19 | |
20 | #include "session.h" |
21 | #include "session_p.h" |
22 | |
23 | #include "imapparser_p.h" |
24 | #include "job.h" |
25 | #include "job_p.h" |
26 | #include "servermanager.h" |
27 | #include "servermanager_p.h" |
28 | #include "protocolhelper_p.h" |
29 | #include "xdgbasedirs_p.h" |
30 | |
31 | #include <kdebug.h> |
32 | #include <klocalizedstring.h> |
33 | |
34 | #include <QCoreApplication> |
35 | #include <QtCore/QDir> |
36 | #include <QtCore/QQueue> |
37 | #include <QtCore/QThreadStorage> |
38 | #include <QtCore/QTimer> |
39 | #include <QtCore/QThread> |
40 | #include <QSettings> |
41 | |
42 | #include <QtNetwork/QLocalSocket> |
43 | #include <QtNetwork/QTcpSocket> |
44 | #include <QtNetwork/QHostAddress> |
45 | #include <QApplication> |
46 | |
47 | // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission |
48 | // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still |
49 | // sends responses for the next one to the already finished one |
50 | #define PIPELINE_LENGTH 0 |
51 | //#define PIPELINE_LENGTH 2 |
52 | |
53 | using namespace Akonadi; |
54 | |
55 | //@cond PRIVATE |
56 | |
57 | static const QList<QByteArray> sCapabilities = QList<QByteArray>() |
58 | << "NOTIFY 3" |
59 | << "NOPAYLOADPATH" |
60 | << "AKAPPENDSTREAMING" |
61 | << "SERVERSEARCH" |
62 | << "DIRECTSTREAMING" ; |
63 | |
64 | void SessionPrivate::startNext() |
65 | { |
66 | QTimer::singleShot(0, mParent, SLOT(doStartNext())); |
67 | } |
68 | |
69 | void SessionPrivate::reconnect() |
70 | { |
71 | QLocalSocket *localSocket = qobject_cast<QLocalSocket *>(socket); |
72 | if (localSocket && (localSocket->state() == QLocalSocket::ConnectedState |
73 | || localSocket->state() == QLocalSocket::ConnectingState)) { |
74 | // nothing to do, we are still/already connected |
75 | return; |
76 | } |
77 | |
78 | QTcpSocket *tcpSocket = qobject_cast<QTcpSocket *>(socket); |
79 | if (tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState |
80 | || tcpSocket->state() == QTcpSocket::ConnectingState)) { |
81 | // same here, but for TCP |
82 | return; |
83 | } |
84 | |
85 | // try to figure out where to connect to |
86 | QString serverAddress; |
87 | quint16 port = 0; |
88 | bool useTcp = false; |
89 | |
90 | // env var has precedence |
91 | const QByteArray serverAddressEnvVar = qgetenv("AKONADI_SERVER_ADDRESS" ); |
92 | if (!serverAddressEnvVar.isEmpty()) { |
93 | const int pos = serverAddressEnvVar.indexOf(':'); |
94 | const QByteArray protocol = serverAddressEnvVar.left(pos); |
95 | QMap<QString, QString> options; |
96 | foreach (const QString &entry, QString::fromLatin1(serverAddressEnvVar.mid(pos + 1)).split(QLatin1Char(','))) { |
97 | const QStringList pair = entry.split(QLatin1Char('=')); |
98 | if (pair.size() != 2) { |
99 | continue; |
100 | } |
101 | options.insert(pair.first(), pair.last()); |
102 | } |
103 | kDebug() << protocol << options; |
104 | |
105 | if (protocol == "tcp" ) { |
106 | serverAddress = options.value(QLatin1String("host" )); |
107 | port = options.value(QLatin1String("port" )).toUInt(); |
108 | useTcp = true; |
109 | } else if (protocol == "unix" ) { |
110 | serverAddress = options.value(QLatin1String("path" )); |
111 | } else if (protocol == "pipe" ) { |
112 | serverAddress = options.value(QLatin1String("name" )); |
113 | } |
114 | } |
115 | |
116 | // try config file next, fall back to defaults if that fails as well |
117 | if (serverAddress.isEmpty()) { |
118 | const QString connectionConfigFile = connectionFile(); |
119 | const QFileInfo fileInfo(connectionConfigFile); |
120 | if (!fileInfo.exists()) { |
121 | kDebug() << "Akonadi Client Session: connection config file '" |
122 | "akonadi/akonadiconnectionrc' can not be found in" |
123 | << XdgBaseDirs::homePath("config" ) << "nor in any of" |
124 | << XdgBaseDirs::systemPathList("config" ); |
125 | } |
126 | const QSettings connectionSettings(connectionConfigFile, QSettings::IniFormat); |
127 | |
128 | #ifdef Q_OS_WIN //krazy:exclude=cpp |
129 | serverAddress = connectionSettings.value(QLatin1String("Data/NamedPipe" ), QLatin1String("Akonadi" )).toString(); |
130 | #else |
131 | const QString defaultSocketDir = Internal::xdgSaveDir("data" ); |
132 | serverAddress = connectionSettings.value(QLatin1String("Data/UnixPath" ), QString(defaultSocketDir + QLatin1String("/akonadiserver.socket" ))).toString(); |
133 | #endif |
134 | } |
135 | |
136 | // create sockets if not yet done, note that this does not yet allow changing socket types on the fly |
137 | // but that's probably not something we need to support anyway |
138 | if (!socket) { |
139 | if (!useTcp) { |
140 | socket = localSocket = new QLocalSocket(mParent); |
141 | mParent->connect(localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError))); |
142 | } else { |
143 | socket = tcpSocket = new QTcpSocket(mParent); |
144 | mParent->connect(tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError))); |
145 | } |
146 | mParent->connect(socket, SIGNAL(disconnected()), SLOT(socketDisconnected())); |
147 | mParent->connect(socket, SIGNAL(readyRead()), SLOT(dataReceived())); |
148 | } |
149 | |
150 | // actually do connect |
151 | kDebug() << "connectToServer" << serverAddress; |
152 | if (!useTcp) { |
153 | localSocket->connectToServer(serverAddress); |
154 | } else { |
155 | tcpSocket->connectToHost(serverAddress, port); |
156 | } |
157 | |
158 | emit mParent->reconnected(); |
159 | } |
160 | |
161 | QString SessionPrivate::connectionFile() |
162 | { |
163 | return Internal::xdgSaveDir("config" ) + QLatin1String("/akonadiconnectionrc" ); |
164 | } |
165 | |
166 | void SessionPrivate::socketError(QLocalSocket::LocalSocketError) |
167 | { |
168 | Q_ASSERT(mParent->sender() == socket); |
169 | kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket *>(socket)->errorString(); |
170 | socketDisconnected(); |
171 | } |
172 | |
173 | void SessionPrivate::socketError(QAbstractSocket::SocketError) |
174 | { |
175 | Q_ASSERT(mParent->sender() == socket); |
176 | kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket *>(socket)->errorString(); |
177 | socketDisconnected(); |
178 | } |
179 | |
180 | void SessionPrivate::socketDisconnected() |
181 | { |
182 | if (currentJob) { |
183 | currentJob->d_ptr->lostConnection(); |
184 | } |
185 | connected = false; |
186 | } |
187 | |
188 | void SessionPrivate::dataReceived() |
189 | { |
190 | while (socket->bytesAvailable() > 0) { |
191 | if (parser->continuationSize() > 1) { |
192 | const QByteArray data = socket->read(qMin(socket->bytesAvailable(), parser->continuationSize() - 1)); |
193 | parser->parseBlock(data); |
194 | } else if (socket->canReadLine()) { |
195 | if (!parser->parseNextLine(socket->readLine())) { |
196 | continue; // response not yet completed |
197 | } |
198 | |
199 | if (logFile) { |
200 | logFile->write("S: " + parser->data()); |
201 | } |
202 | |
203 | // handle login response |
204 | if (parser->tag() == QByteArray("0" )) { |
205 | if (parser->data().startsWith("OK" )) { //krazy:exclude=strings |
206 | writeData("1 CAPABILITY (" + ImapParser::join(sCapabilities, " " ) + ")" ); |
207 | } else { |
208 | kWarning() << "Unable to login to Akonadi server:" << parser->data(); |
209 | socket->close(); |
210 | QTimer::singleShot(1000, mParent, SLOT(reconnect())); |
211 | } |
212 | } |
213 | |
214 | // handle capability response |
215 | if (parser->tag() == QByteArray("1" )) { |
216 | if (parser->data().startsWith("OK" )) { |
217 | connected = true; |
218 | startNext(); |
219 | } else { |
220 | kDebug() << "Unhandled server capability response:" << parser->data(); |
221 | } |
222 | } |
223 | |
224 | // send login command |
225 | if (parser->tag() == "*" && parser->data().startsWith("OK Akonadi" )) { |
226 | const int pos = parser->data().indexOf("[PROTOCOL" ); |
227 | if (pos > 0) { |
228 | qint64 tmp = 0; |
229 | ImapParser::parseNumber(parser->data(), tmp, 0, pos + 9); |
230 | protocolVersion = tmp; |
231 | Internal::setServerProtocolVersion(tmp); |
232 | } |
233 | kDebug() << "Server protocol version is:" << protocolVersion; |
234 | |
235 | writeData("0 LOGIN " + ImapParser::quote(sessionId) + '\n'); |
236 | |
237 | // work for the current job |
238 | } else { |
239 | if (currentJob) { |
240 | currentJob->d_ptr->handleResponse(parser->tag(), parser->data()); |
241 | } |
242 | } |
243 | |
244 | // reset parser stuff |
245 | parser->reset(); |
246 | } else { |
247 | break; // nothing we can do for now |
248 | } |
249 | } |
250 | } |
251 | |
252 | bool SessionPrivate::canPipelineNext() |
253 | { |
254 | if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) { |
255 | return false; |
256 | } |
257 | if (pipeline.isEmpty() && currentJob) { |
258 | return currentJob->d_ptr->mWriteFinished; |
259 | } |
260 | if (!pipeline.isEmpty()) { |
261 | return pipeline.last()->d_ptr->mWriteFinished; |
262 | } |
263 | return false; |
264 | } |
265 | |
266 | void SessionPrivate::doStartNext() |
267 | { |
268 | if (!connected || (queue.isEmpty() && pipeline.isEmpty())) { |
269 | return; |
270 | } |
271 | if (canPipelineNext()) { |
272 | Akonadi::Job *nextJob = queue.dequeue(); |
273 | pipeline.enqueue(nextJob); |
274 | startJob(nextJob); |
275 | } |
276 | if (jobRunning) { |
277 | return; |
278 | } |
279 | jobRunning = true; |
280 | if (!pipeline.isEmpty()) { |
281 | currentJob = pipeline.dequeue(); |
282 | } else { |
283 | currentJob = queue.dequeue(); |
284 | startJob(currentJob); |
285 | } |
286 | } |
287 | |
288 | void SessionPrivate::startJob(Job *job) |
289 | { |
290 | if (protocolVersion < minimumProtocolVersion()) { |
291 | job->setError(Job::ProtocolVersionMismatch); |
292 | job->setErrorText(i18n("Protocol version %1 found, expected at least %2" , protocolVersion, minimumProtocolVersion())); |
293 | job->emitResult(); |
294 | } else { |
295 | job->d_ptr->startQueued(); |
296 | } |
297 | } |
298 | |
299 | void SessionPrivate::endJob(Job *job) |
300 | { |
301 | job->emitResult(); |
302 | } |
303 | |
304 | void SessionPrivate::jobDone(KJob *job) |
305 | { |
306 | // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below) |
307 | // so don't call any methods on job itself |
308 | if (job == currentJob) { |
309 | if (pipeline.isEmpty()) { |
310 | jobRunning = false; |
311 | currentJob = 0; |
312 | } else { |
313 | currentJob = pipeline.dequeue(); |
314 | } |
315 | startNext(); |
316 | } else { |
317 | // non-current job finished, likely canceled while still in the queue |
318 | queue.removeAll(static_cast<Akonadi::Job *>(job)); |
319 | // ### likely not enough to really cancel already running jobs |
320 | pipeline.removeAll(static_cast<Akonadi::Job *>(job)); |
321 | } |
322 | } |
323 | |
324 | void SessionPrivate::jobWriteFinished(Akonadi::Job *job) |
325 | { |
326 | Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last())); |
327 | Q_UNUSED(job); |
328 | |
329 | startNext(); |
330 | } |
331 | |
332 | void SessionPrivate::jobDestroyed(QObject *job) |
333 | { |
334 | // careful, accessing non-QObject methods of job will fail here already |
335 | jobDone(static_cast<KJob *>(job)); |
336 | } |
337 | |
338 | void SessionPrivate::addJob(Job *job) |
339 | { |
340 | queue.append(job); |
341 | QObject::connect(job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*))); |
342 | QObject::connect(job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*))); |
343 | QObject::connect(job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*))); |
344 | startNext(); |
345 | } |
346 | |
347 | int SessionPrivate::nextTag() |
348 | { |
349 | return theNextTag++; |
350 | } |
351 | |
352 | void SessionPrivate::writeData(const QByteArray &data) |
353 | { |
354 | if (logFile) { |
355 | logFile->write("C: " + data); |
356 | if (!data.endsWith('\n')) { |
357 | logFile->write("\n" ); |
358 | } |
359 | logFile->flush(); |
360 | } |
361 | |
362 | if (socket) { |
363 | socket->write(data); |
364 | } else { |
365 | kWarning() << "Trying to write while session is disconnected!" << kBacktrace(); |
366 | } |
367 | } |
368 | |
369 | void SessionPrivate::serverStateChanged(ServerManager::State state) |
370 | { |
371 | if (state == ServerManager::Running && !connected) { |
372 | reconnect(); |
373 | } else if (!connected && state == ServerManager::Broken) { |
374 | // If the server is broken, cancel all pending jobs, otherwise they will be |
375 | // blocked forever and applications waiting for them to finish would be stuck |
376 | Q_FOREACH (Job *job, queue) { |
377 | job->setError(Job::ConnectionFailed); |
378 | job->kill(KJob::EmitResult); |
379 | } |
380 | } |
381 | } |
382 | |
383 | void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision) |
384 | { |
385 | // only deal with the queue, for the guys in the pipeline it's too late already anyway |
386 | // and they shouldn't have gotten there if they depend on a preceding job anyway. |
387 | foreach (Job *job, queue) { |
388 | job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision); |
389 | } |
390 | } |
391 | |
392 | //@endcond |
393 | |
394 | SessionPrivate::SessionPrivate(Session *parent) |
395 | : mParent(parent) |
396 | , socket(0) |
397 | , protocolVersion(0) |
398 | , currentJob(0) |
399 | , parser(0) |
400 | , logFile(0) |
401 | { |
402 | } |
403 | |
404 | void SessionPrivate::init(const QByteArray &id) |
405 | { |
406 | kDebug() << id; |
407 | parser = new ImapParser(); |
408 | |
409 | if (!id.isEmpty()) { |
410 | sessionId = id; |
411 | } else { |
412 | sessionId = QCoreApplication::instance()->applicationName().toUtf8() |
413 | + '-' + QByteArray::number(qrand()); |
414 | } |
415 | |
416 | connected = false; |
417 | theNextTag = 2; |
418 | jobRunning = false; |
419 | |
420 | if (ServerManager::state() == ServerManager::NotRunning) { |
421 | ServerManager::start(); |
422 | } |
423 | mParent->connect(ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)), |
424 | SLOT(serverStateChanged(Akonadi::ServerManager::State))); |
425 | |
426 | const QByteArray sessionLogFile = qgetenv("AKONADI_SESSION_LOGFILE" ); |
427 | if (!sessionLogFile.isEmpty()) { |
428 | logFile = new QFile(QString::fromLatin1("%1.%2.%3" ).arg(QString::fromLatin1(sessionLogFile)) |
429 | .arg(QString::number(QApplication::applicationPid())) |
430 | .arg(QString::fromLatin1(sessionId)), |
431 | mParent); |
432 | if (!logFile->open(QIODevice::WriteOnly | QIODevice::Truncate)) { |
433 | kWarning() << "Failed to open Akonadi Session log file" << logFile->fileName(); |
434 | delete logFile; |
435 | logFile = 0; |
436 | } |
437 | } |
438 | |
439 | reconnect(); |
440 | } |
441 | |
442 | void SessionPrivate::forceReconnect() |
443 | { |
444 | jobRunning = false; |
445 | connected = false; |
446 | if (socket) { |
447 | socket->disconnect(mParent); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage! |
448 | delete socket; |
449 | } |
450 | socket = 0; |
451 | QMetaObject::invokeMethod(mParent, "reconnect" , Qt::QueuedConnection); // avoids reconnecting in the dtor |
452 | } |
453 | |
454 | Session::Session(const QByteArray &sessionId, QObject *parent) |
455 | : QObject(parent) |
456 | , d(new SessionPrivate(this)) |
457 | { |
458 | d->init(sessionId); |
459 | } |
460 | |
461 | Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent) |
462 | : QObject(parent) |
463 | , d(dd) |
464 | { |
465 | d->init(sessionId); |
466 | } |
467 | |
468 | Session::~Session() |
469 | { |
470 | clear(); |
471 | delete d; |
472 | } |
473 | |
474 | QByteArray Session::sessionId() const |
475 | { |
476 | return d->sessionId; |
477 | } |
478 | |
479 | static QThreadStorage<Session *> instances; |
480 | |
481 | void SessionPrivate::createDefaultSession(const QByteArray &sessionId) |
482 | { |
483 | Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession" , |
484 | "You tried to create a default session with empty session id!" ); |
485 | Q_ASSERT_X(!instances.hasLocalData(), "SessionPrivate::createDefaultSession" , |
486 | "You tried to create a default session twice!" ); |
487 | |
488 | instances.setLocalData(new Session(sessionId)); |
489 | } |
490 | |
491 | void SessionPrivate::setDefaultSession(Session *session) |
492 | { |
493 | instances.setLocalData(session); |
494 | } |
495 | |
496 | Session *Session::defaultSession() |
497 | { |
498 | if (!instances.hasLocalData()) { |
499 | instances.setLocalData(new Session()); |
500 | } |
501 | return instances.localData(); |
502 | } |
503 | |
504 | void Session::clear() |
505 | { |
506 | foreach (Job *job, d->queue) { |
507 | job->kill(KJob::EmitResult); // safe, not started yet |
508 | } |
509 | d->queue.clear(); |
510 | foreach (Job *job, d->pipeline) { |
511 | job->d_ptr->mStarted = false; // avoid killing/reconnect loops |
512 | job->kill(KJob::EmitResult); |
513 | } |
514 | d->pipeline.clear(); |
515 | if (d->currentJob) { |
516 | d->currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops |
517 | d->currentJob->kill(KJob::EmitResult); |
518 | } |
519 | d->forceReconnect(); |
520 | } |
521 | |
522 | #include "moc_session.cpp" |
523 | |