1/*
2 Copyright (c) 2007 Volker Krause <vkrause@kde.org>
3
4 This library is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or (at your
7 option) any later version.
8
9 This library is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12 License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this library; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17 02110-1301, USA.
18*/
19
20#include "session.h"
21#include "session_p.h"
22
23#include "imapparser_p.h"
24#include "job.h"
25#include "job_p.h"
26#include "servermanager.h"
27#include "servermanager_p.h"
28#include "protocolhelper_p.h"
29#include "xdgbasedirs_p.h"
30
31#include <kdebug.h>
32#include <klocalizedstring.h>
33
34#include <QCoreApplication>
35#include <QtCore/QDir>
36#include <QtCore/QQueue>
37#include <QtCore/QThreadStorage>
38#include <QtCore/QTimer>
39#include <QtCore/QThread>
40#include <QSettings>
41
42#include <QtNetwork/QLocalSocket>
43#include <QtNetwork/QTcpSocket>
44#include <QtNetwork/QHostAddress>
45#include <QApplication>
46
47// ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
48// in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
49// sends responses for the next one to the already finished one
50#define PIPELINE_LENGTH 0
51//#define PIPELINE_LENGTH 2
52
53using namespace Akonadi;
54
55//@cond PRIVATE
56
57static const QList<QByteArray> sCapabilities = QList<QByteArray>()
58 << "NOTIFY 3"
59 << "NOPAYLOADPATH"
60 << "AKAPPENDSTREAMING"
61 << "SERVERSEARCH"
62 << "DIRECTSTREAMING";
63
64void SessionPrivate::startNext()
65{
66 QTimer::singleShot(0, mParent, SLOT(doStartNext()));
67}
68
69void SessionPrivate::reconnect()
70{
71 QLocalSocket *localSocket = qobject_cast<QLocalSocket *>(socket);
72 if (localSocket && (localSocket->state() == QLocalSocket::ConnectedState
73 || localSocket->state() == QLocalSocket::ConnectingState)) {
74 // nothing to do, we are still/already connected
75 return;
76 }
77
78 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket *>(socket);
79 if (tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
80 || tcpSocket->state() == QTcpSocket::ConnectingState)) {
81 // same here, but for TCP
82 return;
83 }
84
85 // try to figure out where to connect to
86 QString serverAddress;
87 quint16 port = 0;
88 bool useTcp = false;
89
90 // env var has precedence
91 const QByteArray serverAddressEnvVar = qgetenv("AKONADI_SERVER_ADDRESS");
92 if (!serverAddressEnvVar.isEmpty()) {
93 const int pos = serverAddressEnvVar.indexOf(':');
94 const QByteArray protocol = serverAddressEnvVar.left(pos);
95 QMap<QString, QString> options;
96 foreach (const QString &entry, QString::fromLatin1(serverAddressEnvVar.mid(pos + 1)).split(QLatin1Char(','))) {
97 const QStringList pair = entry.split(QLatin1Char('='));
98 if (pair.size() != 2) {
99 continue;
100 }
101 options.insert(pair.first(), pair.last());
102 }
103 kDebug() << protocol << options;
104
105 if (protocol == "tcp") {
106 serverAddress = options.value(QLatin1String("host"));
107 port = options.value(QLatin1String("port")).toUInt();
108 useTcp = true;
109 } else if (protocol == "unix") {
110 serverAddress = options.value(QLatin1String("path"));
111 } else if (protocol == "pipe") {
112 serverAddress = options.value(QLatin1String("name"));
113 }
114 }
115
116 // try config file next, fall back to defaults if that fails as well
117 if (serverAddress.isEmpty()) {
118 const QString connectionConfigFile = connectionFile();
119 const QFileInfo fileInfo(connectionConfigFile);
120 if (!fileInfo.exists()) {
121 kDebug() << "Akonadi Client Session: connection config file '"
122 "akonadi/akonadiconnectionrc' can not be found in"
123 << XdgBaseDirs::homePath("config") << "nor in any of"
124 << XdgBaseDirs::systemPathList("config");
125 }
126 const QSettings connectionSettings(connectionConfigFile, QSettings::IniFormat);
127
128#ifdef Q_OS_WIN //krazy:exclude=cpp
129 serverAddress = connectionSettings.value(QLatin1String("Data/NamedPipe"), QLatin1String("Akonadi")).toString();
130#else
131 const QString defaultSocketDir = Internal::xdgSaveDir("data");
132 serverAddress = connectionSettings.value(QLatin1String("Data/UnixPath"), QString(defaultSocketDir + QLatin1String("/akonadiserver.socket"))).toString();
133#endif
134 }
135
136 // create sockets if not yet done, note that this does not yet allow changing socket types on the fly
137 // but that's probably not something we need to support anyway
138 if (!socket) {
139 if (!useTcp) {
140 socket = localSocket = new QLocalSocket(mParent);
141 mParent->connect(localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)));
142 } else {
143 socket = tcpSocket = new QTcpSocket(mParent);
144 mParent->connect(tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)));
145 }
146 mParent->connect(socket, SIGNAL(disconnected()), SLOT(socketDisconnected()));
147 mParent->connect(socket, SIGNAL(readyRead()), SLOT(dataReceived()));
148 }
149
150 // actually do connect
151 kDebug() << "connectToServer" << serverAddress;
152 if (!useTcp) {
153 localSocket->connectToServer(serverAddress);
154 } else {
155 tcpSocket->connectToHost(serverAddress, port);
156 }
157
158 emit mParent->reconnected();
159}
160
161QString SessionPrivate::connectionFile()
162{
163 return Internal::xdgSaveDir("config") + QLatin1String("/akonadiconnectionrc");
164}
165
166void SessionPrivate::socketError(QLocalSocket::LocalSocketError)
167{
168 Q_ASSERT(mParent->sender() == socket);
169 kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket *>(socket)->errorString();
170 socketDisconnected();
171}
172
173void SessionPrivate::socketError(QAbstractSocket::SocketError)
174{
175 Q_ASSERT(mParent->sender() == socket);
176 kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket *>(socket)->errorString();
177 socketDisconnected();
178}
179
180void SessionPrivate::socketDisconnected()
181{
182 if (currentJob) {
183 currentJob->d_ptr->lostConnection();
184 }
185 connected = false;
186}
187
188void SessionPrivate::dataReceived()
189{
190 while (socket->bytesAvailable() > 0) {
191 if (parser->continuationSize() > 1) {
192 const QByteArray data = socket->read(qMin(socket->bytesAvailable(), parser->continuationSize() - 1));
193 parser->parseBlock(data);
194 } else if (socket->canReadLine()) {
195 if (!parser->parseNextLine(socket->readLine())) {
196 continue; // response not yet completed
197 }
198
199 if (logFile) {
200 logFile->write("S: " + parser->data());
201 }
202
203 // handle login response
204 if (parser->tag() == QByteArray("0")) {
205 if (parser->data().startsWith("OK")) { //krazy:exclude=strings
206 writeData("1 CAPABILITY (" + ImapParser::join(sCapabilities, " ") + ")");
207 } else {
208 kWarning() << "Unable to login to Akonadi server:" << parser->data();
209 socket->close();
210 QTimer::singleShot(1000, mParent, SLOT(reconnect()));
211 }
212 }
213
214 // handle capability response
215 if (parser->tag() == QByteArray("1")) {
216 if (parser->data().startsWith("OK")) {
217 connected = true;
218 startNext();
219 } else {
220 kDebug() << "Unhandled server capability response:" << parser->data();
221 }
222 }
223
224 // send login command
225 if (parser->tag() == "*" && parser->data().startsWith("OK Akonadi")) {
226 const int pos = parser->data().indexOf("[PROTOCOL");
227 if (pos > 0) {
228 qint64 tmp = 0;
229 ImapParser::parseNumber(parser->data(), tmp, 0, pos + 9);
230 protocolVersion = tmp;
231 Internal::setServerProtocolVersion(tmp);
232 }
233 kDebug() << "Server protocol version is:" << protocolVersion;
234
235 writeData("0 LOGIN " + ImapParser::quote(sessionId) + '\n');
236
237 // work for the current job
238 } else {
239 if (currentJob) {
240 currentJob->d_ptr->handleResponse(parser->tag(), parser->data());
241 }
242 }
243
244 // reset parser stuff
245 parser->reset();
246 } else {
247 break; // nothing we can do for now
248 }
249 }
250}
251
252bool SessionPrivate::canPipelineNext()
253{
254 if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
255 return false;
256 }
257 if (pipeline.isEmpty() && currentJob) {
258 return currentJob->d_ptr->mWriteFinished;
259 }
260 if (!pipeline.isEmpty()) {
261 return pipeline.last()->d_ptr->mWriteFinished;
262 }
263 return false;
264}
265
266void SessionPrivate::doStartNext()
267{
268 if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
269 return;
270 }
271 if (canPipelineNext()) {
272 Akonadi::Job *nextJob = queue.dequeue();
273 pipeline.enqueue(nextJob);
274 startJob(nextJob);
275 }
276 if (jobRunning) {
277 return;
278 }
279 jobRunning = true;
280 if (!pipeline.isEmpty()) {
281 currentJob = pipeline.dequeue();
282 } else {
283 currentJob = queue.dequeue();
284 startJob(currentJob);
285 }
286}
287
288void SessionPrivate::startJob(Job *job)
289{
290 if (protocolVersion < minimumProtocolVersion()) {
291 job->setError(Job::ProtocolVersionMismatch);
292 job->setErrorText(i18n("Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion()));
293 job->emitResult();
294 } else {
295 job->d_ptr->startQueued();
296 }
297}
298
299void SessionPrivate::endJob(Job *job)
300{
301 job->emitResult();
302}
303
304void SessionPrivate::jobDone(KJob *job)
305{
306 // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
307 // so don't call any methods on job itself
308 if (job == currentJob) {
309 if (pipeline.isEmpty()) {
310 jobRunning = false;
311 currentJob = 0;
312 } else {
313 currentJob = pipeline.dequeue();
314 }
315 startNext();
316 } else {
317 // non-current job finished, likely canceled while still in the queue
318 queue.removeAll(static_cast<Akonadi::Job *>(job));
319 // ### likely not enough to really cancel already running jobs
320 pipeline.removeAll(static_cast<Akonadi::Job *>(job));
321 }
322}
323
324void SessionPrivate::jobWriteFinished(Akonadi::Job *job)
325{
326 Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
327 Q_UNUSED(job);
328
329 startNext();
330}
331
332void SessionPrivate::jobDestroyed(QObject *job)
333{
334 // careful, accessing non-QObject methods of job will fail here already
335 jobDone(static_cast<KJob *>(job));
336}
337
338void SessionPrivate::addJob(Job *job)
339{
340 queue.append(job);
341 QObject::connect(job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)));
342 QObject::connect(job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)));
343 QObject::connect(job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)));
344 startNext();
345}
346
347int SessionPrivate::nextTag()
348{
349 return theNextTag++;
350}
351
352void SessionPrivate::writeData(const QByteArray &data)
353{
354 if (logFile) {
355 logFile->write("C: " + data);
356 if (!data.endsWith('\n')) {
357 logFile->write("\n");
358 }
359 logFile->flush();
360 }
361
362 if (socket) {
363 socket->write(data);
364 } else {
365 kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
366 }
367}
368
369void SessionPrivate::serverStateChanged(ServerManager::State state)
370{
371 if (state == ServerManager::Running && !connected) {
372 reconnect();
373 } else if (!connected && state == ServerManager::Broken) {
374 // If the server is broken, cancel all pending jobs, otherwise they will be
375 // blocked forever and applications waiting for them to finish would be stuck
376 Q_FOREACH (Job *job, queue) {
377 job->setError(Job::ConnectionFailed);
378 job->kill(KJob::EmitResult);
379 }
380 }
381}
382
383void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
384{
385 // only deal with the queue, for the guys in the pipeline it's too late already anyway
386 // and they shouldn't have gotten there if they depend on a preceding job anyway.
387 foreach (Job *job, queue) {
388 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
389 }
390}
391
392//@endcond
393
394SessionPrivate::SessionPrivate(Session *parent)
395 : mParent(parent)
396 , socket(0)
397 , protocolVersion(0)
398 , currentJob(0)
399 , parser(0)
400 , logFile(0)
401{
402}
403
404void SessionPrivate::init(const QByteArray &id)
405{
406 kDebug() << id;
407 parser = new ImapParser();
408
409 if (!id.isEmpty()) {
410 sessionId = id;
411 } else {
412 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
413 + '-' + QByteArray::number(qrand());
414 }
415
416 connected = false;
417 theNextTag = 2;
418 jobRunning = false;
419
420 if (ServerManager::state() == ServerManager::NotRunning) {
421 ServerManager::start();
422 }
423 mParent->connect(ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)),
424 SLOT(serverStateChanged(Akonadi::ServerManager::State)));
425
426 const QByteArray sessionLogFile = qgetenv("AKONADI_SESSION_LOGFILE");
427 if (!sessionLogFile.isEmpty()) {
428 logFile = new QFile(QString::fromLatin1("%1.%2.%3").arg(QString::fromLatin1(sessionLogFile))
429 .arg(QString::number(QApplication::applicationPid()))
430 .arg(QString::fromLatin1(sessionId)),
431 mParent);
432 if (!logFile->open(QIODevice::WriteOnly | QIODevice::Truncate)) {
433 kWarning() << "Failed to open Akonadi Session log file" << logFile->fileName();
434 delete logFile;
435 logFile = 0;
436 }
437 }
438
439 reconnect();
440}
441
442void SessionPrivate::forceReconnect()
443{
444 jobRunning = false;
445 connected = false;
446 if (socket) {
447 socket->disconnect(mParent); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage!
448 delete socket;
449 }
450 socket = 0;
451 QMetaObject::invokeMethod(mParent, "reconnect", Qt::QueuedConnection); // avoids reconnecting in the dtor
452}
453
454Session::Session(const QByteArray &sessionId, QObject *parent)
455 : QObject(parent)
456 , d(new SessionPrivate(this))
457{
458 d->init(sessionId);
459}
460
461Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent)
462 : QObject(parent)
463 , d(dd)
464{
465 d->init(sessionId);
466}
467
468Session::~Session()
469{
470 clear();
471 delete d;
472}
473
474QByteArray Session::sessionId() const
475{
476 return d->sessionId;
477}
478
479static QThreadStorage<Session *> instances;
480
481void SessionPrivate::createDefaultSession(const QByteArray &sessionId)
482{
483 Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
484 "You tried to create a default session with empty session id!");
485 Q_ASSERT_X(!instances.hasLocalData(), "SessionPrivate::createDefaultSession",
486 "You tried to create a default session twice!");
487
488 instances.setLocalData(new Session(sessionId));
489}
490
491void SessionPrivate::setDefaultSession(Session *session)
492{
493 instances.setLocalData(session);
494}
495
496Session *Session::defaultSession()
497{
498 if (!instances.hasLocalData()) {
499 instances.setLocalData(new Session());
500 }
501 return instances.localData();
502}
503
504void Session::clear()
505{
506 foreach (Job *job, d->queue) {
507 job->kill(KJob::EmitResult); // safe, not started yet
508 }
509 d->queue.clear();
510 foreach (Job *job, d->pipeline) {
511 job->d_ptr->mStarted = false; // avoid killing/reconnect loops
512 job->kill(KJob::EmitResult);
513 }
514 d->pipeline.clear();
515 if (d->currentJob) {
516 d->currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
517 d->currentJob->kill(KJob::EmitResult);
518 }
519 d->forceReconnect();
520}
521
522#include "moc_session.cpp"
523