diff --git a/API.paw b/API.paw index 6bc24f9a7751b90ab7d17c10d9b19df1915f184a..dc3148a5f0d1e74cfdec34e6647cdfcf68903757 100644 Binary files a/API.paw and b/API.paw differ diff --git a/Readme.md b/Readme.md index 685b6d082b47f326f19240389bf05fd1c6bccc8c..e98758e054d7fbf6ed340d82cdce670b2814f3a6 100644 --- a/Readme.md +++ b/Readme.md @@ -59,7 +59,13 @@ Configuration is done via Environment variables. In case the variable is not set |DB_HOST|The hostname of the postgres database.|`localhost`|| |DB_PORT|The port to connect to the postgres database.|`5432`|| |DB_LOGGING|Setting this value to `true` forces the DB driver to log every DB query for debugging purposes. Might negatively impact performance.|`false`|| +|RABBITMQ_HOST|The hostname the RabbitMQ Client will use|`localhost`|| +|RABBITMQ_PORT|The port the RabbitMQ Client will use|`5672`|| +|RABBITMQ_USERNAME|The username the RabbitMQ Client will use||| +|RABBITMQ_PASSWORD|The password the RabbitMQ Client will use||| +|RABBITMQ_QUEUE|The name of the queue the RabbitMQ Client will use. This will be created if it does not exist.|`measurements`|| +RABBITMQ_QUEUE ## API Documentation Some of the mentioned endpoints support versioning by setting a `Accept-Datetime` header according to the HTTP Memento specification. All endpoints which support this header mention this in their documentation. If an endpoint does support this header and the header is not set in the request, this value will default to the current date and time. @@ -238,7 +244,7 @@ curl "http://localhost:8080/information/2a9ab310-2869-478d-b150-8e82b437731d" #### POST `/information/<informationId>` -Creates a new version for the information given in the `informationId`. In case the given component information already has a newer version or the MQTT topic is currently in use by another information, the request will return a `412 - Precondition Failed` status code. +Creates a new version for the information given in the `informationId`. In case the given component information already has a newer version or the MQTT topic is currently in use by another information, the request will return a `409 - Conflict` status code. Required Body: ``` diff --git a/package-lock.json b/package-lock.json index 6fb30f0f681d6ff2041f149f03e32b6a379cb2e7..33cc15bbac500de11e0b208eafc67ce6613eae23 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,11 +8,13 @@ "name": "thesis", "version": "1.0.0", "dependencies": { + "@types/amqplib": "^0.8.2", "@types/express": "^4.17.13", "@types/morgan": "^1.9.3", "@types/validator": "^13.6.6", "ajv": "^8.10.0", "ajv-formats": "^2.1.1", + "amqplib": "^0.9.0", "express": "^4.17.1", "express-validator": "^6.14.0", "jsonld": "^5.2.0", @@ -284,6 +286,20 @@ "integrity": "sha512-2xN+iGTbPBEzGSnVp/Hd64vKJCJWxsi9gfs88x4PPMyEjHJoA3o5BY9r5OLPHIZU2pAQxkSAsJFqn6itClP8mQ==", "dev": true }, + "node_modules/@types/amqplib": { + "version": "0.8.2", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.2.tgz", + "integrity": "sha512-p+TFLzo52f8UanB+Nq6gyUi65yecAcRY3nYowU6MPGFtaJvEDxcnFWrxssSTkF+ts1W3zyQDvgVICLQem5WxRA==", + "dependencies": { + "@types/bluebird": "*", + "@types/node": "*" + } + }, + "node_modules/@types/bluebird": { + "version": "3.5.36", + "resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.36.tgz", + "integrity": "sha512-HBNx4lhkxN7bx6P0++W8E289foSu8kO8GCk2unhuVggO+cE7rh9DhZUyPhUxNRG9m+5B5BTKxZQ5ZP92x/mx9Q==" + }, "node_modules/@types/body-parser": { "version": "1.19.2", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.2.tgz", @@ -735,6 +751,37 @@ } } }, + "node_modules/amqplib": { + "version": "0.9.0", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.9.0.tgz", + "integrity": "sha512-emwSdJElmSp52JIKehjLNimKqbZcGUBGdcqST9fll+C/Uss8fWoGyyWlwt20f5lD+SDdozoc4WhF3uDCUOL2ww==", + "dependencies": { + "bitsyntax": "~0.1.0", + "bluebird": "^3.7.2", + "buffer-more-ints": "~1.0.0", + "readable-stream": "1.x >=1.1.9", + "url-parse": "~1.5.10" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/amqplib/node_modules/readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", + "dependencies": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "node_modules/amqplib/node_modules/string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=" + }, "node_modules/ansi-colors": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.1.tgz", @@ -892,6 +939,37 @@ "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" }, + "node_modules/bitsyntax": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.1.0.tgz", + "integrity": "sha512-ikAdCnrloKmFOugAfxWws89/fPc+nw0OOG1IzIE72uSOg/A3cYptKCjSUhDTuj7fhsJtzkzlv7l3b8PzRHLN0Q==", + "dependencies": { + "buffer-more-ints": "~1.0.0", + "debug": "~2.6.9", + "safe-buffer": "~5.1.2" + }, + "engines": { + "node": ">=0.8" + } + }, + "node_modules/bitsyntax/node_modules/debug": { + "version": "2.6.9", + "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", + "integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==", + "dependencies": { + "ms": "2.0.0" + } + }, + "node_modules/bitsyntax/node_modules/ms": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", + "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" + }, + "node_modules/bitsyntax/node_modules/safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, "node_modules/bl": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz", @@ -902,6 +980,11 @@ "readable-stream": "^3.4.0" } }, + "node_modules/bluebird": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.7.2.tgz", + "integrity": "sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==" + }, "node_modules/body-parser": { "version": "1.19.1", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-1.19.1.tgz", @@ -984,6 +1067,11 @@ "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz", "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==" }, + "node_modules/buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==" + }, "node_modules/buffer-writer": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", @@ -1221,6 +1309,11 @@ "resolved": "https://registry.npmjs.org/cookie-signature/-/cookie-signature-1.0.6.tgz", "integrity": "sha1-4wOogrNCzD7oylE6eZmXNNqzriw=" }, + "node_modules/core-util-is": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz", + "integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==" + }, "node_modules/cross-spawn": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", @@ -2642,6 +2735,11 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=" + }, "node_modules/isexe": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/isexe/-/isexe-2.0.0.tgz", @@ -2908,9 +3006,9 @@ } }, "node_modules/minimist": { - "version": "1.2.5", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz", - "integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw==" + "version": "1.2.6", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.6.tgz", + "integrity": "sha512-Jsjnk4bw3YJqYzbdyBiNsPWHPfO++UGG749Cxs6peCu5Xg4nrena6OVxOYxrQTqww0Jmwt+Ref8rggumkTLz9Q==" }, "node_modules/mkdirp": { "version": "1.0.4", @@ -3493,6 +3591,11 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, "node_modules/queue-microtask": { "version": "1.2.3", "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.3.tgz", @@ -3597,6 +3700,11 @@ "node": ">=0.10.0" } }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=" + }, "node_modules/resolve": { "version": "1.21.0", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.21.0.tgz", @@ -4339,6 +4447,15 @@ "punycode": "^2.1.0" } }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "node_modules/util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", @@ -4778,6 +4895,20 @@ "integrity": "sha512-2xN+iGTbPBEzGSnVp/Hd64vKJCJWxsi9gfs88x4PPMyEjHJoA3o5BY9r5OLPHIZU2pAQxkSAsJFqn6itClP8mQ==", "dev": true }, + "@types/amqplib": { + "version": "0.8.2", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.2.tgz", + "integrity": "sha512-p+TFLzo52f8UanB+Nq6gyUi65yecAcRY3nYowU6MPGFtaJvEDxcnFWrxssSTkF+ts1W3zyQDvgVICLQem5WxRA==", + "requires": { + "@types/bluebird": "*", + "@types/node": "*" + } + }, + "@types/bluebird": { + "version": "3.5.36", + "resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.36.tgz", + "integrity": "sha512-HBNx4lhkxN7bx6P0++W8E289foSu8kO8GCk2unhuVggO+cE7rh9DhZUyPhUxNRG9m+5B5BTKxZQ5ZP92x/mx9Q==" + }, "@types/body-parser": { "version": "1.19.2", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.2.tgz", @@ -5088,6 +5219,36 @@ "ajv": "^8.0.0" } }, + "amqplib": { + "version": "0.9.0", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.9.0.tgz", + "integrity": "sha512-emwSdJElmSp52JIKehjLNimKqbZcGUBGdcqST9fll+C/Uss8fWoGyyWlwt20f5lD+SDdozoc4WhF3uDCUOL2ww==", + "requires": { + "bitsyntax": "~0.1.0", + "bluebird": "^3.7.2", + "buffer-more-ints": "~1.0.0", + "readable-stream": "1.x >=1.1.9", + "url-parse": "~1.5.10" + }, + "dependencies": { + "readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", + "requires": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=" + } + } + }, "ansi-colors": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.1.tgz", @@ -5197,6 +5358,36 @@ } } }, + "bitsyntax": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.1.0.tgz", + "integrity": "sha512-ikAdCnrloKmFOugAfxWws89/fPc+nw0OOG1IzIE72uSOg/A3cYptKCjSUhDTuj7fhsJtzkzlv7l3b8PzRHLN0Q==", + "requires": { + "buffer-more-ints": "~1.0.0", + "debug": "~2.6.9", + "safe-buffer": "~5.1.2" + }, + "dependencies": { + "debug": { + "version": "2.6.9", + "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", + "integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==", + "requires": { + "ms": "2.0.0" + } + }, + "ms": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", + "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" + }, + "safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + } + } + }, "bl": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz", @@ -5207,6 +5398,11 @@ "readable-stream": "^3.4.0" } }, + "bluebird": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.7.2.tgz", + "integrity": "sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==" + }, "body-parser": { "version": "1.19.1", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-1.19.1.tgz", @@ -5271,6 +5467,11 @@ "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz", "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==" }, + "buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==" + }, "buffer-writer": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", @@ -5463,6 +5664,11 @@ "resolved": "https://registry.npmjs.org/cookie-signature/-/cookie-signature-1.0.6.tgz", "integrity": "sha1-4wOogrNCzD7oylE6eZmXNNqzriw=" }, + "core-util-is": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz", + "integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==" + }, "cross-spawn": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", @@ -6514,6 +6720,11 @@ "call-bind": "^1.0.2" } }, + "isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=" + }, "isexe": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/isexe/-/isexe-2.0.0.tgz", @@ -6711,9 +6922,9 @@ } }, "minimist": { - "version": "1.2.5", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz", - "integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw==" + "version": "1.2.6", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.6.tgz", + "integrity": "sha512-Jsjnk4bw3YJqYzbdyBiNsPWHPfO++UGG749Cxs6peCu5Xg4nrena6OVxOYxrQTqww0Jmwt+Ref8rggumkTLz9Q==" }, "mkdirp": { "version": "1.0.4", @@ -7154,6 +7365,11 @@ "resolved": "https://registry.npmjs.org/qs/-/qs-6.9.6.tgz", "integrity": "sha512-TIRk4aqYLNoJUbd+g2lEdz5kLWIuTMRagAXxl78Q0RiVjAOugHmeKNGdd3cwo/ktpf9aL9epCfFqWDEKysUlLQ==" }, + "querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, "queue-microtask": { "version": "1.2.3", "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.3.tgz", @@ -7220,6 +7436,11 @@ "resolved": "https://registry.npmjs.org/require-from-string/-/require-from-string-2.0.2.tgz", "integrity": "sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==" }, + "requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=" + }, "resolve": { "version": "1.21.0", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.21.0.tgz", @@ -7722,6 +7943,15 @@ "punycode": "^2.1.0" } }, + "url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "requires": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", diff --git a/package.json b/package.json index 98b3eaf7b10dc916af46483909ce2c2ebb8a8e78..361fc0448a98ec4f5c77efdafe2a4fa2997162ca 100644 --- a/package.json +++ b/package.json @@ -27,11 +27,13 @@ "typescript": "^4.3.5" }, "dependencies": { + "@types/amqplib": "^0.8.2", "@types/express": "^4.17.13", "@types/morgan": "^1.9.3", "@types/validator": "^13.6.6", "ajv": "^8.10.0", "ajv-formats": "^2.1.1", + "amqplib": "^0.9.0", "express": "^4.17.1", "express-validator": "^6.14.0", "jsonld": "^5.2.0", diff --git a/src/App.ts b/src/App.ts index 92d775bab36d195851eca7a545183de72d72fabc..c86c66ee7c3bc5e436792ed8b6104f0b26524f4c 100644 --- a/src/App.ts +++ b/src/App.ts @@ -8,27 +8,46 @@ import MainRouter from "./router/MainRouter"; import "reflect-metadata"; import IngestionService from "./services/IngestionService"; import { MqttClient } from "mqtt"; +import { ConfirmChannel } from "amqplib"; +import IngressService from "./services/IngressService"; const logger = logService(module); class App { + private port: number; private connection: Connection; private mqttClient: MqttClient; private expressApp: Application; private mainRouter: MainRouter private ingestionService: IngestionService; + private ingressService: IngressService; - constructor(connection: Connection, mqttClient: MqttClient) { + constructor(connection: Connection, mqttClient: MqttClient, channel: ConfirmChannel, port: number) { this.connection = connection; this.mqttClient = mqttClient; this.expressApp = express(); this.mainRouter = new MainRouter(connection); - this.ingestionService = new IngestionService(mqttClient, connection); - this.setup(); + this.ingestionService = new IngestionService(channel, connection); + this.ingressService = new IngressService(channel, mqttClient); + this.port = port; } - private setup(): void { + start() { + this.setupIngestion(); + this.setupIngress(); + this.setupAPI(); + } + + private setupIngestion() { + this.ingestionService.start(); + } + + private setupIngress() { + this.ingressService.start(); + } + + private setupAPI(): void { // Disable ETags for this proof-of-concept to prevent some weird caching behaviour // on some of the evaluation tools. this.expressApp.use((req: express.Request, res: express.Response, next: express.NextFunction) => { @@ -52,11 +71,11 @@ class App { // just send an empty response with an 404 - Not Found HTTP Status Code. res.send(); }); - - logger.info("App is set up."); + + this.listen(this.port); } - listen(port: number): void { + private listen(port: number): void { this.expressApp.listen(port, () => { logger.info("Listening on port %i", port); }); diff --git a/src/config.ts b/src/config.ts index 915f775fc25184fa460e14de9d6e75c9c86d9aad..9115c32f8c86426abbfe0335e4ae66043282a399 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,4 +1,5 @@ import { IClientOptions } from "mqtt"; +import amqplib from "amqplib"; import logService from "./services/logger"; const logger = logService(module); @@ -31,9 +32,23 @@ const postgresConfig = { logging: process.env.DB_LOGGING == "true" ?? false }; +const rabbitMQConnectionConfig: amqplib.Options.Connect = { + protocol: "amqp", + hostname: process.env.RABBITMQ_HOST ?? "localhost", + port: Number(process.env.RABBITMQ_PORT) ?? 5672, + username: process.env.RABBITMQ_USERNAME, + password: process.env.RABBITMQ_PASSWORD, +}; + +const rabbitMQConfig = { + connection: rabbitMQConnectionConfig, + queue: process.env.RABBITMQ_QUEUE ?? "measurements" +}; + export default { port: Number(process.env.API_PORT) || 8080, baseURL: process.env.API_BASE_URL, postgres: postgresConfig, - mqtt: mqttConfig + mqtt: mqttConfig, + rabbitMQ: rabbitMQConfig }; \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index c323042ed613960ad210af849551fc5799b0ef0c..75385152aa50973c6e73921fd9154ec521c9e4f6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ import App from "./App"; import config from "./config"; import dbService from "./services/dbService"; import mqttService from "./services/mqttService"; +import rabbitMQService from "./services/rabbitMQService"; const logger = logService(module); @@ -29,9 +30,13 @@ dbService.then((connection) => { if (connection.isConnected) { mqttService.then((mqttClient) => { if (mqttClient.connected) { - const app = new App(connection, mqttClient); - // DB and MQTT ready and available, start API now. - app.listen(config.port); + rabbitMQService.then((channel) => { + const app = new App(connection, mqttClient, channel, config.port); + app.start(); + }).catch((error) => { + logger.error(`Error while establishing connection to RabbitMQ: ${error}`); + process.exit(1); + }); } else { logger.error("Mqtt Client is not connected."); process.exit(1); diff --git a/src/services/IngestionService.ts b/src/services/IngestionService.ts index 555541de3f5c2984da2b321fbdc13718322b66b8..fd51b0569b1c6640c86fdf767a7cdc8bb0a14585 100644 --- a/src/services/IngestionService.ts +++ b/src/services/IngestionService.ts @@ -1,17 +1,18 @@ import { Schema } from "ajv"; -import { MqttClient } from "mqtt"; import { Connection as DBConnection, Repository } from "typeorm"; import Measurement from "../entity/Measurement"; import ComponentInformation from "../entity/ComponentInformation"; import TypeDefinition from "../entity/TypeDefinition"; import logService from "../services/logger"; import ajv from "./SchemaValidationService"; +import { ConfirmChannel } from "amqplib"; +import config from "../config"; const logger = logService(module); class IngestionService { - client: MqttClient; + channel: ConfirmChannel; dbConnection: DBConnection; informationRepository: Repository<ComponentInformation> measurementRepository: Repository<Measurement> @@ -19,15 +20,14 @@ class IngestionService { typeDefinitionRepository: Repository<TypeDefinition> typeDefinitionCache: Map<string, TypeDefinition> - constructor(client: MqttClient, dbConnection: DBConnection) { - this.client = client; + constructor(channel: ConfirmChannel, dbConnection: DBConnection) { + this.channel = channel; this.dbConnection = dbConnection; this.informationRepository = dbConnection.getRepository(ComponentInformation); this.measurementRepository = dbConnection.getRepository(Measurement); this.typeDefinitionRepository = dbConnection.getRepository(TypeDefinition); // Start with an empty cache. this.typeDefinitionCache = new Map(); - this.setup(); } static messageSchema: Schema = { @@ -61,28 +61,45 @@ class IngestionService { } } - private setup(): void { - this.client.on("message", (topic, message) => { - const messageContent = JSON.parse(message.toString()); + start(): void { + this.channel.consume(config.rabbitMQ.queue, (message) => { + if(!message) { + logger.error("Received empty message."); + // Cannot ack a message which does not exist. + return; + } + + const parsedMessage = JSON.parse(message.content.toString()); + + const topic = parsedMessage.topic; + const messageContent = parsedMessage.payload; if(ajv.validate(IngestionService.messageSchema, messageContent) == false) { logger.error(`Message does not fulfill message schema: ${JSON.stringify(messageContent)}, ${JSON.stringify(ajv.errors)}`); + // Message invalid, ack it so it gets dropped from the queue. + this.channel.ack(message); return; } ComponentInformation.findCurrentVersionForTopic(topic, this.informationRepository).then(async (linkedInformation) => { if(linkedInformation.length == 0) { logger.error(`Component Information for topic ${topic} not found.`); + // Message invalid, ack it so it gets dropped from the queue. + this.channel.ack(message); return; } if(linkedInformation.length > 1) { logger.error(`There is more than one component information object for topic ${topic}: ${linkedInformation}`); + // Message invalid, ack it so it gets dropped from the queue. + this.channel.ack(message); return; } if(!messageContent.valueType || typeof messageContent.valueType !== "string") { logger.error(`Message does not contain value type: ${message}`); + // Message invalid, ack it so it gets dropped from the queue. + this.channel.ack(message); return; } @@ -91,6 +108,8 @@ class IngestionService { // Value Type does not exist. if(!valueType) { logger.error(`Type definition with name ${messageContent.valueType} not found.`); + // Message invalid, ack it so it gets dropped from the queue. + this.channel.ack(message); return; } @@ -99,11 +118,15 @@ class IngestionService { // strict mode. Thus we need to check here again. if(!messageContent.value) { logger.error(`Message ${messageContent} does not have a value set.`); + // Message invalid, ack it so it gets dropped from the queue. + this.channel.ack(message); return; } if(valueType.validateData(messageContent.value) == false) { logger.error(`Value ${messageContent.value} does not fulfill valueType ${messageContent.valueType} schema.`); + // Message invalid, ack it so it gets dropped from the queue. + this.channel.ack(message); return; } @@ -116,11 +139,15 @@ class IngestionService { if(!metadataType) { // Measurement Metadata type is not in the database and thus invalid. logger.error(`Type definition with name ${messageContent.metadataType} not found.`); + // Message invalid, ack it so it gets dropped from the queue. + this.channel.ack(message); return; } if(valueType.validateData(messageContent.metadata) == false) { logger.error(`Value ${messageContent.metadata} does not fulfill metadata type ${messageContent.metadataType} schema.`); + // Message invalid, ack it so it gets dropped from the queue. + this.channel.ack(message); return; } @@ -132,19 +159,14 @@ class IngestionService { measurement.targets = information.measurementTargets; this.measurementRepository.save(measurement); + // Message was fully processed, ack it so it gets dropped from the queue. + this.channel.ack(message); }).catch((error) => { logger.error(`Error while saving measurement with topic ${topic}: ${JSON.stringify(error)}`); + // Something went wrong with the database, nack the message another instance can process it. + this.channel.nack(message); }); }); - - this.client.subscribe("#", function(error, grant) { - if (error) { - logger.error(`Error while setting MQTT Subscription: ${error}`); - return; - } - - logger.info(`Successfully setup MQTT Subscription: ${JSON.stringify(grant)}`); - }); } private async getTypeDefinition(name: string): Promise<TypeDefinition | undefined> { diff --git a/src/services/IngressService.ts b/src/services/IngressService.ts new file mode 100644 index 0000000000000000000000000000000000000000..c13eb0c1e78a807b93127bdcdee713bb56e477ba --- /dev/null +++ b/src/services/IngressService.ts @@ -0,0 +1,49 @@ +import config from "../config"; +import { ConfirmChannel } from "amqplib"; +import { MqttClient } from "mqtt"; +import logService from "./logger"; + +const logger = logService(module); + +export default class IngressService { + + private channel: ConfirmChannel + private mqttClient: MqttClient + + constructor(channel: ConfirmChannel, mqttClient: MqttClient) { + this.channel = channel; + this.mqttClient = mqttClient; + } + + start(): void { + const queue = config.rabbitMQ.queue; + + this.mqttClient.on("message", (topic, message) => { + const result = { + topic: topic, + payload: JSON.parse(message.toString()) + }; + const resultString = JSON.stringify(result); + + this.channel.sendToQueue(queue, Buffer.from(resultString), { persistent: true }, (error, ok) => { + if(ok == false) { + logger.error(`Could not send object to queue: ${resultString}`); + } + + if(error) { + logger.error(error); + } + }); + }); + + this.mqttClient.subscribe("#", function(error, grant) { + if (error) { + logger.error(`Error while setting MQTT Subscription: ${error}`); + return; + } + + logger.info(`Successfully setup MQTT Subscription: ${JSON.stringify(grant)}`); + }); + } + +} \ No newline at end of file diff --git a/src/services/rabbitMQService.ts b/src/services/rabbitMQService.ts new file mode 100644 index 0000000000000000000000000000000000000000..a691a6a8d4aef61c16a5a0ba5ae3a44ac8a39b56 --- /dev/null +++ b/src/services/rabbitMQService.ts @@ -0,0 +1,38 @@ +import amqplib from "amqplib"; +import config from "../config"; +import logService from "./logger"; + +const logger = logService(module); + +const connectionBuilder: Promise<amqplib.Connection> = (async () => { + try { + logger.info("Trying to connect to RabbitMQ."); + const connection = await amqplib.connect(config.rabbitMQ.connection); + logger.info("Connected to RabbitMQ."); + return connection; + } catch(error) { + logger.error(`Error while establishing RabbitMQ Connection: ${JSON.stringify(error)}`); + process.exit(1); + } +})(); + +const channelBuilder: Promise<amqplib.ConfirmChannel> = (async () => { + try { + const connection = await connectionBuilder; + const channel = await connection.createConfirmChannel(); + + // Assert that the queue exists. + await channel.assertQueue(config.rabbitMQ.queue, { + exclusive: false, + durable: true, + autoDelete: false + }); + + return channel; + } catch(error) { + logger.error(`Error while establishing RabbitMQ Channel: ${JSON.stringify(error)}`); + process.exit(1); + } +})(); + +export default channelBuilder; \ No newline at end of file