191 lines
5.6 KiB
C
191 lines
5.6 KiB
C
//
|
|
// Created by haraldwolff on 14.08.22.
|
|
//
|
|
|
|
#include <blksync.h>
|
|
#include <bs_engine.h>
|
|
#include <bs_sync.h>
|
|
#include <bs_comm.h>
|
|
#include <bs_transfer.h>
|
|
|
|
int bs_sync_request(bs_engine_t *engine, int level, int start, int length);
|
|
int bs_sync_thread_rx(bs_engine_t *engine);
|
|
|
|
int bs_sync_start(bs_engine_t *engine){
|
|
if (pthread_create(&engine->threads.sync, NULL, (void * (*)(void *)) bs_sync_thread, engine))
|
|
fatal("could not create sync thread.\n", 0);
|
|
return 0;
|
|
}
|
|
|
|
int bs_sync_thread(bs_engine_t *engine) {
|
|
pthread_t thread_rx;
|
|
|
|
engine->state_flags |= BSS_SYNC;
|
|
|
|
if (engine->bs_flags & BS_SENDER) { // Sender instance
|
|
pthread_create(&thread_rx, NULL, (void * (*)(void *))bs_sync_thread_rx, engine);
|
|
|
|
for (int n = 0; n < engine->merkle.local->parameters.depth; n++) {
|
|
uint64_t *local_row, *remote_row;
|
|
int local_row_length, remote_row_length;
|
|
|
|
if (
|
|
(merkle_get_row(engine->merkle.local, n, &local_row, &local_row_length) < 0) ||
|
|
(merkle_get_row(engine->merkle.remote, n, &remote_row, &remote_row_length) < 0)
|
|
)
|
|
fatal("could not get merkle tree local_row %d", n);
|
|
|
|
if (n == 0){
|
|
if (bs_sync_request(engine, 0, 0, 1))
|
|
fatal("failed to request top sync\n", 0);
|
|
|
|
pthread_mutex_lock(&engine->merkle.mutex);
|
|
engine->merkle.waiting_syncs++;
|
|
pthread_mutex_unlock(&engine->merkle.mutex);
|
|
} else {
|
|
for (int start = 0; start < local_row_length; start++) {
|
|
int length;
|
|
for (length = 0; ((start+length) < local_row_length) && (length < (512 / engine->merkle.local->parameters.n)); length++){
|
|
if (!remote_row[start + length] || (local_row[start + length] == remote_row[start + length] ))
|
|
break;
|
|
}
|
|
if (length){
|
|
if (bs_sync_request(engine, n, start, length))
|
|
fatal("could not send sync request for n=%d start=%d\n", n, start);
|
|
|
|
pthread_mutex_lock(&engine->merkle.mutex);
|
|
engine->merkle.waiting_syncs++;
|
|
pthread_mutex_unlock(&engine->merkle.mutex);
|
|
start += length;
|
|
}
|
|
}
|
|
}
|
|
|
|
bs_progress_update(engine, BS_PRG_SYNC, 1, engine->merkle.waiting_syncs);
|
|
|
|
pthread_mutex_lock(&engine->merkle.mutex);
|
|
if (engine->merkle.waiting_syncs)
|
|
pthread_cond_wait( &engine->merkle.sync, &engine->merkle.mutex);
|
|
pthread_mutex_unlock(&engine->merkle.mutex);
|
|
}
|
|
|
|
bs_sync_request(engine, -1, -1, -1);
|
|
pthread_join(thread_rx, NULL);
|
|
} else { // Receiver instance
|
|
bs_msg_buffer_t *msg;
|
|
|
|
while (-1){
|
|
if (bs_msg_queue_pop(&engine->queues.sync, &msg, Q_WAIT))
|
|
fatal("could not pop message from sync pool\n", 0);
|
|
|
|
/*
|
|
fprintf(stdout, "tree sync rq d=%d s=%d l=%d\n", msg->sync.level, msg->sync.start, msg->sync.length);
|
|
fflush(stdout);
|
|
*/
|
|
|
|
|
|
if ((msg->msg_type == (BS_MSG_MERKLE_SYNC | BS_MSG_REQUEST)) && (msg->sync.length <= (512 / engine->merkle.local->parameters.n))){
|
|
uint64_t *local_row;
|
|
int local_row_length;
|
|
|
|
if (msg->sync.level == -1){
|
|
msg->msg_type = BS_MSG_MERKLE_SYNC | BS_MSG_RESPONSE;
|
|
bs_comm_send_msg(engine, msg);
|
|
break;
|
|
}
|
|
|
|
merkle_get_row(engine->merkle.local, msg->sync.level+1, &local_row, &local_row_length);
|
|
for (int p=0;p<msg->sync.length;p++){
|
|
for (int n=0;n<engine->merkle.local->parameters.n;n++){
|
|
int row_index = ((msg->sync.start + p) * engine->merkle.local->parameters.n) + n;
|
|
int msg_index = (p * engine->merkle.local->parameters.n) + n;
|
|
msg->payload_ui64[msg_index] = local_row[row_index];
|
|
}
|
|
}
|
|
|
|
msg->msg_type = (BS_MSG_MERKLE_SYNC | BS_MSG_RESPONSE);
|
|
msg->sync.level++;
|
|
msg->sync.start = msg->sync.start * engine->merkle.local->parameters.n;
|
|
msg->sync.length = msg->sync.length * engine->merkle.local->parameters.n;
|
|
msg->payload_length = sizeof(uint64_t) * msg->sync.length;
|
|
|
|
bs_comm_send_msg(engine, msg);
|
|
} else {
|
|
bs_msg_queue_push(&engine->queues.pool, msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
engine->state_flags &= BSS_SYNC;
|
|
|
|
fprintf(stdout, "finished tree sync.\n");
|
|
fflush(stdout);
|
|
|
|
bs_transfer_start(engine);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
int bs_sync_request(bs_engine_t *engine, int level, int start, int length){
|
|
bs_msg_buffer_t *msg = NULL;
|
|
if (!msg)
|
|
{
|
|
if (bs_msg_queue_pop( &engine->queues.pool, &msg, Q_WAIT ))
|
|
fatal("could not fetch message buffer form pool.\n", 0);
|
|
}
|
|
|
|
msg->msg_type = BS_MSG_MERKLE_SYNC | BS_MSG_REQUEST;
|
|
msg->msg_length = sizeof(bs_msg_sync_t);
|
|
msg->sync.level = level;
|
|
msg->sync.start = start;
|
|
msg->sync.length = length;
|
|
msg->payload_length = 0;
|
|
|
|
/*
|
|
fprintf(stdout, "tree sync rq d=%d s=%d l=%d\n", msg->sync.level, msg->sync.start, msg->sync.length);
|
|
fflush(stdout);
|
|
*/
|
|
|
|
if (bs_comm_send_msg(engine, msg))
|
|
fatal("could not send message\n", 0);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int bs_sync_thread_rx(bs_engine_t *engine){
|
|
bs_msg_buffer_t *msg;
|
|
while (engine->state_flags & BSS_SYNC){
|
|
if (!bs_msg_queue_pop(&engine->queues.sync, &msg, Q_WAIT)){
|
|
if (msg->msg_type == (BS_MSG_MERKLE_SYNC|BS_MSG_RESPONSE)){
|
|
uint64_t *remote_row;
|
|
int remote_row_length;
|
|
|
|
/*
|
|
fprintf(stdout, "tree rx d=%d s=%d l=%d\n", msg->sync.level, msg->sync.start, msg->sync.length);
|
|
fflush(stdout);
|
|
*/
|
|
|
|
|
|
if (msg->sync.level == -1)
|
|
break;
|
|
|
|
merkle_get_row(engine->merkle.remote, msg->sync.level, &remote_row, &remote_row_length);
|
|
|
|
if (msg->sync.start + msg->sync.length <= remote_row_length) {
|
|
for (int p = 0; p < msg->sync.length; p++)
|
|
remote_row[msg->sync.start + p] = msg->payload_ui64[p];
|
|
}
|
|
|
|
pthread_mutex_lock(&engine->merkle.mutex);
|
|
engine->merkle.waiting_syncs--;
|
|
if (!engine->merkle.waiting_syncs)
|
|
pthread_cond_signal(&engine->merkle.sync);
|
|
pthread_mutex_unlock(&engine->merkle.mutex);
|
|
}
|
|
bs_msg_queue_push(&engine->queues.pool, msg);
|
|
}
|
|
}
|
|
return 0;
|
|
}
|