First merkle-tree synchronization implementation

master
Harald Wolff 2022-08-12 23:50:27 +02:00
parent c5636b5e78
commit babd5e767b
14 changed files with 602 additions and 182 deletions

View File

@ -4,7 +4,7 @@ project(sparse-tools C)
set(CMAKE_C_STANDARD 99)
include_directories( include )
add_executable(blksync src/blksync.c src/bs_recv.c src/bs_send.c src/hash.c src/bs_tools.c src/bs_test.c src/merkle.c include/merkle.h)
add_executable(blksync src/blksync.c src/bs_recv_linear.c src/bs_send_linear.c src/bs_recv_merkle.c src/bs_send_merkle.c src/hash.c src/bs_tools.c src/bs_test.c src/merkle.c include/merkle.h src/bs_handshake.c src/bs_sendrecv.c src/bs_merkle.c)
target_link_libraries(blksync pthread)
add_executable(mksparse src/mksparse.c)
add_executable(hashtest src/hashtest.c src/hash.c)

View File

@ -19,6 +19,8 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <merkle.h>
typedef enum {
SR_SENDER = (1<<8),
SR_RECEIVER = (1<<9),
@ -26,27 +28,38 @@ typedef enum {
} sr_mode_t;
typedef enum {
DS_SIMULATE = (1<<0),
DS_REVERSE = (1<<1),
DS_LISTENER = (1<<2),
DS_VERBOSE = (1<<3),
DS_DUMPBLOCKS = (1<<4),
DS_COMPRESS_BZ2 = (1<<5),
BS_SIMULATE = (1<<0),
BS_LISTENER = (1<<2),
BS_VERBOSE = (1<<3),
BS_DEBUG = (1 << 4),
BS_SHUTDOWN = (1<<5)
} tool_flags_t;
DS_HELLO = (1<<12),
DS_ACK = (1<<13)
} ds_flags_t;
typedef enum {
BS_SENDER = (1<<0),
BS_RECEIVER = (1<<1),
BS_COMPRESSION_MASK
= (0xFF << 8),
BS_COMPRESSION_BZ2
= (1<<8),
BS_PROTO_MASK = (0xFF << 16),
BS_LINEAR = (1<<16),
BS_MERKLE = (2<<16)
} bs_flags_t;
typedef struct {
sr_mode_t sr_mode;
ds_flags_t ds_flags;
tool_flags_t tool_flags;
bs_flags_t bs_flags;
int listenSocket,
clientSocket;
int file;
uint64_t filesize;
uint64_t remote_filesize;
int32_t blocksize;
char filename[FILENAME_MAX];
@ -67,11 +80,21 @@ typedef struct {
typedef struct {
int magic;
ds_flags_t flags;
bs_flags_t bs_flags;
long filesize;
int blocksize;
} bs_hello_t;
static const uint32_t magic = 0xbeefcafe;
static const uint32_t version = 1;
typedef struct {
int hours;
int minutes;
int seconds;
} bs_time_t;
static const uint32_t bs_magic = 0xbeefcafe;
static const uint32_t bs_version = 1;
static const uint32_t defaultBlocksize = 4096;
bsync_engine_t *create_engine();
@ -79,19 +102,32 @@ bsync_engine_t *create_engine();
int connect_to_host(bsync_engine_t *engine, char *hostname, int port);
int accept_connection(bsync_engine_t *engine, char *hostname, int port);
int blksync_handshake(bsync_engine_t *engine);
int bs_recv(int socket, void *buffer, int size);
int bs_send(int socket, const void *buffer, int size);
int read_block(bsync_engine_t *engine, long offset, char *block, int blocksize);
int write_block(bsync_engine_t *engine, long offset, char *block, int blocksize);
int bs_sender(bsync_engine_t *engine);
int bs_receiver(bsync_engine_t *engine);
int bs_recv_linear(bsync_engine_t *engine);
int bs_recv_merkle(bsync_engine_t *engine);
int bs_sender(bsync_engine_t *engine);
int bs_send_linear(bsync_engine_t *engine);
int bs_send_merkle(bsync_engine_t *engine);
int bs_test(bsync_engine_t *engine);
int bs_merkle_build(bsync_engine_t *engine, merkle_t **merkle);
char* ntounit(long l, char *p);
void dump_engine(bsync_engine_t *engine);
int dump_engine_state(bsync_engine_t *engine);
void bs_time_set(bs_time_t* dst, int seconds);
static inline void assert(int r, char *msg) {
if (r < 0) {
if (msg)

View File

@ -34,6 +34,6 @@ int merkle_get(merkle_t *merkle, int l, int n, uint64_t *hash);
int merkle_set(merkle_t *merkle, int l, int n, uint64_t hash);
int merkle_get_row(merkle_t *merkle, int level, uint64_t **offset, int *size);
int merkle_dump(merkle_t *merkle);
#endif //SPARSE_TOOLS_MERKLE_H

View File

@ -1,19 +1,24 @@
#define _GNU_SOURCE
#include <getopt.h>
#include <hash.h>
#include <blksync.h>
#include <sys/types.h>
#include <sys/stat.h>
char sHelp[] = "usage: dsync [ -v ] [ -4 | -6 ] [ -depth | -r ] [ -s ] [ -p <port> ] [ -b <blocksize> ] <file> [ host ]\n"
"\t-h\tshow this help\n"
"\t-v\targ_verbose\n"
"\t-depth\tlisten and wait for connection\n"
"\t-r\targ_reverse_mode connection (sync from arg_listener_mode instead of to)\n"
"\t-v\tbe verbose\n"
"\t-l\tlisten and wait for connection\n"
"\t-r\treceiver\n"
"\t-s\tsender\n"
"\t-b\tset non-default blocksize\n"
"\t-p\tuse tcp port <port> for listening or connecting\n"
"\t-4\tuse ipv4\n"
"\t-6\tuse ipv6\n"
"\t-s\tsimulate (don't change target file)\n"
"\t-t\ttest mode, only output block hashes\n"
;
@ -24,6 +29,7 @@ uint16_t port = 13487;
int main_listen();
int main_connect();
int show_help(char *msg){
if (msg && strlen(msg))
fprintf(stdout, "%s\n", msg);
@ -46,21 +52,21 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
while ((opt = getopt(argc, argv, "v46lrb:p:shtd"))>0){
while ((opt = getopt(argc, argv, "v46lrb:p:shtdm"))>0){
switch (opt){
case 'h':
return show_help(NULL);
case 'v':
engine->ds_flags |= DS_VERBOSE;
engine->tool_flags |= BS_VERBOSE;
break;
case 'l':
engine->ds_flags |= DS_LISTENER;
engine->tool_flags |= BS_LISTENER;
break;
case 'r':
engine->ds_flags |= DS_REVERSE;
engine->bs_flags |= BS_RECEIVER;
break;
case 's':
engine->ds_flags |= DS_SIMULATE;
engine->bs_flags |= BS_SENDER;
break;
case '4':
af = AF_INET;
@ -81,30 +87,34 @@ int main(int argc, char *argv[])
return show_help("blocksize must be smaller then 1<<32\n");
engine->blocksize = _l;
break;
case 't':
engine->sr_mode = SR_TEST;
break;
case 'd':
engine->ds_flags |= DS_DUMPBLOCKS;
engine->tool_flags |= BS_DEBUG;
break;
case 'm':
engine->bs_flags |= BS_MERKLE;
break;
}
}
if (engine->sr_mode != SR_TEST)
engine->sr_mode = ((engine->ds_flags & DS_LISTENER)==DS_LISTENER) == ((engine->ds_flags & DS_REVERSE)==DS_REVERSE) ? SR_SENDER : SR_RECEIVER;
if ((engine->bs_flags & (BS_SENDER | BS_RECEIVER))==(BS_SENDER | BS_RECEIVER))
{
fprintf(stderr, "can only be receiver or sender!\n");
exit(EXIT_FAILURE);
}
if (optind >= argc)
return show_help("need a file or device to sync");
strncpy( engine->filename, argv[optind++], sizeof(engine->filename));
if ((engine->sr_mode != SR_TEST) && (optind >= argc) && !(engine->ds_flags & DS_LISTENER))
show_help("need a hostname\n");
if (optind < argc)
if (optind >= argc)
engine->tool_flags |= BS_LISTENER;
else
strncpy(engine->hostname, argv[optind++], sizeof(engine->hostname));
engine->file = open( engine->filename, engine->sr_mode == SR_SENDER ? O_RDONLY : (O_RDWR | O_CREAT), S_IRWXU | S_IRWXG | S_IRWXO);
engine->file = open( engine->filename, O_DIRECT | engine->bs_flags & BS_SENDER ? O_RDONLY : (O_RDWR | O_CREAT), S_IRWXU | S_IRWXG | S_IRWXO);
if (engine->file < 0){
fprintf(stderr, "could not open %s\n", engine->filename);
exit(EXIT_FAILURE);
@ -113,37 +123,37 @@ int main(int argc, char *argv[])
engine->filesize = lseek( engine->file, 0, SEEK_END);
lseek(engine->file, 0, SEEK_SET);
if (!(engine->sr_mode == SR_TEST)) {
if (engine->ds_flags & DS_LISTENER) {
assert_x(accept_connection(engine, engine->hostname, port), NULL);
} else {
assert_x(connect_to_host(engine, engine->hostname, port), NULL);
}
int s;
if (engine->tool_flags & BS_LISTENER){
s = accept_connection(engine, engine->hostname, port);
} else {
s = connect_to_host(engine, engine->hostname, port);
}
if (engine->ds_flags & DS_VERBOSE)
if (s<0){
fprintf(stderr, "connection failed. errno=%d ( %s )", s, strerror(s));
exit(EXIT_FAILURE);
}
if (blksync_handshake(engine))
{
fprintf(stderr, "handshake failed.\n");
exit(EXIT_FAILURE);
}
if (engine->tool_flags & BS_VERBOSE)
dump_engine(engine);
switch (engine->sr_mode){
case SR_SENDER:
bs_sender(engine);
break;
case SR_RECEIVER:
bs_receiver(engine);
break;
case SR_TEST:
bs_test(engine);
break;
default:
exit(EXIT_FAILURE);
}
if (engine->bs_flags & BS_SENDER)
bs_sender(engine);
else if (engine->bs_flags & BS_RECEIVER)
bs_receiver(engine);
close(engine->file);
close(engine->listenSocket);
close(engine->clientSocket);
if (engine)
free(engine);
free(engine);
}
int accept_connection(bsync_engine_t *engine, char *hostname, int port){
@ -233,7 +243,7 @@ int connect_to_host(bsync_engine_t *engine, char *hostname, int port){
void dump_engine(bsync_engine_t *engine){
fprintf( stderr, "blksync engine dump:\n");
fprintf( stderr,"sr_mode=%x ds_flags=%x\n", engine->sr_mode, engine->ds_flags);
fprintf( stderr,"tool_flags=%x bs_flags=%x\n", engine->tool_flags, engine->bs_flags);
fprintf( stderr,"filename=%s\n", engine->filename);
fprintf( stderr,"filesize=%ld (0x%lx)\n", engine->filesize, engine->filesize);
fprintf( stderr,"hostname=%s\n", engine->hostname);

85
src/bs_handshake.c 100644
View File

@ -0,0 +1,85 @@
//
// Created by haraldwolff on 08.08.22.
//
#include <blksync.h>
int blksync_handshake(bsync_engine_t *engine) {
bs_hello_t hello, remote;
hello.magic = bs_magic;
hello.bs_flags = engine->bs_flags;
hello.filesize = engine->filesize;
hello.blocksize = engine->blocksize;
if (bs_send(engine->clientSocket, &hello, sizeof(hello)) < 0)
return -1;
if (bs_recv(engine->clientSocket, &remote, sizeof(remote)) < 0)
return -1;
bs_flags_t mine, yours;
mine = hello.bs_flags & (BS_RECEIVER | BS_SENDER);
yours = remote.bs_flags & (BS_RECEIVER | BS_SENDER);
if (yours == (BS_RECEIVER | BS_SENDER))
{
fprintf(stderr, "remote wants to be receiver and sender.\n");
exit(EXIT_FAILURE);
}
if (!mine && !yours)
{
mine = engine->tool_flags & BS_LISTENER ? BS_RECEIVER : BS_SENDER;
} else if (!mine)
{
mine = (~yours) & (BS_SENDER | BS_RECEIVER);
}
if (mine == yours) {
fprintf(stderr, "remote wants to be same like me, can't work.\n");
exit(EXIT_FAILURE);
}
engine->bs_flags |= mine;
if ((engine->bs_flags & BS_COMPRESSION_MASK) & ~(BS_COMPRESSION_BZ2))
{
fprintf(stderr, "unknown compression bs_flags\n");
exit(EXIT_FAILURE);
}
if ((engine->bs_flags & BS_PROTO_MASK) & ~(BS_LINEAR | BS_MERKLE))
{
fprintf(stderr, "unknown protocol bs_flags\n");
exit(EXIT_FAILURE);
}
if (!engine->blocksize)
engine->blocksize = remote.blocksize ? remote.blocksize : defaultBlocksize;
else if (remote.blocksize && (engine->blocksize != remote.blocksize))
{
fprintf(stderr, "incompatible block sizes\n");
exit(EXIT_FAILURE);
}
if ((engine->bs_flags & BS_COMPRESSION_MASK) && (remote.bs_flags & BS_COMPRESSION_MASK) && ((engine->bs_flags & BS_COMPRESSION_MASK) != (remote.bs_flags & BS_COMPRESSION_MASK))){
fprintf(stderr, "incomatible compression modes\n");
exit(EXIT_FAILURE);
} else if (!(engine->bs_flags & BS_COMPRESSION_MASK)){
engine->bs_flags |= remote.bs_flags & BS_COMPRESSION_MASK;
}
if ((engine->bs_flags & BS_PROTO_MASK) && (remote.bs_flags & BS_PROTO_MASK) && ((engine->bs_flags & BS_PROTO_MASK) != (remote.bs_flags & BS_PROTO_MASK))){
fprintf(stderr, "incomatible protocol modes\n");
exit(EXIT_FAILURE);
} else if (!(engine->bs_flags & BS_PROTO_MASK)){
engine->bs_flags |= remote.bs_flags & BS_PROTO_MASK;
}
if (!(engine->bs_flags & BS_PROTO_MASK))
engine->bs_flags |= BS_LINEAR;
engine->remote_filesize = remote.filesize;
return 0;
}

85
src/bs_merkle.c 100644
View File

@ -0,0 +1,85 @@
//
// Created by haraldwolff on 08.08.22.
//
#include <blksync.h>
#include <hash.h>
int merkle_build_base(bsync_engine_t *engine, merkle_t **_merkle);
int bs_merkle_build(bsync_engine_t *engine, merkle_t **_merkle){
int blocks = (engine->filesize + engine->blocksize - 1) / engine->blocksize;
int s;
s = merkle_create(_merkle, 2, blocks);
if (s<0)
return s;
merkle_t *merkle = *_merkle;
char *block = malloc(engine->blocksize);
if (block){
long offset = 0;
long residue = engine->filesize;
engine->t_start = time(NULL);
for (int nblock=0; nblock < blocks; nblock++){
int size = engine->blocksize > residue ? residue : engine->blocksize;
if (read_block(engine, offset, block, size)<0)
return -EIO;
uint64_t hash = dhash(block, size);
merkle_set(merkle, merkle->parameters.depth, nblock, hash);
time_t now = time(NULL);
if ((nblock == (blocks-1)) || (now != engine->t_last_update)) {
bs_time_t bst;
bs_time_set( &bst, nblock ? (now - engine->t_start) * (blocks - nblock) / nblock : 0);
engine->t_last_update = now;
int percentage = 100 * (nblock+1) / blocks;
fprintf(stdout, "\rhashing file: %d of %d blocks ( %d%% ) => %02d:%02d:%02d", (nblock+1), blocks, percentage, bst.hours, bst.minutes, bst.seconds);
fflush(stdout);
}
offset += size;
residue -= size;
}
fprintf(stdout, "\n");
fflush(stdout);
int srcsize = sizeof(uint64_t) * merkle->parameters.n;
for (int d=merkle->parameters.depth; d > 0; d--){
uint64_t *src, *dst;
int srclen, dstlen;
merkle_get_row(merkle, d , &src, &srclen);
merkle_get_row(merkle, d-1, &dst, &dstlen);
for (int n=0;n<dstlen;n++)
dst[n] = dhash((char*)&src[n * merkle->parameters.n], srcsize );
}
if (engine->tool_flags & BS_VERBOSE){
uint64_t hash;
for (int nblock=0; nblock < blocks; nblock++) {
merkle_get( merkle, merkle->parameters.depth, nblock, &hash);
fprintf(stderr, "Block #%06d: 0x%016lx\n", nblock, hash);
}
merkle_get( merkle, 0, 0, &hash);
fprintf(stderr, "Top: 0x%016lx\n", hash);
}
free(block);
return 0;
}
merkle_free(merkle);
return -ENOMEM;
}

View File

@ -6,63 +6,8 @@
#include <hash.h>
#include <time.h>
int bs_receiver_tx(bsync_engine_t *engine);
int bs_receiver(bsync_engine_t *engine){
uint32_t _magic = 0, _version = 0;
uint64_t _filesize = 0;
uint32_t _blocksize = 0;
bs_send( engine->clientSocket, &magic, sizeof(magic));
bs_send( engine->clientSocket, &version, sizeof(version));
bs_recv( engine->clientSocket, &_magic, sizeof(_magic));
bs_recv( engine->clientSocket, &_version, sizeof(_version));
bs_recv( engine->clientSocket, &_filesize, sizeof(_filesize));
bs_recv( engine->clientSocket, &_blocksize, sizeof(_blocksize));
if (magic != _magic){
fprintf(stderr, "magic missmatch (0x%08x != 0x%08x)\n", magic, _magic);
exit(EXIT_FAILURE);
}
if (version != _version){
fprintf(stderr, "version missmatch\n");
exit(EXIT_FAILURE);
}
if (_filesize != engine->filesize){
if (ftruncate( engine->file, _filesize ) == -1){
if (engine->filesize > _filesize){
fprintf(stderr, "warning: could not truncate %s\n", engine->filename);
} else {
fprintf(stderr, "error: could not extent %s\n", engine->filename);
exit(EXIT_FAILURE);
}
}
engine->filesize = _filesize;
}
if (_blocksize != engine->blocksize)
{
if (engine->blocksize == 0)
engine->blocksize = _blocksize;
if (_blocksize == 0)
_blocksize = engine->blocksize;
}
if (_blocksize != engine->blocksize)
{
fprintf(stderr, "don't agree about blocksize (%d <> %d)", engine->blocksize, _blocksize);
exit(EXIT_FAILURE);
}
if (!engine->blocksize)
engine->blocksize = defaultBlocksize;
bs_send( engine->clientSocket, &engine->filesize, sizeof(engine->filesize));
bs_send( engine->clientSocket, &engine->blocksize, sizeof(engine->blocksize));
fprintf(stdout, "size: %ld bytes, blocksize: %ld bytes\n", engine->filesize, engine->blocksize);
int bs_recv_linear_tx(bsync_engine_t *engine);
int bs_recv_linear(bsync_engine_t *engine){
char *block = malloc(engine->blocksize);
if (!block)
{
@ -73,7 +18,7 @@ int bs_receiver(bsync_engine_t *engine){
long residual;
pthread_t thread_tx;
int s = pthread_create(&thread_tx, NULL, (void * (*)(void *))bs_receiver_tx, engine);
int s = pthread_create(&thread_tx, NULL, (void *(*)(void *)) bs_recv_linear_tx, engine);
if (s != 0){
fprintf(stderr, "could not create tx thread\n");
fflush(stderr);
@ -86,7 +31,6 @@ int bs_receiver(bsync_engine_t *engine){
exit(EXIT_FAILURE);
}
if (offset == -1ul){
fprintf(stderr, "received end mark [0x%lx].\n", offset);
break;
}
@ -103,21 +47,23 @@ int bs_receiver(bsync_engine_t *engine){
//fprintf(stderr, "received block with %d bytes @ offset 0x%lx\n", size, offset);
if (!(engine->ds_flags & DS_SIMULATE)) {
if (!(engine->tool_flags & BS_SIMULATE)) {
if (write_block(engine, offset, block, size) != size)
exit(EXIT_FAILURE);
}
}
fprintf( stderr, "bs_receiver(); finished.");
fflush(stderr);
engine->tool_flags |= BS_SHUTDOWN;
pthread_join(thread_tx, NULL);
dump_engine_state(engine);
fprintf( stdout, "\n");
}
int bs_receiver_tx(bsync_engine_t *engine){
int bs_recv_linear_tx(bsync_engine_t *engine){
char *block = malloc(engine->blocksize);
if (!block)
{
@ -127,9 +73,6 @@ int bs_receiver_tx(bsync_engine_t *engine){
long offset = 0;
long residual = engine->filesize;
fprintf( stderr, "bs_receiver_tx(); starts");
fflush(stderr);
while (residual > 0){
int32_t size = residual > engine->blocksize ? engine->blocksize : residual;
uint64_t blockhash, remotehash;
@ -160,9 +103,6 @@ int bs_receiver_tx(bsync_engine_t *engine){
residual -= size;
}
fprintf( stderr, "bs_receiver_tx(); finished.");
fflush(stderr);
offset = -1l;
if (bs_send(engine->clientSocket, &offset, sizeof(offset)) == -1)
{
@ -170,7 +110,11 @@ int bs_receiver_tx(bsync_engine_t *engine){
exit(EXIT_FAILURE);
}
sleep(30);
while (!(engine->tool_flags & BS_SHUTDOWN))
{
sleep(1);
dump_engine_state(engine);
}
return 0;
}

View File

@ -0,0 +1,85 @@
//
// Created by haraldwolff on 07.08.22.
//
#include <blksync.h>
#include <hash.h>
#include <time.h>
int bs_recv_merkle(bsync_engine_t *engine) {
int s;
uint64_t hash;
merkle_t *merkle;
s = bs_merkle_build(engine, &merkle);
if (s < 0){
fprintf(stderr, "failed to build merkle tree.\n");
}
if (engine->tool_flags & BS_DEBUG){
merkle_dump( merkle );
}
merkle_get(merkle, 0, 0, &hash);
if (bs_send(engine->clientSocket, &hash, sizeof(uint64_t)) < 0)
return -EIO;
int d,n;
while (-1) {
if (
(bs_recv(engine->clientSocket, &d, sizeof(int))<0) ||
(bs_recv(engine->clientSocket, &n, sizeof(int))<0)
){
fprintf(stderr, "failed to read tree hash request.\n");
return -EIO;
}
if ((d==-1) && (n==-1))
break;
uint64_t *row;
merkle_get_row( merkle, d+1, &row, NULL);
if (bs_send(engine->clientSocket, &row[n * merkle->parameters.n], merkle->parameters.n * sizeof(uint64_t))<0)
{
fprintf(stderr, "failed to send tree hashes for d=%d n=%d\n", d, n);
return -EIO;
}
}
char *block = malloc(engine->blocksize);
if (!block) {
fprintf(stderr, "out of memory allocating block buffer\n");
return -ENOMEM;
}
long offset = 0;
while (offset != -1){
if (bs_recv(engine->clientSocket, &offset, sizeof(offset)) < 0){
fprintf(stderr, "could not receive block offset\n");
return -EIO;
}
if (offset == -1)
break;
long residue = engine->filesize - offset;
int size = residue > engine->blocksize ? engine->blocksize : residue;
if (bs_recv(engine->clientSocket, block, size) < 0){
fprintf(stderr, "could not receive block at 0x%016lx\n", offset);
return -EIO;
}
if (write_block(engine, offset, block, size)<0){
fprintf(stderr, "could not write block at offset 0x%016lx\n", offset);
return -EIO;
}
}
free(block),
merkle_free(merkle);
return 0;
}

View File

@ -6,47 +6,7 @@
#include <hash.h>
#include <time.h>
int bs_sender(bsync_engine_t *engine){
uint32_t _magic = 0, _version = 0;
uint64_t _filesize = 0;
uint32_t _blocksize = 0;
write( engine->clientSocket, &magic, sizeof(magic));
write( engine->clientSocket, &version, sizeof(version));
write( engine->clientSocket, &engine->filesize, sizeof(engine->filesize));
write( engine->clientSocket, &engine->blocksize, sizeof(engine->blocksize));
read( engine->clientSocket, &_magic, sizeof(_magic));
read( engine->clientSocket, &_version, sizeof(_version));
read( engine->clientSocket, &_filesize, sizeof(_filesize));
read( engine->clientSocket, &_blocksize, sizeof(_blocksize));
if (magic != _magic){
fprintf(stderr, "magic missmatch (0x%08x != 0x%08x)\n", magic, _magic);
exit(EXIT_FAILURE);
}
if (version != _version){
fprintf(stderr, "version missmatch\n");
exit(EXIT_FAILURE);
}
if (engine->filesize != _filesize)
{
fprintf(stderr, "filesizes don't match (%ld != %ld)\n", engine->filesize, _filesize);
exit(EXIT_FAILURE);
}
if ((engine->blocksize != 0) && (engine->blocksize != _blocksize))
{
fprintf(stderr, "blocksizes don't match (%d != %d)\n", engine->blocksize, _blocksize);
exit(EXIT_FAILURE);
}
if (engine->blocksize == 0)
engine->blocksize = _blocksize;
fprintf(stdout, "size: %ld bytes, blocksize: %d bytes\n", engine->filesize, engine->blocksize);
int bs_send_linear(bsync_engine_t *engine){
char *block = malloc(engine->blocksize);
if (!block)
{
@ -56,9 +16,9 @@ int bs_sender(bsync_engine_t *engine){
long offset = 0;
long residual;
if (engine->ds_flags & DS_VERBOSE){
fprintf( stderr, "about to start...\n");
dump_engine(engine);
if (engine->tool_flags & BS_VERBOSE){
fprintf(stdout, "size: %ld bytes, blocksize: %d bytes\n", engine->filesize, engine->blocksize);
fflush(stdout);
}
while (-1){
@ -120,6 +80,8 @@ int bs_sender(bsync_engine_t *engine){
exit(EXIT_FAILURE);
}
dump_engine_state(engine);
fprintf( stdout, "\n");
}

View File

@ -0,0 +1,156 @@
// Created by haraldwolff on 07.08.22.
//
#include <blksync.h>
#include <hash.h>
#include <time.h>
int sync_merkle(bsync_engine_t *engine, merkle_t* merkle, merkle_t *remote, int d, int n);
int bs_send_merkle(bsync_engine_t *engine){
int s;
int blocks = (engine->filesize + engine->blocksize - 1) / engine->blocksize;
uint64_t hash;
merkle_t *merkle, *remote_merkle;
if (merkle_create(&remote_merkle, 2, blocks)<0)
return -ENOMEM;
s = bs_merkle_build(engine, &merkle);
if (s < 0){
fprintf(stderr, "failed to build merkle tree.\n");
}
fprintf(stdout, "synchronize merkle trees...\n");
fflush(stdout);
if (bs_recv(engine->clientSocket, &hash, sizeof(uint64_t)) < 0)
return -EIO;
merkle_set(remote_merkle, 0, 0, hash);
fprintf(stdout, "remote top: 0x%016lx\n", hash);
fflush(stdout);
if (engine->tool_flags & BS_DEBUG){
merkle_dump( merkle );
}
if (sync_merkle(engine, merkle, remote_merkle, 0, 0)<0)
{
fprintf(stderr, "could not sync merkle trees.\n");
return -1;
}
s = -1;
if (
(bs_send(engine->clientSocket, &s, sizeof(int))<0) ||
(bs_send(engine->clientSocket, &s, sizeof(int))<0)
){
fprintf(stderr, "failed to send end-of-sync mark\n");
return -EIO;
}
if (engine->tool_flags & BS_DEBUG){
merkle_dump( remote_merkle );
}
uint64_t *myrow, *yourrow;
char *block = malloc(engine->blocksize);
if (!block) {
fprintf(stderr, "out of memory allocating block buffer\n");
return -ENOMEM;
}
merkle_get_row(merkle, merkle->parameters.depth, &myrow, NULL);
merkle_get_row(remote_merkle, remote_merkle->parameters.depth, &yourrow, NULL);
int count_changed = 0;
int count_sent = 0;
for (int nblock = 0; nblock < blocks; nblock++) {
if (yourrow[nblock] && (yourrow[nblock] != myrow[nblock]))
count_changed++;
}
for (int nblock = 0; nblock < blocks; nblock++){
if (yourrow[nblock] && (yourrow[nblock] != myrow[nblock])){
long offset = ((long)engine->blocksize) * nblock;
int size = (nblock == (blocks - 1)) ? engine->filesize - offset : engine->blocksize;
if (read_block(engine, offset, block, size)<0)
return -EIO;
if (engine->tool_flags & BS_VERBOSE) {
fprintf(stdout, "\tsending block 0x%016lx (%d bytes)\n", offset, size);
fflush(stdout);
}
if (
(bs_send(engine->clientSocket, &offset, sizeof(offset)) < 0) ||
(bs_send(engine->clientSocket, block, size) < 0)
){
fprintf(stderr, "could not send block at 0x%016lx\n", offset);
return -EIO;
}
count_sent++;
time_t now = time(NULL);
if (engine->t_last_update != now){
bs_time_t bst;
engine->t_last_update = now;
bs_time_set( &bst, (now - engine->t_start) * (count_changed-count_sent) / count_sent);
fprintf(stdout, "\r%d / %d [ %d%% ] => %02d:%02d:%02d", count_sent, count_changed, 100 * count_sent / count_changed, bst.hours, bst.minutes, bst.seconds);
fflush(stdout);
}
}
}
free(block);
hash = -1;
bs_send(engine->clientSocket, &hash, sizeof(hash));
merkle_free(merkle);
merkle_free(remote_merkle);
return 0;
}
int sync_merkle(bsync_engine_t *engine, merkle_t* merkle, merkle_t *remote, int d, int n)
{
uint64_t mine, yours;
uint64_t *nextrow;
merkle_get(merkle, d, n, &mine);
merkle_get(remote, d, n, &yours);
if (engine->tool_flags & BS_DEBUG) {
fprintf(stdout, "%d / %d : 0x%016lx 0x%016lx\n", d, n, mine, yours);
fflush(stdout);
}
if (mine != yours){
if (
(bs_send(engine->clientSocket, &d, sizeof(int))<0) ||
(bs_send(engine->clientSocket, &n, sizeof(int))<0)
){
fprintf(stderr, "failed to request tree hashes for d=%d n=%d\n", d, n);
return -EIO;
}
merkle_get_row(remote, d+1, &nextrow, NULL);
if (bs_recv(engine->clientSocket, &nextrow[n*(remote->parameters.n)], remote->parameters.n * sizeof(uint64_t))<0)
{
fprintf(stderr, "failed to receive tree hashes for d=%d n=%d\n", d, n);
return -EIO;
}
if (d < (remote->parameters.depth-1)) {
for (int i = 0; i < remote->parameters.n; i++) {
int s = sync_merkle(engine, merkle, remote, d+1, (n*merkle->parameters.n) + i);
if (s < 0)
return s;
}
}
}
return 0;
}

40
src/bs_sendrecv.c 100644
View File

@ -0,0 +1,40 @@
//
// Created by haraldwolff on 08.08.22.
//
#include <blksync.h>
int bs_receiver(bsync_engine_t *engine){
if (engine->remote_filesize != engine->filesize){
if (ftruncate( engine->file, engine->remote_filesize ) == -1){
if (engine->filesize > engine->remote_filesize){
fprintf(stderr, "warning: could not truncate %s\n", engine->filename);
} else {
fprintf(stderr, "error: could not extent %s\n", engine->filename);
exit(EXIT_FAILURE);
}
}
engine->filesize = engine->remote_filesize;
}
switch (engine->bs_flags & BS_PROTO_MASK){
case BS_LINEAR:
return bs_recv_linear(engine);
case BS_MERKLE:
return bs_recv_merkle(engine);
}
return 0;
}
int bs_sender(bsync_engine_t *engine){
switch (engine->bs_flags & BS_PROTO_MASK){
case BS_LINEAR:
return bs_send_linear(engine);
case BS_MERKLE:
return bs_send_merkle(engine);
}
return 0;
}

View File

@ -20,7 +20,7 @@ int bs_test(bsync_engine_t *engine){
long p = 0;
long r = engine->filesize;
if (engine->ds_flags & DS_VERBOSE){
if (engine->tool_flags & BS_VERBOSE){
fprintf( stderr, "about to start...\n");
dump_engine(engine);
}

View File

@ -8,11 +8,6 @@
#include <sys/types.h>
#include <unistd.h>
typedef struct {
int hours;
int minutes;
int seconds;
} bs_time_t;
char* units[] = { "B", "kB", "MB", "GB", "TB" };
@ -63,7 +58,7 @@ int read_block(bsync_engine_t *engine, long offset, char *block, int blocksize){
while (pthread_mutex_unlock(&engine->m_file));
if (engine->ds_flags & DS_DUMPBLOCKS){
if (engine->tool_flags & BS_DEBUG){
fprintf( stderr, "BLOCK (%d bytes): ", blocksize);
for (int n=0;n<blocksize; n++){
fprintf(stderr, "%02hhx", block[n]);

View File

@ -6,6 +6,7 @@
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>
int highest_one(long l){
long test = 1l<<63;
@ -97,20 +98,25 @@ int merkle_get(merkle_t *merkle, int level, int n, uint64_t *hash){
int s = merkle_get_row(merkle, level, &row, &rowlength);
if (s < 0)
return s;
/*
for (int i=level;i<merkle->parameters.depth;i++)
n /= merkle->parameters.n;
*/
if (hash)
*hash = row[n];
return 0;
}
int merkle_set(merkle_t *merkle, int level, int n, uint64_t hash){
uint64_t *row;
int rowlength;
int s = merkle_get_row(merkle, level, &row, &rowlength);
if (s < 0)
return s;
for (int i=0;i<level;i++)
/*
for (int i=level;i<merkle->parameters.depth;i++)
n /= merkle->parameters.n;
*/
row[n] = hash;
return 0;
}
@ -128,3 +134,19 @@ int merkle_get_row(merkle_t *merkle, int level, uint64_t **offset, int *size){
return merkle->indeces[level];
}
int merkle_dump(merkle_t *merkle){
uint64_t *row;
int rowlength;
for (int d=0; d <= merkle->parameters.depth; d++)
{
merkle_get_row( merkle, d, &row, &rowlength);
for (int p=0;p<rowlength;p++)
{
fprintf(stderr, "0x%016lx ", row[p]);
}
fprintf(stderr, "\n");
}
return 0;
}