113 lines
3.6 KiB
C
113 lines
3.6 KiB
C
//
|
|
// Created by haraldwolff on 15.08.22.
|
|
//
|
|
|
|
#include <bs_transfer.h>
|
|
#include <bs_comm.h>
|
|
#include <blksync.h>
|
|
|
|
int bs_transfer_start(bs_engine_t *engine){
|
|
if (pthread_create(&engine->threads.transfer, NULL, (void * (*)(void *)) bs_transfer_thread, engine))
|
|
fatal("could not create transfer thread.\n", 0);
|
|
return 0;
|
|
}
|
|
|
|
int bs_transfer_thread(bs_engine_t *engine){
|
|
bs_msg_buffer_t *msg;
|
|
int count_blocks = (int)(engine->file.size / engine->parameters.blocksize);
|
|
int changed_blocks, transfered_blocks = 0;
|
|
|
|
engine->state_flags |= BSS_TRANSFER;
|
|
|
|
if (bs_engine_mmap_file(engine))
|
|
fatal("could not mmap file %s\n", engine->file.filename);
|
|
|
|
if (engine->bs_flags & BS_SENDER){
|
|
uint64_t *local_hashes, *remote_hashes;
|
|
int local_hashes_length, remote_hashes_length;
|
|
|
|
merkle_get_row(engine->merkle.local, engine->merkle.local->parameters.depth, &local_hashes, &local_hashes_length);
|
|
merkle_get_row(engine->merkle.remote, engine->merkle.remote->parameters.depth, &remote_hashes, &remote_hashes_length);
|
|
|
|
for (int n=0;n<count_blocks;n++) {
|
|
uint64_t local_hash = local_hashes[n];
|
|
uint64_t remote_hash = remote_hashes[n];
|
|
if (remote_hash && (remote_hash != local_hash))
|
|
changed_blocks++;
|
|
|
|
}
|
|
|
|
fprintf(stdout, "transfering blocks: %d of %d blocks need to be transfered (%d%%).\n", changed_blocks, count_blocks, (int)((100l * changed_blocks) / count_blocks));
|
|
fflush(stdout);
|
|
|
|
if (engine->tool_flags & BS_DEBUG){
|
|
fprintf(stdout, "flushing merkle base to merkle.dump\n");
|
|
fflush(stdout);
|
|
|
|
int md = open("merkle.dump", O_CREAT | O_TRUNC, 0x1FF);
|
|
for (int n=0;n<count_blocks;n++) {
|
|
write( md, &local_hashes[n], sizeof(uint64_t));
|
|
write( md, &remote_hashes[n], sizeof(uint64_t));
|
|
}
|
|
close(md);
|
|
|
|
}
|
|
|
|
|
|
for (int n=0;n<count_blocks;n++) {
|
|
uint64_t local_hash = local_hashes[n];
|
|
uint64_t remote_hash = remote_hashes[n];
|
|
|
|
if (remote_hash && (remote_hash != local_hash)){
|
|
if (bs_msg_queue_pop(&engine->queues.pool, &msg, Q_WAIT))
|
|
fatal("could not pop message from pool\n", 0);
|
|
|
|
long offset = ((long)n) * engine->parameters.blocksize;
|
|
int size = engine->parameters.blocksize;
|
|
|
|
if ((offset + size) > engine->file.size)
|
|
size = engine->file.size - offset;
|
|
|
|
msg->msg_type = BS_MSG_BLOCKDATA | BS_MSG_RESPONSE;
|
|
msg->msg_length = sizeof(bs_msg_transfer_t);
|
|
msg->transfer.blockno = n;
|
|
msg->payload_length = size;
|
|
memcpy(msg->payload, &engine->file.mmap[offset], size);
|
|
|
|
bs_comm_send_msg(engine, msg);
|
|
transfered_blocks++;
|
|
bs_progress_update(engine, BSS_TRANSFER, transfered_blocks, changed_blocks);
|
|
}
|
|
}
|
|
|
|
if (bs_msg_queue_pop(&engine->queues.pool, &msg, Q_WAIT))
|
|
fatal("could not pop message from pool\n", 0);
|
|
|
|
msg->msg_type = BS_MSG_GOODBYE;
|
|
msg->msg_length = 0;
|
|
msg->payload_length = 0;
|
|
|
|
bs_comm_send_msg(engine, msg);
|
|
} else {
|
|
|
|
while (!(engine->tool_flags & BS_SHUTDOWN)){
|
|
if (bs_msg_queue_pop(&engine->queues.transfer, &msg, Q_WAIT))
|
|
fatal("could not receive transfer messge\n", 0);
|
|
|
|
if (msg->msg_type == (BS_MSG_BLOCKDATA | BS_MSG_RESPONSE)){
|
|
long offset = ((long)msg->transfer.blockno) * engine->parameters.blocksize;
|
|
memcpy(&engine->file.mmap[offset], msg->payload, msg->payload_length);
|
|
}
|
|
|
|
bs_msg_queue_push(&engine->queues.pool, msg);
|
|
bs_progress_update(engine, BSS_TRANSFER, msg->transfer.blockno, count_blocks);
|
|
}
|
|
}
|
|
|
|
if (bs_engine_unmap_file(engine))
|
|
fatal("could not unmap file %s\n", engine->file.filename);
|
|
|
|
engine->state_flags &= ~BSS_TRANSFER;
|
|
engine->tool_flags |= BS_SHUTDOWN;
|
|
}
|