diff --git a/src/entity/ComponentInformation.ts b/src/entity/ComponentInformation.ts index 83c21ffbf7fe18b68352c0e36065be150b540eeb..580c51cbff485c4aa6663a4e5dea0181b9d3ff41 100644 --- a/src/entity/ComponentInformation.ts +++ b/src/entity/ComponentInformation.ts @@ -109,6 +109,17 @@ export default class ComponentInformation { }); } + static findCurrentVersionforMeasurement(topic: string, timestamp: Date, repository: Repository<ComponentInformation>): Promise<ComponentInformation | undefined> { + return repository + .createQueryBuilder("componentInformation") + .leftJoinAndSelect("componentInformation.nextVersion", "nextVersion") + .where("componentInformation.topic = :topic", { topic }) + .andWhere("componentInformation.dateCreated <= :timestamp1::timestamptz", { timestamp1: timestamp }) + .andWhere("coalesce(\"nextVersion\".\"dateCreated\", 'Infinity') > :timestamp2::timestamptz", { timestamp2: timestamp }) + .orderBy("componentInformation.dateCreated", "DESC") + .getOne(); + } + static findCurrentVersionForComponent(componentId: string, repository: Repository<ComponentInformation>, mementoDate?: Date): Promise<ComponentInformation | undefined> { const date: Date = mementoDate ?? new Date(); diff --git a/src/services/IngestionService.ts b/src/services/IngestionService.ts index 2c5af8a39699f8e2fdb3e2f6f741e26bbb03ed20..b819da0c3f3699bf6dcdfbccff0b4350070829c2 100644 --- a/src/services/IngestionService.ts +++ b/src/services/IngestionService.ts @@ -86,21 +86,14 @@ class IngestionService { return; } - ComponentInformation.findCurrentVersionForTopic(topic, this.informationRepository).then(async (linkedInformation) => { - if(linkedInformation.length == 0) { + ComponentInformation.findCurrentVersionforMeasurement(topic, new Date(messageContent.timestamp), this.informationRepository).then(async (linkedInformation) => { + if(!linkedInformation) { logger.error(`Component Information for topic ${topic} not found.`); // Message invalid, ack it so it gets dropped from the queue. this.channel.ack(message); return; } - if(linkedInformation.length > 1) { - logger.error(`There is more than one component information object for topic ${topic}: ${linkedInformation}`); - // Message invalid, ack it so it gets dropped from the queue. - this.channel.ack(message); - return; - } - if(!messageContent.valueType || typeof messageContent.valueType !== "string") { logger.error(`Message does not contain value type: ${message}`); // Message invalid, ack it so it gets dropped from the queue. @@ -159,7 +152,7 @@ class IngestionService { metadata = messageContent.metadata; } - const information: ComponentInformation = linkedInformation[0]; + const information: ComponentInformation = linkedInformation; const measurement = new Measurement(new Date(messageContent.timestamp), information.measurementLicense, messageContent.value, valueType, metadata, metadataType, information); measurement.targets = information.measurementTargets;