25#include "collection.h"
28#include "itemcreatejob.h"
29#include "itemdeletejob.h"
30#include "itemfetchjob.h"
31#include "itemmodifyjob.h"
32#include "transactionsequence.h"
33#include "itemfetchscope.h"
37#include <QtCore/QStringList>
44class Akonadi::ItemSyncPrivate :
public JobPrivate
47 ItemSyncPrivate(ItemSync *parent)
49 , mTransactionMode(ItemSync::SingleTransaction)
50 , mCurrentTransaction(0)
55 , mTotalItemsProcessed(0)
58 , mDeliveryDone(false)
60 , mFullListingDone(false)
61 , mProcessingBatch(false)
62 , mDisableAutomaticDeliveryDone(false)
64 , mMergeMode(Akonadi::ItemSync::RIDMerge)
67 mFetchScope.fetchFullPayload();
68 mFetchScope.fetchAllAttributes();
71 void createOrMerge(
const Item &item);
73 void slotItemsReceived(
const Item::List &items);
74 void slotLocalListDone(KJob *job);
75 void slotLocalDeleteDone(KJob *);
76 void slotLocalChangeDone(KJob *job);
80 void deleteItems(
const Item::List &items);
81 void slotTransactionResult(KJob *job);
82 void requestTransaction();
83 Job *subjobParent()
const;
84 void fetchLocalItemsToDelete();
85 QString jobDebuggingString() const ;
86 bool allProcessed() const;
88 Q_DECLARE_PUBLIC(ItemSync)
89 Collection mSyncCollection;
90 QSet<QString> mListedItems;
92 ItemSync::TransactionMode mTransactionMode;
93 TransactionSequence *mCurrentTransaction;
97 ItemFetchScope mFetchScope;
99 Akonadi::Item::List mRemoteItemQueue;
100 Akonadi::Item::List mRemovedRemoteItemQueue;
101 Akonadi::Item::List mCurrentBatchRemoteItems;
102 Akonadi::Item::List mCurrentBatchRemovedRemoteItems;
103 Akonadi::Item::List mItemsToDelete;
109 int mTotalItemsProcessed;
115 bool mFullListingDone;
116 bool mProcessingBatch;
117 bool mDisableAutomaticDeliveryDone;
120 Akonadi::ItemSync::MergeMode mMergeMode;
123void ItemSyncPrivate::createOrMerge(const Item &item)
131 ItemCreateJob *create =
new ItemCreateJob(item, mSyncCollection, subjobParent());
133 if (mMergeMode == ItemSync::GIDMerge && !item.gid().isEmpty()) {
139 q->connect(create, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)));
142bool ItemSyncPrivate::allProcessed()
const
144 return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty() && mCurrentBatchRemovedRemoteItems.isEmpty();
147void ItemSyncPrivate::checkDone()
150 q->setProcessedAmount(KJob::Bytes, mProgress);
151 if (mPendingJobs > 0) {
155 if (mTransactionJobs > 0) {
159 if (mCurrentTransaction) {
160 q->emit transactionCommitted();
161 mCurrentTransaction->commit();
162 mCurrentTransaction = 0;
167 mProcessingBatch =
false;
168 if (!mRemoteItemQueue.isEmpty()) {
171 if (!mProcessingBatch) {
172 q->emit readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
176 q->emit readyForNextBatch(mBatchSize);
178 if (allProcessed() && !mFinished) {
186 :
Job(new ItemSyncPrivate(this), parent)
189 d->mSyncCollection = collection;
207 Q_ASSERT(!d->mIncremental);
208 if (!d->mStreaming) {
209 d->mDeliveryDone =
true;
211 d->mRemoteItemQueue += items;
212 d->mTotalItemsProcessed += items.count();
213 kDebug() <<
"Received: " << items.count() <<
"In total: " << d->mTotalItemsProcessed <<
" Wanted: " << d->mTotalItems;
214 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
215 d->mDeliveryDone =
true;
223 Q_ASSERT(!d->mIncremental);
224 Q_ASSERT(amount >= 0);
227 d->mTotalItems = amount;
228 setTotalAmount(KJob::Bytes, amount);
229 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) {
230 d->mDeliveryDone =
true;
238 d->mDisableAutomaticDeliveryDone = disable;
251 d->mIncremental =
true;
252 if (!d->mStreaming) {
253 d->mDeliveryDone =
true;
255 d->mRemoteItemQueue += changedItems;
256 d->mRemovedRemoteItemQueue += removedItems;
257 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
258 kDebug() <<
"Received: " << changedItems.count() <<
"Removed: " << removedItems.count() <<
"In total: " << d->mTotalItemsProcessed <<
" Wanted: " << d->mTotalItems;
259 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
260 d->mDeliveryDone =
true;
274 return d->mFetchScope;
283 Q_UNUSED(storedItem);
288void ItemSyncPrivate::fetchLocalItemsToDelete()
292 kFatal() <<
"This must not be called while in incremental mode";
302 QObject::connect(job, SIGNAL(itemsReceived(Akonadi::Item::List)), q, SLOT(slotItemsReceived(Akonadi::Item::List)));
303 QObject::connect(job, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)));
307void ItemSyncPrivate::slotItemsReceived(
const Item::List &items)
309 foreach (
const Akonadi::Item &item, items) {
311 if (item.remoteId().isEmpty()) {
314 if (!mListedItems.contains(item.remoteId())) {
315 mItemsToDelete << Item(item.id());
320void ItemSyncPrivate::slotLocalListDone(KJob *job)
324 kWarning() << job->errorString();
326 deleteItems(mItemsToDelete);
330QString ItemSyncPrivate::jobDebuggingString() const
334 return QString::fromLatin1(
"Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name());
337void ItemSyncPrivate::execute()
342 kWarning() <<
"Call to execute() on finished job.";
347 if (!mProcessingBatch) {
348 if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
350 const int num = qMin(mBatchSize, mRemoteItemQueue.size());
351 for (
int i = 0; i < num; i++) {
352 mCurrentBatchRemoteItems << mRemoteItemQueue.takeFirst();
354 mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
355 mRemovedRemoteItemQueue.clear();
360 mProcessingBatch =
true;
368void ItemSyncPrivate::processBatch()
371 if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
376 requestTransaction();
381 if (!mIncremental && allProcessed()) {
383 fetchLocalItemsToDelete();
385 deleteItems(mCurrentBatchRemovedRemoteItems);
386 mCurrentBatchRemovedRemoteItems.clear();
392void ItemSyncPrivate::processItems()
396 foreach (
const Item &remoteItem, mCurrentBatchRemoteItems) {
397 if (remoteItem.remoteId().isEmpty()) {
398 kWarning() <<
"Item without rid passed to itemsync";
402 mListedItems << remoteItem.remoteId();
404 createOrMerge(remoteItem);
406 mCurrentBatchRemoteItems.clear();
409void ItemSyncPrivate::deleteItems(
const Item::List &itemsToDelete)
417 if (itemsToDelete.isEmpty()) {
422 ItemDeleteJob *job =
new ItemDeleteJob(itemsToDelete, subjobParent());
423 q->connect(job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)));
429 TransactionSequence *transaction = qobject_cast<TransactionSequence *>(subjobParent());
435void ItemSyncPrivate::slotLocalDeleteDone(KJob *job)
438 kWarning() <<
"Deleting items from the akonadi database failed:" << job->errorString();
446void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
449 kWarning() <<
"Creating/updating items from the akonadi database failed:" << job->errorString();
457void ItemSyncPrivate::slotTransactionResult(KJob *job)
460 if (mCurrentTransaction == job) {
461 mCurrentTransaction = 0;
467void ItemSyncPrivate::requestTransaction()
471 if (!mCurrentTransaction) {
473 mCurrentTransaction =
new TransactionSequence(q);
474 mCurrentTransaction->setAutomaticCommittingEnabled(
false);
475 QObject::connect(mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)));
479Job *ItemSyncPrivate::subjobParent()
const
483 return mCurrentTransaction;
485 return const_cast<ItemSync *
>(q);
491 d->mStreaming = enable;
497 Q_ASSERT(d->mStreaming);
498 d->mDeliveryDone =
true;
502void ItemSync::slotResult(KJob *job)
509 setError(job->error());
510 setErrorText(job->errorText());
513 Akonadi::Job::slotResult(job);
521 if (d->mCurrentTransaction) {
522 d->mCurrentTransaction->rollback();
524 d->mDeliveryDone =
true;
531 d->mTransactionMode = mode;
537 return d->mBatchSize;
543 d->mBatchSize = size;
549 return d->mMergeMode;
558#include "moc_itemsync.cpp"
Represents a collection of PIM items.
@ Silent
Only return the id of the merged/created item.
void setMerge(MergeOptions options)
Merge this item into an existing one if available.
Job that fetches items from the Akonadi storage.
void setDeliveryOption(DeliveryOptions options)
Sets the mechanisms by which the items should be fetched.
@ EmitItemsIndividually
emitted via signal upon reception
ItemFetchScope & fetchScope()
Returns the item fetch scope.
Specifies which parts of an item should be fetched from the Akonadi storage.
void setFetchRemoteIdentification(bool retrieveRid)
Fetch remote identification for items.
void setCacheOnly(bool cacheOnly)
Sets whether payload data should be requested from remote sources or just from the local cache.
void setFetchModificationTime(bool retrieveMtime)
Enables retrieval of the item modification time.
Syncs between items known to a client (usually a resource) and the Akonadi storage.
void setTotalItems(int amount)
Set the amount of items which you are going to return in total by using the setFullSyncItems()/setInc...
virtual AKONADI_DEPRECATED bool updateItem(const Item &storedItem, Item &newItem)
Reimplement this method to customize the synchronization algorithm.
TransactionMode
Transaction mode used by ItemSync.
@ NoTransaction
Use no transaction at all, provides highest responsiveness (might therefore feel faster even when act...
@ MultipleTransactions
Use one transaction per chunk of delivered items, good compromise between the other two when using st...
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
void setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
Sets the item lists for incrementally syncing the collection.
ItemFetchScope & fetchScope()
Returns the item fetch scope.
MergeMode mergeMode() const
Returns current merge mode.
void setBatchSize(int)
Set the batch size.
void setFullSyncItems(const Item::List &items)
Sets the full item list for the collection.
ItemSync(const Collection &collection, QObject *parent=0)
Creates a new item synchronizer.
void rollback()
Aborts the sync process and rolls back all not yet committed transactions.
void deliveryDone()
Notify ItemSync that all remote items have been delivered.
int batchSize() const
Minimum number of items required to start processing in streaming mode.
void setFetchScope(ItemFetchScope &fetchScope)
Sets the item fetch scope.
void setStreamingEnabled(bool enable)
Enable item streaming.
void doStart()
This method must be reimplemented in the concrete jobs.
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
~ItemSync()
Destroys the item synchronizer.
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Base class for all actions in the Akonadi storage.
@ UserCanceled
The user canceld this job.
virtual bool removeSubjob(KJob *job)
Removes the given subjob of this job.
Job(QObject *parent=0)
Creates a new job.
void setIgnoreJobFailure(KJob *job)
Sets which job of the sequence might fail without rolling back the complete transaction.
FreeBusyManager::Singleton.