From bece97eee5587fbeb8802b8f5a337dcbd2a89d81 Mon Sep 17 00:00:00 2001 From: Jonas Schlabertz <jonas@schlabertz.de> Date: Tue, 21 Jun 2022 12:37:55 +0200 Subject: [PATCH] Fixes a race condition where a measurement could be associated with the wrong ComponentInformation due to the delay from the measurement to value ingestion. --- src/entity/ComponentInformation.ts | 11 +++++++++++ src/services/IngestionService.ts | 13 +++---------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/entity/ComponentInformation.ts b/src/entity/ComponentInformation.ts index 83c21ff..580c51c 100644 --- a/src/entity/ComponentInformation.ts +++ b/src/entity/ComponentInformation.ts @@ -109,6 +109,17 @@ export default class ComponentInformation { }); } + static findCurrentVersionforMeasurement(topic: string, timestamp: Date, repository: Repository<ComponentInformation>): Promise<ComponentInformation | undefined> { + return repository + .createQueryBuilder("componentInformation") + .leftJoinAndSelect("componentInformation.nextVersion", "nextVersion") + .where("componentInformation.topic = :topic", { topic }) + .andWhere("componentInformation.dateCreated <= :timestamp1::timestamptz", { timestamp1: timestamp }) + .andWhere("coalesce(\"nextVersion\".\"dateCreated\", 'Infinity') > :timestamp2::timestamptz", { timestamp2: timestamp }) + .orderBy("componentInformation.dateCreated", "DESC") + .getOne(); + } + static findCurrentVersionForComponent(componentId: string, repository: Repository<ComponentInformation>, mementoDate?: Date): Promise<ComponentInformation | undefined> { const date: Date = mementoDate ?? new Date(); diff --git a/src/services/IngestionService.ts b/src/services/IngestionService.ts index 2c5af8a..b819da0 100644 --- a/src/services/IngestionService.ts +++ b/src/services/IngestionService.ts @@ -86,21 +86,14 @@ class IngestionService { return; } - ComponentInformation.findCurrentVersionForTopic(topic, this.informationRepository).then(async (linkedInformation) => { - if(linkedInformation.length == 0) { + ComponentInformation.findCurrentVersionforMeasurement(topic, new Date(messageContent.timestamp), this.informationRepository).then(async (linkedInformation) => { + if(!linkedInformation) { logger.error(`Component Information for topic ${topic} not found.`); // Message invalid, ack it so it gets dropped from the queue. this.channel.ack(message); return; } - if(linkedInformation.length > 1) { - logger.error(`There is more than one component information object for topic ${topic}: ${linkedInformation}`); - // Message invalid, ack it so it gets dropped from the queue. - this.channel.ack(message); - return; - } - if(!messageContent.valueType || typeof messageContent.valueType !== "string") { logger.error(`Message does not contain value type: ${message}`); // Message invalid, ack it so it gets dropped from the queue. @@ -159,7 +152,7 @@ class IngestionService { metadata = messageContent.metadata; } - const information: ComponentInformation = linkedInformation[0]; + const information: ComponentInformation = linkedInformation; const measurement = new Measurement(new Date(messageContent.timestamp), information.measurementLicense, messageContent.value, valueType, metadata, metadataType, information); measurement.targets = information.measurementTargets; -- GitLab