dsync-brain-msgs.c [plain text]
#include "lib.h"
#include "array.h"
#include "hash.h"
#include "dsync-worker.h"
#include "dsync-brain-private.h"
static void dsync_brain_guid_add(struct dsync_brain_msg_iter *iter)
{
struct dsync_brain_guid_instance *inst, *prev_inst;
if ((iter->msg.flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0)
return;
inst = p_new(iter->sync->pool, struct dsync_brain_guid_instance, 1);
inst->mailbox_idx = iter->mailbox_idx;
inst->uid = iter->msg.uid;
prev_inst = hash_table_lookup(iter->guid_hash, iter->msg.guid);
if (prev_inst == NULL) {
hash_table_insert(iter->guid_hash,
p_strdup(iter->sync->pool, iter->msg.guid),
inst);
} else {
inst->next = prev_inst->next;
prev_inst->next = inst;
}
}
static int dsync_brain_msg_iter_next(struct dsync_brain_msg_iter *iter)
{
int ret = 1;
if (iter->msg.guid == NULL) {
ret = dsync_worker_msg_iter_next(iter->iter,
&iter->mailbox_idx,
&iter->msg);
if (ret > 0)
dsync_brain_guid_add(iter);
}
if (iter->sync->wanted_mailbox_idx != iter->mailbox_idx) {
return -1;
}
return ret;
}
static int
dsync_brain_msg_iter_skip_mailbox(struct dsync_brain_mailbox_sync *sync)
{
int ret;
while ((ret = dsync_brain_msg_iter_next(sync->src_msg_iter)) > 0)
sync->src_msg_iter->msg.guid = NULL;
if (ret == 0)
return 0;
while ((ret = dsync_brain_msg_iter_next(sync->dest_msg_iter)) > 0)
sync->dest_msg_iter->msg.guid = NULL;
if (ret == 0)
return 0;
sync->skip_mailbox = FALSE;
return -1;
}
static int dsync_brain_msg_iter_next_pair(struct dsync_brain_mailbox_sync *sync)
{
int ret1, ret2;
if (sync->skip_mailbox) {
if (dsync_brain_msg_iter_skip_mailbox(sync) == 0)
return 0;
}
ret1 = dsync_brain_msg_iter_next(sync->src_msg_iter);
ret2 = dsync_brain_msg_iter_next(sync->dest_msg_iter);
if (ret1 == 0 || ret2 == 0) {
return 0;
}
if (ret1 < 0 || ret2 < 0)
return -1;
return 1;
}
static void
dsync_brain_msg_sync_save(struct dsync_brain_msg_iter *iter,
unsigned int mailbox_idx,
const struct dsync_message *msg)
{
struct dsync_brain_new_msg *new_msg;
if ((msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0)
return;
new_msg = array_append_space(&iter->new_msgs);
new_msg->mailbox_idx = mailbox_idx;
new_msg->orig_uid = msg->uid;
new_msg->msg = dsync_message_dup(iter->sync->pool, msg);
}
static void
dsync_brain_msg_sync_conflict(struct dsync_brain_msg_iter *conflict_iter,
struct dsync_brain_msg_iter *save_iter,
const struct dsync_message *msg)
{
struct dsync_brain_uid_conflict *conflict;
struct dsync_brain_new_msg *new_msg;
struct dsync_brain_mailbox *brain_box;
uint32_t new_uid;
brain_box = array_idx_modifiable(&save_iter->sync->mailboxes,
save_iter->mailbox_idx);
if (save_iter->sync->brain->backup) {
i_warning("Destination mailbox %s has been modified, "
"need to recreate it before we can continue syncing",
brain_box->box.name);
dsync_worker_delete_mailbox(save_iter->sync->brain->dest_worker,
&brain_box->box);
save_iter->sync->brain->unexpected_changes = TRUE;
save_iter->sync->skip_mailbox = TRUE;
return;
}
new_uid = brain_box->box.uid_next++;
conflict = array_append_space(&conflict_iter->uid_conflicts);
conflict->mailbox_idx = conflict_iter->mailbox_idx;
conflict->old_uid = msg->uid;
conflict->new_uid = new_uid;
new_msg = array_append_space(&save_iter->new_msgs);
new_msg->mailbox_idx = save_iter->mailbox_idx;
new_msg->orig_uid = msg->uid;
new_msg->msg = dsync_message_dup(save_iter->sync->pool, msg);
new_msg->msg->uid = new_uid;
}
static int
dsync_message_flag_importance_cmp(const struct dsync_message *m1,
const struct dsync_message *m2)
{
unsigned int i, count1, count2;
if (m1->modseq > m2->modseq)
return -1;
else if (m1->modseq < m2->modseq)
return 1;
if (m1->flags == m2->flags &&
dsync_keyword_list_equals(m1->keywords, m2->keywords))
return 0;
count1 = str_array_length(m1->keywords);
count2 = str_array_length(m2->keywords);
for (i = 1; i != MAIL_RECENT; i <<= 1) {
if ((m1->flags & i) != 0)
count1++;
if ((m2->flags & i) != 0)
count2++;
}
if (count1 > count2)
return -1;
else if (count1 < count2)
return 1;
return -1;
}
static void dsync_brain_msg_sync_existing(struct dsync_brain_mailbox_sync *sync,
struct dsync_message *src_msg,
struct dsync_message *dest_msg)
{
int ret;
ret = dsync_message_flag_importance_cmp(src_msg, dest_msg);
if (ret < 0 || (sync->brain->backup && ret > 0))
dsync_worker_msg_update_metadata(sync->dest_worker, src_msg);
else if (ret > 0)
dsync_worker_msg_update_metadata(sync->src_worker, dest_msg);
}
static int dsync_brain_msg_sync_pair(struct dsync_brain_mailbox_sync *sync)
{
struct dsync_message *src_msg = &sync->src_msg_iter->msg;
struct dsync_message *dest_msg = &sync->dest_msg_iter->msg;
const char *src_guid, *dest_guid;
unsigned char guid_128_data[MAIL_GUID_128_SIZE * 2 + 1];
bool src_expunged, dest_expunged;
i_assert(sync->src_msg_iter->mailbox_idx ==
sync->dest_msg_iter->mailbox_idx);
src_expunged = (src_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0;
dest_expunged = (dest_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0;
if (src_expunged) {
src_guid = src_msg->guid;
dest_guid = dsync_get_guid_128_str(dest_msg->guid,
guid_128_data,
sizeof(guid_128_data));
} else if (dest_expunged) {
src_guid = dsync_get_guid_128_str(src_msg->guid, guid_128_data,
sizeof(guid_128_data));
dest_guid = dest_msg->guid;
} else {
src_guid = src_msg->guid;
dest_guid = dest_msg->guid;
}
if (src_msg->uid < dest_msg->uid) {
if (src_expunged) {
} else if (sync->uid_conflict || sync->brain->backup) {
dsync_brain_msg_sync_conflict(sync->src_msg_iter,
sync->dest_msg_iter,
src_msg);
} else {
dsync_worker_msg_expunge(sync->src_worker,
src_msg->uid);
}
src_msg->guid = NULL;
return 0;
} else if (src_msg->uid > dest_msg->uid) {
if (dest_expunged) {
} else if (sync->uid_conflict && !sync->brain->backup) {
dsync_brain_msg_sync_conflict(sync->dest_msg_iter,
sync->src_msg_iter,
dest_msg);
} else {
dsync_worker_msg_expunge(sync->dest_worker,
dest_msg->uid);
}
dest_msg->guid = NULL;
return 0;
}
if (strcmp(src_guid, dest_guid) != 0 &&
*src_guid != '\0' && *dest_guid != '\0') {
sync->uid_conflict = TRUE;
if (!dest_expunged) {
dsync_brain_msg_sync_conflict(sync->dest_msg_iter,
sync->src_msg_iter,
dest_msg);
}
if (!src_expunged) {
dsync_brain_msg_sync_conflict(sync->src_msg_iter,
sync->dest_msg_iter,
src_msg);
}
} else if (dest_expunged) {
if (src_expunged) {
} else if (sync->brain->backup) {
dsync_brain_msg_sync_conflict(sync->src_msg_iter,
sync->dest_msg_iter,
src_msg);
} else {
dsync_worker_msg_expunge(sync->src_worker,
src_msg->uid);
}
} else if (src_expunged) {
dsync_worker_msg_expunge(sync->dest_worker, dest_msg->uid);
} else {
dsync_brain_msg_sync_existing(sync, src_msg, dest_msg);
}
src_msg->guid = NULL;
dest_msg->guid = NULL;
return 0;
}
static bool dsync_brain_msg_sync_mailbox_end(struct dsync_brain_msg_iter *iter1,
struct dsync_brain_msg_iter *iter2)
{
int ret;
while ((ret = dsync_brain_msg_iter_next(iter1)) > 0) {
dsync_brain_msg_sync_save(iter2, iter1->mailbox_idx,
&iter1->msg);
iter1->msg.guid = NULL;
}
return ret < 0;
}
static bool
dsync_brain_msg_sync_mailbox_more(struct dsync_brain_mailbox_sync *sync)
{
int ret;
while ((ret = dsync_brain_msg_iter_next_pair(sync)) > 0) {
if (dsync_brain_msg_sync_pair(sync) < 0)
break;
if (dsync_worker_is_output_full(sync->dest_worker)) {
if (dsync_worker_output_flush(sync->dest_worker) <= 0)
return FALSE;
}
}
if (ret == 0)
return FALSE;
if (!sync->brain->backup) {
if (!dsync_brain_msg_sync_mailbox_end(sync->dest_msg_iter,
sync->src_msg_iter))
return FALSE;
}
if (!dsync_brain_msg_sync_mailbox_end(sync->src_msg_iter,
sync->dest_msg_iter))
return FALSE;
return TRUE;
}
void dsync_brain_msg_sync_more(struct dsync_brain_mailbox_sync *sync)
{
const struct dsync_brain_mailbox *mailboxes;
unsigned int count, mailbox_idx = 0;
mailboxes = array_get(&sync->mailboxes, &count);
while (dsync_brain_msg_sync_mailbox_more(sync)) {
sync->uid_conflict = FALSE;
mailbox_idx = ++sync->wanted_mailbox_idx;
if (mailbox_idx >= count)
break;
dsync_worker_select_mailbox(sync->src_worker,
&mailboxes[mailbox_idx].box);
dsync_worker_select_mailbox(sync->dest_worker,
&mailboxes[mailbox_idx].box);
}
if (mailbox_idx < count) {
return;
}
dsync_worker_set_input_callback(sync->src_msg_iter->worker, NULL, NULL);
dsync_worker_set_output_callback(sync->src_msg_iter->worker, NULL, NULL);
dsync_worker_set_input_callback(sync->dest_msg_iter->worker, NULL, NULL);
dsync_worker_set_output_callback(sync->dest_msg_iter->worker, NULL, NULL);
if (dsync_worker_msg_iter_deinit(&sync->src_msg_iter->iter) < 0 ||
dsync_worker_msg_iter_deinit(&sync->dest_msg_iter->iter) < 0) {
dsync_brain_fail(sync->brain);
return;
}
dsync_brain_msg_sync_new_msgs(sync);
}
static void dsync_worker_msg_callback(void *context)
{
struct dsync_brain_mailbox_sync *sync = context;
dsync_brain_msg_sync_more(sync);
}
static struct dsync_brain_msg_iter *
dsync_brain_msg_iter_init(struct dsync_brain_mailbox_sync *sync,
struct dsync_worker *worker,
const mailbox_guid_t mailboxes[],
unsigned int mailbox_count)
{
struct dsync_brain_msg_iter *iter;
iter = p_new(sync->pool, struct dsync_brain_msg_iter, 1);
iter->sync = sync;
iter->worker = worker;
i_array_init(&iter->uid_conflicts, 128);
i_array_init(&iter->new_msgs, 128);
iter->guid_hash = hash_table_create(default_pool, sync->pool, 10000,
strcase_hash,
(hash_cmp_callback_t *)strcasecmp);
iter->iter = dsync_worker_msg_iter_init(worker, mailboxes,
mailbox_count);
dsync_worker_set_input_callback(worker,
dsync_worker_msg_callback, sync);
dsync_worker_set_output_callback(worker,
dsync_worker_msg_callback, sync);
if (mailbox_count > 0) {
const struct dsync_brain_mailbox *first;
first = array_idx(&sync->mailboxes, 0);
dsync_worker_select_mailbox(worker, &first->box);
}
return iter;
}
static void dsync_brain_msg_iter_deinit(struct dsync_brain_msg_iter *iter)
{
if (iter->iter != NULL)
(void)dsync_worker_msg_iter_deinit(&iter->iter);
hash_table_destroy(&iter->guid_hash);
array_free(&iter->uid_conflicts);
array_free(&iter->new_msgs);
}
static void
get_mailbox_guids(const ARRAY_TYPE(dsync_brain_mailbox) *mailboxes,
ARRAY_TYPE(mailbox_guid) *guids)
{
const struct dsync_brain_mailbox *brain_box;
t_array_init(guids, array_count(mailboxes));
array_foreach(mailboxes, brain_box)
array_append(guids, &brain_box->box.mailbox_guid, 1);
}
struct dsync_brain_mailbox_sync *
dsync_brain_msg_sync_init(struct dsync_brain *brain,
const ARRAY_TYPE(dsync_brain_mailbox) *mailboxes)
{
struct dsync_brain_mailbox_sync *sync;
pool_t pool;
pool = pool_alloconly_create("dsync brain mailbox sync", 1024*256);
sync = p_new(pool, struct dsync_brain_mailbox_sync, 1);
sync->pool = pool;
sync->brain = brain;
sync->src_worker = brain->src_worker;
sync->dest_worker = brain->dest_worker;
p_array_init(&sync->mailboxes, pool, array_count(mailboxes));
array_append_array(&sync->mailboxes, mailboxes);
T_BEGIN {
ARRAY_TYPE(mailbox_guid) guids_arr;
const mailbox_guid_t *guids;
unsigned int count;
get_mailbox_guids(mailboxes, &guids_arr);
guids = array_get(&guids_arr, &count);
sync->src_msg_iter =
dsync_brain_msg_iter_init(sync, brain->src_worker,
guids, count);
sync->dest_msg_iter =
dsync_brain_msg_iter_init(sync, brain->dest_worker,
guids, count);
} T_END;
return sync;
}
void dsync_brain_msg_sync_deinit(struct dsync_brain_mailbox_sync **_sync)
{
struct dsync_brain_mailbox_sync *sync = *_sync;
*_sync = NULL;
dsync_brain_msg_iter_deinit(sync->src_msg_iter);
dsync_brain_msg_iter_deinit(sync->dest_msg_iter);
pool_unref(&sync->pool);
}