sparse-tools/src/bs_sync.c

191 lines
5.6 KiB
C

//
// Created by haraldwolff on 14.08.22.
//
#include <blksync.h>
#include <bs_engine.h>
#include <bs_sync.h>
#include <bs_comm.h>
#include <bs_transfer.h>
int bs_sync_request(bs_engine_t *engine, int level, int start, int length);
int bs_sync_thread_rx(bs_engine_t *engine);
int bs_sync_start(bs_engine_t *engine){
if (pthread_create(&engine->threads.sync, NULL, (void * (*)(void *)) bs_sync_thread, engine))
fatal("could not create sync thread.\n", 0);
return 0;
}
int bs_sync_thread(bs_engine_t *engine) {
pthread_t thread_rx;
engine->state_flags |= BSS_SYNC;
if (engine->bs_flags & BS_SENDER) { // Sender instance
pthread_create(&thread_rx, NULL, (void * (*)(void *))bs_sync_thread_rx, engine);
for (int n = 0; n < engine->merkle.local->parameters.depth; n++) {
uint64_t *local_row, *remote_row;
int local_row_length, remote_row_length;
if (
(merkle_get_row(engine->merkle.local, n, &local_row, &local_row_length) < 0) ||
(merkle_get_row(engine->merkle.remote, n, &remote_row, &remote_row_length) < 0)
)
fatal("could not get merkle tree local_row %d", n);
if (n == 0){
if (bs_sync_request(engine, 0, 0, 1))
fatal("failed to request top sync\n", 0);
pthread_mutex_lock(&engine->merkle.mutex);
engine->merkle.waiting_syncs++;
pthread_mutex_unlock(&engine->merkle.mutex);
} else {
for (int start = 0; start < local_row_length; start++) {
int length;
for (length = 0; ((start+length) < local_row_length) && (length < (512 / engine->merkle.local->parameters.n)); length++){
if (!remote_row[start + length] || (local_row[start + length] == remote_row[start + length] ))
break;
}
if (length){
if (bs_sync_request(engine, n, start, length))
fatal("could not send sync request for n=%d start=%d\n", n, start);
pthread_mutex_lock(&engine->merkle.mutex);
engine->merkle.waiting_syncs++;
pthread_mutex_unlock(&engine->merkle.mutex);
start += length;
}
}
}
bs_progress_update(engine, BS_PRG_SYNC, 1, engine->merkle.waiting_syncs);
pthread_mutex_lock(&engine->merkle.mutex);
if (engine->merkle.waiting_syncs)
pthread_cond_wait( &engine->merkle.sync, &engine->merkle.mutex);
pthread_mutex_unlock(&engine->merkle.mutex);
}
bs_sync_request(engine, -1, -1, -1);
pthread_join(thread_rx, NULL);
} else { // Receiver instance
bs_msg_buffer_t *msg;
while (-1){
if (bs_msg_queue_pop(&engine->queues.sync, &msg, Q_WAIT))
fatal("could not pop message from sync pool\n", 0);
/*
fprintf(stdout, "tree sync rq d=%d s=%d l=%d\n", msg->sync.level, msg->sync.start, msg->sync.length);
fflush(stdout);
*/
if ((msg->msg_type == (BS_MSG_MERKLE_SYNC | BS_MSG_REQUEST)) && (msg->sync.length <= (512 / engine->merkle.local->parameters.n))){
uint64_t *local_row;
int local_row_length;
if (msg->sync.level == -1){
msg->msg_type = BS_MSG_MERKLE_SYNC | BS_MSG_RESPONSE;
bs_comm_send_msg(engine, msg);
break;
}
merkle_get_row(engine->merkle.local, msg->sync.level+1, &local_row, &local_row_length);
for (int p=0;p<msg->sync.length;p++){
for (int n=0;n<engine->merkle.local->parameters.n;n++){
int row_index = ((msg->sync.start + p) * engine->merkle.local->parameters.n) + n;
int msg_index = (p * engine->merkle.local->parameters.n) + n;
msg->payload_ui64[msg_index] = local_row[row_index];
}
}
msg->msg_type = (BS_MSG_MERKLE_SYNC | BS_MSG_RESPONSE);
msg->sync.level++;
msg->sync.start = msg->sync.start * engine->merkle.local->parameters.n;
msg->sync.length = msg->sync.length * engine->merkle.local->parameters.n;
msg->payload_length = sizeof(uint64_t) * msg->sync.length;
bs_comm_send_msg(engine, msg);
} else {
bs_msg_queue_push(&engine->queues.pool, msg);
}
}
}
engine->state_flags &= BSS_SYNC;
fprintf(stdout, "finished tree sync.\n");
fflush(stdout);
bs_transfer_start(engine);
return 0;
}
int bs_sync_request(bs_engine_t *engine, int level, int start, int length){
bs_msg_buffer_t *msg = NULL;
if (!msg)
{
if (bs_msg_queue_pop( &engine->queues.pool, &msg, Q_WAIT ))
fatal("could not fetch message buffer form pool.\n", 0);
}
msg->msg_type = BS_MSG_MERKLE_SYNC | BS_MSG_REQUEST;
msg->msg_length = sizeof(bs_msg_sync_t);
msg->sync.level = level;
msg->sync.start = start;
msg->sync.length = length;
msg->payload_length = 0;
/*
fprintf(stdout, "tree sync rq d=%d s=%d l=%d\n", msg->sync.level, msg->sync.start, msg->sync.length);
fflush(stdout);
*/
if (bs_comm_send_msg(engine, msg))
fatal("could not send message\n", 0);
return 0;
}
int bs_sync_thread_rx(bs_engine_t *engine){
bs_msg_buffer_t *msg;
while (engine->state_flags & BSS_SYNC){
if (!bs_msg_queue_pop(&engine->queues.sync, &msg, Q_WAIT)){
if (msg->msg_type == (BS_MSG_MERKLE_SYNC|BS_MSG_RESPONSE)){
uint64_t *remote_row;
int remote_row_length;
/*
fprintf(stdout, "tree rx d=%d s=%d l=%d\n", msg->sync.level, msg->sync.start, msg->sync.length);
fflush(stdout);
*/
if (msg->sync.level == -1)
break;
merkle_get_row(engine->merkle.remote, msg->sync.level, &remote_row, &remote_row_length);
if (msg->sync.start + msg->sync.length <= remote_row_length) {
for (int p = 0; p < msg->sync.length; p++)
remote_row[msg->sync.start + p] = msg->payload_ui64[p];
}
pthread_mutex_lock(&engine->merkle.mutex);
engine->merkle.waiting_syncs--;
if (!engine->merkle.waiting_syncs)
pthread_cond_signal(&engine->merkle.sync);
pthread_mutex_unlock(&engine->merkle.mutex);
}
bs_msg_queue_push(&engine->queues.pool, msg);
}
}
return 0;
}