sparse-tools/src/bs_transfer.c

113 lines
3.6 KiB
C

//
// Created by haraldwolff on 15.08.22.
//
#include <bs_transfer.h>
#include <bs_comm.h>
#include <blksync.h>
int bs_transfer_start(bs_engine_t *engine){
if (pthread_create(&engine->threads.transfer, NULL, (void * (*)(void *)) bs_transfer_thread, engine))
fatal("could not create transfer thread.\n", 0);
return 0;
}
int bs_transfer_thread(bs_engine_t *engine){
bs_msg_buffer_t *msg;
int count_blocks = (int)(engine->file.size / engine->parameters.blocksize);
int changed_blocks, transfered_blocks = 0;
engine->state_flags |= BSS_TRANSFER;
if (bs_engine_mmap_file(engine))
fatal("could not mmap file %s\n", engine->file.filename);
if (engine->bs_flags & BS_SENDER){
uint64_t *local_hashes, *remote_hashes;
int local_hashes_length, remote_hashes_length;
merkle_get_row(engine->merkle.local, engine->merkle.local->parameters.depth, &local_hashes, &local_hashes_length);
merkle_get_row(engine->merkle.remote, engine->merkle.remote->parameters.depth, &remote_hashes, &remote_hashes_length);
for (int n=0;n<count_blocks;n++) {
uint64_t local_hash = local_hashes[n];
uint64_t remote_hash = remote_hashes[n];
if (remote_hash && (remote_hash != local_hash))
changed_blocks++;
}
fprintf(stdout, "transfering blocks: %d of %d blocks need to be transfered (%d%%).\n", changed_blocks, count_blocks, (int)((100l * changed_blocks) / count_blocks));
fflush(stdout);
if (engine->tool_flags & BS_DEBUG){
fprintf(stdout, "flushing merkle base to merkle.dump\n");
fflush(stdout);
int md = open("merkle.dump", O_CREAT | O_TRUNC, 0x1FF);
for (int n=0;n<count_blocks;n++) {
write( md, &local_hashes[n], sizeof(uint64_t));
write( md, &remote_hashes[n], sizeof(uint64_t));
}
close(md);
}
for (int n=0;n<count_blocks;n++) {
uint64_t local_hash = local_hashes[n];
uint64_t remote_hash = remote_hashes[n];
if (remote_hash && (remote_hash != local_hash)){
if (bs_msg_queue_pop(&engine->queues.pool, &msg, Q_WAIT))
fatal("could not pop message from pool\n", 0);
long offset = ((long)n) * engine->parameters.blocksize;
int size = engine->parameters.blocksize;
if ((offset + size) > engine->file.size)
size = engine->file.size - offset;
msg->msg_type = BS_MSG_BLOCKDATA | BS_MSG_RESPONSE;
msg->msg_length = sizeof(bs_msg_transfer_t);
msg->transfer.blockno = n;
msg->payload_length = size;
memcpy(msg->payload, &engine->file.mmap[offset], size);
bs_comm_send_msg(engine, msg);
transfered_blocks++;
bs_progress_update(engine, BSS_TRANSFER, transfered_blocks, changed_blocks);
}
}
if (bs_msg_queue_pop(&engine->queues.pool, &msg, Q_WAIT))
fatal("could not pop message from pool\n", 0);
msg->msg_type = BS_MSG_GOODBYE;
msg->msg_length = 0;
msg->payload_length = 0;
bs_comm_send_msg(engine, msg);
} else {
while (!(engine->tool_flags & BS_SHUTDOWN)){
if (bs_msg_queue_pop(&engine->queues.transfer, &msg, Q_WAIT))
fatal("could not receive transfer messge\n", 0);
if (msg->msg_type == (BS_MSG_BLOCKDATA | BS_MSG_RESPONSE)){
long offset = ((long)msg->transfer.blockno) * engine->parameters.blocksize;
memcpy(&engine->file.mmap[offset], msg->payload, msg->payload_length);
}
bs_msg_queue_push(&engine->queues.pool, msg);
bs_progress_update(engine, BSS_TRANSFER, msg->transfer.blockno, count_blocks);
}
}
if (bs_engine_unmap_file(engine))
fatal("could not unmap file %s\n", engine->file.filename);
engine->state_flags &= ~BSS_TRANSFER;
engine->tool_flags |= BS_SHUTDOWN;
}