sparse-tools/src/bs_engine.c

146 lines
3.9 KiB
C

//
// Created by haraldwolff on 13.08.22.
//
#define _GNU_SOURCE
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include "blksync.h"
#include "bs_engine.h"
int bs_engine_create(bs_engine_t **engine){
*engine = malloc(sizeof(bs_engine_t));
if (!*engine)
return -ENOMEM;
int s = bs_engine_init(*engine);
(*engine)->tool_flags |= BS_ALLOC;
return s;
}
int bs_engine_init(bs_engine_t *engine){
int s;
memset(engine, 0, sizeof(bs_engine_t));
pthread_mutex_init(&engine->comm.mutex, NULL);
pthread_mutex_init(&engine->file.mutex, NULL);
pthread_mutex_init(&engine->merkle.mutex, NULL);
pthread_cond_init( &engine->merkle.sync, NULL);
bs_msg_queue_init(&engine->queues.pool);
bs_msg_queue_init(&engine->queues.progress);
bs_msg_queue_init(&engine->queues.sync);
bs_msg_queue_init(&engine->queues.transfer);
engine->comm.af_family = AF_UNSPEC;
engine->comm.port = 13487;
engine->file.mmap = MAP_FAILED;
for (int n=0;n<MAX_MSG_POOL;n++){
bs_msg_buffer_t *msg;
s = bs_msg_buffer_create(&msg);
if (s)
fatal("", 0);
bs_msg_queue_push(&engine->queues.pool, msg);
}
return 0;
}
int bs_engine_destroy(bs_engine_t *engine){
if (engine){
engine->tool_flags |= BS_SHUTDOWN;
pthread_join(engine->threads.transfer, NULL);
pthread_join(engine->threads.sync, NULL);
pthread_join(engine->threads.analyze, NULL);
pthread_join(engine->threads.comm, NULL);
if (engine->tool_flags & BS_ALLOC)
free(engine);
}
return 0;
}
int bs_engine_open_file(bs_engine_t *engine){
struct stat _stat;
engine->file.handle = open( engine->file.filename, O_DIRECT | (engine->bs_flags & BS_SENDER ? O_RDONLY : (O_RDWR | O_CREAT)), S_IRWXU | S_IRWXG | S_IRWXO);
if (engine->file.handle < 0)
error(errno, "could not open %s\n", engine->file.filename);
#if 0 // fstat does not work on block devices !!!
if (fstat(engine->file.handle, &_stat))
error(errno, "could not stat file %s", engine->file.filename);
engine->file.size = _stat.st_size;
#else
engine->file.size = lseek( engine->file.handle, 0, SEEK_END);
lseek(engine->file.handle, 0, SEEK_SET);
#endif
return 0;
}
int bs_engine_truncate_file(bs_engine_t *engine, long length){
if (engine->file.handle < 0)
fatal("bs_engine_truncate_file(): file not opened\n", 0);
int s = ftruncate( engine->file.handle, length);
if (s)
error(s, "could not truncate file %s ( %s )\n", engine->file.filename, strerror(errno));
engine->file.size = length;
return 0;
}
int bs_engine_mmap_file(bs_engine_t *engine){
if (engine->file.handle <= 0)
bs_engine_open_file(engine);
if (engine->file.mmap && (MAP_FAILED != engine->file.mmap)) // already mapped
return 0;
engine->file.mmap = mmap(NULL, engine->file.size, engine->bs_flags & BS_SENDER ? PROT_READ : PROT_READ | PROT_WRITE, MAP_SHARED, engine->file.handle, 0);
if (MAP_FAILED == engine->file.mmap)
fatal("could not map file contents to memory (%s) (f=%d, size=%ld)\n", strerror(errno), engine->file.handle, engine->file.size);
return 0;
}
int bs_engine_unmap_file(bs_engine_t *engine){
if (!engine->file.mmap || (MAP_FAILED != engine->file.mmap)){
if (munmap(engine->file.mmap, engine->file.size))
error(errno, "could not unmap file %s\n", engine->file.filename);
engine->file.mmap = NULL;
}
return 0;
}
void bs_engine_dump(bs_engine_t *engine){
fprintf( stderr, "blksync engine dump:\n");
fprintf( stderr,"tool_flags=%x bs_flags=%x\n", engine->tool_flags, engine->bs_flags);
fprintf( stderr,"filename=%s\n", engine->file.filename);
fprintf( stderr,"filesize=%ld (0x%lx)\n", engine->file.size, engine->file.size);
fprintf( stderr,"hostname=%s\n", engine->comm.hostname);
fprintf( stderr,"blocksize=%d\n", engine->parameters.blocksize);
fprintf( stderr,"file=%d\n", engine->file);
fprintf( stderr,"listen=%d\n", engine->comm.listen);
fprintf( stderr,"remote=%d\n", engine->comm.remote);
fprintf( stderr,"\n");
fflush(stderr);
}