Skip to content
Snippets Groups Projects
Select Git revision
  • f189cd94fe8ec0b600f504c7157f1e8c36945c33
  • dev default
  • master
3 results

readme.md

Blame
  • writefile.py 25.15 KiB
    import os
    import tomli
    
    """
    This File takes the prepared Data structures and writes the OPC-UA Server based on them.
        Methods: defFunction, getVariantType, writebeginning, writeenums, openobject, writeenumobj, writemeasurement, writeparameter,
                 writefunction, writeobjects, writeinterface, writeevents, writestreams, writendpt1, triggerevents, triggerstreams,
                 writendpt2
    """
    
    
    def defFunction(functionname, file, indent, inargs, function):
        #return values are not written in the function at the moment
        """
        writes the body for each function, it leaves the body empty and gives a description for the method which should be implemented
            arguments:
                - functionname(name of the function changed to a lowercase String without blanks),
                - file(file it writes in)
                - indent(indent it writes with)
                - inargs(arguments of the function
                - function(the object)
        """
        writeindent = ""
        for i in range(indent):
            writeindent = writeindent + "    "
        file.write(writeindent + "async def " + functionname + "(parent")
        if inargs is not None:
            file.write(", ")
            for i in range (len(inargs)):
                if i < (len(inargs) - 1):
                    file.write(inargs[i] + ", ")
                else:
                    file.write(inargs[i])
        file.write("):\n")
        writeindent = writeindent + "    "
        file.write(writeindent + "#ToDo: Write a Function that fits the decription: \"" + str(function.description) + "\"\n")
        file.write(writeindent + "    # Use \"await\" before changing values and cast to Ua Varianttype\n")
        file.write(writeindent + "    # Use get and set_value to change the value at a given nodeid e.g: await testvar.set_value(ua.Variant(position))\n")
        file.write(writeindent + "pass\n")
    
    
    def getVariantType(datatype):
        """
        returns the ua.VariantType according to the datatype of a variable.
            arguments:
                - the datatype
            returns:
                - ua.VarianType.<datatype> as String
                    (float and double get casted to ua.VariantType.Double and all forms of int get casted to ua.VariantType.Int64)
        """
        varianttype = "ua.VariantType.Int64"
        if datatype == "float" or datatype == "double":
            varianttype = "ua.VariantType.Double"
        elif datatype == "Boolean":
            varianttype = "ua.VariantType.Boolean"
        elif datatype == "time":
            varianttype = "ua.VariantType.DateTime"
        elif datatype == "string":
            varianttype = "ua.VariantType.String"
        elif datatype == "enum":
            varianttype = "ua.VariantType.String"
        return varianttype
    
    
    with open("config.toml", "rb") as t:
        """
        opens toml config file to access its data
        """
        toml_dict = tomli.load(t)
    
    #sets opcua-server address and namespace according to config file 
    server_endpoint = toml_dict['server']
    namespace = toml_dict['namespace']
    
    
    def writebeginning():
        """
        delets the existing opcua-server.py and generates a new one. This new file is started with all Imports and the needed set up
        is written. The objecttypes measurement and parameter are defined here too
        arguments: None
        returns: None
        """
        if os.path.exists("opcua-server.py"):
            os.remove("opcua-server.py")
        server = open("opcua-server.py", "a")
        server.write("""import logging 
    import asyncio
    
    from asyncua import ua, Server
    from asyncua.common.methods import uamethod
    from asyncua.common.structures104 import new_struct, new_enum, new_struct_field
    from asyncua.common import node
    
    logging.basicConfig(level=logging.INFO)
    _logger = logging.getLogger('asyncua')
    """)
    
        server.write("\nasync def main():")
        #äinten = "\tab"
        server.write(f"""
        # setup our server
        server = Server()
        await server.init()
        server.set_endpoint('{server_endpoint}')
    
        # setup our own namespace, not really necessary but should as spec
        uri = '{namespace}'
        idx = await server.register_namespace(uri)
        #id of rangetype
        rangeid = await server.nodes.base_data_type.get_child(["0:Structure", "0:Range"])
        
        
        
        #creating dataypes for parameters and measurements
        measurementtype = await server.nodes.base_data_type.add_data_type(idx, "Measurement")
        parametertype = await server.nodes.base_data_type.add_data_type(idx, "Parameter")
    """)
    
    
    def writeenums(enumslist):
        """
        Creates the needed enums in the beginning of the server. And creates three dicts needed for Information.
        :param enumslist:
        :return enumnodenames: Dict with format: {uuid of enum: opc ua object node of the enum}
        :return returnvalues: Dict with format: {uuid of enum: valuelist with
                                added numbers to values showing their corresponding int Value}
        :return enumvariantnames: Dict with format: {uuid of enum: official name of the enum}
                                    Dict helps so that you don't need to search the fitting enum later on.
        """
        server = open("opcua-server.py", "a")
        writeindent = "    "
        returnvalues = {}
        enumnodenames = {}
        variantnames = {}
        for enum in enumslist:
            returnvallist = []
            enumname = (enum["name"].lower()).replace(" ", "")
            values = enum["values"]
            for i in range(len(values)):
                retval = str(i) + ":" + str(values[i])
                returnvallist.append(retval)
            server.write(writeindent + enumname + "= await new_enum(server, idx, \""+ str(enum["name"]) + "\", values=" + str(enum["values"]) + ")\n")
            enumnodenames[enum["uuid"]] = enumname
            returnvalues[enum["uuid"]] = returnvallist
            variantnames[enum["uuid"]] = enum["name"]
        return enumnodenames, returnvalues, variantnames
    
    
    def openobject(objname, description):
        #ToDO:description adden
        """
        Creates the needed objecttype
        :param objname: Name of object
        :param description:Description of the object
        :return objtypename: the name of the new node
        """
        server = open("opcua-server.py", "a")
        writeindent = "    "
        server.write(writeindent + "#creating the objecttype\n")
        objtypename = (objname.lower()).replace(" ", "") + "type"
        server.write(writeindent + objtypename + "= await server.nodes.base_object_type.add_object_type(idx, \"" + objname +"\" )\n")
        return objtypename
    
    
    def writeenumobj(variable, headnode, enumnodedict, enumvaldict, enumvariantdict):
        """
        Creates the variables with the type enum.
        :param variable: the variableobject that is created
        :param headnode: the component to which the variable belongs
        :param enumnodedict: Dict of the format {enum uuid: enum opc ua nodename}
        :param enumvaldict: Dict of the format {enum uuid: changed values for the propertie names}
                                (same as returnvalues from writeenums)
        :param enumvariantdict: Dict of the format {enum uuid: enum name}
        :return: name of the created variablenode
        """
        server = open("opcua-server.py", "a")
        writeindent = "    "
        uuid = variable.uuid
        varname = (variable.name.lower()).replace(" ","") + "var"
        server.write(writeindent + varname + " = await " + headnode + ".add_variable(idx,  \"" + variable.name + "\", " + variable.value + ", datatype=" + enumnodedict[variable.uuid] + ".nodeid)\n")
        server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
        i = 0
        for value in enumvaldict[variable.uuid]:
            valname = "value" + str(i)
            server.write(writeindent + valname + " = await " + varname + ".add_property(idx, \"" + str(value) + "\", \"" + str(value) + "\")\n")
            server.write(writeindent + "await " + valname + "." + "set_modelling_rule(True)\n")
            i += 1
        return varname
    
    
    def writemeasurement(measurementobj, headnode, enumnodedict, enumvaldict, enumvariantdict):
        #ToDo: Schauen ob man returns rausnehmen kann
        """
        Creates the measurement variables with their properties
        :param measurementobj: a list of measurementobjects
        :param headnode: the component to which the varianles belong
        :param enumnodedict: Dict for the names of the enum nodes from writeenums (used for writeenumobj)
        :param enumvaldict: Dict for the enumvalues from writenums (used for writeenumobj)
        :param enumvariantdict: Dict for the name of the enums (used for writeenumobj)
        :return: a dict for the measurement node names
        """
        server = open("opcua-server.py", "a")
        writeindent = "    "
        measurementdict = {}
        for i in measurementobj:
            if i.datatype == "enum":
                varname = writeenumobj(i, headnode, enumnodedict, enumvaldict, enumvariantdict)
                measurementdict[i.name] = varname
            else:
                varianttype = getVariantType(i.datatype)
                datatype = "measurementtype.nodeid"
                varname = ((i.name).lower()).replace(" ", "") + "var"
                measurementdict[i.name] = varname
                server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
                    i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
                server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
                if i.range is not None and i.range != [None, None]:
                    rangename = "rangeprop"
                    server.write(writeindent + rangename + " = await " + varname + ".add_property(idx, \"range\", ua.Range(" + str(i.range[0]) + ", " + str(i.range[1]) + "))\n")
                    server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
                if i.unit != "UNITLESS":
                    unitname = "unitprop"
                    server.write(
                        writeindent + unitname + " = await " + varname + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
                    server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
    
            server.write(
                writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
            server.write(writeindent + "await "+ varname + "description.set_modelling_rule(True)\n\n")
    
        server.write("\n" + writeindent + "#creating all parameters with their properties\n")
        return measurementdict
    
    
    def writeparameter(parameterobj, headnode, enumnodedict, enumvaldict, enumvariantdict):
        """
        Creates the parameter values. (Similar to writemeasurement)
        :param parameterobj: A list of the objparameters
        :param headnode: The component to which the variables belong
        :param enumnodedict: Dict for the names of the enum nodes from writeenums (used for writeenumobj)
        :param enumvaldict: Dict for the enumvalues from writenums (used for writeenumobj)
        :param enumvariantdict: Dict for the name of the enums (used for writeenumobj)
        :return: A dict with the node names of the parameters
        """
        writeindent = "    "
        server = open("opcua-server.py", "a")
        parameterdict = {}
        for i in parameterobj:
            if i.datatype == "enum":
                varname = writeenumobj(i, headnode, enumnodedict, enumvaldict, enumvariantdict)
                parameterdict[i.name] = varname
            else:
                varianttype = getVariantType(i.datatype)
                datatype = "parametertype.nodeid"
                varname = ((i.name).lower()).replace(" ", "") + "var"
                parameterdict[i.name] = varname
                server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
                    i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
                server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
                if i.range is not None and i.range != [None, None]:
                    rangename = "rangeprop"
                    server.write(
                        writeindent + rangename + " = await " + varname + ".add_property(idx, \"range\", ua.Range(" + str(i.range[0]) + ", " + str(i.range[1]) + "))\n")
                    server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
                if i.unit != "UNITLESS":
                    unitname = "unitprop"
                    server.write(
                        writeindent + unitname + " = await " + varname + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
                    server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
    
            server.write(
                writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
            server.write(writeindent + "await " + varname + "description.set_modelling_rule(True)\n\n")
        return parameterdict
    
    
    def writefunction(functionobj, headnode):
        """
        Takes the function objects writes the function body by calling deffunction and adds it to the objecttypes
        :param functionobj: List of functionobjects
        :param headnode: component the functions belong to
        :return: None
        """
        server = open("opcua-server.py", "a")
        writeindent = "    "
        for i in functionobj:
            functionname = (i.name.lower()).replace(" ", "")
            defFunction(functionname, server, 1, i.inargsnames, i)
            server.write("\n")
            inargslist = []
            if i.inargsdatatypes != None:
                for j in i.inargsdatatypes:
                    inargslist.append(getVariantType(j))
            outargslist = []
            if i.outargsdatatypes != None:
                for j in i.outargsdataypes:
                    inargslist.append(getVariantType(j))
            methodvarname = functionname + "var"
            server.write(writeindent + methodvarname + " = await " + headnode + ".add_method(idx, \"" + i.name +"\", " + functionname + ", " + str(inargslist) + ", "+ str(outargslist) + ")\n\n")
            server.write(writeindent + "await " + methodvarname + ".set_modelling_rule(True)\n\n")
    
    
    def writeobjects(objtype, objname, headnode, count):
        #ToDo: Implement count or cut it out
        """
        Generates the objects of the components.
        :param objtype: The node of the node with the objecttype
        :param objname: The name of the object
        :param headnode: The node the component belongs to.
        :param count: (Currently not really implemented) but for counting the names up
                        if an object with the same name is instantiated twice.
        :return: The name of the newly created objectnode
        """
        server = open("opcua-server.py", "a")
        writeindent = "    "
        objtypename = (objname.lower()).replace(" ", "") + "var" + str(count)
        server.write(
            writeindent + objtypename + "= await " + headnode + ".add_object(idx, \"" + objname + "\", " + objtype + ".nodeid)\n")
        return objtypename
    
    
    def writeinterface(objtype, objname, count):
        """
        Generates the interface, meaning the highest object layer. (Works similar to writeobjects)
        :param objtype: The name of the node with the objecttype
        :param objname: The name of the object
        :param count: (Currently not really implemented) but for counting the names up
                        if an object with the same name is instantiated twice.
        :return: The name of the newly created objectnode
        """
        server = open("opcua-server.py", "a")
        writeindent = "    "
        objtypename = (objname.lower()).replace(" ","") + "var" + str(count)
        server.write(writeindent + objtypename + "= await server.nodes.objects.add_object(idx, \"" + objname + "\", " + objtype + ".nodeid)\n")
        return objtypename
    
    
    def writeevents(events, component, eventlist):
        """
        Generates the events and event generator for each event and adds the fitting severity.
        :param events: List of eventdicts
        :param component: The component the events belong to
        :param eventlist: Empty dict (filled with the names of the eventgenerator nodes)
        :return: The eventlist now filled in the format {event name(generated in the code) : eventgenerator node name}
        """
        server = open("opcua-server.py", "a")
        writeindent = "    "
        for event in events:
            eventname = (event["element"].lower()).replace(" ", "") + component + str(event["severity"])
            eventgenname = eventname + "generator"
            eventlist[eventname] = eventgenname
            severity = 0
            if event["severity"] == "debug":
                severity = 200
            elif event["severity"] == "info":
                severity = 400
            elif event["severity"] == "warning":
                severity = 600
            elif event["severity"] == "error":
                severity = 800
            elif event["severity"] == "critical":
                severity = 1000
            server.write(writeindent + eventname + " = await server.create_custom_event_type(idx, \"" + str(event["element"]) + " " + str(event["severity"]) + "\", ua.ObjectIds.SystemEventType)\n")
            server.write(writeindent + str(eventlist[eventname]) + " = await server.get_event_generator(" + eventname + ", " + component + ")\n")
            server.write(writeindent + str(eventlist[eventname]) + ".event.Severity = " + str(severity) + "\n")
        return eventlist
    
    
    def writestreams(streams, component, streamslist, variablelist, jsonmeasurements, jsonparameters):
        """
        Generates the streams as events withe event generator, uses standard severity 1.
        :param streams: List of streamdicts
        :param component: Component the streams belong to.
        :param streamslist: Empty dict (filled with streamgenerator node names later on)
        :param variablelist: List of the variables belonging to the component in Format {name : value}
        :param jsonmeasurements: List of all measurement elements
        :param jsonparameters: List of all parameter elements
        :return streamlist: The streamslist now filled in the format
                                {streamname(generated in the code) : streamgenerator node name}
        :return updatestreamsdict: A dict in which the compare variable names for
                                    update streams are saved in format {streamname: variablename}
        """
        server = open("opcua-server.py", "a")
        writeindent = "    "
        updatestreamdict = {}
        for stream in streams:
            streamname = (stream["measurement"].lower()).replace(" ", "") + component
            streamgenname = streamname + "generator"
            streamslist[streamname] = streamgenname
            server.write(writeindent + streamname + " = await server.create_custom_event_type(idx, \"" + str(stream["measurement"]) + "\", ua.ObjectIds.SystemEventType)\n")
            server.write(writeindent + str(streamslist[streamname]) + " = await server.get_event_generator(" + streamname + ", " + component + ")\n")
    
            if stream["streamType"] == "update":
                value = 0
                for variable in variablelist:
                    if variable["name"] == stream["measurement"]:
                        for measurement in jsonmeasurements:
                            if measurement["uuid"] == variable["value"]:
                                value = measurement["value"]
                        for parameter in jsonparameters:
                            if parameter["uuid"] == variable["value"]:
                                value = parameter["value"]
    
                savevaluename = "save" + streamname
                writevalue = ""
                if type(value) == str:
                    writevalue = "\"" + value + "\""
                else:
                    writevalue = str(value)
                server.write(writeindent + savevaluename + " = " + writevalue + "\n")
                updatestreamdict[streamname] = savevaluename
    
        return streamslist, updatestreamdict
    
    
    def writendpt1():
        """
        Writes the beginning of the while loop which starts the server.
        :return: None
        """
        server = open("opcua-server.py", "a")
        server.write("""
        async with server:
            print("Server is running!")
            count = 0
            
            while True:\n""")
    
    
    def triggerevents(generatordict, eventslist, component, variablelist, jsonmeasurements, jsonparameters):
        """
        Writes the if cases in which the events get triggered.
        :param generatordict: Dict with the format
                                {eventname (generated according to format from writeevents) : eventgeneratorname}
        :param eventslist: List of eventdicts
        :param component: Component the events belong to
        :param variablelist: List of all variabledicts from component in the format {name : value}
        :param jsonmeasurements: List of all measurements
        :param jsonparameters: List of all parameters
        :return: None
        """
        server = open("opcua-server.py", "a")
        indent = 3
        writeindent = ""
        variablenamelist = {}
        for event in eventslist:
            writeindent = ""
            for i in range(indent):
                writeindent += "    "
            valuename = event["element"] + component
            eventsearchname = ""
    
            if event["element"] in list(variablenamelist.keys()):
                eventsearchname = variablenamelist[str(event["element"])]
            else:
                for variable in variablelist:
                    if variable["name"] == event["element"]:
                        for measurement in jsonmeasurements:
                            if variable["value"] == measurement["uuid"]:
                                eventsearchname = measurement["name"]
                        for parameter in jsonparameters:
                            if variable["value"] == parameter["uuid"]:
                                eventsearchname = parameter["name"]
                variablenamelist[str(event["element"])] = eventsearchname
                server.write(writeindent + valuename + "= await " + component + ".get_child([\"2:" + eventsearchname + "\"])\n")
            dictname = (event["element"].lower()).replace(" ", "") + component + event["severity"]
            generatorname = generatordict[dictname]
    
            trigger = str(event["target"])
            if type(event["target"]) == str:
                trigger = "\"" + str(event["target"] + "\"")
            server.write(writeindent + "if (await " + valuename + ".get_value()) " + event["trigger"] + " " + trigger + ":\n")
            writeindent += "    "
            server.write(writeindent + "await " + generatorname + ".trigger(message=\"" + event["message"] + "\")\n")
            writeindent = ""
            for i in range(indent):
                writeindent += "    "
    
    
    def triggerstreams(generatordict, streamslist, component, variablelist, jsonmeasurements, jsonparameters, savevaldict):
        """
        Generates the Streams according to their stream type. (Dynamic streams are still missing)
        :param generatordict: Dict with the format
                                {streamname (generated according to format from writestreams) : streamgeneratorname}
        :param streamslist: List of streamdicts
        :param component: Component the streams belong to
        :param variablelist: List of all variabledicts from the component in the format {name: value}
        :param jsonmeasurements: List of all measurements
        :param jsonparameters: List of all parameters
        :param savevaldict: Dict with the name of the compare values for update streams.
                                Format {streamname : comparevaluename}
        :return: None
        """
        server = open("opcua-server.py", "a")
        indent = 3
        for stream in streamslist:
            writeindent = ""
            streamname = ""
            valuename = "stream" + ((stream["measurement"]).lower()).replace(" ", "")
            for i in range(indent):
                writeindent += "    "
            for variable in variablelist:
                if variable["name"] == stream["measurement"]:
                    for measurement in jsonmeasurements:
                        if measurement["uuid"] == variable["value"]:
                            streamname = measurement["name"]
                    for parameter in jsonparameters:
                        if parameter["uuid"] == variable["value"]:
                            streamname = parameter["name"]
            server.write(writeindent + valuename + " = await " + component + ".get_child([\"2:" + streamname + "\"])\n")
            generatorname = generatordict[(stream["measurement"].lower()).replace(" ", "") + component]
            if stream["streamType"] == "update":
                server.write(writeindent + "if (await " + valuename + ".get_value()) != " + savevaldict[(stream["measurement"].lower()).replace(" ", "") + component] + ":\n")
                writeindent += "    "
                server.write(writeindent + "puffer = await " + valuename + ".get_value()\n")
                server.write(writeindent + "await " + generatorname + ".trigger(message= \"value\" + str(puffer))\n")
                server.write(writeindent + savevaldict[(stream["measurement"].lower()).replace(" ", "") + component] + " = puffer \n")
                writeindent = ""
                for i in range(indent):
                    writeindent += "    "
    
            if stream["streamType"] == "fixed":
                server.write(writeindent + stream["measurement"] + "time = count % " + str(stream["intervalValue"]) + "\n")
                server.write(writeindent + "if "+ stream["measurement"] + "time == 0:\n")
                writeindent += "    "
                server.write(writeindent + "puffer = await " + valuename + ".get_value()\n")
                server.write(writeindent + "await " + generatorname + ".trigger(message=\"" + stream["measurement"] + " value: \" + str(puffer))\n")
                writeindent = ""
                for i in range(indent):
                    writeindent += "    "
    
    
    def writendpt2():
        """
        Writes the end of the server with the await asyncio.sleep(1) and the time counter.
        :return: None
        """
        server = open("opcua-server.py", "a")
        server.write("""
                await asyncio.sleep(1)
                count += 1
    
    if __name__ == '__main__':
        asyncio.run(main())""")