// // Created by haraldwolff on 08.08.22. // #include #include #include #include #include struct analyzer_thread_parm { bs_engine_t *engine; pthread_spinlock_t spinlock; int blocks; long next_offset; }; int bs_analyzer_thread(struct analyzer_thread_parm *atp); int bs_analyze_start(bs_engine_t *engine){ if (pthread_create(&engine->threads.analyze, NULL, (void * (*)(void *))bs_analyze_thread, engine)) fatal("could not create analyzer thread.\n", 0); return 0; } int bs_analyze_thread(bs_engine_t *engine){ pthread_t analyzers[4]; engine->state_flags |= BSS_ANALYZE; if (bs_engine_mmap_file(engine)) fatal("could not map file %s", engine->file.filename); if (!engine->parameters.blocksize) engine->parameters.blocksize = 4096; struct analyzer_thread_parm atp; atp.engine = engine; pthread_spin_init(&atp.spinlock, PTHREAD_PROCESS_PRIVATE ); atp.next_offset = 0; atp.blocks = (int)(((long)engine->file.size + (long)engine->parameters.blocksize - 1) / engine->parameters.blocksize); int s = merkle_create(&engine->merkle.local, 2, atp.blocks); if (s<0) fatal("failed to allocate merkle tree (local)\n", 0); s = merkle_create(&engine->merkle.remote, 2, atp.blocks); if (s<0) fatal("failed to allocate merkle tree (remote)\n", 0); for (int n=0;n<4;n++) pthread_create(&analyzers[n], NULL, (void * (*)(void *))bs_analyzer_thread, &atp); while ((atp.next_offset < engine->file.size) && (engine->state_flags & BSS_ANALYZE)){ bs_progress_update(engine, BS_PRG_ANALYZE, atp.next_offset, engine->file.size); sleep(1); } for (int depth=engine->merkle.local->parameters.depth; depth > 0; depth--){ uint64_t *bottom_row, *upper_row; int bottom_row_length, upper_row_length; merkle_get_row(engine->merkle.local, depth, &bottom_row, &bottom_row_length); merkle_get_row(engine->merkle.local, depth-1, &upper_row, &upper_row_length); for (int p=0; p < upper_row_length; p++) upper_row[p] = dhash(&bottom_row[p * engine->merkle.local->parameters.n], sizeof(uint64_t) * engine->merkle.local->parameters.n); } bs_progress_update(engine, BS_PRG_ANALYZE, atp.next_offset, engine->file.size); engine->state_flags &= ~BSS_ANALYZE; for (int n=0;n<4;n++) pthread_join(analyzers[n], NULL); bs_engine_unmap_file(engine); //merkle_dump(engine->merkle.local); bs_sync_start(engine); return 0; } int bs_analyzer_thread(struct analyzer_thread_parm *atp){ long offset; while ((atp->next_offset < atp->engine->file.size) && (atp->engine->state_flags & BSS_ANALYZE)) { pthread_spin_lock(&atp->spinlock); offset = atp->next_offset; if (offset < atp->engine->file.size) { atp->next_offset += atp->engine->parameters.blocksize; } pthread_spin_unlock(&atp->spinlock); if (offset >= atp->engine->file.size) break; long size = atp->engine->file.size - offset; if (size > atp->engine->parameters.blocksize) size = atp->engine->parameters.blocksize; uint64_t hash = dhash(&atp->engine->file.mmap[offset], size); merkle_set(atp->engine->merkle.local, atp->engine->merkle.local->parameters.depth, (int)(offset / atp->engine->parameters.blocksize), hash); } return 0; } /* int bs_merkle_build(bsync_engine_t *engine, merkle_t **_merkle){ int srcsize = sizeof(uint64_t) * merkle->parameters.n; for (int d=merkle->parameters.depth; d > 0; d--){ uint64_t *src, *dst; int srclen, dstlen; merkle_get_row(merkle, d , &src, &srclen); merkle_get_row(merkle, d-1, &dst, &dstlen); for (int n=0;nparameters.n], srcsize ); } if (engine->tool_flags & BS_VERBOSE){ uint64_t hash; for (int nblock=0; nblock < blocks; nblock++) { merkle_get( merkle, merkle->parameters.depth, nblock, &hash); fprintf(stderr, "Block #%06d: 0x%016lx\n", nblock, hash); } merkle_get( merkle, 0, 0, &hash); fprintf(stderr, "Top: 0x%016lx\n", hash); } free(block); return 0; } merkle_free(merkle); return -ENOMEM; } */