Initial Commit

master
Harald Wolff 2022-08-19 19:52:26 +02:00
parent babd5e767b
commit 5168b97463
30 changed files with 1395 additions and 374 deletions

View File

@ -4,9 +4,10 @@ project(sparse-tools C)
set(CMAKE_C_STANDARD 99)
include_directories( include )
add_executable(blksync src/blksync.c src/bs_recv_linear.c src/bs_send_linear.c src/bs_recv_merkle.c src/bs_send_merkle.c src/hash.c src/bs_tools.c src/bs_test.c src/merkle.c include/merkle.h src/bs_handshake.c src/bs_sendrecv.c src/bs_merkle.c)
add_executable(blksync src/blksync.c src/bs_recv_linear.c src/bs_send_linear.c src/bs_recv_merkle.c src/bs_send_merkle.c src/hash.c src/bs_tools.c src/bs_test.c src/merkle.c include/merkle.h src/bs_comm_handshake.c src/bs_sendrecv.c src/bs_analyze.c include/bs_msg.h include/bs_progress.h include/bs_engine.h include/bs_types.h src/bs_msg_queue.c src/bs_comm.c src/bs_engine.c include/bs_comm.h include/bs_analyze.h src/bs_progress.c include/bs_tools.h src/bs_sync.c include/bs_sync.h src/bs_transfer.c include/bs_transfer.h)
target_link_libraries(blksync pthread)
add_executable(mksparse src/mksparse.c)
add_executable(hashtest src/hashtest.c src/hash.c)
add_executable(test_merkle src/test_merkle.c src/merkle.c)
add_executable(test_merkle src/test_merkle.c src/merkle.c)
add_executable(test_mmap src/test_mmap.c)

View File

@ -2,8 +2,7 @@
// Created by haraldwolff on 07.08.22.
//
#ifndef MKSPARSE_DSYNC_H
#define MKSPARSE_DSYNC_H
#pragma once
#include <stdlib.h>
#include <stdio.h>
@ -21,86 +20,12 @@
#include <merkle.h>
typedef enum {
SR_SENDER = (1<<8),
SR_RECEIVER = (1<<9),
SR_TEST = (1<<10)
} sr_mode_t;
typedef enum {
BS_SIMULATE = (1<<0),
BS_LISTENER = (1<<2),
BS_VERBOSE = (1<<3),
BS_DEBUG = (1 << 4),
BS_SHUTDOWN = (1<<5)
} tool_flags_t;
typedef enum {
BS_SENDER = (1<<0),
BS_RECEIVER = (1<<1),
BS_COMPRESSION_MASK
= (0xFF << 8),
BS_COMPRESSION_BZ2
= (1<<8),
BS_PROTO_MASK = (0xFF << 16),
BS_LINEAR = (1<<16),
BS_MERKLE = (2<<16)
} bs_flags_t;
typedef struct {
tool_flags_t tool_flags;
bs_flags_t bs_flags;
int listenSocket,
clientSocket;
int file;
uint64_t filesize;
uint64_t remote_filesize;
int32_t blocksize;
char filename[FILENAME_MAX];
char hostname[256];
pthread_mutex_t m_file,
m_socket;
long last_offset;
long stat_blocks;
long stat_block_bytes;
time_t t_start,
t_last_update;
uint64_t *local_hashes, *remote_hashes;
} bsync_engine_t;
typedef struct {
int magic;
bs_flags_t bs_flags;
long filesize;
int blocksize;
} bs_hello_t;
typedef struct {
int hours;
int minutes;
int seconds;
} bs_time_t;
static const uint32_t bs_magic = 0xbeefcafe;
static const uint32_t bs_version = 1;
static const uint32_t defaultBlocksize = 4096;
#define error(e, msg, ...) { fprintf(stderr, msg, __VA_ARGS__); fflush(stderr); return e; }
#define fatal(msg, ...) { fprintf(stderr, msg, __VA_ARGS__ ); fflush(stderr); exit(EXIT_FAILURE); }
bsync_engine_t *create_engine();
int connect_to_host(bsync_engine_t *engine, char *hostname, int port);
int accept_connection(bsync_engine_t *engine, char *hostname, int port);
/*
int blksync_handshake(bsync_engine_t *engine);
@ -150,6 +75,4 @@ static inline char* format(char *msg, ...){
#define assert_r(r, msg) { if (r<0) { assert(r, msg); return r; } }
#define assert_x(r, msg) { if (r<0) { assert(r, msg); exit(EXIT_FAILURE); } }
#endif //MKSPARSE_DSYNC_H
*/

View File

@ -0,0 +1,11 @@
//
// Created by haraldwolff on 13.08.22.
//
#pragma once
#include <bs_engine.h>
int bs_analyze_start(bs_engine_t *engine);
int bs_analyze_thread(bs_engine_t *engine);

24
include/bs_comm.h 100644
View File

@ -0,0 +1,24 @@
//
// Created by haraldwolff on 13.08.22.
//
#pragma once
#include <bs_msg.h>
static const uint32_t bs_magic = 0xbeefcafe;
static const uint32_t bs_version = 1;
static const uint32_t defaultBlocksize = 4096;
int bs_comm_start(bs_engine_t *engine);
int bs_comm_thread(bs_engine_t *engine);
int bs_comm_connect(bs_engine_t *engine);
int bs_comm_listen(bs_engine_t *engine);
int bs_comm_handshake(bs_engine_t *engine);
int bs_comm_send(bs_engine_t *engine, bs_msg_t msg_type, char *msg, int length);
int bs_comm_send_payload(bs_engine_t *engine, bs_msg_t msg_type, char *msg, int length, char *payload, int payload_length);
int bs_comm_send_msg(bs_engine_t *engine, bs_msg_buffer_t *msg);

View File

@ -0,0 +1,97 @@
//
// Created by haraldwolff on 13.08.22.
//
#pragma once
#include <stdint.h>
#include <stdio.h>
#include <pthread.h>
#include <bs_types.h>
#include <bs_progress.h>
#include <bs_msg.h>
#include <merkle.h>
struct bs_engine {
tool_flags_t tool_flags;
bs_flags_t bs_flags;
bs_state_flags_t state_flags;
struct {
int32_t blocksize;
} parameters;
struct {
pthread_mutex_t mutex;
int af_family;
char hostname[256];
int port;
int listen;
int remote;
} comm;
struct {
pthread_mutex_t mutex;
char filename[FILENAME_MAX];
int handle;
long size;
char *mmap;
} file;
struct {
bs_progress_steps_t step;
time_t step_started;
time_t last_screen_refresh;
int32_t distance;
int32_t current;
} progress;
struct {
bs_msg_queue_t pool;
bs_msg_queue_t progress;
bs_msg_queue_t sync;
bs_msg_queue_t transfer;
} queues;
struct {
pthread_t comm;
pthread_t analyze;
pthread_t sync;
pthread_t transfer;
} threads;
struct {
uint64_t filesize;
} remote;
struct {
merkle_t *local;
merkle_t *remote;
pthread_mutex_t mutex;
pthread_cond_t sync;
int waiting_syncs;
} merkle;
};
int bs_engine_create(bs_engine_t **engine);
int bs_engine_init(bs_engine_t *engine);
int bs_engine_destroy(bs_engine_t *engine);
int bs_engine_open_file(bs_engine_t *engine);
int bs_engine_truncate_file(bs_engine_t *engine, long length);
int bs_engine_mmap_file(bs_engine_t *engine);
int bs_engine_unmap_file(bs_engine_t *engine);
void bs_engine_dump(bs_engine_t *engine);

25
include/bs_msg.h 100644
View File

@ -0,0 +1,25 @@
//
// Created by haraldwolff on 12.08.22.
//
#pragma once
#include <stdint.h>
#include <pthread.h>
#include <bs_types.h>
#include <bs_progress.h>
#ifndef MAX_MSG_POOL
#define MAX_MSG_POOL 128
#endif
#ifndef MAX_MSG_PAYLOAD_LENGTH
#define MAX_MSG_PAYLOAD_LENGTH 4096
#endif
void bs_msg_queue_init(bs_msg_queue_t *queue);
int bs_msg_queue_push(bs_msg_queue_t *queue, bs_msg_buffer_t* msg);
int bs_msg_queue_pop(bs_msg_queue_t *queue, bs_msg_buffer_t** msg, bs_msg_queue_flags_t flags );
int bs_msg_buffer_create(bs_msg_buffer_t **msg);

View File

@ -0,0 +1,14 @@
//
// Created by haraldwolff on 13.08.22.
//
#pragma once
#include <stdint.h>
#include <bs_types.h>
#include <bs_engine.h>
int bs_progress_update(bs_engine_t *engine, bs_progress_steps_t step, long progress, long distance);

11
include/bs_sync.h 100644
View File

@ -0,0 +1,11 @@
//
// Created by haraldwolff on 14.08.22.
//
#pragma once
#include <bs_engine.h>
int bs_sync_start(bs_engine_t *engine);
int bs_sync_thread(bs_engine_t *engine);

10
include/bs_tools.h 100644
View File

@ -0,0 +1,10 @@
//
// Created by haraldwolff on 13.08.22.
//
#pragma once
#include <bs_types.h>
void bs_time_set(bs_time_t* dst, int seconds);

View File

@ -0,0 +1,11 @@
//
// Created by haraldwolff on 15.08.22.
//
#pragma once
#include <bs_engine.h>
int bs_transfer_start(bs_engine_t *engine);
int bs_transfer_thread(bs_engine_t *engine);

126
include/bs_types.h 100644
View File

@ -0,0 +1,126 @@
//
// Created by haraldwolff on 13.08.22.
//
#pragma once
#include <stdint.h>
#include <pthread.h>
typedef enum {
SR_SENDER = (1<<8),
SR_RECEIVER = (1<<9),
SR_TEST = (1<<10)
} sr_mode_t;
typedef enum {
BS_SIMULATE = (1<<0),
BS_LISTENER = (1<<2),
BS_VERBOSE = (1<<3),
BS_DEBUG = (1 << 4),
BS_SHUTDOWN = (1<<5),
BS_ALLOC = (1<<6),
} tool_flags_t;
typedef enum {
BS_SENDER = (1<<0),
BS_RECEIVER = (1<<1),
BS_COMPRESSION_MASK
= (0xFF << 8),
BS_COMPRESSION_BZ2
= (1<<8),
BS_PROTO_MASK = (0xFF << 16),
BS_LINEAR = (1<<16),
BS_MERKLE = (2<<16)
} bs_flags_t;
typedef enum {
BSS_COMM = (1<<0),
BSS_ANALYZE = (1<<1),
BSS_SYNC = (1<<2),
BSS_TRANSFER = (1<<3)
} bs_state_flags_t;
typedef struct {
int magic;
bs_flags_t bs_flags;
long filesize;
int blocksize;
} bs_hello_t;
typedef struct {
int hours;
int minutes;
int seconds;
} bs_time_t;
typedef enum {
BS_PRG_ANALYZE = (1 << 0),
BS_PRG_SYNC = (1 << 1),
BS_PRG_TRANSFER = (1 << 2),
} bs_progress_steps_t;
typedef enum {
BS_MSG_REQUEST = 0,
BS_MSG_RESPONSE = (1<<0),
BS_MSG_PROGRESS = (1<<1),
BS_MSG_GOODBYE = (1<<2),
// (1<<3),
BS_MSG_MERKLE_SYNC = (1<<4),
BS_MSG_BLOCKDATA = (1<<5),
} bs_msg_t;
typedef struct {
bs_progress_steps_t step; // current step worked on
int32_t distance; // progress needed to reach 100%
int32_t progress; // progress made so far
int32_t duration; // time needed so far
} bs_msg_progress_t;
typedef struct {
int32_t level;
int32_t start;
int32_t length;
} bs_msg_sync_t;
typedef struct {
int32_t blockno;
} bs_msg_transfer_t;
typedef struct bs_msg_buffer bs_msg_buffer_t;
struct bs_msg_buffer{
bs_msg_buffer_t *next;
bs_msg_t msg_type;
int msg_length;
int payload_length;
union {
char msg[1024];
bs_msg_progress_t progress;
bs_msg_sync_t sync;
bs_msg_transfer_t transfer;
};
union {
char *payload;
int32_t *payload_i32;
uint32_t *payload_ui32;
uint64_t *payload_i64;
uint64_t *payload_ui64;
};
};
typedef enum { Q_NONE, Q_WAIT } bs_msg_queue_flags_t;
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
bs_msg_buffer_t *first;
bs_msg_buffer_t *last;
} bs_msg_queue_t;
typedef struct bs_engine bs_engine_t;

View File

@ -1,10 +1,12 @@
#define _GNU_SOURCE
#include <getopt.h>
#include <hash.h>
#include <blksync.h>
#include <bs_engine.h>
#include <bs_comm.h>
#include <bs_analyze.h>
#include <sys/types.h>
#include <sys/stat.h>
@ -22,10 +24,6 @@ char sHelp[] = "usage: dsync [ -v ] [ -4 | -6 ] [ -depth | -r ] [ -s ] [ -p <por
"\t-t\ttest mode, only output block hashes\n"
;
int af = AF_UNSPEC;
uint16_t port = 13487;
int main_listen();
int main_connect();
@ -43,14 +41,11 @@ int main(int argc, char *argv[])
{
int opt;
long _l;
bs_engine_t *engine;
if (bs_engine_create(&engine))
fatal("could not allocate engine\n", 0);
bsync_engine_t *engine = create_engine();
if (!engine)
{
fprintf(stderr, "could not allocate engine memory\n");
fflush(stderr);
exit(EXIT_FAILURE);
}
while ((opt = getopt(argc, argv, "v46lrb:p:shtdm"))>0){
switch (opt){
@ -69,23 +64,23 @@ int main(int argc, char *argv[])
engine->bs_flags |= BS_SENDER;
break;
case '4':
af = AF_INET;
engine->comm.af_family = AF_INET;
break;
case '6':
af = AF_INET6;
engine->comm.af_family = AF_INET6;
break;
case 'p':
_l = strtol(optarg, NULL, 0 );
if (_l > 65535)
return show_help("port must be less then 65536");
port = _l;
engine->comm.port = _l;
break;
case 'b':
_l = strtol(optarg, NULL, 0 );
if (_l >= (1L<<32))
return show_help("blocksize must be smaller then 1<<32\n");
engine->blocksize = _l;
engine->parameters.blocksize = _l;
break;
case 'd':
engine->tool_flags |= BS_DEBUG;
@ -105,165 +100,38 @@ int main(int argc, char *argv[])
if (optind >= argc)
return show_help("need a file or device to sync");
strncpy( engine->filename, argv[optind++], sizeof(engine->filename));
strncpy( engine->file.filename, argv[optind++], sizeof(engine->file.filename));
if (optind >= argc)
engine->tool_flags |= BS_LISTENER;
else
strncpy(engine->hostname, argv[optind++], sizeof(engine->hostname));
strncpy(engine->comm.hostname, argv[optind++], sizeof(engine->comm.hostname));
engine->file = open( engine->filename, O_DIRECT | engine->bs_flags & BS_SENDER ? O_RDONLY : (O_RDWR | O_CREAT), S_IRWXU | S_IRWXG | S_IRWXO);
if (engine->file < 0){
fprintf(stderr, "could not open %s\n", engine->filename);
if (engine->parameters.blocksize > MAX_MSG_PAYLOAD_LENGTH){
fprintf(stderr, "maximum blocksize can be %d bytes\n", MAX_MSG_PAYLOAD_LENGTH);
exit(EXIT_FAILURE);
}
engine->filesize = lseek( engine->file, 0, SEEK_END);
lseek(engine->file, 0, SEEK_SET);
int s;
if (engine->tool_flags & BS_LISTENER){
s = accept_connection(engine, engine->hostname, port);
} else {
s = connect_to_host(engine, engine->hostname, port);
}
if (s<0){
fprintf(stderr, "connection failed. errno=%d ( %s )", s, strerror(s));
exit(EXIT_FAILURE);
}
if (blksync_handshake(engine))
{
fprintf(stderr, "handshake failed.\n");
exit(EXIT_FAILURE);
}
bs_comm_start(engine);
if (engine->tool_flags & BS_VERBOSE)
dump_engine(engine);
bs_engine_dump(engine);
if (engine->bs_flags & BS_SENDER)
bs_sender(engine);
else if (engine->bs_flags & BS_RECEIVER)
bs_receiver(engine);
close(engine->file);
close(engine->listenSocket);
close(engine->clientSocket);
free(engine);
}
int accept_connection(bsync_engine_t *engine, char *hostname, int port){
struct addrinfo hints;
struct addrinfo *result, *ai;
int one = 1;
char sport[12];
sprintf(sport, "%d", port);
memset( &hints, 0, sizeof(hints));
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
int s = getaddrinfo(hostname[0] ? hostname : NULL, sport, &hints, &result);
if (s != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
exit(EXIT_FAILURE);
while (!(engine->tool_flags & BS_SHUTDOWN)) {
sleep(1);
if (engine->state_flags & BSS_COMM)
bs_comm_send(engine, 0, NULL, 0);
}
for (ai = result; ai != NULL; ai = ai->ai_next) {
engine->listenSocket = socket( ai->ai_family, ai->ai_socktype, ai->ai_protocol );
if (engine->listenSocket < 0)
continue;
setsockopt( engine->listenSocket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
if (bind(engine->listenSocket, ai->ai_addr, ai->ai_addrlen)!=-1)
break;
close(engine->listenSocket);
}
if (ai == NULL){
fprintf(stderr, "could not bind to %s : %d\n", hostname, port);
exit(EXIT_FAILURE);
}
freeaddrinfo(result);
listen(engine->listenSocket, 1);
engine->clientSocket = accept( engine->listenSocket, NULL, 0);
if (engine->clientSocket < 0){
fprintf(stderr, "no one connected\n");
exit(EXIT_FAILURE);
}
return 0;
}
int connect_to_host(bsync_engine_t *engine, char *hostname, int port){
struct addrinfo hints;
struct addrinfo *result, *ai;
char sport[12];
sprintf(sport, "%d", port);
memset( &hints, 0, sizeof(hints));
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
int s = getaddrinfo(hostname, sport, &hints, &result);
if (s != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
exit(EXIT_FAILURE);
}
for (ai = result; ai != NULL; ai = ai->ai_next) {
engine->clientSocket = socket( ai->ai_family, ai->ai_socktype, ai->ai_protocol );
if (engine->clientSocket < 0)
continue;
if (connect(engine->clientSocket, ai->ai_addr, ai->ai_addrlen)!=-1)
break;
close(engine->clientSocket);
}
if (ai == NULL){
fprintf(stderr, "could not connect to host %s : %d\n", hostname, port);
exit(EXIT_FAILURE);
}
freeaddrinfo(result);
return 0;
}
void dump_engine(bsync_engine_t *engine){
fprintf( stderr, "blksync engine dump:\n");
fprintf( stderr,"tool_flags=%x bs_flags=%x\n", engine->tool_flags, engine->bs_flags);
fprintf( stderr,"filename=%s\n", engine->filename);
fprintf( stderr,"filesize=%ld (0x%lx)\n", engine->filesize, engine->filesize);
fprintf( stderr,"hostname=%s\n", engine->hostname);
fprintf( stderr,"blocksize=%d\n", engine->blocksize);
fprintf( stderr,"file=%d\n", engine->file);
fprintf( stderr,"listenSocket=%d\n", engine->listenSocket);
fprintf( stderr,"clientSocket=%d\n", engine->clientSocket);
fprintf( stderr,"\n");
fprintf(stderr, "shuting down, waiting for threads to quit.\n");
fflush(stderr);
bs_engine_destroy(engine);
return 0;
}
bsync_engine_t *create_engine(){
bsync_engine_t *engine = malloc(sizeof(bsync_engine_t));
if (engine) {
memset(engine, 0, sizeof(bsync_engine_t));
pthread_mutex_init(&engine->m_file, NULL);
pthread_mutex_init(&engine->m_socket, NULL);
engine->t_start = time(NULL);
}
return engine;
}

144
src/bs_analyze.c 100644
View File

@ -0,0 +1,144 @@
//
// Created by haraldwolff on 08.08.22.
//
#include <blksync.h>
#include <hash.h>
#include <bs_engine.h>
#include <bs_analyze.h>
#include <bs_sync.h>
struct analyzer_thread_parm {
bs_engine_t *engine;
pthread_spinlock_t spinlock;
int blocks;
long next_offset;
};
int bs_analyzer_thread(struct analyzer_thread_parm *atp);
int bs_analyze_start(bs_engine_t *engine){
if (pthread_create(&engine->threads.analyze, NULL, (void * (*)(void *))bs_analyze_thread, engine))
fatal("could not create analyzer thread.\n", 0);
return 0;
}
int bs_analyze_thread(bs_engine_t *engine){
pthread_t analyzers[4];
engine->state_flags |= BSS_ANALYZE;
if (bs_engine_mmap_file(engine))
fatal("could not map file %s", engine->file.filename);
if (!engine->parameters.blocksize)
engine->parameters.blocksize = 4096;
struct analyzer_thread_parm atp;
atp.engine = engine;
pthread_spin_init(&atp.spinlock, PTHREAD_PROCESS_PRIVATE );
atp.next_offset = 0;
atp.blocks = (engine->file.size + engine->parameters.blocksize - 1) / engine->parameters.blocksize;
int s = merkle_create(&engine->merkle.local, 2, atp.blocks);
if (s<0)
fatal("failed to allocate merkle tree (local)\n", 0);
s = merkle_create(&engine->merkle.remote, 2, atp.blocks);
if (s<0)
fatal("failed to allocate merkle tree (remote)\n", 0);
for (int n=0;n<4;n++)
pthread_create(&analyzers[n], NULL, (void * (*)(void *))bs_analyzer_thread, &atp);
while ((atp.next_offset < engine->file.size) && (engine->state_flags & BSS_ANALYZE)){
bs_progress_update(engine, BS_PRG_ANALYZE, atp.next_offset, engine->file.size);
sleep(1);
}
for (int depth=engine->merkle.local->parameters.depth; depth > 0; depth--){
uint64_t *bottom_row, *upper_row;
int bottom_row_length, upper_row_length;
merkle_get_row(engine->merkle.local, depth, &bottom_row, &bottom_row_length);
merkle_get_row(engine->merkle.local, depth-1, &upper_row, &upper_row_length);
for (int p=0; p < upper_row_length; p++)
upper_row[p] = dhash(&bottom_row[p * engine->merkle.local->parameters.n], sizeof(uint64_t) * engine->merkle.local->parameters.n);
}
bs_progress_update(engine, BS_PRG_ANALYZE, atp.next_offset, engine->file.size);
engine->state_flags &= ~BSS_ANALYZE;
for (int n=0;n<4;n++)
pthread_join(analyzers[n], NULL);
bs_engine_unmap_file(engine);
//merkle_dump(engine->merkle.local);
bs_sync_start(engine);
return 0;
}
int bs_analyzer_thread(struct analyzer_thread_parm *atp){
long offset;
while ((atp->next_offset < atp->engine->file.size) && (atp->engine->state_flags & BSS_ANALYZE)) {
pthread_spin_lock(&atp->spinlock);
offset = atp->next_offset;
if (offset < atp->engine->file.size) {
atp->next_offset += atp->engine->parameters.blocksize;
}
pthread_spin_unlock(&atp->spinlock);
if (offset >= atp->engine->file.size)
break;
int size = atp->engine->file.size - offset;
if (size > atp->engine->parameters.blocksize)
size = atp->engine->parameters.blocksize;
uint64_t hash = dhash(&atp->engine->file.mmap[offset], size);
merkle_set(atp->engine->merkle.local, atp->engine->merkle.local->parameters.depth, offset / atp->engine->parameters.blocksize, hash);
}
return 0;
}
/*
int bs_merkle_build(bsync_engine_t *engine, merkle_t **_merkle){
int srcsize = sizeof(uint64_t) * merkle->parameters.n;
for (int d=merkle->parameters.depth; d > 0; d--){
uint64_t *src, *dst;
int srclen, dstlen;
merkle_get_row(merkle, d , &src, &srclen);
merkle_get_row(merkle, d-1, &dst, &dstlen);
for (int n=0;n<dstlen;n++)
dst[n] = dhash((char*)&src[n * merkle->parameters.n], srcsize );
}
if (engine->tool_flags & BS_VERBOSE){
uint64_t hash;
for (int nblock=0; nblock < blocks; nblock++) {
merkle_get( merkle, merkle->parameters.depth, nblock, &hash);
fprintf(stderr, "Block #%06d: 0x%016lx\n", nblock, hash);
}
merkle_get( merkle, 0, 0, &hash);
fprintf(stderr, "Top: 0x%016lx\n", hash);
}
free(block);
return 0;
}
merkle_free(merkle);
return -ENOMEM;
}
*/

202
src/bs_comm.c 100644
View File

@ -0,0 +1,202 @@
//
// Created by haraldwolff on 13.08.22.
//
#include <string.h>
#include <netdb.h>
#include <blksync.h>
#include <bs_engine.h>
#include <bs_comm.h>
#include <bs_analyze.h>
int bs_comm_start(bs_engine_t *engine){
if (pthread_create(&engine->threads.comm, NULL, (void * (*)(void *))bs_comm_thread, engine))
fatal("could not create comm thread.\n", 0);
return 0;
}
int bs_comm_recv(int socket, void *buffer, int length){
int l = length;
char *b = buffer;
while (l){
int r = recv(socket, b, l, 0);
if (r < 0)
return r;
b += r;
l -= r;
}
return length;
}
int bs_comm_thread(bs_engine_t *engine){
int s;
bs_msg_buffer_t *msg;
if (engine->tool_flags & BS_LISTENER)
bs_comm_listen(engine);
else
bs_comm_connect(engine);
engine->state_flags |= BSS_COMM;
if (bs_comm_handshake(engine))
fatal("handshake failed.\n", 0);
bs_analyze_start(engine);
while (!(engine->tool_flags & BS_SHUTDOWN)) {
if (!msg) {
s = bs_msg_queue_pop(&engine->queues.pool, &msg, Q_WAIT);
if (s) fatal("could not pop message buffer from pool\n", 0);
}
s = bs_comm_recv(engine->comm.remote, &msg->msg_type, sizeof(msg->msg_type));
if (s != sizeof(msg->msg_type)) fatal("I/O error while receiving message (A) %s\n", strerror(errno));
s = bs_comm_recv(engine->comm.remote, &msg->msg_length, sizeof(msg->msg_length));
if (s != sizeof(msg->msg_length)) fatal("I/O error while receiving message (B) %s\n", strerror(errno));
s = bs_comm_recv(engine->comm.remote, &msg->payload_length, sizeof(msg->payload_length));
if (s != sizeof(msg->msg_length)) fatal("I/O error while receiving message (C) %s\n", strerror(errno));
if (msg->msg_length) {
s = bs_comm_recv(engine->comm.remote, msg->msg, msg->msg_length);
if (s != msg->msg_length) fatal("I/O error while receiving message (D) %s\n", strerror(errno));
}
if (msg->payload_length) {
s = bs_comm_recv(engine->comm.remote, msg->payload, msg->payload_length);
if (s != msg->payload_length) fatal("I/O error while receiving message (E) %s\n", strerror(errno));
}
bs_msg_queue_t *queue = NULL;
if (msg->msg_type & BS_MSG_PROGRESS)
queue = &engine->queues.progress;
else if (msg->msg_type & BS_MSG_MERKLE_SYNC)
queue = &engine->queues.sync;
else if (msg->msg_type & BS_MSG_BLOCKDATA)
queue = &engine->queues.transfer;
else if (msg->msg_type & BS_MSG_GOODBYE) {
engine->tool_flags |= BS_SHUTDOWN;
} else if (msg->msg_type == 0){
// Keep-Alive
} else {
fprintf(stderr, "received unkown message type 0x%x\n", msg->msg_type);
fflush(stderr);
}
if (queue) {
bs_msg_queue_push(queue, msg);
msg = NULL;
}
}
engine->state_flags &= ~BSS_COMM;
return 0;
}
int bs_comm_connect(bs_engine_t *engine){
struct addrinfo hints;
struct addrinfo *result, *ai;
char sport[12];
sprintf(sport, "%d", engine->comm.port);
memset( &hints, 0, sizeof(hints));
hints.ai_family = engine->comm.af_family;
hints.ai_socktype = SOCK_STREAM;
int s = getaddrinfo(engine->comm.hostname, sport, &hints, &result);
if (s != 0)
fatal("could not get addrinfo for host %s (%s)\n", engine->comm.hostname, gai_strerror(s));
for (ai = result; ai != NULL; ai = ai->ai_next) {
engine->comm.remote = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol );
if (engine->comm.remote < 0)
continue;
if (connect(engine->comm.remote, ai->ai_addr, ai->ai_addrlen) != -1)
break;
close(engine->comm.remote);
}
if (ai == NULL)
fatal("could not connect to host %s : %d\n", engine->comm.hostname, engine->comm.port);
freeaddrinfo(result);
return 0;
}
int bs_comm_listen(bs_engine_t *engine){
struct addrinfo hints;
struct addrinfo *result, *ai;
int one = 1;
char sport[12];
sprintf(sport, "%d", engine->comm.port);
memset( &hints, 0, sizeof(hints));
hints.ai_family = engine->comm.af_family;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
int s = getaddrinfo(engine->comm.hostname[0] ? engine->comm.hostname : NULL, sport, &hints, &result);
if (s != 0)
fatal("could not get addrinfo for host %s (%s)\n", engine->comm.hostname, gai_strerror(s));
for (ai = result; ai != NULL; ai = ai->ai_next) {
engine->comm.listen = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol );
if (engine->comm.listen < 0)
continue;
setsockopt(engine->comm.listen, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
if (bind(engine->comm.listen, ai->ai_addr, ai->ai_addrlen) != -1)
break;
close(engine->comm.listen);
}
if (ai == NULL)
fatal("could not bind to %s:%d\n", engine->comm.hostname, engine->comm.port);
freeaddrinfo(result);
listen(engine->comm.listen, 1);
engine->comm.remote = accept(engine->comm.listen, NULL, 0);
if (engine->comm.remote < 0)
fatal("no one connected\n", 0);
return 0;
}
int bs_comm_send_msg(bs_engine_t *engine, bs_msg_buffer_t *msg){
int s = bs_comm_send_payload(engine, msg->msg_type, msg->msg, msg->msg_length, msg->payload, msg->payload_length);
bs_msg_queue_push(&engine->queues.pool, msg);
return s;
}
int bs_comm_send(bs_engine_t *engine, bs_msg_t msg_type, char *msg, int length){
return bs_comm_send_payload(engine, msg_type, msg, length, NULL, 0);
}
int bs_comm_send_payload(bs_engine_t *engine, bs_msg_t msg_type, char *msg, int length, char *payload, int payload_length){
if (!payload && payload_length)
return -EINVAL;
pthread_mutex_lock(&engine->comm.mutex);
if (
(send(engine->comm.remote, &msg_type, sizeof(msg_type), 0) < 0) ||
(send(engine->comm.remote, &length, sizeof(length), 0) < 0) ||
(send(engine->comm.remote, &payload_length, sizeof(payload_length), 0) < 0) ||
(send(engine->comm.remote, msg, length, 0) < 0) ||
(send(engine->comm.remote, payload, payload_length, 0) < 0)
)
fatal("failed to send message\n", 0);
pthread_mutex_unlock(&engine->comm.mutex);
return 0;
}

View File

@ -2,19 +2,34 @@
// Created by haraldwolff on 08.08.22.
//
#include <blksync.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <blksync.h>
#include <bs_comm.h>
#include <bs_engine.h>
int bs_comm_handshake(bs_engine_t *engine) {
if (!engine->file.handle)
{
int s = bs_engine_open_file(engine);
if (s)
fatal("could not open file %s\n", engine->file.filename);
}
int blksync_handshake(bsync_engine_t *engine) {
bs_hello_t hello, remote;
memset(&hello, 0, sizeof(hello));
hello.magic = bs_magic;
hello.bs_flags = engine->bs_flags;
hello.filesize = engine->filesize;
hello.blocksize = engine->blocksize;
hello.filesize = engine->file.size;
hello.blocksize = engine->parameters.blocksize;
if (bs_send(engine->clientSocket, &hello, sizeof(hello)) < 0)
if (send(engine->comm.remote, &hello, sizeof(hello), 0) < 0)
return -1;
if (bs_recv(engine->clientSocket, &remote, sizeof(remote)) < 0)
if (recv(engine->comm.remote, &remote, sizeof(remote), 0) < 0)
return -1;
bs_flags_t mine, yours;
@ -54,12 +69,14 @@ int blksync_handshake(bsync_engine_t *engine) {
exit(EXIT_FAILURE);
}
if (!engine->blocksize)
engine->blocksize = remote.blocksize ? remote.blocksize : defaultBlocksize;
else if (remote.blocksize && (engine->blocksize != remote.blocksize))
if (!engine->parameters.blocksize)
engine->parameters.blocksize = remote.blocksize ? remote.blocksize : defaultBlocksize;
else if (remote.blocksize && (engine->parameters.blocksize != remote.blocksize))
{
fprintf(stderr, "incompatible block sizes\n");
exit(EXIT_FAILURE);
} else if (engine->parameters.blocksize > MAX_MSG_PAYLOAD_LENGTH){
fatal("requested blocksize too large. (%d > %d)", remote.blocksize, MAX_MSG_PAYLOAD_LENGTH);
}
if ((engine->bs_flags & BS_COMPRESSION_MASK) && (remote.bs_flags & BS_COMPRESSION_MASK) && ((engine->bs_flags & BS_COMPRESSION_MASK) != (remote.bs_flags & BS_COMPRESSION_MASK))){
@ -79,7 +96,13 @@ int blksync_handshake(bsync_engine_t *engine) {
if (!(engine->bs_flags & BS_PROTO_MASK))
engine->bs_flags |= BS_LINEAR;
engine->remote_filesize = remote.filesize;
if ((engine->bs_flags & BS_RECEIVER) && (engine->file.size != remote.filesize)){
fprintf(stdout, "truncate needed to %ld bytes (0x%lx)\n", remote.filesize, remote.filesize);
fflush(stdout);
if (bs_engine_truncate_file(engine, remote.filesize))
fatal("could not truncate file %s ( %s )", engine->file.filename, strerror(errno));
}
return 0;
}

145
src/bs_engine.c 100644
View File

@ -0,0 +1,145 @@
//
// Created by haraldwolff on 13.08.22.
//
#define _GNU_SOURCE
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include "blksync.h"
#include "bs_engine.h"
int bs_engine_create(bs_engine_t **engine){
*engine = malloc(sizeof(bs_engine_t));
if (!*engine)
return -ENOMEM;
int s = bs_engine_init(*engine);
(*engine)->tool_flags |= BS_ALLOC;
return s;
}
int bs_engine_init(bs_engine_t *engine){
int s;
memset(engine, 0, sizeof(bs_engine_t));
pthread_mutex_init(&engine->comm.mutex, NULL);
pthread_mutex_init(&engine->file.mutex, NULL);
pthread_mutex_init(&engine->merkle.mutex, NULL);
pthread_cond_init( &engine->merkle.sync, NULL);
bs_msg_queue_init(&engine->queues.pool);
bs_msg_queue_init(&engine->queues.progress);
bs_msg_queue_init(&engine->queues.sync);
bs_msg_queue_init(&engine->queues.transfer);
engine->comm.af_family = AF_UNSPEC;
engine->comm.port = 13487;
engine->file.mmap = MAP_FAILED;
for (int n=0;n<MAX_MSG_POOL;n++){
bs_msg_buffer_t *msg;
s = bs_msg_buffer_create(&msg);
if (s)
fatal("", 0);
bs_msg_queue_push(&engine->queues.pool, msg);
}
return 0;
}
int bs_engine_destroy(bs_engine_t *engine){
if (engine){
engine->tool_flags |= BS_SHUTDOWN;
pthread_join(engine->threads.transfer, NULL);
pthread_join(engine->threads.sync, NULL);
pthread_join(engine->threads.analyze, NULL);
pthread_join(engine->threads.comm, NULL);
if (engine->tool_flags & BS_ALLOC)
free(engine);
}
return 0;
}
int bs_engine_open_file(bs_engine_t *engine){
struct stat _stat;
engine->file.handle = open( engine->file.filename, O_DIRECT | (engine->bs_flags & BS_SENDER ? O_RDONLY : (O_RDWR | O_CREAT)), S_IRWXU | S_IRWXG | S_IRWXO);
if (engine->file.handle < 0)
error(errno, "could not open %s\n", engine->file.filename);
#if 0 // fstat does not work on block devices !!!
if (fstat(engine->file.handle, &_stat))
error(errno, "could not stat file %s", engine->file.filename);
engine->file.size = _stat.st_size;
#else
engine->file.size = lseek( engine->file.handle, 0, SEEK_END);
lseek(engine->file.handle, 0, SEEK_SET);
#endif
return 0;
}
int bs_engine_truncate_file(bs_engine_t *engine, long length){
if (engine->file.handle < 0)
fatal("bs_engine_truncate_file(): file not opened\n", 0);
int s = ftruncate( engine->file.handle, length);
if (s)
error(s, "could not truncate file %s ( %s )\n", engine->file.filename, strerror(errno));
engine->file.size = length;
return 0;
}
int bs_engine_mmap_file(bs_engine_t *engine){
if (engine->file.handle <= 0)
bs_engine_open_file(engine);
if (engine->file.mmap && (MAP_FAILED != engine->file.mmap)) // already mapped
return 0;
engine->file.mmap = mmap(NULL, engine->file.size, engine->bs_flags & BS_SENDER ? PROT_READ : PROT_READ | PROT_WRITE, MAP_SHARED, engine->file.handle, 0);
if (MAP_FAILED == engine->file.mmap)
fatal("could not map file contents to memory (%s) (f=%d, size=%ld)\n", strerror(errno), engine->file.handle, engine->file.size);
return 0;
}
int bs_engine_unmap_file(bs_engine_t *engine){
if (!engine->file.mmap || (MAP_FAILED != engine->file.mmap)){
if (munmap(engine->file.mmap, engine->file.size))
error(errno, "could not unmap file %s\n", engine->file.filename);
engine->file.mmap = NULL;
}
return 0;
}
void bs_engine_dump(bs_engine_t *engine){
fprintf( stderr, "blksync engine dump:\n");
fprintf( stderr,"tool_flags=%x bs_flags=%x\n", engine->tool_flags, engine->bs_flags);
fprintf( stderr,"filename=%s\n", engine->file.filename);
fprintf( stderr,"filesize=%ld (0x%lx)\n", engine->file.size, engine->file.size);
fprintf( stderr,"hostname=%s\n", engine->comm.hostname);
fprintf( stderr,"blocksize=%d\n", engine->parameters.blocksize);
fprintf( stderr,"file=%d\n", engine->file);
fprintf( stderr,"listen=%d\n", engine->comm.listen);
fprintf( stderr,"remote=%d\n", engine->comm.remote);
fprintf( stderr,"\n");
fflush(stderr);
}

View File

@ -1,85 +0,0 @@
//
// Created by haraldwolff on 08.08.22.
//
#include <blksync.h>
#include <hash.h>
int merkle_build_base(bsync_engine_t *engine, merkle_t **_merkle);
int bs_merkle_build(bsync_engine_t *engine, merkle_t **_merkle){
int blocks = (engine->filesize + engine->blocksize - 1) / engine->blocksize;
int s;
s = merkle_create(_merkle, 2, blocks);
if (s<0)
return s;
merkle_t *merkle = *_merkle;
char *block = malloc(engine->blocksize);
if (block){
long offset = 0;
long residue = engine->filesize;
engine->t_start = time(NULL);
for (int nblock=0; nblock < blocks; nblock++){
int size = engine->blocksize > residue ? residue : engine->blocksize;
if (read_block(engine, offset, block, size)<0)
return -EIO;
uint64_t hash = dhash(block, size);
merkle_set(merkle, merkle->parameters.depth, nblock, hash);
time_t now = time(NULL);
if ((nblock == (blocks-1)) || (now != engine->t_last_update)) {
bs_time_t bst;
bs_time_set( &bst, nblock ? (now - engine->t_start) * (blocks - nblock) / nblock : 0);
engine->t_last_update = now;
int percentage = 100 * (nblock+1) / blocks;
fprintf(stdout, "\rhashing file: %d of %d blocks ( %d%% ) => %02d:%02d:%02d", (nblock+1), blocks, percentage, bst.hours, bst.minutes, bst.seconds);
fflush(stdout);
}
offset += size;
residue -= size;
}
fprintf(stdout, "\n");
fflush(stdout);
int srcsize = sizeof(uint64_t) * merkle->parameters.n;
for (int d=merkle->parameters.depth; d > 0; d--){
uint64_t *src, *dst;
int srclen, dstlen;
merkle_get_row(merkle, d , &src, &srclen);
merkle_get_row(merkle, d-1, &dst, &dstlen);
for (int n=0;n<dstlen;n++)
dst[n] = dhash((char*)&src[n * merkle->parameters.n], srcsize );
}
if (engine->tool_flags & BS_VERBOSE){
uint64_t hash;
for (int nblock=0; nblock < blocks; nblock++) {
merkle_get( merkle, merkle->parameters.depth, nblock, &hash);
fprintf(stderr, "Block #%06d: 0x%016lx\n", nblock, hash);
}
merkle_get( merkle, 0, 0, &hash);
fprintf(stderr, "Top: 0x%016lx\n", hash);
}
free(block);
return 0;
}
merkle_free(merkle);
return -ENOMEM;
}

78
src/bs_msg_queue.c 100644
View File

@ -0,0 +1,78 @@
//
// Created by haraldwolff on 13.08.22.
//
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <bs_msg.h>
#include <blksync.h>
void bs_msg_queue_init(bs_msg_queue_t *queue){
memset(queue, 0, sizeof(bs_msg_queue_t));
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->cond, NULL);
}
int bs_msg_queue_push(bs_msg_queue_t *queue, bs_msg_buffer_t* msg){
if (!queue || !msg)
return -EINVAL;
pthread_mutex_lock( &queue->mutex );
if (!queue->first)
{
queue->first = queue->last = msg;
} else {
queue->last->next = msg;
queue->last = msg;
}
msg->next = NULL;
pthread_cond_signal(&queue->cond);
pthread_mutex_unlock( &queue->mutex );
return 0;
}
int bs_msg_queue_pop(bs_msg_queue_t *queue, bs_msg_buffer_t** msg, bs_msg_queue_flags_t flags ){
if (!queue || !msg)
return -EINVAL;
pthread_mutex_lock( &queue->mutex );
do {
if (queue->first) {
*msg = queue->first;
queue->first = (*msg)->next;
if (!queue->first)
queue->last = NULL;
break;
} else if (flags & Q_WAIT) {
do {
pthread_cond_wait(&queue->cond, &queue->mutex);
} while (!queue->first);
continue;
} else {
pthread_mutex_unlock( &queue->mutex );
return -ENAVAIL;
}
} while (1);
pthread_mutex_unlock( &queue->mutex );
return 0;
}
int bs_msg_buffer_create(bs_msg_buffer_t **msg){
*msg = malloc(sizeof(bs_msg_buffer_t));
if (!*msg)
error(-ENOMEM, "could not allocate msg buffers\n", 0);
memset( *msg, 0, sizeof(*msg) );
(*msg)->payload = malloc(MAX_MSG_PAYLOAD_LENGTH);
if (!(*msg)->payload) {
free(*msg);
error(-ENOMEM, "out of memory while allocating message buffers.\n", 0);
}
return 0;
}

30
src/bs_progress.c 100644
View File

@ -0,0 +1,30 @@
//
// Created by haraldwolff on 13.08.22.
//
#include <bs_tools.h>
#include <bs_progress.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
int bs_progress_update(bs_engine_t *engine, bs_progress_steps_t step, long progress, long distance){
time_t now = time(NULL);
if (step != engine->progress.step){
engine->progress.step = step;
engine->progress.step_started = now;
}
if (engine->progress.last_screen_refresh != now) {
engine->progress.last_screen_refresh = now;
bs_time_t r;
bs_time_set(&r, progress ? (now - engine->progress.step_started) * (distance - progress) / progress : 0);
fprintf(stderr, "\rProgress: STEP: %x Progress=%ld Distance=%ld (%d%%) needs %02d:%02d:%02d", step, progress,
distance, distance ? (int) (100 * progress / distance) : 100, r.hours, r.minutes, r.seconds);
fflush(stderr);
}
return 0;
}

View File

@ -5,7 +5,7 @@
#include <blksync.h>
#include <hash.h>
#include <time.h>
/*
int bs_recv_linear_tx(bsync_engine_t *engine);
int bs_recv_linear(bsync_engine_t *engine){
char *block = malloc(engine->blocksize);
@ -26,7 +26,7 @@ int bs_recv_linear(bsync_engine_t *engine){
}
while (-1){
if (bs_recv( engine->clientSocket, &offset, sizeof(offset))==-1){
if (bs_recv(engine->remote, &offset, sizeof(offset)) == -1){
fprintf(stderr, "could not receive offset\n");
exit(EXIT_FAILURE);
}
@ -37,7 +37,7 @@ int bs_recv_linear(bsync_engine_t *engine){
residual = engine->filesize - offset;
int32_t size = residual > engine->blocksize ? engine->blocksize : residual;
if (bs_recv( engine->clientSocket, block, size)==-1){
if (bs_recv(engine->remote, block, size) == -1){
fprintf(stderr, "could not receive block at offset 0x%lx\n", offset);
exit(EXIT_FAILURE);
}
@ -85,8 +85,8 @@ int bs_recv_linear_tx(bsync_engine_t *engine){
int percent = 100 * offset / engine->filesize;
if (
(bs_send(engine->clientSocket, &offset, sizeof(offset)) == -1) ||
(bs_send( engine->clientSocket, &blockhash, sizeof(blockhash))==-1)
(bs_send(engine->remote, &offset, sizeof(offset)) == -1) ||
(bs_send(engine->remote, &blockhash, sizeof(blockhash)) == -1)
)
{
fprintf(stderr, "could not send block hash at offset 0x%lx\n", offset);
@ -104,7 +104,7 @@ int bs_recv_linear_tx(bsync_engine_t *engine){
}
offset = -1l;
if (bs_send(engine->clientSocket, &offset, sizeof(offset)) == -1)
if (bs_send(engine->remote, &offset, sizeof(offset)) == -1)
{
fprintf(stderr, "could not send end mark\n");
exit(EXIT_FAILURE);
@ -117,4 +117,6 @@ int bs_recv_linear_tx(bsync_engine_t *engine){
}
return 0;
}
}
*/

View File

@ -6,6 +6,7 @@
#include <hash.h>
#include <time.h>
/*
int bs_recv_merkle(bsync_engine_t *engine) {
int s;
uint64_t hash;
@ -21,15 +22,15 @@ int bs_recv_merkle(bsync_engine_t *engine) {
}
merkle_get(merkle, 0, 0, &hash);
if (bs_send(engine->clientSocket, &hash, sizeof(uint64_t)) < 0)
if (bs_send(engine->remote, &hash, sizeof(uint64_t)) < 0)
return -EIO;
int d,n;
while (-1) {
if (
(bs_recv(engine->clientSocket, &d, sizeof(int))<0) ||
(bs_recv(engine->clientSocket, &n, sizeof(int))<0)
(bs_recv(engine->remote, &d, sizeof(int)) < 0) ||
(bs_recv(engine->remote, &n, sizeof(int)) < 0)
){
fprintf(stderr, "failed to read tree hash request.\n");
return -EIO;
@ -41,7 +42,7 @@ int bs_recv_merkle(bsync_engine_t *engine) {
uint64_t *row;
merkle_get_row( merkle, d+1, &row, NULL);
if (bs_send(engine->clientSocket, &row[n * merkle->parameters.n], merkle->parameters.n * sizeof(uint64_t))<0)
if (bs_send(engine->remote, &row[n * merkle->parameters.n], merkle->parameters.n * sizeof(uint64_t)) < 0)
{
fprintf(stderr, "failed to send tree hashes for d=%d n=%d\n", d, n);
return -EIO;
@ -56,7 +57,7 @@ int bs_recv_merkle(bsync_engine_t *engine) {
long offset = 0;
while (offset != -1){
if (bs_recv(engine->clientSocket, &offset, sizeof(offset)) < 0){
if (bs_recv(engine->remote, &offset, sizeof(offset)) < 0){
fprintf(stderr, "could not receive block offset\n");
return -EIO;
}
@ -67,7 +68,7 @@ int bs_recv_merkle(bsync_engine_t *engine) {
long residue = engine->filesize - offset;
int size = residue > engine->blocksize ? engine->blocksize : residue;
if (bs_recv(engine->clientSocket, block, size) < 0){
if (bs_recv(engine->remote, block, size) < 0){
fprintf(stderr, "could not receive block at 0x%016lx\n", offset);
return -EIO;
}
@ -83,3 +84,4 @@ int bs_recv_merkle(bsync_engine_t *engine) {
merkle_free(merkle);
return 0;
}
*/

View File

@ -6,6 +6,7 @@
#include <hash.h>
#include <time.h>
/*
int bs_send_linear(bsync_engine_t *engine){
char *block = malloc(engine->blocksize);
if (!block)
@ -22,7 +23,7 @@ int bs_send_linear(bsync_engine_t *engine){
}
while (-1){
if (bs_recv( engine->clientSocket, &offset, sizeof(offset))==-1){
if (bs_recv(engine->remote, &offset, sizeof(offset)) == -1){
fprintf(stderr, "could not receive offset\n");
exit(EXIT_FAILURE);
}
@ -33,7 +34,7 @@ int bs_send_linear(bsync_engine_t *engine){
int32_t size = residual > engine->blocksize ? engine->blocksize : residual;
uint64_t blockhash, remotehash;
if (bs_recv( engine->clientSocket, &remotehash, sizeof(remotehash))==-1){
if (bs_recv(engine->remote, &remotehash, sizeof(remotehash)) == -1){
fprintf(stderr, "could not receive block hash at offset 0x%lx\n", offset);
exit(EXIT_FAILURE);
}
@ -46,14 +47,12 @@ int bs_send_linear(bsync_engine_t *engine){
int percent = 100 * offset / engine->filesize;
if (remotehash != blockhash){
/*
fprintf(stdout, "offset: %d bytes @ 0x%lx (%d%%) [0x%lx] differs \n", size, offset, percent, blockhash);
fflush(stdout);
*/
if (
(bs_send( engine->clientSocket, &offset, sizeof(offset))==-1) ||
(bs_send( engine->clientSocket, block, size)==-1)
(bs_send(engine->remote, &offset, sizeof(offset)) == -1) ||
(bs_send(engine->remote, block, size) == -1)
){
fprintf(stderr,"could not send block at offset 0x%lx\n", offset);
fprintf(stderr,"errno=%d (%s)", errno, strerror(errno));
@ -74,7 +73,7 @@ int bs_send_linear(bsync_engine_t *engine){
}
offset = -1l;
if (bs_send(engine->clientSocket, &offset, sizeof(offset)) == -1)
if (bs_send(engine->remote, &offset, sizeof(offset)) == -1)
{
fprintf(stderr, "could not send end mark\n");
exit(EXIT_FAILURE);
@ -85,3 +84,4 @@ int bs_send_linear(bsync_engine_t *engine){
fprintf( stdout, "\n");
}
*/

View File

@ -6,6 +6,8 @@
#include <hash.h>
#include <time.h>
/*
int sync_merkle(bsync_engine_t *engine, merkle_t* merkle, merkle_t *remote, int d, int n);
int bs_send_merkle(bsync_engine_t *engine){
@ -25,7 +27,7 @@ int bs_send_merkle(bsync_engine_t *engine){
fprintf(stdout, "synchronize merkle trees...\n");
fflush(stdout);
if (bs_recv(engine->clientSocket, &hash, sizeof(uint64_t)) < 0)
if (bs_recv(engine->remote, &hash, sizeof(uint64_t)) < 0)
return -EIO;
merkle_set(remote_merkle, 0, 0, hash);
@ -44,8 +46,8 @@ int bs_send_merkle(bsync_engine_t *engine){
s = -1;
if (
(bs_send(engine->clientSocket, &s, sizeof(int))<0) ||
(bs_send(engine->clientSocket, &s, sizeof(int))<0)
(bs_send(engine->remote, &s, sizeof(int)) < 0) ||
(bs_send(engine->remote, &s, sizeof(int)) < 0)
){
fprintf(stderr, "failed to send end-of-sync mark\n");
return -EIO;
@ -86,8 +88,8 @@ int bs_send_merkle(bsync_engine_t *engine){
}
if (
(bs_send(engine->clientSocket, &offset, sizeof(offset)) < 0) ||
(bs_send(engine->clientSocket, block, size) < 0)
(bs_send(engine->remote, &offset, sizeof(offset)) < 0) ||
(bs_send(engine->remote, block, size) < 0)
){
fprintf(stderr, "could not send block at 0x%016lx\n", offset);
return -EIO;
@ -108,7 +110,7 @@ int bs_send_merkle(bsync_engine_t *engine){
free(block);
hash = -1;
bs_send(engine->clientSocket, &hash, sizeof(hash));
bs_send(engine->remote, &hash, sizeof(hash));
merkle_free(merkle);
merkle_free(remote_merkle);
@ -130,15 +132,15 @@ int sync_merkle(bsync_engine_t *engine, merkle_t* merkle, merkle_t *remote, int
if (mine != yours){
if (
(bs_send(engine->clientSocket, &d, sizeof(int))<0) ||
(bs_send(engine->clientSocket, &n, sizeof(int))<0)
(bs_send(engine->remote, &d, sizeof(int)) < 0) ||
(bs_send(engine->remote, &n, sizeof(int)) < 0)
){
fprintf(stderr, "failed to request tree hashes for d=%d n=%d\n", d, n);
return -EIO;
}
merkle_get_row(remote, d+1, &nextrow, NULL);
if (bs_recv(engine->clientSocket, &nextrow[n*(remote->parameters.n)], remote->parameters.n * sizeof(uint64_t))<0)
if (bs_recv(engine->remote, &nextrow[n * (remote->parameters.n)], remote->parameters.n * sizeof(uint64_t)) < 0)
{
fprintf(stderr, "failed to receive tree hashes for d=%d n=%d\n", d, n);
return -EIO;
@ -153,4 +155,6 @@ int sync_merkle(bsync_engine_t *engine, merkle_t* merkle, merkle_t *remote, int
}
}
return 0;
}
}
*/

View File

@ -2,7 +2,7 @@
// Created by haraldwolff on 08.08.22.
//
#include <blksync.h>
/*
int bs_receiver(bsync_engine_t *engine){
if (engine->remote_filesize != engine->filesize){
@ -38,3 +38,4 @@ int bs_sender(bsync_engine_t *engine){
return 0;
}
*/

190
src/bs_sync.c 100644
View File

@ -0,0 +1,190 @@
//
// Created by haraldwolff on 14.08.22.
//
#include <blksync.h>
#include <bs_engine.h>
#include <bs_sync.h>
#include <bs_comm.h>
#include <bs_transfer.h>
int bs_sync_request(bs_engine_t *engine, int level, int start, int length);
int bs_sync_thread_rx(bs_engine_t *engine);
int bs_sync_start(bs_engine_t *engine){
if (pthread_create(&engine->threads.sync, NULL, (void * (*)(void *)) bs_sync_thread, engine))
fatal("could not create sync thread.\n", 0);
return 0;
}
int bs_sync_thread(bs_engine_t *engine) {
pthread_t thread_rx;
engine->state_flags |= BSS_SYNC;
if (engine->bs_flags & BS_SENDER) { // Sender instance
pthread_create(&thread_rx, NULL, (void * (*)(void *))bs_sync_thread_rx, engine);
for (int n = 0; n < engine->merkle.local->parameters.depth; n++) {
uint64_t *local_row, *remote_row;
int local_row_length, remote_row_length;
if (
(merkle_get_row(engine->merkle.local, n, &local_row, &local_row_length) < 0) ||
(merkle_get_row(engine->merkle.remote, n, &remote_row, &remote_row_length) < 0)
)
fatal("could not get merkle tree local_row %d", n);
if (n == 0){
if (bs_sync_request(engine, 0, 0, 1))
fatal("failed to request top sync\n", 0);
pthread_mutex_lock(&engine->merkle.mutex);
engine->merkle.waiting_syncs++;
pthread_mutex_unlock(&engine->merkle.mutex);
} else {
for (int start = 0; start < local_row_length; start++) {
int length;
for (length = 0; ((start+length) < local_row_length) && (length < (512 / engine->merkle.local->parameters.n)); length++){
if (!remote_row[start + length] || (local_row[start + length] == remote_row[start + length] ))
break;
}
if (length){
if (bs_sync_request(engine, n, start, length))
fatal("could not send sync request for n=%d start=%d\n", n, start);
pthread_mutex_lock(&engine->merkle.mutex);
engine->merkle.waiting_syncs++;
pthread_mutex_unlock(&engine->merkle.mutex);
start += length;
}
}
}
bs_progress_update(engine, BS_PRG_SYNC, 1, engine->merkle.waiting_syncs);
pthread_mutex_lock(&engine->merkle.mutex);
if (engine->merkle.waiting_syncs)
pthread_cond_wait( &engine->merkle.sync, &engine->merkle.mutex);
pthread_mutex_unlock(&engine->merkle.mutex);
}
bs_sync_request(engine, -1, -1, -1);
pthread_join(thread_rx, NULL);
} else { // Receiver instance
bs_msg_buffer_t *msg;
while (-1){
if (bs_msg_queue_pop(&engine->queues.sync, &msg, Q_WAIT))
fatal("could not pop message from sync pool\n", 0);
/*
fprintf(stdout, "tree sync rq d=%d s=%d l=%d\n", msg->sync.level, msg->sync.start, msg->sync.length);
fflush(stdout);
*/
if ((msg->msg_type == (BS_MSG_MERKLE_SYNC | BS_MSG_REQUEST)) && (msg->sync.length <= (512 / engine->merkle.local->parameters.n))){
uint64_t *local_row;
int local_row_length;
if (msg->sync.level == -1){
msg->msg_type = BS_MSG_MERKLE_SYNC | BS_MSG_RESPONSE;
bs_comm_send_msg(engine, msg);
break;
}
merkle_get_row(engine->merkle.local, msg->sync.level+1, &local_row, &local_row_length);
for (int p=0;p<msg->sync.length;p++){
for (int n=0;n<engine->merkle.local->parameters.n;n++){
int row_index = ((msg->sync.start + p) * engine->merkle.local->parameters.n) + n;
int msg_index = (p * engine->merkle.local->parameters.n) + n;
msg->payload_ui64[msg_index] = local_row[row_index];
}
}
msg->msg_type = (BS_MSG_MERKLE_SYNC | BS_MSG_RESPONSE);
msg->sync.level++;
msg->sync.start = msg->sync.start * engine->merkle.local->parameters.n;
msg->sync.length = msg->sync.length * engine->merkle.local->parameters.n;
msg->payload_length = sizeof(uint64_t) * msg->sync.length;
bs_comm_send_msg(engine, msg);
} else {
bs_msg_queue_push(&engine->queues.pool, msg);
}
}
}
engine->state_flags &= BSS_SYNC;
fprintf(stdout, "finished tree sync.\n");
fflush(stdout);
bs_transfer_start(engine);
return 0;
}
int bs_sync_request(bs_engine_t *engine, int level, int start, int length){
bs_msg_buffer_t *msg = NULL;
if (!msg)
{
if (bs_msg_queue_pop( &engine->queues.pool, &msg, Q_WAIT ))
fatal("could not fetch message buffer form pool.\n", 0);
}
msg->msg_type = BS_MSG_MERKLE_SYNC | BS_MSG_REQUEST;
msg->msg_length = sizeof(bs_msg_sync_t);
msg->sync.level = level;
msg->sync.start = start;
msg->sync.length = length;
msg->payload_length = 0;
/*
fprintf(stdout, "tree sync rq d=%d s=%d l=%d\n", msg->sync.level, msg->sync.start, msg->sync.length);
fflush(stdout);
*/
if (bs_comm_send_msg(engine, msg))
fatal("could not send message\n", 0);
return 0;
}
int bs_sync_thread_rx(bs_engine_t *engine){
bs_msg_buffer_t *msg;
while (engine->state_flags & BSS_SYNC){
if (!bs_msg_queue_pop(&engine->queues.sync, &msg, Q_WAIT)){
if (msg->msg_type == (BS_MSG_MERKLE_SYNC|BS_MSG_RESPONSE)){
uint64_t *remote_row;
int remote_row_length;
/*
fprintf(stdout, "tree rx d=%d s=%d l=%d\n", msg->sync.level, msg->sync.start, msg->sync.length);
fflush(stdout);
*/
if (msg->sync.level == -1)
break;
merkle_get_row(engine->merkle.remote, msg->sync.level, &remote_row, &remote_row_length);
if (msg->sync.start + msg->sync.length <= remote_row_length) {
for (int p = 0; p < msg->sync.length; p++)
remote_row[msg->sync.start + p] = msg->payload_ui64[p];
}
pthread_mutex_lock(&engine->merkle.mutex);
engine->merkle.waiting_syncs--;
if (!engine->merkle.waiting_syncs)
pthread_cond_signal(&engine->merkle.sync);
pthread_mutex_unlock(&engine->merkle.mutex);
}
bs_msg_queue_push(&engine->queues.pool, msg);
}
}
return 0;
}

View File

@ -5,7 +5,7 @@
#include <blksync.h>
#include <hash.h>
/*
int bs_test(bsync_engine_t *engine){
if (!(engine->blocksize))
engine->blocksize = defaultBlocksize;
@ -44,4 +44,7 @@ int bs_test(bsync_engine_t *engine){
}
return 0;
}
}
*/

View File

@ -8,6 +8,8 @@
#include <sys/types.h>
#include <unistd.h>
#include <bs_types.h>
char* units[] = { "B", "kB", "MB", "GB", "TB" };
@ -31,7 +33,7 @@ void bs_time_set(bs_time_t* dst, int seconds){
dst->minutes = (seconds % 3600) / 60;
dst->seconds = seconds % 60;
}
/*
int read_block(bsync_engine_t *engine, long offset, char *block, int blocksize){
while (pthread_mutex_lock(&engine->m_file));
@ -149,3 +151,4 @@ int dump_engine_state(bsync_engine_t *engine){
engine->t_last_update = now;
}
*/

88
src/bs_transfer.c 100644
View File

@ -0,0 +1,88 @@
//
// Created by haraldwolff on 15.08.22.
//
#include <bs_transfer.h>
#include <bs_comm.h>
#include <blksync.h>
int bs_transfer_start(bs_engine_t *engine){
if (pthread_create(&engine->threads.transfer, NULL, (void * (*)(void *)) bs_transfer_thread, engine))
fatal("could not create transfer thread.\n", 0);
return 0;
}
int bs_transfer_thread(bs_engine_t *engine){
bs_msg_buffer_t *msg;
int count_blocks = (int)(engine->file.size / engine->parameters.blocksize);
engine->state_flags |= BSS_TRANSFER;
if (bs_engine_mmap_file(engine))
fatal("could not mmap file %s\n", engine->file.filename);
if (engine->bs_flags & BS_SENDER){
uint64_t *local_hashes, *remote_hashes;
int local_hashes_length, remote_hashes_length;
merkle_get_row(engine->merkle.local, engine->merkle.local->parameters.depth, &local_hashes, &local_hashes_length);
merkle_get_row(engine->merkle.remote, engine->merkle.remote->parameters.depth, &remote_hashes, &remote_hashes_length);
fprintf(stdout, "transfering blocks (%d / %d / %d)\n", count_blocks, local_hashes_length, remote_hashes_length);
fflush(stdout);
for (int n=0;n<count_blocks;n++) {
uint64_t local_hash = local_hashes[n];
uint64_t remote_hash = remote_hashes[n];
if (remote_hash && (remote_hash != local_hash)){
if (bs_msg_queue_pop(&engine->queues.pool, &msg, Q_WAIT))
fatal("could not pop message from pool\n", 0);
long offset = n * engine->parameters.blocksize;
int size = engine->parameters.blocksize;
if ((offset + size) > engine->file.size)
size = engine->file.size - offset;
msg->msg_type = BS_MSG_BLOCKDATA | BS_MSG_RESPONSE;
msg->msg_length = sizeof(bs_msg_transfer_t);
msg->transfer.blockno = n;
msg->payload_length = size;
memcpy(msg->payload, &engine->file.mmap[offset], size);
bs_comm_send_msg(engine, msg);
bs_progress_update(engine, BSS_TRANSFER, n, count_blocks);
}
}
if (bs_msg_queue_pop(&engine->queues.pool, &msg, Q_WAIT))
fatal("could not pop message from pool\n", 0);
msg->msg_type = BS_MSG_GOODBYE;
msg->msg_length = 0;
msg->payload_length = 0;
bs_comm_send_msg(engine, msg);
} else {
while (!(engine->tool_flags & BS_SHUTDOWN)){
if (bs_msg_queue_pop(&engine->queues.transfer, &msg, Q_WAIT))
fatal("could not receive transfer messge\n", 0);
if (msg->msg_type == (BS_MSG_BLOCKDATA | BS_MSG_RESPONSE)){
long offset = msg->transfer.blockno * engine->parameters.blocksize;
memcpy(&engine->file.mmap[offset], msg->payload, msg->payload_length);
}
bs_msg_queue_push(&engine->queues.pool, msg);
bs_progress_update(engine, BSS_TRANSFER, msg->transfer.blockno, count_blocks);
}
}
if (bs_engine_unmap_file(engine))
fatal("could not unmap file %s\n", engine->file.filename);
engine->state_flags &= ~BSS_TRANSFER;
engine->tool_flags |= BS_SHUTDOWN;
}

View File

@ -142,7 +142,7 @@ int merkle_dump(merkle_t *merkle){
for (int d=0; d <= merkle->parameters.depth; d++)
{
merkle_get_row( merkle, d, &row, &rowlength);
for (int p=0;p<rowlength;p++)
for (int p=0;( p < rowlength) && (p < 8);p++)
{
fprintf(stderr, "0x%016lx ", row[p]);
}

70
src/test_mmap.c 100644
View File

@ -0,0 +1,70 @@
//
// Created by haraldwolff on 15.08.22.
//
#define _GNU_SOURCE
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
int main(int argc, char *argv[]){
long size;
argv++;
struct stat stat;
while (*argv){
fprintf(stdout, "Testing to map %s\n", *argv);
fflush(stdout);
int f = open(*argv, O_RDWR | O_DIRECT);
if (f<0)
{
fprintf(stderr, "could not open %s. %s\n", *argv, strerror(errno));
fflush(stderr);
} else {
if (fstat(f, &stat)){
fprintf(stderr, "could not stat %s. %s\n", *argv, strerror(errno));
fflush(stderr);
} else {
if ((stat.st_mode & S_IFMT)==S_IFBLK){
fprintf(stdout,"is a block device.\n");
long offset = lseek(f, 0, SEEK_END);
fprintf(stdout, "device size seems to be: %ld\n\n", offset);
size = offset;
} else {
fprintf(stdout,"size: %d bytes.\n\n", stat.st_size);
size = stat.st_size;
}
fflush(stdout);
void *mapping = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, f, 0);
if (mapping == MAP_FAILED){
fprintf(stderr, "could not mmap %s. %s\n", *argv, strerror(errno));
fflush(stderr);
} else {
fprintf(stdout, "successfully mapped %s to memory.\n\n\n", *argv);
munmap(mapping, 8192);
}
}
close(f);
}
argv++;
}
return 0;
}