sparse-tools/src/bs_analyze.c

144 lines
3.9 KiB
C

//
// Created by haraldwolff on 08.08.22.
//
#include <blksync.h>
#include <hash.h>
#include <bs_engine.h>
#include <bs_analyze.h>
#include <bs_sync.h>
struct analyzer_thread_parm {
bs_engine_t *engine;
pthread_spinlock_t spinlock;
int blocks;
long next_offset;
};
int bs_analyzer_thread(struct analyzer_thread_parm *atp);
int bs_analyze_start(bs_engine_t *engine){
if (pthread_create(&engine->threads.analyze, NULL, (void * (*)(void *))bs_analyze_thread, engine))
fatal("could not create analyzer thread.\n", 0);
return 0;
}
int bs_analyze_thread(bs_engine_t *engine){
pthread_t analyzers[4];
engine->state_flags |= BSS_ANALYZE;
if (bs_engine_mmap_file(engine))
fatal("could not map file %s", engine->file.filename);
if (!engine->parameters.blocksize)
engine->parameters.blocksize = 4096;
struct analyzer_thread_parm atp;
atp.engine = engine;
pthread_spin_init(&atp.spinlock, PTHREAD_PROCESS_PRIVATE );
atp.next_offset = 0;
atp.blocks = (int)(((long)engine->file.size + (long)engine->parameters.blocksize - 1) / engine->parameters.blocksize);
int s = merkle_create(&engine->merkle.local, 2, atp.blocks);
if (s<0)
fatal("failed to allocate merkle tree (local)\n", 0);
s = merkle_create(&engine->merkle.remote, 2, atp.blocks);
if (s<0)
fatal("failed to allocate merkle tree (remote)\n", 0);
for (int n=0;n<4;n++)
pthread_create(&analyzers[n], NULL, (void * (*)(void *))bs_analyzer_thread, &atp);
while ((atp.next_offset < engine->file.size) && (engine->state_flags & BSS_ANALYZE)){
bs_progress_update(engine, BS_PRG_ANALYZE, atp.next_offset, engine->file.size);
sleep(1);
}
for (int depth=engine->merkle.local->parameters.depth; depth > 0; depth--){
uint64_t *bottom_row, *upper_row;
int bottom_row_length, upper_row_length;
merkle_get_row(engine->merkle.local, depth, &bottom_row, &bottom_row_length);
merkle_get_row(engine->merkle.local, depth-1, &upper_row, &upper_row_length);
for (int p=0; p < upper_row_length; p++)
upper_row[p] = dhash(&bottom_row[p * engine->merkle.local->parameters.n], sizeof(uint64_t) * engine->merkle.local->parameters.n);
}
bs_progress_update(engine, BS_PRG_ANALYZE, atp.next_offset, engine->file.size);
engine->state_flags &= ~BSS_ANALYZE;
for (int n=0;n<4;n++)
pthread_join(analyzers[n], NULL);
bs_engine_unmap_file(engine);
//merkle_dump(engine->merkle.local);
bs_sync_start(engine);
return 0;
}
int bs_analyzer_thread(struct analyzer_thread_parm *atp){
long offset;
while ((atp->next_offset < atp->engine->file.size) && (atp->engine->state_flags & BSS_ANALYZE)) {
pthread_spin_lock(&atp->spinlock);
offset = atp->next_offset;
if (offset < atp->engine->file.size) {
atp->next_offset += atp->engine->parameters.blocksize;
}
pthread_spin_unlock(&atp->spinlock);
if (offset >= atp->engine->file.size)
break;
long size = atp->engine->file.size - offset;
if (size > atp->engine->parameters.blocksize)
size = atp->engine->parameters.blocksize;
uint64_t hash = dhash(&atp->engine->file.mmap[offset], size);
merkle_set(atp->engine->merkle.local, atp->engine->merkle.local->parameters.depth, (int)(offset / atp->engine->parameters.blocksize), hash);
}
return 0;
}
/*
int bs_merkle_build(bsync_engine_t *engine, merkle_t **_merkle){
int srcsize = sizeof(uint64_t) * merkle->parameters.n;
for (int d=merkle->parameters.depth; d > 0; d--){
uint64_t *src, *dst;
int srclen, dstlen;
merkle_get_row(merkle, d , &src, &srclen);
merkle_get_row(merkle, d-1, &dst, &dstlen);
for (int n=0;n<dstlen;n++)
dst[n] = dhash((char*)&src[n * merkle->parameters.n], srcsize );
}
if (engine->tool_flags & BS_VERBOSE){
uint64_t hash;
for (int nblock=0; nblock < blocks; nblock++) {
merkle_get( merkle, merkle->parameters.depth, nblock, &hash);
fprintf(stderr, "Block #%06d: 0x%016lx\n", nblock, hash);
}
merkle_get( merkle, 0, 0, &hash);
fprintf(stderr, "Top: 0x%016lx\n", hash);
}
free(block);
return 0;
}
merkle_free(merkle);
return -ENOMEM;
}
*/