Skip to content
Snippets Groups Projects
Unverified Commit 0241b06c authored by CyberKatze2077's avatar CyberKatze2077 Committed by GitHub
Browse files

Merge pull request #2 from fesolix/addingScalability

Adding scalability
parents b3707fdb 2ff1efc3
No related branches found
No related tags found
No related merge requests found
...@@ -7,156 +7,111 @@ ...@@ -7,156 +7,111 @@
#include <errno.h> #include <errno.h>
#include <zlib.h> #include <zlib.h>
// static const char *PIPE_ONE = "/tmp/pipeOne";
// static const char *PIPE_TWO = "/tmp/pipeTwo";
static const char *CHAR_DEV = "/dev/packet_receiver"; #define MAX_PROCESSES 10
#define PIPE_DIR "/tmp/" //name von pipe ist jetzt gleich name von process
// static const char *SENDER_ID = "0x91"; // bsp.: /usr/local/bin/cpu_temp hat die pipe /tmp/cpu_freq
#define PROCESS_MONITORING_DIR "/Processes/process_monitoring.sh" //bitte überprüfen ob das die richtige DIR ist
ssize_t read_pipe(int *fd, char *buf, size_t bufSize, const char *pipeName) {
// Alles mit 0 füllen, um es später einfach als String verarbeiten zu können. typedef struct {
memset(buf, 0, bufSize); char name[256]; //processname
char pipe_path[256];
// Blocking read int fd;
ssize_t bytesRead = read(*fd, buf, bufSize - 1); } ProcessPipe;
if (bytesRead > 0) {
// buf[bytesRead] = '\0'; // Schon durch memset garantiert ProcessPipe processes[MAX_PROCESSES]; //bis auf 10 prozesse skalierbar
return bytesRead; int process_count = 0;
}
if (bytesRead == 0) { void scan_processes() {
// EOF -> Schreibseite geschlossen FILE *file = fopen(PROCESS_MONITORING_DIR, "r");//file öffnen
printf("Pipe '%s' closed by writer. Reopening...\n", pipeName); if (!file) {
close(*fd); perror("Could not open process file");
exit(EXIT_FAILURE); //break; oder return 1; ?
// Neu öffnen (blockierend) }
*fd = open(pipeName, O_RDONLY);
if (*fd == -1) { char line[512];
perror("Reopen pipe"); while (fgets(line, sizeof(line), file) && process_count < MAX_PROCESSES) {
} if (strncmp(line, "/usr/local/bin/", 15) == 0) {//zeilenweise nach prozessen suchen
// bytesRead = 0 bedeutet hier: wir haben keine Daten char *newline = strchr(line, '\n');
return 0; if (newline) {
*newline = '\0';
}
//prozess in dateistruktur schreiben
snprintf(processes[process_count].name, sizeof(processes[process_count].name), "%s", strrchr(line, '/') + 1);
snprintf(processes[process_count].pipe_path, sizeof(processes[process_count].pipe_path), "%s%s", PIPE_DIR, processes[process_count].name);
// mkfifo() nur ausführen, wenn die Pipes noch nicht existieren,
// bzw. Fehler ignorieren, falls schon existiert:
if (mkfifo(processes[process_count].pipe_path, 0666) < 0 && errno != EEXIST) {//mkfifo machen
perror("mkfifo failed");
exit(EXIT_FAILURE);
} }
process_count++;
// Fehlerfall (bytesRead < 0)
perror("read pipe");
return -1;
} }
void build_package(char *package, size_t package_size, const char *payload, int value_id) {
// Zwischenspeicher für die neue Komponente
char tmp[128];
// VALUE_ID=<id> VALUE=<payload>
snprintf(tmp, sizeof(tmp), " VALUE_ID=%d VALUE=%s", value_id, payload);
// Hänge tmp an package
strncat(package, tmp, package_size - strlen(package) - 1);
} }
fclose(file);
void build_crc_checksum(char *package, size_t package_size) {
// CRC berechnen
uLong c = crc32(0L, Z_NULL, 0);
c = crc32(c, (const Bytef*)package, strlen(package));
char tmp[32];
// z.B. " CRC=0x1A2B3C4D"
snprintf(tmp, sizeof(tmp), " CRC=0x%08lX", c);
// ans package anhängen
strncat(package, tmp, package_size - strlen(package) - 1);
} }
void send_package(const char *package, const int fdCharDev) { void open_pipes() {//pipe blockierend öffnen
char transmitBuf[256] = {0}; for (int i = 0; i < process_count; i++) {
processes[i].fd = open(processes[i].pipe_path, O_RDONLY);
// Kopiere Package in transmitBuf if (processes[i].fd == -1) {
snprintf(transmitBuf, sizeof(transmitBuf), "%s", package); perror("Error opening pipe");
exit(EXIT_FAILURE);
const ssize_t written = write(fdCharDev, transmitBuf, strlen(transmitBuf));
if (written < 0) {
perror("write /dev/packet_receiver");
} else {
printf("Wrote %ld bytes to %s\n", written, CHAR_DEV);
} }
} }
// argc = Anzahl Pipes *argv[] Pipe Pfade
int main(const int argc, char *argv[]) {
const int num_pipes = argc - 1;
int *fds = calloc(num_pipes, sizeof(int));
if (!fds) {
perror("calloc fds");
return 1;
} }
for (int i = 0; i < num_pipes; i++) { void read_pipes() {
const char *pipeName = argv[i + 1]; while (1) {//alle pipes in dauerschlife nacheinander lesen
// mkfifo() nur ausführen, wenn die Pipes noch nicht existieren, for (int i = 0; i < process_count; i++) {
// bzw. Fehler ignorieren, falls schon existiert: char buffer[128] = {0};
if (mkfifo(pipeName, 0666) < 0 && errno != EEXIST) { // read() kann blockieren, wenn noch keine Daten da sind,
perror("mkfifo pipe"); // da wir blockierendes I/O verwenden.
free(fds); // Wenn die Schreibseite offen bleibt, kommen periodisch neue Daten.
return 1; ssize_t bytes_read = read(processes[i].fd, buffer, sizeof(buffer));
if (bytes_read > 0) {
// Gültige Daten -> ausgeben
printf("%s: %s\n", processes[i].name, buffer);
} else if (bytes_read == 0) {
// EOF -> Schreibseite hat geschlossen
// -> ggf. Pipe erneut öffnen
printf("%s closed, reopening...\n", processes[i].name);
close(processes[i].fd);
processes[i].fd = open(processes[i].pipe_path, O_RDONLY);
if (processes[i].fd == -1) {
perror("Reopen pipe");
exit(EXIT_FAILURE);
} }
} else {
// Pipes blockierend öffnen // bytesReadOne < 0 => Fehler
fds[i] = open(pipeName, O_RDONLY); if (errno == EINTR) {
if (fds[i] == -1) { // Signal unterbrochen, einfach weiter
perror("open pipe for reading"); continue;
free(fds);
return 1;
} }
perror("read error");
exit(EXIT_FAILURE);
} }
// Open the char device for writing
const int fdCharDev = open(CHAR_DEV, O_WRONLY);
if (fdCharDev == -1) {
perror("open /dev/packet_receiver");
// Aufräumen
for (int i = 0; i < num_pipes; i++) {
close(fds[i]);
}
free(fds);
return 1;
}
printf("Opened %s for writing.\n", CHAR_DEV);
// Dauerhafte Lese-Schleife
while (1) {
char package[256];
snprintf(package, sizeof(package), "SENDER=0x91");
// Lese von jeder Pipe und ergänze das Package
for (int i = 0; i < num_pipes; i++) {
char pipeBuf[128];
const ssize_t ret = read_pipe(&fds[i], pipeBuf, sizeof(pipeBuf), argv[i + 1]);
if (ret > 0) {
// ret Bytes gelesen in pipeBuf -> package anhängen
build_package(package, sizeof(package), pipeBuf, i);
} else if (ret == -1) {
// Fehler beim Lesen
fprintf(stderr, "Error reading from pipe\n");
free(fds);
return 1;
}
}
build_crc_checksum(package, sizeof(package));
// Jetzt an /dev/packet_receiver schicken
if (strlen(package) > 0) {
send_package(package, fdCharDev);
} }
// Kurze Pause, damit CPU-Last nicht durch // Kurze Pause, damit CPU-Last nicht durch
// Dauerschleife hochgetrieben wird // Dauerschleife hochgetrieben wird
usleep(200000); // 200 ms usleep(200000);//20ms
} }
// Sollte man jemals aus der while(1)-Schleife ausbrechen: // Sollte man jemals aus der while(1)-Schleife ausbrechen:
for (int i = 0; i < argc; i++) { close_pipes();
close(fds[i]);
} }
void close_pipes() {//braucht man das?
for (int i = 0; i < process_count; i++) {
close(processes[i].fd);
}
return;
}
int main() {
scan_processes();
open_pipes();
read_pipes();
return 0; return 0;
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment