sparse-tools/src/bs_msg_queue.c

78 lines
1.6 KiB
C

//
// Created by haraldwolff on 13.08.22.
//
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <bs_msg.h>
#include <blksync.h>
void bs_msg_queue_init(bs_msg_queue_t *queue){
memset(queue, 0, sizeof(bs_msg_queue_t));
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->cond, NULL);
}
int bs_msg_queue_push(bs_msg_queue_t *queue, bs_msg_buffer_t* msg){
if (!queue || !msg)
return -EINVAL;
pthread_mutex_lock( &queue->mutex );
if (!queue->first)
{
queue->first = queue->last = msg;
} else {
queue->last->next = msg;
queue->last = msg;
}
msg->next = NULL;
pthread_cond_signal(&queue->cond);
pthread_mutex_unlock( &queue->mutex );
return 0;
}
int bs_msg_queue_pop(bs_msg_queue_t *queue, bs_msg_buffer_t** msg, bs_msg_queue_flags_t flags ){
if (!queue || !msg)
return -EINVAL;
pthread_mutex_lock( &queue->mutex );
do {
if (queue->first) {
*msg = queue->first;
queue->first = (*msg)->next;
if (!queue->first)
queue->last = NULL;
break;
} else if (flags & Q_WAIT) {
do {
pthread_cond_wait(&queue->cond, &queue->mutex);
} while (!queue->first);
continue;
} else {
pthread_mutex_unlock( &queue->mutex );
return -ENAVAIL;
}
} while (1);
pthread_mutex_unlock( &queue->mutex );
return 0;
}
int bs_msg_buffer_create(bs_msg_buffer_t **msg){
*msg = malloc(sizeof(bs_msg_buffer_t));
if (!*msg)
error(-ENOMEM, "could not allocate msg buffers\n", 0);
memset( *msg, 0, sizeof(*msg) );
(*msg)->payload = malloc(MAX_MSG_PAYLOAD_LENGTH);
if (!(*msg)->payload) {
free(*msg);
error(-ENOMEM, "out of memory while allocating message buffers.\n", 0);
}
return 0;
}