Skip to content
Snippets Groups Projects
Commit fe31481d authored by Ruben Otto's avatar Ruben Otto :crab:
Browse files

feat: use message queue instead of pipes for userspace communication

parent b68650a7
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include "../../../common/include/protocol.h" #include "../../../common/include/protocol.h"
#include "../../../common/include/userspace_comm.h"
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
if (argc < 3 || argc > 3) { if (argc < 3 || argc > 3) {
...@@ -13,9 +14,9 @@ int main(int argc, char* argv[]) { ...@@ -13,9 +14,9 @@ int main(int argc, char* argv[]) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
int pipe_fd = open(DAEMON_PIPE_NAME, O_WRONLY); send_t send;
if (pipe_fd == -1) { if (send_init(&send) == -1) {
perror("Failed to open pipe"); perror("Failed to open message queue");
return EXIT_FAILURE; return EXIT_FAILURE;
} }
...@@ -46,15 +47,12 @@ int main(int argc, char* argv[]) { ...@@ -46,15 +47,12 @@ int main(int argc, char* argv[]) {
memcpy(&packet.data, &cpu_temp, sizeof(cpu_temp)); memcpy(&packet.data, &cpu_temp, sizeof(cpu_temp));
if (write(pipe_fd, &packet, DAEMON_PACKET_HEADER_SIZE + packet.length) == -1) { send_send(&send, packet);
perror("Failed to write pipe");
continue;
}
usleep(sleep_ms * 1000); usleep(sleep_ms * 1000);
} }
close(pipe_fd); send_destroy(&send);
close(file_fd); close(file_fd);
return EXIT_SUCCESS; return EXIT_SUCCESS;
......
add_library(common add_library(common
src/protocol.c src/protocol.c
src/userspace_comm.c
src/communication.c src/communication.c
) )
target_include_directories(common target_include_directories(common
......
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include <linux/limits.h> #include <sys/ipc.h>
#define MAX_DAEMON_PACKET_SIZE PIPE_BUF #define MAX_DAEMON_PACKET_DATA_SIZE 4096
#define DAEMON_PACKET_HEADER_SIZE sizeof(uint8_t) + sizeof(daemon_measurement_datatype_t) + sizeof(uint16_t)
#define MAX_DAEMON_PACKET_DATA_SIZE MAX_DAEMON_PACKET_SIZE - DAEMON_PACKET_HEADER_SIZE
#define DAEMON_PIPE_NAME "/var/run/sibyl" #define MSG_QUEUE_KEY "/var/run/sibyl"
#define MSG_QUEUE_PROJ_ID 69
/** /**
* typedef daemon_measurement_datatype_t - Enum of possible datatypes for `daemon_measurement_t`. * typedef daemon_measurement_datatype_t - Enum of possible datatypes for `daemon_measurement_t`.
*/ */
typedef enum { typedef enum {
CHAR, CHAR,
UINT64,
INT64, INT64,
UINT64,
FLOAT64 FLOAT64
} __attribute__ ((__packed__)) daemon_measurement_datatype_t; } __attribute__ ((__packed__)) daemon_measurement_datatype_t;
...@@ -48,3 +47,10 @@ int daemon_measurement_datatype_size(daemon_measurement_datatype_t datatype); ...@@ -48,3 +47,10 @@ int daemon_measurement_datatype_size(daemon_measurement_datatype_t datatype);
* Returns 0 on success, -1 on error. * Returns 0 on success, -1 on error.
*/ */
int daemon_measurement_check(daemon_measurement_t* daemon_measurement); int daemon_measurement_check(daemon_measurement_t* daemon_measurement);
/**
* msg_queue_key() - Return the message queue key for the userspace communication.
*
* Returns the key of the message queue or -1 on error.
*/
key_t msg_queue_key();
#pragma once
#include "protocol.h"
#define MESSAGE_MEASUREMENT_ID 1
/**
* typedef send_t - Holds the a handle to the message queue.
* @msg_queue_id: Id of the message queue.
*/
typedef struct {
int msg_queue_id;
} send_t;
/**
* typedef message_t - Wrapper around `daemon_measurement_t` to make it sendable over a message queue.
* @message_type: The messages' type.
* @measurement: The measurement.
*/
typedef struct {
// required by the message queue api in order to work
long message_type;
daemon_measurement_t measurement;
} message_t;
/**
* send_init() - Initializes the send module.
* @arg1: The send module.
*
* Tries to get a handle to a message queue.
* Errors if the message queue doesn't exist.
*
* Returns 0 on success, -1 on error.
*/
int send_init(send_t* send);
/**
* send_send() - Sends a message over the message queue.
*
* Returns 0 on success, -1 on error.
*/
int send_send(send_t* send, daemon_measurement_t measurement);
/**
* send_destroy() - Destroys the message queue handle.
*
* Currently this is a dummy function that doesn't do anything.
*
* Always returns 0.
*/
int send_destroy(send_t* send);
...@@ -4,8 +4,8 @@ int daemon_measurement_datatype_size(daemon_measurement_datatype_t datatype) { ...@@ -4,8 +4,8 @@ int daemon_measurement_datatype_size(daemon_measurement_datatype_t datatype) {
switch (datatype) { switch (datatype) {
case CHAR: case CHAR:
return 1; return 1;
case UINT64:
case INT64: case INT64:
case UINT64:
case FLOAT64: case FLOAT64:
return 8; return 8;
} }
...@@ -20,3 +20,7 @@ int daemon_measurement_check(daemon_measurement_t* daemon_measurement) { ...@@ -20,3 +20,7 @@ int daemon_measurement_check(daemon_measurement_t* daemon_measurement) {
return 0; return 0;
} }
key_t msg_queue_key() {
return ftok(MSG_QUEUE_KEY, MSG_QUEUE_PROJ_ID);
}
#include "../include/protocol.h"
#include "../include/userspace_comm.h"
#include <sys/ipc.h>
#include <sys/msg.h>
int send_init(send_t* send) {
int key;
int msg_queue_id;
if ((key = msg_queue_key()) == -1) {
return -1;
}
if ((msg_queue_id = msgget(key, IPC_NOWAIT)) == -1) {
return -1;
}
send->msg_queue_id = msg_queue_id;
return 0;
}
int send_send(send_t *send, daemon_measurement_t measurement) {
message_t message = {
.message_type = MESSAGE_MEASUREMENT_ID,
.measurement = measurement,
};
msgsnd(send->msg_queue_id, &message, sizeof(daemon_measurement_t), 0);
return 0;
}
int send_destroy(send_t *send) {
return 0;
}
#pragma once #pragma once
#include "../../common/include/protocol.h" #include "../../common/include/protocol.h"
#include <sys/ipc.h>
/** /**
* typedef receive_t - The receive module. * typedef receive_t - The receive module.
* @msg_id: Id of the message queue that is used internally to communicate.
*/ */
typedef struct {} receive_t; typedef struct {
int msg_queue_id;
} receive_t;
/** /**
* receive_init() - Initializes the receive module. * receive_init() - Initializes the receive module.
* @arg1: The receive module. * @arg1: The receive module.
* *
* Creates a lockfile and a named pipe at DAEMON_PIPE_NAME. * Creates a file at `MSG_QUEUE_KEY` and a message queue with `MSG_QUEUE_KEY` as key and `MSG_QUEUE_PROJ_ID` as project id.
* It will fail if the lockfile already exists. * It will fail if the file or message queue already exists.
* It will not fail if the lockfile is missing but the pipe exists. In this case, the pipe is reused.
* *
* Returns 0 on success, -1 on error. * Returns 0 on success, -1 on error.
*/ */
int receive_init(receive_t* receive); int receive_init(receive_t* receive);
/** /**
* receive_read() - Reads a measurement from the pipe. * receive_read() - Reads a measurement from the message queue.
* @arg1: The receive module. * @arg1: The receive module.
* @arg2: The struct to store the read measurement in. * @arg2: The struct to store the read measurement in.
* *
...@@ -34,4 +37,4 @@ int receive_read(receive_t* receive, daemon_measurement_t* daemon_measurement); ...@@ -34,4 +37,4 @@ int receive_read(receive_t* receive, daemon_measurement_t* daemon_measurement);
* *
* Always returns 0. * Always returns 0.
*/ */
int receive_deinit(receive_t* receive); int receive_destroy(receive_t* receive);
...@@ -8,18 +8,44 @@ ...@@ -8,18 +8,44 @@
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
#define LOCKFILE "/tmp/.sibyl-daemon.lock"
receive_t receive; receive_t receive;
void signal_handler(int sig) { void signal_handler(int sig) {
receive_deinit(&receive); receive_destroy(&receive);
exit(1); exit(1);
} }
void print_measurement_data(daemon_measurement_t* measurement) {
int datatype_size = daemon_measurement_datatype_size(measurement->datatype);
for (int i = 0; i < datatype_size / measurement->length; i++) {
uint8_t buf[datatype_size];
for (int j = 0; j < datatype_size; j++) {
buf[j] = measurement->data[(i * datatype_size) + j];
}
switch (measurement->datatype) {
case CHAR:
printf("%c", *(char*)buf);
break;
case INT64:
printf("%ld", *(int64_t*)buf);
break;
case UINT64:
printf("%ld", *(uint64_t*)buf);
break;
case FLOAT64:
printf("%f", *(double*)buf);
break;
}
if (measurement->datatype != CHAR && i != measurement->length - 1) {
printf(" ");
}
}
}
int main() { int main() {
if (receive_init(&receive) != 0) { if (receive_init(&receive) != 0) {
receive_deinit(&receive); perror("Failed to create receive module");
perror("Failed to open pipe");
return EXIT_FAILURE; return EXIT_FAILURE;
} }
...@@ -37,10 +63,12 @@ int main() { ...@@ -37,10 +63,12 @@ int main() {
continue; continue;
} }
printf("Received data from %d with length %d\n", daemon_measurement.measurement_id, daemon_measurement.length); printf("Received data from %d with length %d: ", daemon_measurement.measurement_id, daemon_measurement.length);
print_measurement_data(&daemon_measurement);
printf("\n");
} }
receive_deinit(&receive); receive_destroy(&receive);
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
#include "../../common/include/protocol.h" #include "../../common/include/protocol.h"
#include "../../common/include/userspace_comm.h"
#include "../include/receive.h" #include "../include/receive.h"
#include <errno.h>
#include <stddef.h> #include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#define LOCKFILE "/tmp/.sibyl-daemon.lock" int create_run_file() {
int fd;
// create lockfile
int lock() {
int lockfile_fd;
if ((lockfile_fd = open(LOCKFILE, O_CREAT | O_EXCL)) < 0) { if ((fd = open(MSG_QUEUE_KEY, O_CREAT | O_EXCL)) == -1) {
if (errno == EEXIST) {
perror("Lockfile already exists. Maybe another process is already using it?");
} else {
perror("Cannot open lockfile");
}
return -1; return -1;
} }
close(lockfile_fd);
return 0; return close(fd);
} }
// delete lockfile int remove_run_file() {
int unlock() { return remove(MSG_QUEUE_KEY);
remove(LOCKFILE);
return 0;
} }
int receive_init(receive_t* receive) { int receive_init(receive_t* receive) {
if (lock() != 0) { if (create_run_file() == -1) {
return -1; return -1;
} }
/* create the fifo file or reuses the existing one, if existing */ key_t key = msg_queue_key();
if (mkfifo(DAEMON_PIPE_NAME, 0666) != 0) { if (key == -1) {
struct stat file_stat;
if (stat(DAEMON_PIPE_NAME, &file_stat) < 0 || !S_ISFIFO(file_stat.st_mode)) {
return -1; return -1;
} }
int msg_queue_id = msgget(key, IPC_CREAT | IPC_EXCL | IPC_NOWAIT);
if (msg_queue_id == -1) {
return -1;
} }
receive->msg_queue_id = msg_queue_id;
return 0; return 0;
} }
int receive_read(receive_t* receive, daemon_measurement_t* daemon_measurement) { int receive_read(receive_t* receive, daemon_measurement_t* daemon_measurement) {
int fd; message_t message;
// open pipe in read mode ssize_t read = msgrcv(receive->msg_queue_id, &message, sizeof(daemon_measurement_t), MESSAGE_MEASUREMENT_ID, 0);
if ((fd = open(DAEMON_PIPE_NAME, O_RDONLY)) < 0) { if (read == -1) {
errno = EIO;
close(fd);
return -1; return -1;
} }
/* read first 4 bytes as packet header */ *daemon_measurement = message.measurement;
uint8_t header[DAEMON_PACKET_HEADER_SIZE];
size_t bytes_read = read(fd, header, DAEMON_PACKET_HEADER_SIZE);
if (bytes_read != DAEMON_PACKET_HEADER_SIZE) {
close(fd);
/* only set custom errno if error is size related */
if (bytes_read >= 0) {
errno = EINVAL;
}
return -1;
}
/* assemble daemon measurement id and size from packet header */
daemon_measurement->measurement_id = header[0];
daemon_measurement->datatype = header[1];
daemon_measurement->length = header[2] | (header[3] << 8);
/* read measurement data */
bytes_read = read(fd, &daemon_measurement->data, daemon_measurement_datatype_size(daemon_measurement->datatype) * daemon_measurement->length);
if (bytes_read <= 0) {
close(fd);
/* only set custom errno if error is size related */
if (bytes_read >= 0) {
errno = EINVAL;
}
return -1;
}
return 0; return 0;
} }
int receive_deinit(receive_t* receive) { int receive_destroy(receive_t* receive) {
remove(DAEMON_PIPE_NAME); remove_run_file();
return unlock(); msgctl(receive->msg_queue_id, IPC_RMID, NULL);
return 0;
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment