23#include "imapparser_p.h"
26#include "servermanager.h"
27#include "servermanager_p.h"
28#include "protocolhelper_p.h"
29#include "xdgbasedirs_p.h"
32#include <klocalizedstring.h>
34#include <QCoreApplication>
36#include <QtCore/QQueue>
37#include <QtCore/QThreadStorage>
38#include <QtCore/QTimer>
39#include <QtCore/QThread>
42#include <QtNetwork/QLocalSocket>
43#include <QtNetwork/QTcpSocket>
44#include <QtNetwork/QHostAddress>
45#include <QApplication>
50#define PIPELINE_LENGTH 0
57static const QList<QByteArray> sCapabilities = QList<QByteArray>()
60 <<
"AKAPPENDSTREAMING"
64void SessionPrivate::startNext()
66 QTimer::singleShot(0, mParent, SLOT(doStartNext()));
71 QLocalSocket *localSocket = qobject_cast<QLocalSocket *>(socket);
72 if (localSocket && (localSocket->state() == QLocalSocket::ConnectedState
73 || localSocket->state() == QLocalSocket::ConnectingState)) {
78 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket *>(socket);
79 if (tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
80 || tcpSocket->state() == QTcpSocket::ConnectingState)) {
86 QString serverAddress;
91 const QByteArray serverAddressEnvVar = qgetenv(
"AKONADI_SERVER_ADDRESS");
92 if (!serverAddressEnvVar.isEmpty()) {
93 const int pos = serverAddressEnvVar.indexOf(
':');
94 const QByteArray protocol = serverAddressEnvVar.left(pos);
95 QMap<QString, QString> options;
96 foreach (
const QString &entry, QString::fromLatin1(serverAddressEnvVar.mid(pos + 1)).split(QLatin1Char(
','))) {
97 const QStringList pair = entry.split(QLatin1Char(
'='));
98 if (pair.size() != 2) {
101 options.insert(pair.first(), pair.last());
103 kDebug() << protocol << options;
105 if (protocol ==
"tcp") {
106 serverAddress = options.value(QLatin1String(
"host"));
107 port = options.value(QLatin1String(
"port")).toUInt();
109 }
else if (protocol ==
"unix") {
110 serverAddress = options.value(QLatin1String(
"path"));
111 }
else if (protocol ==
"pipe") {
112 serverAddress = options.value(QLatin1String(
"name"));
117 if (serverAddress.isEmpty()) {
119 const QFileInfo fileInfo(connectionConfigFile);
120 if (!fileInfo.exists()) {
121 kDebug() <<
"Akonadi Client Session: connection config file '"
122 "akonadi/akonadiconnectionrc' can not be found in"
123 << XdgBaseDirs::homePath(
"config") <<
"nor in any of"
124 << XdgBaseDirs::systemPathList(
"config");
126 const QSettings connectionSettings(connectionConfigFile, QSettings::IniFormat);
129 serverAddress = connectionSettings.value(QLatin1String(
"Data/NamedPipe"), QLatin1String(
"Akonadi")).toString();
131 const QString defaultSocketDir = Internal::xdgSaveDir(
"data");
132 serverAddress = connectionSettings.value(QLatin1String(
"Data/UnixPath"), QString(defaultSocketDir + QLatin1String(
"/akonadiserver.socket"))).toString();
140 socket = localSocket =
new QLocalSocket(mParent);
141 mParent->connect(localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)));
143 socket = tcpSocket =
new QTcpSocket(mParent);
144 mParent->connect(tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)));
146 mParent->connect(socket, SIGNAL(disconnected()), SLOT(socketDisconnected()));
147 mParent->connect(socket, SIGNAL(readyRead()), SLOT(dataReceived()));
151 kDebug() <<
"connectToServer" << serverAddress;
153 localSocket->connectToServer(serverAddress);
155 tcpSocket->connectToHost(serverAddress, port);
163 return Internal::xdgSaveDir(
"config") + QLatin1String(
"/akonadiconnectionrc");
166void SessionPrivate::socketError(QLocalSocket::LocalSocketError)
168 Q_ASSERT(mParent->sender() == socket);
169 kWarning() <<
"Socket error occurred:" << qobject_cast<QLocalSocket *>(socket)->errorString();
170 socketDisconnected();
173void SessionPrivate::socketError(QAbstractSocket::SocketError)
175 Q_ASSERT(mParent->sender() == socket);
176 kWarning() <<
"Socket error occurred:" << qobject_cast<QTcpSocket *>(socket)->errorString();
177 socketDisconnected();
180void SessionPrivate::socketDisconnected()
183 currentJob->d_ptr->lostConnection();
188void SessionPrivate::dataReceived()
190 while (socket->bytesAvailable() > 0) {
191 if (parser->continuationSize() > 1) {
192 const QByteArray data = socket->read(qMin(socket->bytesAvailable(), parser->continuationSize() - 1));
193 parser->parseBlock(data);
194 }
else if (socket->canReadLine()) {
195 if (!parser->parseNextLine(socket->readLine())) {
200 logFile->write(
"S: " + parser->data());
204 if (parser->tag() == QByteArray(
"0")) {
205 if (parser->data().startsWith(
"OK")) {
206 writeData(
"1 CAPABILITY (" + ImapParser::join(sCapabilities,
" ") +
")");
208 kWarning() <<
"Unable to login to Akonadi server:" << parser->data();
210 QTimer::singleShot(1000, mParent, SLOT(
reconnect()));
215 if (parser->tag() == QByteArray(
"1")) {
216 if (parser->data().startsWith(
"OK")) {
220 kDebug() <<
"Unhandled server capability response:" << parser->data();
225 if (parser->tag() ==
"*" && parser->data().startsWith(
"OK Akonadi")) {
226 const int pos = parser->data().indexOf(
"[PROTOCOL");
229 ImapParser::parseNumber(parser->data(), tmp, 0, pos + 9);
230 protocolVersion = tmp;
231 Internal::setServerProtocolVersion(tmp);
233 kDebug() <<
"Server protocol version is:" << protocolVersion;
235 writeData(
"0 LOGIN " + ImapParser::quote(sessionId) +
'\n');
240 currentJob->d_ptr->handleResponse(parser->tag(), parser->data());
252bool SessionPrivate::canPipelineNext()
254 if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
257 if (pipeline.isEmpty() && currentJob) {
258 return currentJob->d_ptr->mWriteFinished;
260 if (!pipeline.isEmpty()) {
261 return pipeline.last()->d_ptr->mWriteFinished;
266void SessionPrivate::doStartNext()
268 if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
271 if (canPipelineNext()) {
273 pipeline.enqueue(nextJob);
280 if (!pipeline.isEmpty()) {
281 currentJob = pipeline.dequeue();
283 currentJob = queue.dequeue();
284 startJob(currentJob);
288void SessionPrivate::startJob(
Job *job)
290 if (protocolVersion < minimumProtocolVersion()) {
292 job->setErrorText(i18n(
"Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion()));
295 job->d_ptr->startQueued();
304void SessionPrivate::jobDone(KJob *job)
308 if (job == currentJob) {
309 if (pipeline.isEmpty()) {
313 currentJob = pipeline.dequeue();
326 Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
332void SessionPrivate::jobDestroyed(QObject *job)
335 jobDone(
static_cast<KJob *
>(job));
341 QObject::connect(job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)));
343 QObject::connect(job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)));
355 logFile->write(
"C: " + data);
356 if (!data.endsWith(
'\n')) {
357 logFile->write(
"\n");
365 kWarning() <<
"Trying to write while session is disconnected!" << kBacktrace();
376 Q_FOREACH (
Job *job, queue) {
378 job->kill(KJob::EmitResult);
387 foreach (
Job *job, queue) {
388 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
394SessionPrivate::SessionPrivate(
Session *parent)
404void SessionPrivate::init(
const QByteArray &
id)
407 parser =
new ImapParser();
412 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
413 +
'-' + QByteArray::number(qrand());
426 const QByteArray sessionLogFile = qgetenv(
"AKONADI_SESSION_LOGFILE");
427 if (!sessionLogFile.isEmpty()) {
428 logFile =
new QFile(QString::fromLatin1(
"%1.%2.%3").arg(QString::fromLatin1(sessionLogFile))
429 .arg(QString::number(QApplication::applicationPid()))
430 .arg(QString::fromLatin1(sessionId)),
432 if (!logFile->open(QIODevice::WriteOnly | QIODevice::Truncate)) {
433 kWarning() <<
"Failed to open Akonadi Session log file" << logFile->fileName();
447 socket->disconnect(mParent);
451 QMetaObject::invokeMethod(mParent,
"reconnect", Qt::QueuedConnection);
479static QThreadStorage<Session *> instances;
483 Q_ASSERT_X(!sessionId.isEmpty(),
"SessionPrivate::createDefaultSession",
484 "You tried to create a default session with empty session id!");
485 Q_ASSERT_X(!instances.hasLocalData(),
"SessionPrivate::createDefaultSession",
486 "You tried to create a default session twice!");
488 instances.setLocalData(
new Session(sessionId));
493 instances.setLocalData(session);
498 if (!instances.hasLocalData()) {
499 instances.setLocalData(
new Session());
501 return instances.localData();
506 foreach (
Job *job, d->queue) {
507 job->kill(KJob::EmitResult);
510 foreach (
Job *job, d->pipeline) {
511 job->d_ptr->mStarted =
false;
512 job->kill(KJob::EmitResult);
516 d->currentJob->d_ptr->mStarted =
false;
517 d->currentJob->kill(KJob::EmitResult);
522#include "moc_session.cpp"
Base class for all actions in the Akonadi storage.
@ ProtocolVersionMismatch
The server protocol version is too old or too new.
@ ConnectionFailed
The connection to the Akonadi server failed.
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
static State state()
Returns the state of the server.
static bool start()
Starts the server.
State
Enum for the various states the server can be in.
@ Running
Server is running and operational.
@ Broken
Server is not operational and an error has been detected.
@ NotRunning
Server is not running, could be no one started it yet or it failed to start.
static void setDefaultSession(Session *session)
Sets the default session.
virtual void addJob(Job *job)
Associates the given Job object with this session.
void writeData(const QByteArray &data)
Sends the given raw data.
static QString connectionFile()
Default location for akonadiconnectionrc.
void forceReconnect()
Disconnects a previously existing connection and tries to reconnect.
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to following jobs.
int nextTag()
Returns the next IMAP tag.
virtual void reconnect()
Attemps to establish a connections to the Akonadi server.
static void createDefaultSession(const QByteArray &sessionId)
Creates a new default session for this thread with the given sessionId.
A communication session with the Akonadi storage.
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=0)
Creates a new session.
void clear()
Stops all jobs queued for execution.
static Session * defaultSession()
Returns the default session for this thread.
~Session()
Destroys the session.
QByteArray sessionId() const
Returns the session identifier.
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e....
FreeBusyManager::Singleton.