commit c5636b5e782ec324a521a9cb7ade8709ad7492d6 Author: Harald Wolff-Thobaben Date: Mon Aug 8 21:16:29 2022 +0200 Initial Commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ff9047e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/cmake-build-debug/ diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..c8c5988 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,9 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml + diff --git a/.idea/.name b/.idea/.name new file mode 100644 index 0000000..3fdd84e --- /dev/null +++ b/.idea/.name @@ -0,0 +1 @@ +sparse-tools \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..aa5c3e2 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/mksparse.iml b/.idea/mksparse.iml new file mode 100644 index 0000000..f08604b --- /dev/null +++ b/.idea/mksparse.iml @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..2fe990b --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..9d7cca4 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.20) +project(sparse-tools C) + +set(CMAKE_C_STANDARD 99) +include_directories( include ) + +add_executable(blksync src/blksync.c src/bs_recv.c src/bs_send.c src/hash.c src/bs_tools.c src/bs_test.c src/merkle.c include/merkle.h) +target_link_libraries(blksync pthread) +add_executable(mksparse src/mksparse.c) +add_executable(hashtest src/hashtest.c src/hash.c) + +add_executable(test_merkle src/test_merkle.c src/merkle.c) \ No newline at end of file diff --git a/hashtest.gnuplot b/hashtest.gnuplot new file mode 100644 index 0000000..475ee58 --- /dev/null +++ b/hashtest.gnuplot @@ -0,0 +1,2 @@ +set datafile separator ',' +plot '/tmp/hashtest.csv' using 2:3 with dots diff --git a/include/blksync.h b/include/blksync.h new file mode 100644 index 0000000..ee714f6 --- /dev/null +++ b/include/blksync.h @@ -0,0 +1,119 @@ +// +// Created by haraldwolff on 07.08.22. +// + +#ifndef MKSPARSE_DSYNC_H +#define MKSPARSE_DSYNC_H + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +typedef enum { + SR_SENDER = (1<<8), + SR_RECEIVER = (1<<9), + SR_TEST = (1<<10) +} sr_mode_t; + +typedef enum { + DS_SIMULATE = (1<<0), + DS_REVERSE = (1<<1), + DS_LISTENER = (1<<2), + DS_VERBOSE = (1<<3), + DS_DUMPBLOCKS = (1<<4), + DS_COMPRESS_BZ2 = (1<<5), + + DS_HELLO = (1<<12), + DS_ACK = (1<<13) +} ds_flags_t; + + +typedef struct { + sr_mode_t sr_mode; + ds_flags_t ds_flags; + + int listenSocket, + clientSocket; + int file; + + uint64_t filesize; + int32_t blocksize; + + char filename[FILENAME_MAX]; + char hostname[256]; + + pthread_mutex_t m_file, + m_socket; + + long last_offset; + long stat_blocks; + long stat_block_bytes; + + time_t t_start, + t_last_update; + + uint64_t *local_hashes, *remote_hashes; +} bsync_engine_t; + +typedef struct { + int magic; + ds_flags_t flags; +} bs_hello_t; + +static const uint32_t magic = 0xbeefcafe; +static const uint32_t version = 1; +static const uint32_t defaultBlocksize = 4096; + +bsync_engine_t *create_engine(); + +int connect_to_host(bsync_engine_t *engine, char *hostname, int port); +int accept_connection(bsync_engine_t *engine, char *hostname, int port); + +int bs_recv(int socket, void *buffer, int size); +int bs_send(int socket, const void *buffer, int size); +int read_block(bsync_engine_t *engine, long offset, char *block, int blocksize); +int write_block(bsync_engine_t *engine, long offset, char *block, int blocksize); + +int bs_sender(bsync_engine_t *engine); +int bs_receiver(bsync_engine_t *engine); +int bs_test(bsync_engine_t *engine); + +char* ntounit(long l, char *p); +void dump_engine(bsync_engine_t *engine); +int dump_engine_state(bsync_engine_t *engine); + +static inline void assert(int r, char *msg) { + if (r < 0) { + if (msg) + fprintf(stderr, msg, r); + if (errno) + fprintf(stderr, "errno=%d %s", errno, strerror(errno)); + fflush(stderr); + } +} + +static inline char* format(char *msg, ...){ + va_list val; + static char _msg[2048]; + + va_start(val, msg); + vsnprintf(_msg, sizeof(_msg), msg, val); + va_end(val); + return _msg; +} + +#define assert_r(r, msg) { if (r<0) { assert(r, msg); return r; } } +#define assert_x(r, msg) { if (r<0) { assert(r, msg); exit(EXIT_FAILURE); } } + + +#endif //MKSPARSE_DSYNC_H diff --git a/include/hash.h b/include/hash.h new file mode 100644 index 0000000..b982d14 --- /dev/null +++ b/include/hash.h @@ -0,0 +1,22 @@ +// +// Created by haraldwolff on 06.08.22. +// + +#ifndef MKSPARSE_HASH_H +#define MKSPARSE_HASH_H + +#include + +static inline uint64_t rol64(uint64_t word, unsigned int shift) +{ + return (word << shift) | (word >> (64 - shift)); +} +static inline uint64_t ror64(uint64_t word, unsigned int shift) +{ + return (word >> shift) | (word << (64 - shift)); +} + +uint64_t dhash(char *data, int size); + + +#endif //MKSPARSE_HASH_H diff --git a/include/merkle.h b/include/merkle.h new file mode 100644 index 0000000..317815a --- /dev/null +++ b/include/merkle.h @@ -0,0 +1,39 @@ +// +// Created by haraldwolff on 08.08.22. +// + +#ifndef SPARSE_TOOLS_MERKLE_H +#define SPARSE_TOOLS_MERKLE_H + +#include + +typedef enum { + MF_ALLOC, +} merkle_flags_t; + +typedef struct { + int n; // level-to-level ratio + int depth; // number of levels + int length; // number of elements over the whole tree + int width; // number of elements at base level +} merkle_parameters_t; + +typedef struct { + merkle_flags_t flags; + merkle_parameters_t parameters; + + int *indeces; + uint64_t *hashes; +} merkle_t; + +int merkle_create(merkle_t **merkle, int n, int minwidth); +int merkle_init(merkle_t *merkle, int n, int minwidth); +int merkle_free(merkle_t *merkle); + +int merkle_get(merkle_t *merkle, int l, int n, uint64_t *hash); +int merkle_set(merkle_t *merkle, int l, int n, uint64_t hash); + +int merkle_get_row(merkle_t *merkle, int level, uint64_t **offset, int *size); + + +#endif //SPARSE_TOOLS_MERKLE_H diff --git a/mksparse b/mksparse new file mode 100755 index 0000000..7244f5d Binary files /dev/null and b/mksparse differ diff --git a/src/blksync.c b/src/blksync.c new file mode 100644 index 0000000..be98e1e --- /dev/null +++ b/src/blksync.c @@ -0,0 +1,259 @@ +#include + +#include +#include + + +char sHelp[] = "usage: dsync [ -v ] [ -4 | -6 ] [ -depth | -r ] [ -s ] [ -p ] [ -b ] [ host ]\n" + "\t-h\tshow this help\n" + "\t-v\targ_verbose\n" + "\t-depth\tlisten and wait for connection\n" + "\t-r\targ_reverse_mode connection (sync from arg_listener_mode instead of to)\n" + "\t-b\tset non-default blocksize\n" + "\t-p\tuse tcp port for listening or connecting\n" + "\t-4\tuse ipv4\n" + "\t-6\tuse ipv6\n" + "\t-s\tsimulate (don't change target file)\n" + "\t-t\ttest mode, only output block hashes\n" + ; + +int af = AF_UNSPEC; + +uint16_t port = 13487; + +int main_listen(); +int main_connect(); + +int show_help(char *msg){ + if (msg && strlen(msg)) + fprintf(stdout, "%s\n", msg); + + fprintf(stdout, sHelp); + fflush(stdout); + return 0; +} + +int main(int argc, char *argv[]) +{ + int opt; + long _l; + + bsync_engine_t *engine = create_engine(); + if (!engine) + { + fprintf(stderr, "could not allocate engine memory\n"); + fflush(stderr); + exit(EXIT_FAILURE); + } + + while ((opt = getopt(argc, argv, "v46lrb:p:shtd"))>0){ + switch (opt){ + case 'h': + return show_help(NULL); + case 'v': + engine->ds_flags |= DS_VERBOSE; + break; + case 'l': + engine->ds_flags |= DS_LISTENER; + break; + case 'r': + engine->ds_flags |= DS_REVERSE; + break; + case 's': + engine->ds_flags |= DS_SIMULATE; + break; + case '4': + af = AF_INET; + break; + case '6': + af = AF_INET6; + break; + case 'p': + _l = strtol(optarg, NULL, 0 ); + if (_l > 65535) + return show_help("port must be less then 65536"); + + port = _l; + break; + case 'b': + _l = strtol(optarg, NULL, 0 ); + if (_l >= (1L<<32)) + return show_help("blocksize must be smaller then 1<<32\n"); + engine->blocksize = _l; + break; + case 't': + engine->sr_mode = SR_TEST; + break; + case 'd': + engine->ds_flags |= DS_DUMPBLOCKS; + break; + } + } + + if (engine->sr_mode != SR_TEST) + engine->sr_mode = ((engine->ds_flags & DS_LISTENER)==DS_LISTENER) == ((engine->ds_flags & DS_REVERSE)==DS_REVERSE) ? SR_SENDER : SR_RECEIVER; + + if (optind >= argc) + return show_help("need a file or device to sync"); + + strncpy( engine->filename, argv[optind++], sizeof(engine->filename)); + + if ((engine->sr_mode != SR_TEST) && (optind >= argc) && !(engine->ds_flags & DS_LISTENER)) + show_help("need a hostname\n"); + + if (optind < argc) + strncpy(engine->hostname, argv[optind++], sizeof(engine->hostname)); + + engine->file = open( engine->filename, engine->sr_mode == SR_SENDER ? O_RDONLY : (O_RDWR | O_CREAT), S_IRWXU | S_IRWXG | S_IRWXO); + if (engine->file < 0){ + fprintf(stderr, "could not open %s\n", engine->filename); + exit(EXIT_FAILURE); + } + + engine->filesize = lseek( engine->file, 0, SEEK_END); + lseek(engine->file, 0, SEEK_SET); + + if (!(engine->sr_mode == SR_TEST)) { + if (engine->ds_flags & DS_LISTENER) { + assert_x(accept_connection(engine, engine->hostname, port), NULL); + } else { + assert_x(connect_to_host(engine, engine->hostname, port), NULL); + } + } + + if (engine->ds_flags & DS_VERBOSE) + dump_engine(engine); + + switch (engine->sr_mode){ + case SR_SENDER: + bs_sender(engine); + break; + case SR_RECEIVER: + bs_receiver(engine); + break; + case SR_TEST: + bs_test(engine); + break; + default: + exit(EXIT_FAILURE); + } + + close(engine->file); + close(engine->listenSocket); + close(engine->clientSocket); + + if (engine) + free(engine); +} + +int accept_connection(bsync_engine_t *engine, char *hostname, int port){ + struct addrinfo hints; + struct addrinfo *result, *ai; + int one = 1; + char sport[12]; + sprintf(sport, "%d", port); + + memset( &hints, 0, sizeof(hints)); + hints.ai_family = af; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + + int s = getaddrinfo(hostname[0] ? hostname : NULL, sport, &hints, &result); + if (s != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); + exit(EXIT_FAILURE); + } + + for (ai = result; ai != NULL; ai = ai->ai_next) { + engine->listenSocket = socket( ai->ai_family, ai->ai_socktype, ai->ai_protocol ); + if (engine->listenSocket < 0) + continue; + + setsockopt( engine->listenSocket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + + if (bind(engine->listenSocket, ai->ai_addr, ai->ai_addrlen)!=-1) + break; + + close(engine->listenSocket); + } + + if (ai == NULL){ + fprintf(stderr, "could not bind to %s : %d\n", hostname, port); + exit(EXIT_FAILURE); + } + + freeaddrinfo(result); + + listen(engine->listenSocket, 1); + + engine->clientSocket = accept( engine->listenSocket, NULL, 0); + if (engine->clientSocket < 0){ + fprintf(stderr, "no one connected\n"); + exit(EXIT_FAILURE); + } + + return 0; +} + +int connect_to_host(bsync_engine_t *engine, char *hostname, int port){ + struct addrinfo hints; + struct addrinfo *result, *ai; + char sport[12]; + sprintf(sport, "%d", port); + + memset( &hints, 0, sizeof(hints)); + hints.ai_family = af; + hints.ai_socktype = SOCK_STREAM; + + int s = getaddrinfo(hostname, sport, &hints, &result); + if (s != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); + exit(EXIT_FAILURE); + } + + for (ai = result; ai != NULL; ai = ai->ai_next) { + engine->clientSocket = socket( ai->ai_family, ai->ai_socktype, ai->ai_protocol ); + if (engine->clientSocket < 0) + continue; + if (connect(engine->clientSocket, ai->ai_addr, ai->ai_addrlen)!=-1) + break; + + close(engine->clientSocket); + } + + if (ai == NULL){ + fprintf(stderr, "could not connect to host %s : %d\n", hostname, port); + exit(EXIT_FAILURE); + } + + freeaddrinfo(result); + + return 0; +} + +void dump_engine(bsync_engine_t *engine){ + fprintf( stderr, "blksync engine dump:\n"); + fprintf( stderr,"sr_mode=%x ds_flags=%x\n", engine->sr_mode, engine->ds_flags); + fprintf( stderr,"filename=%s\n", engine->filename); + fprintf( stderr,"filesize=%ld (0x%lx)\n", engine->filesize, engine->filesize); + fprintf( stderr,"hostname=%s\n", engine->hostname); + fprintf( stderr,"blocksize=%d\n", engine->blocksize); + fprintf( stderr,"file=%d\n", engine->file); + fprintf( stderr,"listenSocket=%d\n", engine->listenSocket); + fprintf( stderr,"clientSocket=%d\n", engine->clientSocket); + fprintf( stderr,"\n"); + fflush(stderr); +} + +bsync_engine_t *create_engine(){ + bsync_engine_t *engine = malloc(sizeof(bsync_engine_t)); + if (engine) { + memset(engine, 0, sizeof(bsync_engine_t)); + pthread_mutex_init(&engine->m_file, NULL); + pthread_mutex_init(&engine->m_socket, NULL); + + engine->t_start = time(NULL); + } + return engine; +} + diff --git a/src/bs_recv.c b/src/bs_recv.c new file mode 100644 index 0000000..2673ecd --- /dev/null +++ b/src/bs_recv.c @@ -0,0 +1,176 @@ +// +// Created by haraldwolff on 07.08.22. +// + +#include +#include +#include + +int bs_receiver_tx(bsync_engine_t *engine); + +int bs_receiver(bsync_engine_t *engine){ + uint32_t _magic = 0, _version = 0; + uint64_t _filesize = 0; + uint32_t _blocksize = 0; + + bs_send( engine->clientSocket, &magic, sizeof(magic)); + bs_send( engine->clientSocket, &version, sizeof(version)); + + bs_recv( engine->clientSocket, &_magic, sizeof(_magic)); + bs_recv( engine->clientSocket, &_version, sizeof(_version)); + bs_recv( engine->clientSocket, &_filesize, sizeof(_filesize)); + bs_recv( engine->clientSocket, &_blocksize, sizeof(_blocksize)); + + if (magic != _magic){ + fprintf(stderr, "magic missmatch (0x%08x != 0x%08x)\n", magic, _magic); + exit(EXIT_FAILURE); + } + if (version != _version){ + fprintf(stderr, "version missmatch\n"); + exit(EXIT_FAILURE); + } + + if (_filesize != engine->filesize){ + if (ftruncate( engine->file, _filesize ) == -1){ + if (engine->filesize > _filesize){ + fprintf(stderr, "warning: could not truncate %s\n", engine->filename); + } else { + fprintf(stderr, "error: could not extent %s\n", engine->filename); + exit(EXIT_FAILURE); + } + } + engine->filesize = _filesize; + } + + if (_blocksize != engine->blocksize) + { + if (engine->blocksize == 0) + engine->blocksize = _blocksize; + if (_blocksize == 0) + _blocksize = engine->blocksize; + } + if (_blocksize != engine->blocksize) + { + fprintf(stderr, "don't agree about blocksize (%d <> %d)", engine->blocksize, _blocksize); + exit(EXIT_FAILURE); + } + + if (!engine->blocksize) + engine->blocksize = defaultBlocksize; + + bs_send( engine->clientSocket, &engine->filesize, sizeof(engine->filesize)); + bs_send( engine->clientSocket, &engine->blocksize, sizeof(engine->blocksize)); + + fprintf(stdout, "size: %ld bytes, blocksize: %ld bytes\n", engine->filesize, engine->blocksize); + + char *block = malloc(engine->blocksize); + if (!block) + { + fprintf(stderr, "no memory available for block"); + exit(EXIT_FAILURE); + } + long offset = 0; + long residual; + + pthread_t thread_tx; + int s = pthread_create(&thread_tx, NULL, (void * (*)(void *))bs_receiver_tx, engine); + if (s != 0){ + fprintf(stderr, "could not create tx thread\n"); + fflush(stderr); + exit(EXIT_FAILURE); + } + + while (-1){ + if (bs_recv( engine->clientSocket, &offset, sizeof(offset))==-1){ + fprintf(stderr, "could not receive offset\n"); + exit(EXIT_FAILURE); + } + if (offset == -1ul){ + fprintf(stderr, "received end mark [0x%lx].\n", offset); + break; + } + + residual = engine->filesize - offset; + int32_t size = residual > engine->blocksize ? engine->blocksize : residual; + + if (bs_recv( engine->clientSocket, block, size)==-1){ + fprintf(stderr, "could not receive block at offset 0x%lx\n", offset); + exit(EXIT_FAILURE); + } + + engine->stat_blocks++; + engine->stat_block_bytes += size; + + //fprintf(stderr, "received block with %d bytes @ offset 0x%lx\n", size, offset); + + if (!(engine->ds_flags & DS_SIMULATE)) { + if (write_block(engine, offset, block, size) != size) + exit(EXIT_FAILURE); + } + + } + + fprintf( stderr, "bs_receiver(); finished."); + fflush(stderr); + + fprintf( stdout, "\n"); +} + + +int bs_receiver_tx(bsync_engine_t *engine){ + char *block = malloc(engine->blocksize); + if (!block) + { + fprintf(stderr, "no memory available for block"); + exit(EXIT_FAILURE); + } + long offset = 0; + long residual = engine->filesize; + + fprintf( stderr, "bs_receiver_tx(); starts"); + fflush(stderr); + + while (residual > 0){ + int32_t size = residual > engine->blocksize ? engine->blocksize : residual; + uint64_t blockhash, remotehash; + + if (read_block(engine, offset, block, size) != size) + exit(EXIT_FAILURE); + + blockhash = dhash( block, size); + + int percent = 100 * offset / engine->filesize; + + if ( + (bs_send(engine->clientSocket, &offset, sizeof(offset)) == -1) || + (bs_send( engine->clientSocket, &blockhash, sizeof(blockhash))==-1) + ) + { + fprintf(stderr, "could not send block hash at offset 0x%lx\n", offset); + exit(EXIT_FAILURE); + } + + engine->last_offset = offset; + + time_t t = time(NULL); + if (engine->t_last_update != t) + dump_engine_state(engine); + + offset += size; + residual -= size; + } + + fprintf( stderr, "bs_receiver_tx(); finished."); + fflush(stderr); + + offset = -1l; + if (bs_send(engine->clientSocket, &offset, sizeof(offset)) == -1) + { + fprintf(stderr, "could not send end mark\n"); + exit(EXIT_FAILURE); + } + + sleep(30); + + return 0; +} \ No newline at end of file diff --git a/src/bs_send.c b/src/bs_send.c new file mode 100644 index 0000000..9ff6cb8 --- /dev/null +++ b/src/bs_send.c @@ -0,0 +1,125 @@ +// +// Created by haraldwolff on 07.08.22. +// + +#include +#include +#include + +int bs_sender(bsync_engine_t *engine){ + uint32_t _magic = 0, _version = 0; + uint64_t _filesize = 0; + uint32_t _blocksize = 0; + + write( engine->clientSocket, &magic, sizeof(magic)); + write( engine->clientSocket, &version, sizeof(version)); + write( engine->clientSocket, &engine->filesize, sizeof(engine->filesize)); + write( engine->clientSocket, &engine->blocksize, sizeof(engine->blocksize)); + + read( engine->clientSocket, &_magic, sizeof(_magic)); + read( engine->clientSocket, &_version, sizeof(_version)); + read( engine->clientSocket, &_filesize, sizeof(_filesize)); + read( engine->clientSocket, &_blocksize, sizeof(_blocksize)); + + if (magic != _magic){ + fprintf(stderr, "magic missmatch (0x%08x != 0x%08x)\n", magic, _magic); + exit(EXIT_FAILURE); + } + if (version != _version){ + fprintf(stderr, "version missmatch\n"); + exit(EXIT_FAILURE); + } + + if (engine->filesize != _filesize) + { + fprintf(stderr, "filesizes don't match (%ld != %ld)\n", engine->filesize, _filesize); + exit(EXIT_FAILURE); + } + + if ((engine->blocksize != 0) && (engine->blocksize != _blocksize)) + { + fprintf(stderr, "blocksizes don't match (%d != %d)\n", engine->blocksize, _blocksize); + exit(EXIT_FAILURE); + } + + if (engine->blocksize == 0) + engine->blocksize = _blocksize; + + fprintf(stdout, "size: %ld bytes, blocksize: %d bytes\n", engine->filesize, engine->blocksize); + + char *block = malloc(engine->blocksize); + if (!block) + { + fprintf(stderr, "no memory available for block"); + exit(EXIT_FAILURE); + } + long offset = 0; + long residual; + + if (engine->ds_flags & DS_VERBOSE){ + fprintf( stderr, "about to start...\n"); + dump_engine(engine); + } + + while (-1){ + if (bs_recv( engine->clientSocket, &offset, sizeof(offset))==-1){ + fprintf(stderr, "could not receive offset\n"); + exit(EXIT_FAILURE); + } + if (offset == -1ul) + break; + + residual = engine->filesize - offset; + int32_t size = residual > engine->blocksize ? engine->blocksize : residual; + uint64_t blockhash, remotehash; + + if (bs_recv( engine->clientSocket, &remotehash, sizeof(remotehash))==-1){ + fprintf(stderr, "could not receive block hash at offset 0x%lx\n", offset); + exit(EXIT_FAILURE); + } + + if (read_block(engine, offset, block, size) != size) + exit(EXIT_FAILURE); + + blockhash = dhash( block, size); + + int percent = 100 * offset / engine->filesize; + + if (remotehash != blockhash){ +/* + fprintf(stdout, "offset: %d bytes @ 0x%lx (%d%%) [0x%lx] differs \n", size, offset, percent, blockhash); + fflush(stdout); +*/ + + if ( + (bs_send( engine->clientSocket, &offset, sizeof(offset))==-1) || + (bs_send( engine->clientSocket, block, size)==-1) + ){ + fprintf(stderr,"could not send block at offset 0x%lx\n", offset); + fprintf(stderr,"errno=%d (%s)", errno, strerror(errno)); + exit(EXIT_FAILURE); + } + + engine->stat_blocks++; + engine->stat_block_bytes += size; + engine->last_offset = offset; + } + + time_t t = time(NULL); + if (engine->t_last_update != t) + dump_engine_state(engine); + + offset += size; + residual -= size; + } + + offset = -1l; + if (bs_send(engine->clientSocket, &offset, sizeof(offset)) == -1) + { + fprintf(stderr, "could not send end mark\n"); + exit(EXIT_FAILURE); + } + + fprintf( stdout, "\n"); + +} diff --git a/src/bs_test.c b/src/bs_test.c new file mode 100644 index 0000000..6608c76 --- /dev/null +++ b/src/bs_test.c @@ -0,0 +1,47 @@ +// +// Created by haraldwolff on 08.08.22. +// + +#include +#include + + +int bs_test(bsync_engine_t *engine){ + if (!(engine->blocksize)) + engine->blocksize = defaultBlocksize; + + char *block = malloc(engine->blocksize); + if (!block) + { + fprintf(stderr, "no memory available for block"); + exit(EXIT_FAILURE); + } + + long p = 0; + long r = engine->filesize; + + if (engine->ds_flags & DS_VERBOSE){ + fprintf( stderr, "about to start...\n"); + dump_engine(engine); + } + + while (r > 0){ + int32_t size = r > engine->blocksize ? engine->blocksize : r; + uint64_t blockhash; + + if (read_block(engine, p, block, size) != size) + exit(EXIT_FAILURE); + + blockhash = dhash( block, size); + + int percent = 100 * p / engine->filesize; + + fprintf(stdout, "offset: 0x%lx (%d%%) [0x%lx]\n", p, percent, blockhash); + fflush(stdout); + + p += size; + r -= size; + } + + return 0; +} \ No newline at end of file diff --git a/src/bs_tools.c b/src/bs_tools.c new file mode 100644 index 0000000..5e1a136 --- /dev/null +++ b/src/bs_tools.c @@ -0,0 +1,156 @@ +// +// Created by haraldwolff on 07.08.22. +// + +#include + +#define _LARGEFILE64_SOURCE +#include +#include + +typedef struct { + int hours; + int minutes; + int seconds; +} bs_time_t; + +char* units[] = { "B", "kB", "MB", "GB", "TB" }; + + +char* ntounit(long l, char *p){ + static char _buffer[64]; + if (!p) + p = _buffer; + + int up = 0; + while ((l > 1024) && (up < 5)){ + l /= 1024; + up++; + } + snprintf(p, sizeof(_buffer), "%ld %s", l, units[up]); + return p; +} + +void bs_time_set(bs_time_t* dst, int seconds){ + dst->hours = seconds / 3600; + dst->minutes = (seconds % 3600) / 60; + dst->seconds = seconds % 60; +} + +int read_block(bsync_engine_t *engine, long offset, char *block, int blocksize){ + while (pthread_mutex_lock(&engine->m_file)); + + long r = lseek( engine->file, offset, SEEK_SET); + if ((r == -1) || (r != offset)){ + fprintf(stderr, "read_block(): could not seek to offset 0x%lx, lseek() returned 0x%lx\n", offset, r); + if (r == -1){ + fprintf(stderr, "errno = %d (%s)\n", errno, strerror(errno)); + dump_engine(engine); + } + fflush(stderr); + return -1; + } + + r = read( engine->file, block, blocksize); + if (r != blocksize){ + fprintf(stderr, "read_block(): failed to read block at offset 0x%lx, read() returned %d\n", offset, r); + if (r < 0){ + fprintf(stderr, "errno = %d (%s)\n", errno, strerror(errno)); + } + fflush(stderr); + return -1; + } + + while (pthread_mutex_unlock(&engine->m_file)); + + if (engine->ds_flags & DS_DUMPBLOCKS){ + fprintf( stderr, "BLOCK (%d bytes): ", blocksize); + for (int n=0;nm_file)); + + long r = lseek( engine->file, offset, SEEK_SET); + if ((r == -1) || (r != offset)){ + fprintf(stderr, "write_block(): could not seek to offset 0x%lx, lseek() returned 0x%lx\n", offset, r); + if (r == -1){ + fprintf(stderr, "errno = %d (%s)\n", errno, strerror(errno)); + dump_engine(engine); + } + fflush(stderr); + return -1; + } + + r = write( engine->file, block, blocksize); + if (r != blocksize){ + fprintf(stderr, "write_block(): failed to write block at offset 0x%lx, write() returned %d\n", offset, r); + if (r < 0){ + fprintf(stderr, "errno = %d (%s)\n", errno, strerror(errno)); + } + fflush(stderr); + return -1; + } + while (pthread_mutex_unlock(&engine->m_file)); + return blocksize; +} + +int bs_recv(int socket, void *buffer, int size){ + int residue = size; + while (residue){ + int s = recv(socket, buffer, residue, 0); + if (s < 0) + return s; + + buffer += s; + residue -= s; + } + return size; +} + +int bs_send(int socket, const void *buffer, int size){ + int residue = size; + while (residue){ + int s = send(socket, buffer, residue, 0); + if (s < 0) + return s; + + buffer += s; + residue -= s; + } + return size; +} + +int dump_engine_state(bsync_engine_t *engine){ + char _b[32]; + time_t now = time(NULL); + time_t d = now - engine->t_start; + int percentage = engine->filesize ? 100 * engine->last_offset / engine->filesize : 0; + + bs_time_t t, r; + bs_time_set(&t, d); + bs_time_set(&r, engine->last_offset ? d * engine->filesize / engine->last_offset : 0); + + long syncspeed = d ? engine->stat_block_bytes / d : 0; + + fprintf(stderr,"\rS=0x%016lx O=0x%016lx P=%d%% T=%02d:%02d:%02d R=%02d:%02d:%02d N=%ld V=%s VS=%s/s", + engine->filesize, + engine->last_offset, + percentage, + t.hours, t.minutes, t.seconds, + r.hours, r.minutes, r.seconds, + engine->stat_blocks, + ntounit(engine->stat_block_bytes, NULL), + ntounit(syncspeed, _b) + ); + + engine->t_last_update = now; +} diff --git a/src/hash.c b/src/hash.c new file mode 100644 index 0000000..ace0388 --- /dev/null +++ b/src/hash.c @@ -0,0 +1,33 @@ +// +// Created by haraldwolff on 06.08.22. +// + +#include "../include/hash.h" +#include +#include + +uint64_t dhash(char *data, int size){ + uint64_t *i64 = (uint64_t*)data; + char *i8; + uint64_t hash = 0xbeefcafe; + + while (size >= 8){ + hash = rol64(hash, 23); + for (int n=0;n<16;n++) { + hash ^= *i64; + hash = ((hash>>32) * (hash>>32)) - ((hash & 0xffffffff) * (hash & 0xffffffff)); + hash = rol64(hash, 16); + } + size -= 8; + i64++; + } + + i8 = (char*)i64; + while (size > 0){ + hash = ror64( hash, 11 ); + hash ^= *(i8++); + size --; + } + + return hash; +} diff --git a/src/hashtest.c b/src/hashtest.c new file mode 100644 index 0000000..c398af8 --- /dev/null +++ b/src/hashtest.c @@ -0,0 +1,160 @@ +// +// Created by haraldwolff on 07.08.22. +// + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +int next_block(char *block, int blocksize); + +int cntHashes = 25000; +int blockSize = 4096; +char sourcePath[FILENAME_MAX]; +char destinationPath[FILENAME_MAX]; + +int srcFile = 0; +int dstFile = 1; + +int pattern; + +time_t lastUpdate; + +int main(int argc, char *argv[]){ + int opt, r; + char outline[128]; + + union { + uint64_t hash; + struct { + int32_t x; + int32_t y; + }; + } hu; + + while ((opt = getopt(argc, argv, "n:i:s:o:p:"))!=-1){ + switch (opt){ + case 'n': + cntHashes = strtol(optarg, NULL, 0); + break; + case 'i': + strncpy(sourcePath, optarg, sizeof(sourcePath)); + break; + case 's': + blockSize = strtol(optarg, NULL, 0); + break; + case 'o': + strncpy(destinationPath, optarg, sizeof(destinationPath)); + break; + case 'p': + pattern = strtol(optarg, NULL, 0); + break; + } + } + + fprintf(stderr, "calculating hashes over %d blocks of %d bytes size read from %s\nresults written to %s\n", cntHashes, blockSize, sourcePath, destinationPath); + fflush(stderr); + + + if (sourcePath[0]) + { + srcFile = open(sourcePath, O_RDONLY ); + if (srcFile < 0){ + fprintf(stderr, "could not open source %s, errno = %d ( %s )\n", sourcePath, errno, strerror(errno)); + + fflush(stderr); + exit(EXIT_FAILURE); + } + } else { + strcpy(sourcePath, "stdin"); + } + + if (destinationPath[0]) + { + dstFile = open(destinationPath, O_RDWR | O_CREAT | O_TRUNC, S_IRWXO | S_IRWXG | S_IRWXU ); + if (dstFile < 0){ + fprintf(stderr, "could not open destination %s, errno = %d ( %s )\n", destinationPath, errno, strerror(errno)); + fflush(stderr); + exit(EXIT_FAILURE); + } + } else { + strcpy(destinationPath, "stdout"); + } + + char *block = malloc(blockSize); + if (!block) + { + fprintf(stderr, "could not allocate block memory\n"); + fflush(stderr); + exit(EXIT_FAILURE); + } + + for (int n=0 ; n < cntHashes ; n++){ + next_block(block, blockSize); + + hu.hash = dhash( block, blockSize); + snprintf(outline, sizeof(outline), "%lx,%d,%d\n", hu.hash, hu.x, hu.y); + + int ol = strlen(outline); + r = write( dstFile, outline, ol); + if (r != ol){ + fprintf(stderr, "could not write hash pair to %s (write() returned %d)\n", destinationPath, r); + if (r == -1) + fprintf(stderr, "errno = %d ( %s )", errno, strerror(errno)); + fflush(stderr); + exit(EXIT_FAILURE); + } + + time_t t = time(NULL); + if (lastUpdate != t){ + lastUpdate = t; + fprintf(stderr, "\rprogress: %d%%", (100 * n / cntHashes)); + fflush(stderr); + } + } + fprintf(stderr, "\n"); + fflush(stderr); + + return 0; +} + +int create_pattern(char *block, int blocksize){ + static uint64_t state; + uint64_t *b64 = (uint64_t*)block; + + switch (pattern){ + default: + if (!state) + state = 1; + break; + } + + for (int n=0; n < (blocksize >> 3); n++) + { + *(b64++) = state; + } + + state++; + return blocksize; +} + +int next_block(char *block, int blocksize){ + if (pattern) + return create_pattern(block, blocksize); + + int r = read( srcFile, block, blocksize); + if (r != blocksize){ + fprintf(stderr, "could not read block from source\n"); + fflush(stderr); + exit(EXIT_FAILURE); + } + return r; +} \ No newline at end of file diff --git a/src/merkle.c b/src/merkle.c new file mode 100644 index 0000000..8b17c7b --- /dev/null +++ b/src/merkle.c @@ -0,0 +1,130 @@ +// +// Created by haraldwolff on 08.08.22. +// + +#include +#include +#include +#include + +int highest_one(long l){ + long test = 1l<<63; + + for (int n=63; n>=0 ; n--) + { + if (l & test) + return n; + test >>= 1; + } + return -1; +} + +int merkle_create(merkle_t **merkle, int n, int minwidth){ + *merkle = malloc(sizeof(merkle_t)); + if (!*merkle) + return -ENOMEM; + + memset( *merkle, 0x00, sizeof(merkle_t)); + + int s = merkle_init(*merkle, n, minwidth); + if (s){ + free(*merkle); + return s; + } + (*merkle)->flags |= MF_ALLOC; + + return 0; +} + +int merkle_init(merkle_t *merkle, int n, int minwidth){ + int l = 0; + int _width = 1; + int length = 1; + + while (_width < minwidth){ + l++; + _width *= n; + length += _width; + } + + merkle->indeces = malloc(sizeof(int)*(l+1)); + if (!merkle->indeces) + return -ENOMEM; + + merkle->hashes = malloc(sizeof(uint64_t)*length); + if (!merkle->hashes) { + free(merkle->indeces); + return -ENOMEM; + } + + merkle->parameters.n = n; + merkle->parameters.depth = l; + merkle->parameters.width = _width; + merkle->parameters.length = length; + + merkle->indeces[0] = 0; + if (l > 0) + { + int i = 1; + for (int d=1;d<=l;d++){ + merkle->indeces[d] = merkle->indeces[d-1] + i; + i*=n; + } + } + + memset(merkle->hashes, 0x00, sizeof(uint64_t) * length ); + + return 0; +} + + +int merkle_free(merkle_t *merkle){ + if (merkle){ + if (merkle->hashes) + free(merkle->hashes); + if (merkle->indeces) + free(merkle->indeces); + if (merkle->flags & MF_ALLOC) + free(merkle); + return 0; + } + return -EINVAL; +} + +int merkle_get(merkle_t *merkle, int level, int n, uint64_t *hash){ + uint64_t *row; + int rowlength; + int s = merkle_get_row(merkle, level, &row, &rowlength); + if (s < 0) + return s; + for (int i=level;iparameters.depth;i++) + n /= merkle->parameters.n; + if (hash) + *hash = row[n]; + return 0; +} +int merkle_set(merkle_t *merkle, int level, int n, uint64_t hash){ + uint64_t *row; + int rowlength; + int s = merkle_get_row(merkle, level, &row, &rowlength); + if (s < 0) + return s; + for (int i=0;iparameters.n; + row[n] = hash; + return 0; +} + +int merkle_get_row(merkle_t *merkle, int level, uint64_t **offset, int *size){ + if (!merkle) + return -EINVAL; + if ((level<0) || (level > merkle->parameters.depth)) + return -EINVAL; + + if (offset) + *offset = &merkle->hashes[merkle->indeces[level]]; + if (size) + *size = merkle->indeces[level]+1; + + return merkle->indeces[level]; +} diff --git a/src/mksparse.c b/src/mksparse.c new file mode 100644 index 0000000..1c93abd --- /dev/null +++ b/src/mksparse.c @@ -0,0 +1,193 @@ +#include +#include +#include +#include +#include +#include + +char *units[] = +{ + "Bytes", + "kBytes", + "MBytes", + "GBytes" +}; + +int mode = 0; // 0 = stdin -> stdout + // 1 = enocde + // 2 = decode + +int arg_verbose = 0; + +long filesize = 0; +long netsize = 0; + +long transferred = 0; +long nettransferred = 0; + +long transfersum = 0; +long nettransfersum = 0; + + +char status[1024]; + +void (*restore_sig_alrm)(int sig); +void sig_alrm(int sig) +{ + long transfer = filesize - transferred; + transferred += transfer; + + transfersum += transfer; + transfer = transfersum >> 2; + transfersum -= transfer; + + long t = transferred; + + int nu = 0; + int nt = 0; + + while ((transfer > 1024)) + { + transfer >>= 10; + nu++; + if (nu == 3) + break; + } + + while ((t > 1024)) + { + t >>= 10; + nt++; + if (nt == 3) + break; + } + + sprintf(status, "sparse-tools: %li %s seen, current speed is %li %s/s \r", t, units[nt], transfer, units[nu]); + write(2, status, strlen(status)); + fsync(2); + + alarm( 1 ); +} + + + + +int sparse_copy(int in, int out, int blocksize) +{ + int nread = 0, i; + char block[blocksize]; + + while ((nread = read( in, block, blocksize))>0) + { + for (i=0; (i0) + { + for (i=0; (i 0) + { + switch (m) + { + case 'S': + lseek( out, blocksize, SEEK_CUR ); + filesize += blocksize; + break; + case 'D': + nread = read( in, block, blocksize); + write( out, block, nread ); + filesize += nread; + break; + } + } + + ftruncate( out, filesize ); + + return nread; +} + +int main(int argc, char *argv[]) +{ + int ch; + + while ((ch = getopt(argc, argv, "edv"))!=-1) + { + switch (ch) + { + case 'd': + mode = 2; + break; + case 'e': + mode = 1; + break; + case 'v': + arg_verbose = 1; + break; + } + } + + if (arg_verbose) + { + restore_sig_alrm = signal( SIGALRM, sig_alrm); + alarm( 1 ); + } + + switch (mode) + { + case 0: + return sparse_copy( 0, 1, 4096 ); + case 1: + return sparse_encode( 0, 1, 4096 ); + case 2: + return sparse_decode( 0, 1, 4096 ); + } + + if (arg_verbose) + { + alarm( 0 ); + signal( SIGALRM, restore_sig_alrm ); + write( 2, "\n", 1 ); + } + + close(0); + close(1); +} diff --git a/src/test_merkle.c b/src/test_merkle.c new file mode 100644 index 0000000..a006a57 --- /dev/null +++ b/src/test_merkle.c @@ -0,0 +1,60 @@ +// +// Created by haraldwolff on 08.08.22. +// + +#include +#include +#include +#include + +#include +#include + + +int main(int argc, char *argv[]){ + merkle_t *merkle; + + +/* + for (int n=4; n<5; n++) + for (int mw=12; mw < 13 ; mw++) + { +*/ + int s = merkle_create(&merkle, 8, 1048768); + if (s){ + printf("failed to create merkle tree. error=%d (%s)\n", s, strerror(s)); + } else{ + printf("created merkle tree with n=%d\n", merkle->parameters.n); + printf("n=%d depth=%d width=%d length=%d\n", merkle->parameters.n, merkle->parameters.depth, merkle->parameters.width, merkle->parameters.length); + + for (int n=0;nparameters.length;n++) + merkle->hashes[n] = n; + +/* + int w = 1; + for (int n = 0; n < merkle->parameters.width; n++) { + printf("N=%3d ", n); + for (int d=0;d<=merkle->parameters.depth;d++) { + uint64_t th; + s = merkle_get(merkle, d, n, &th); + if (s < 0) { + printf("D:%d G:%d E=%d (%s)\n", d, merkle->parameters.n, s, strerror(s)); + } else { + printf("%4d ", th); + } + } + printf("\n"); + } +*/ + merkle_free(merkle); + } +/* + } +*/ + + + + + + return 0; +} \ No newline at end of file