Skip to content
Snippets Groups Projects
Select Git revision
  • 7335f51f370f6984c70a4f2f09a07808d34d4435
  • master default protected
2 results

protected-ssr-api.md

Blame
  • newmodeljsonparse.py 15.59 KiB
    import json
    import os
    import sys
    import writefile as wf
    from collections import namedtuple
    
    
    def measurementJsonDecod(measurementdict):
        return namedtuple('Measurement', measurementdict.keys())(*measurementdict.values())
    
    
    def parameterJsonDecod(parameterdict):
        return namedtuple('Parameter', parameterdict.keys())(*parameterdict.values())
    
    
    def functionJsonDecod(functiondict):
        return namedtuple('Function', functiondict.keys())(*functiondict.values())
    
    
    class Measurement:
        def __init__(self, name, description, datatype, value, dimension, unit=None, range=None):
            self.name = name
            self.description = description
            self.datatype = datatype
            self.range = range
            self.dimension = dimension
            self.unit = unit
            array = []
            if self.datatype == "float":
                self.value = float(value)
            else:
                self.value = value
            # calculate a start value
            # if self.dimension != 0:
            #     for i in range(self.dimension):
            #         arrayval = self.datatype((self.range[0] + self.range[1]) / 2)
            #         if self.datatype == "float" or self.datatype == "double":
            #             arrayval = float(arrayval)
            #         array.append(arrayval)
            #     self.value = [0.0, 0.0, 0.0]
            # else:
            #     self.value = self.datatype(((self.range[0] + self.range[1]) / 2))
    
    class Parameter:
        def __init__(self, name, description, datatype, dimension, value, rangevar=None, unit=None, default=None, constant=False):
            self.name = name
            self.description = description
            self.datatype = datatype
            # if range == [None, None]:
            #     self.range = None
            # else:
            self.range = rangevar
            self.dimension = dimension
            self.unit = unit
            self.default = default
            self.constant = constant
            self.value = value
            # array = []
            # calculate a start value
            # if self.default == False:
            #     if self.dimension != 0:
            #         for i in range(self.dimension):
            #             array.append(self.datatype((self.range[0] + self.range[1]) / 2))
            #         self.value = array
            #     else:
            #         self.value = self.datatype(((self.range[0] + self.range[1]) / 2))
            # else:
            #     self.value = self.datatype(default)
    
    class Function:
        def __init__(self, name, description, inargsdatatypes = None, outargsdatatypes = None):
            self.name = name
            self.description = description
            self.inargsdatatypes = inargsdatatypes
            self.outargsdatatypes = outargsdatatypes
    
    def dimensionchange(string):
        dimarr = string["dimension"]
        if type(dimarr) == list:
            if dimarr:
                value = dimarr[0]
            else:
                value = 0
            newdimension = {"dimension": value}
            string.update(newdimension)
    
    
    def getdatatypes(argarray):
        returnarr = []
        if len(argarray) > 0:
            for i in argarray:
                returnarr.append(i["datatype"])
            return returnarr
        else:
            return None
    
    
    
    def changerange(string):
        #print(type(string["range"][0]))
        if iskeythere(string, "range"):
            rangeval = string["range"]
            rangetuple = (rangeval[0], rangeval[1])
        #print(rangetuple)
            string.update({"range": rangetuple})
            #print(string["range"])
        else:
            string.update({"range": (None, None)})
       # print(string)
    
    def getParameters(parametersDict):
        # extracts the measurements of a Json-Model
        parameterobj = []
        if len(parametersDict) > 0:
            for i in parametersDict:
                obj = i
                # del obj["value"]
                #del obj["uuid"]
                dimensionchange(obj)
                obj["value"]=getdefaultvalue(obj["dimension"], obj["datatype"])
                obj = json.dumps(i)
                parameterobji = json.loads(obj, object_hook=parameterJsonDecod)
                parameterobj.append(parameterobji)
        return parameterobj
    
    def getMeasurements(measurementsDict):
        #extracts the measurements of a Json-Model
        measurementobj = []
        if len(measurementsDict) > 0:
            for i in measurementsDict:
                obj = i
                #del obj["value"]
                #del obj["uuid"]
                dimensionchange(obj)
                obj["value"] = getdefaultvalue(obj["dimension"], obj["datatype"])
                #if i["range"]:
                changerange(obj)
                if not iskeythere(obj, "unit"):
                    obj["unit"] = "UNITLESS"
                obj = json.dumps(i)
                measurementobji = json.loads(obj, object_hook=measurementJsonDecod)
                measurementobj.append(measurementobji)
        return measurementobj
    
    def searchlists(list1, list2):
        print("I am searching")
        objmeasurementlist = []
       # print(obj["measurements"])
        if len(list1) <= 0:
            return []
        for input in list1:
            for searchobj in list2:
                # print(measentrance)
                if input["value"] == searchobj["uuid"]:
                    objmeasurementlist.append(searchobj)
        return objmeasurementlist
    
    def getdefaultvalue(dimension, datatype):
        if datatype == "float" or datatype == "double" or datatype == "int":
            if dimension > 1:
                value = []
                for i in range(dimension):
                    value.append(0)
                return value
            else:
                return 0
        else:
            return True
    
    def iskeythere(list, key):
        for keysearch in list:
            if(keysearch == key):
                return True
        return False
    
    def iteratingInterface(obj, dict, componentslist):
        if iskeythere(list(obj.keys()), "baseComponent"):
            for searchobj in componentslist:
                if searchobj["uuid"] == obj["baseComponent"]:
                    wf.writeinterface(componentslist[searchobj["name"]], searchobj["name"])
    
    
    def getInterfaceparam(list1, interface, searched, jsoncomponents, searchinglist):
        #list1.append(searchlists(interface[searched], searchinglist))
        # if interface["name"] == "Gripper":
            # print("I am searching Gripper in: ")
            # print(searchinglist)
            # print([interface[searched]])
        for inputsearch in interface[searched]:
            for searchobj in searchinglist:
                # print(inputsearch["value"] + " " + searchobj["uuid"])
                if inputsearch["value"] == searchobj["uuid"]:
                    # print("found matching pair")
                    # print(inputsearch["value"] + " " + searchobj["uuid"])
                    list1.append(searchobj)
        if iskeythere(list(interface.keys()), "baseComponent"):
            for component in jsoncomponents:
                if component["uuid"] == interface["baseComponent"]:
                   return getInterfaceparam(list1, component, searched, jsoncomponents, searchinglist)
        else:
            return list1
    
    def buildComponents(interfacesreturn, jsoninterfaces, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone):
        for obj in jsoninterfaces:
            rootobj = {}
            if obj["elementType"] == "interface":
                for search in jsoncomponents:
                    if obj["rootComponent"] == search["uuid"]:
                        rootobj = search
            else:
                rootobj = obj
            if obj["name"] in list(interfacesreturn.keys()):
                continue
            objcomponentslist = getInterfaceparam([], rootobj, "components", jsoncomponents, jsoncomponents)
            if obj["name"] not in interfacesreturn.keys():
                interfacesreturn[str(obj["name"])] = wf.openobject(obj["name"], rootobj["description"])
                #print(obj["name"])
                #del obj["cardOpen"]
                #del obj["uuid"]
                    #rootObjname = obj["rootComponent"]
    
                # if iskeythere(list(obj.keys()), "baseComponent"):
                #     basecomponent = obj["baseComponent"]
                #     for search in jsoncomponents:
                #         #print(search["uuid"])
                #         if basecomponent == search["uuid"]:
                #             wf.writebasecomponent(componenttypesdict[str(obj["name"])], componenttypesdict[str(search["name"])], search["name"])
                #wf.writecomponents(interfacestypesdict[str(obj["name"])], objcomponentslist, interfacestypesdict)
                objmeasurementlist = getInterfaceparam([], rootobj, "measurements", jsoncomponents, jsonmeasurements)
                print(objmeasurementlist)
                measurementsobjs = getMeasurements(objmeasurementlist)
                #print(objmeasurementlist)
                # print(interfacesreturn)
                wf.writemeasurement(measurementsobjs, interfacesreturn[str(obj["name"])])
                # print("I am searching Parameters")
                objparameterlist = getInterfaceparam([], rootobj, "parameters", jsoncomponents, jsonparameters)
                parameterobjs = getParameters(objparameterlist)
                # print(objparameterlist)
                wf.writeparameter(parameterobjs, interfacesreturn[str(obj["name"])])
                buildComponents(interfacesreturn, objcomponentslist, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone)
        return interfacesreturn
        #     if len(objcomponentslist) > 0:
        #         print("I got called")
        #         objcomponentsclone = objcomponentslist
        #     if len(objcomponentslist) == 0:
        #         if len(objcomponentsclone) > 0:
        #             print(" I got called")
        #             print(objcomponentslist)
        #             return buildComponents(interfacesreturn, objcomponentsclone, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone)
        #     else:
        #             return buildComponents(interfacesreturn, objcomponentslist, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone)
        #
        # else:
        #     if len(objcomponentslist) > 0:
        #         return buildComponents(interfacesreturn, objcomponentslist, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone)
        #     else:
        #         return interfacesreturn
    
    
    
    
    def instantiateComponents(typesdict, interfaces, jsoncomponents, headnode, instantiatedict):
            count = 0
            countcomponent = 0
            for interface in interfaces:
                # root = {}
                # if interface["elementType"] == "interface":
                #     for component in jsoncomponents:
                #         if interface["rootComponent"] == component["uuid"]:
                #             root = component
                # else:
                root = interface
    
                components = getInterfaceparam([], root, "components", jsoncomponents, jsoncomponents)
                if interface["name"] in list(instantiatedict.keys()):
                    count += 1
                # dictname = str(interface["name"]) + str(count)
                # if interface["elementType"] == "interface":
                #     instantiatedict[str(dictname)] = wf.writeinterface(typesdict[str(interface["name"])], interface["name"], count)
                # else:
                instantiatedict[str(interface["name"])] = wf.writeobjects(typesdict[str(interface["name"])], interface["name"], instantiatedict[str(headnode["name"])], count)
                # for component in components:
                #     if component["name"] in list(instantiatedict.keys()):
                #         countcomponent += 1
                    #dictnamecomponent = str(component["name"]) + str(count)
                    # instantiatedict[dictnamecomponent] = wf.writeobjects(typesdict[str(component["name"])], component["name"], instantiatedict[dictname], countcomponent)
                if len(components) > 0:
                    instantiateComponents(typesdict,components , jsoncomponents, interface, instantiatedict)
    
            return instantiatedict
    
    def instantiate(typesdict, interfaces, jsoncomponents):
        instantiatedict = {}
        #print(interfaces)
        for interface in interfaces:
            root = {}
            count = 0
            for component in jsoncomponents:
                if interface["rootComponent"] == component["uuid"]:
                    root = component
            components = getInterfaceparam([], root, "components", jsoncomponents, jsoncomponents)
            #dictname = str(interface["name"]) + str(count)
            instantiatedict[str(interface["name"])] = wf.writeinterface(typesdict[str(interface["name"])], interface["name"], count)
            instantiateComponents(typesdict, components, jsoncomponents, interface, instantiatedict)
    
    
    
    def main():
        try:
            JSON_MODEL = sys.argv[1][:-5]
            if not os.path.exists(os.path.join('..', 'SOIL-OPC-UA', f'{JSON_MODEL}.json')):
                raise Exception()
        except Exception as e:
            print('You must provide a file from the folder to be used as root file of the parsing, e.g., "python jsonparser.py model.json".')
            exit()
    
        with open(f'{JSON_MODEL}.json') as file:
    
            wf.writebeginning()
            jsonobj = json.loads(file.read())
            elements = jsonobj["elements"]
            if len(elements) <= 0:
                exit()
            jsonmeasurements = []
            jsonparameters = []
            jsonfunctions = []
            jsoncomponents = []
            jsoninterfaces = []
            for obj in elements:
                #print(obj)
                if obj["elementType"] == "measurement":
                    jsonmeasurements.append(obj)
                elif obj["elementType"] == "parameter":
                    jsonparameters.append(obj)
                elif obj["elementType"] == "function":
                    jsonfunctions.append(obj)
                elif obj["elementType"] == "component":
                    jsoncomponents.append(obj)
                elif obj["elementType"] == "interface":
                    jsoninterfaces.append(obj)
    
    
            # interfacestypesdict = {}
            # for obj in jsoninterfaces:
            #     #print(obj["name"])
            #     #del obj["cardOpen"]
            #     #del obj["uuid"]
            #     rootobj = {}
            #     for search in jsoncomponents:
            #         if obj["rootComponent"] == search["uuid"]:
            #             rootobj = search
            #     objname = wf.openobject(obj["name"], rootobj["description"])
            #     interfacestypesdict[str(obj["name"])] = objname
            #     #rootObjname = obj["rootComponent"]
            #
            #     # if iskeythere(list(obj.keys()), "baseComponent"):
            #     #     basecomponent = obj["baseComponent"]
            #     #     for search in jsoncomponents:
            #     #         #print(search["uuid"])
            #     #         if basecomponent == search["uuid"]:
            #     #             wf.writebasecomponent(componenttypesdict[str(obj["name"])], componenttypesdict[str(search["name"])], search["name"])
            #     objcomponentslist = getInterfaceparam([], rootobj, "components", jsoncomponents, jsoncomponents)
            #     wf.writecomponents(interfacestypesdict[str(obj["name"])], objcomponentslist, interfacestypesdict)
            #     objmeasurementlist = getInterfaceparam([], rootobj, "measurements", jsoncomponents, jsonmeasurements)
            #     measurementsobjs = getMeasurements(objmeasurementlist)
            #     #print(objmeasurementlist)
            #     wf.writemeasurement(measurementsobjs, interfacestypesdict[str(obj["name"])])
            #     objparameterlist = getInterfaceparam([], rootobj, "parameters", jsoncomponents, jsonparameters)
            #     parameterobjs = getParameters(objparameterlist)
            #     wf.writemeasurement(parameterobjs, interfacestypesdict[str(obj["name"])])
    
            typesdict = buildComponents({}, jsoninterfaces, jsoncomponents, jsonmeasurements, jsonparameters, [])
            print(typesdict)
            intantiateddict = instantiate(typesdict, jsoninterfaces, jsoncomponents)
    
            wf.writend()
    
    
    
            # print("Measurements: ")
            # print(objmeasurementlist)
            # print("\n")
            # print("Components: ")
            # print(objcomponentslist)
            # print("\n")
            # print("Parameters: ")
            # print(objparameterlist)
            # print("\n")
            # print("Functions: ")
            # print(jsonfunctions)
    
    
    
    
    if __name__ == '__main__':
        main()