sparse-tools/src/bs_comm.c

202 lines
5.7 KiB
C

//
// Created by haraldwolff on 13.08.22.
//
#include <string.h>
#include <netdb.h>
#include <blksync.h>
#include <bs_engine.h>
#include <bs_comm.h>
#include <bs_analyze.h>
int bs_comm_start(bs_engine_t *engine){
if (pthread_create(&engine->threads.comm, NULL, (void * (*)(void *))bs_comm_thread, engine))
fatal("could not create comm thread.\n", 0);
return 0;
}
int bs_comm_recv(int socket, void *buffer, int length){
int l = length;
char *b = buffer;
while (l){
int r = recv(socket, b, l, 0);
if (r < 0)
return r;
b += r;
l -= r;
}
return length;
}
int bs_comm_thread(bs_engine_t *engine){
int s;
bs_msg_buffer_t *msg;
if (engine->tool_flags & BS_LISTENER)
bs_comm_listen(engine);
else
bs_comm_connect(engine);
engine->state_flags |= BSS_COMM;
if (bs_comm_handshake(engine))
fatal("handshake failed.\n", 0);
bs_analyze_start(engine);
while (!(engine->tool_flags & BS_SHUTDOWN)) {
if (!msg) {
s = bs_msg_queue_pop(&engine->queues.pool, &msg, Q_WAIT);
if (s) fatal("could not pop message buffer from pool\n", 0);
}
s = bs_comm_recv(engine->comm.remote, &msg->msg_type, sizeof(msg->msg_type));
if (s != sizeof(msg->msg_type)) fatal("I/O error while receiving message (A) %s\n", strerror(errno));
s = bs_comm_recv(engine->comm.remote, &msg->msg_length, sizeof(msg->msg_length));
if (s != sizeof(msg->msg_length)) fatal("I/O error while receiving message (B) %s\n", strerror(errno));
s = bs_comm_recv(engine->comm.remote, &msg->payload_length, sizeof(msg->payload_length));
if (s != sizeof(msg->msg_length)) fatal("I/O error while receiving message (C) %s\n", strerror(errno));
if (msg->msg_length) {
s = bs_comm_recv(engine->comm.remote, msg->msg, msg->msg_length);
if (s != msg->msg_length) fatal("I/O error while receiving message (D) %s\n", strerror(errno));
}
if (msg->payload_length) {
s = bs_comm_recv(engine->comm.remote, msg->payload, msg->payload_length);
if (s != msg->payload_length) fatal("I/O error while receiving message (E) %s\n", strerror(errno));
}
bs_msg_queue_t *queue = NULL;
if (msg->msg_type & BS_MSG_PROGRESS)
queue = &engine->queues.progress;
else if (msg->msg_type & BS_MSG_MERKLE_SYNC)
queue = &engine->queues.sync;
else if (msg->msg_type & BS_MSG_BLOCKDATA)
queue = &engine->queues.transfer;
else if (msg->msg_type & BS_MSG_GOODBYE) {
engine->tool_flags |= BS_SHUTDOWN;
} else if (msg->msg_type == 0){
// Keep-Alive
} else {
fprintf(stderr, "received unkown message type 0x%x\n", msg->msg_type);
fflush(stderr);
}
if (queue) {
bs_msg_queue_push(queue, msg);
msg = NULL;
}
}
engine->state_flags &= ~BSS_COMM;
return 0;
}
int bs_comm_connect(bs_engine_t *engine){
struct addrinfo hints;
struct addrinfo *result, *ai;
char sport[12];
sprintf(sport, "%d", engine->comm.port);
memset( &hints, 0, sizeof(hints));
hints.ai_family = engine->comm.af_family;
hints.ai_socktype = SOCK_STREAM;
int s = getaddrinfo(engine->comm.hostname, sport, &hints, &result);
if (s != 0)
fatal("could not get addrinfo for host %s (%s)\n", engine->comm.hostname, gai_strerror(s));
for (ai = result; ai != NULL; ai = ai->ai_next) {
engine->comm.remote = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol );
if (engine->comm.remote < 0)
continue;
if (connect(engine->comm.remote, ai->ai_addr, ai->ai_addrlen) != -1)
break;
close(engine->comm.remote);
}
if (ai == NULL)
fatal("could not connect to host %s : %d\n", engine->comm.hostname, engine->comm.port);
freeaddrinfo(result);
return 0;
}
int bs_comm_listen(bs_engine_t *engine){
struct addrinfo hints;
struct addrinfo *result, *ai;
int one = 1;
char sport[12];
sprintf(sport, "%d", engine->comm.port);
memset( &hints, 0, sizeof(hints));
hints.ai_family = engine->comm.af_family;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
int s = getaddrinfo(engine->comm.hostname[0] ? engine->comm.hostname : NULL, sport, &hints, &result);
if (s != 0)
fatal("could not get addrinfo for host %s (%s)\n", engine->comm.hostname, gai_strerror(s));
for (ai = result; ai != NULL; ai = ai->ai_next) {
engine->comm.listen = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol );
if (engine->comm.listen < 0)
continue;
setsockopt(engine->comm.listen, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
if (bind(engine->comm.listen, ai->ai_addr, ai->ai_addrlen) != -1)
break;
close(engine->comm.listen);
}
if (ai == NULL)
fatal("could not bind to %s:%d\n", engine->comm.hostname, engine->comm.port);
freeaddrinfo(result);
listen(engine->comm.listen, 1);
engine->comm.remote = accept(engine->comm.listen, NULL, 0);
if (engine->comm.remote < 0)
fatal("no one connected\n", 0);
return 0;
}
int bs_comm_send_msg(bs_engine_t *engine, bs_msg_buffer_t *msg){
int s = bs_comm_send_payload(engine, msg->msg_type, msg->msg, msg->msg_length, msg->payload, msg->payload_length);
bs_msg_queue_push(&engine->queues.pool, msg);
return s;
}
int bs_comm_send(bs_engine_t *engine, bs_msg_t msg_type, char *msg, int length){
return bs_comm_send_payload(engine, msg_type, msg, length, NULL, 0);
}
int bs_comm_send_payload(bs_engine_t *engine, bs_msg_t msg_type, char *msg, int length, char *payload, int payload_length){
if (!payload && payload_length)
return -EINVAL;
pthread_mutex_lock(&engine->comm.mutex);
if (
(send(engine->comm.remote, &msg_type, sizeof(msg_type), 0) < 0) ||
(send(engine->comm.remote, &length, sizeof(length), 0) < 0) ||
(send(engine->comm.remote, &payload_length, sizeof(payload_length), 0) < 0) ||
(send(engine->comm.remote, msg, length, 0) < 0) ||
(send(engine->comm.remote, payload, payload_length, 0) < 0)
)
fatal("failed to send message\n", 0);
pthread_mutex_unlock(&engine->comm.mutex);
return 0;
}