Skip to content
Snippets Groups Projects
Select Git revision
  • 2f3b38f1c8975c070755ae6713dd35f254c37a74
  • 5.3 default protected
  • feature/animation-layering-system
  • feature/MH-audio2face
  • feature/add-new-animations
  • fix/investigate-sync-issues
  • WIP/MA_Dasbach_Bystander
  • thesis/MA_Kuehlem
  • deprecated/4.27
  • thesis/MA_drupp_city-guide
  • thesis/BA_pgriesser_handshake
  • thesis/BA_rschaefer
  • deprecated/4.26
  • thesis/MA_Ye
  • thesis/BA_tsittart
  • thesis/MA_amoedder_natural_gaze_sim
  • thesis/MA_tyroller_backchannels
  • thesis/BA_dvonk_influence-maps
  • thesis/MA_dkuznietsov_emotional-speech
  • fix/random
  • deprecated/4.22
21 results

README.md

Blame
  • To learn more about this project, read the wiki.
    writefile.py 16.13 KiB
    import jsonparse
    import os
    import tomli
    import json
    
    
    def defFunction(functionname, file, indent, inargs):
        writeindent = ""
        for i in range(indent):
            writeindent = writeindent + "    "
        file.write(writeindent + "async def " + functionname + "(parent")
        if inargs is not None:
            file.write(", ")
            for i in range (len(inargs)):
                if i < (len(inargs) - 1):
                    file.write(inargs[i] + ", ")
                else:
                    file.write(inargs[i])
        file.write("):\n")
        writeindent = writeindent + "    "
        file.write(writeindent + """#ToDo: Write an algorithm that fits the description of the function. 
            # Use "await" before changing values and cast to Ua Varianttype
            # Use get and set_value to change the value at a given nodeid e.g: await testvar.set_value(ua.Variant(position))
            pass
        """)
    
    def getVariantType(datatype):
        varianttype = "ua.VariantType.Int64"
        if datatype == "float" or datatype == "double":
            varianttype = "ua.VariantType.Double"
        elif datatype == "Boolean":
            varianttype = "ua.VariantType.Boolean"
        elif datatype == "time":
            varianttype = "ua.VariantType.DateTime"
        elif datatype == "string":
            varianttype = "ua.VariantType.String"
        elif datatype == "enum":
            varianttype = "ua.VariantType.String"
        return varianttype
    
    
    with open("config.toml", "rb") as t:
        toml_dict = tomli.load(t)
    
    server_endpoint = toml_dict['server']
    namespace = toml_dict['namespace']
    
    def writebeginning():
        if os.path.exists("opcua-server.py"):
            os.remove("opcua-server.py")
        server = open("opcua-server.py", "a")
        server.write("""import logging 
    import asyncio
    
    from asyncua import ua, Server
    from asyncua.common.methods import uamethod
    from asyncua.common.structures104 import new_struct, new_enum, new_struct_field
    from asyncua.common import node
    
    logging.basicConfig(level=logging.INFO)
    _logger = logging.getLogger('asyncua')
    """)
    
        server.write("\nasync def main():")
        #äinten = "\tab"
        server.write(f"""
        # setup our server
        server = Server()
        await server.init()
        server.set_endpoint('{server_endpoint}')
    
        # setup our own namespace, not really necessary but should as spec
        uri = '{namespace}'
        idx = await server.register_namespace(uri)
        #id of rangetype
        rangeid = await server.nodes.base_data_type.get_child(["0:Structure", "0:Range"])
        
        
        
        #creating dataypes for parameters and measurements
        measurementtype = await server.nodes.base_data_type.add_data_type(idx, "Measurement")
        parametertype = await server.nodes.base_data_type.add_data_type(idx, "Parameter")
    """)
    def openobject(objname, description):
        server = open("opcua-server.py", "a")
        writeindent = "    "
        server.write(writeindent + "#creating the objecttype\n")
        objtypename = (objname.lower()).replace(" ", "") + "type"
        server.write(writeindent + objtypename + "= await server.nodes.base_object_type.add_object_type(idx, \"" + objname +"\" )\n")
        return objtypename
    
    
    def writebasecomponent(headnode, objtype,name):
        server = open("opcua-server.py", "a")
        writeindent = "    "
        objname = objtype + "component"
        server.write(writeindent + objname + " = await " + headnode + ".add_object(idx, \""+ name + "\", " + objtype + ".nodeid)\n")
        server.write(writeindent + "await " + objname + "." + "set_modelling_rule(True)\n")
    
    
    def writecomponents(headnode, componentslist, typesdict):
        server = open("opcua-server.py", "a")
        writeindent = "    "
        for component in componentslist:
            objname = component["name"].lower() + "component"
            server.write(writeindent + objname + " = await " + headnode + ".add_object(idx, \"" + component["name"] + "\")\n")
            server.write(writeindent + "await " + objname + "." + "set_modelling_rule(True)\n")
    
    
    def writemeasurement(measurementobj, headnode):
        server = open("opcua-server.py", "a")
        writeindent = "    "
        measurementdict = {}
        for i in measurementobj:
            varianttype = getVariantType(i.datatype)
            datatype = "measurementtype.nodeid"
            varname = ((i.name).lower()).replace(" ", "") + "var"
            measurementdict[i.name] = varname
            server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
                i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
            server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
            if i.range is not None and i.range != [None, None]:
                rangename = "rangeprop"
                server.write(writeindent + rangename + " = await " + varname + ".add_property(idx, \"range\", ua.Range(" + str(i.range[0]) + ", " + str(i.range[1]) + "))\n")
                server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
            if i.unit != "UNITLESS":
                unitname = "unitprop"
                server.write(
                    writeindent + unitname + " = await " + varname + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
                server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
    
            server.write(
                writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
            server.write(writeindent + "await "+ varname + "description.set_modelling_rule(True)\n\n")
    
        server.write("\n" + writeindent + "#creating all parameters with their properties\n")
        return measurementdict
    
    
    def writeparameter(parameterobj, headnode):
        writeindent = "    "
        server = open("opcua-server.py", "a")
        parameterdict = {}
        for i in parameterobj:
            varianttype = getVariantType(i.datatype)
            datatype = "parametertype.nodeid"
            varname = ((i.name).lower()).replace(" ", "") + "var"
            parameterdict[i.name] = varname
            server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
                i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
            server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
            if i.range is not None and i.range != [None, None]:
                rangename = "rangeprop"
                server.write(
                    writeindent + rangename + " = await " + varname + ".add_property(idx, \"range\", ua.Range(" + str(i.range[0]) + ", " + str(i.range[1]) + "))\n")
                server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
            if i.unit != "UNITLESS":
                unitname = "unitprop"
                server.write(
                    writeindent + unitname + " = await " + varname + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
                server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
    
            server.write(
                writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
            server.write(writeindent + "await " + varname+ "description.set_modelling_rule(True)\n\n")
    
    
    def writefunction(functionobj, headnode):
        server = open("opcua-server.py", "a")
        writeindent = "    "
        for i in functionobj:
            functionname = (i.name.lower()).replace(" ", "")
            defFunction(functionname, server, 1, i.inargsnames)
            server.write("\n")
            inargslist = []
            if i.inargsdatatypes != None:
                for j in i.inargsdatatypes:
                    inargslist.append(getVariantType(j))
            outargslist = []
            if i.outargsdatatypes != None:
                for j in i.outargsdataypes:
                    inargslist.append(getVariantType(j))
            methodvarname = functionname + "var"
            server.write(writeindent + methodvarname + " = await " + headnode + ".add_method(idx, \"" + i.name +"\", " + functionname + ", " + str(inargslist) + ", "+ str(outargslist) + ")\n\n")
            server.write(writeindent + "await " + methodvarname + ".set_modelling_rule(True)\n\n")
    
    
    def writeobjects(objtype, objname, headnode, count):
        server = open("opcua-server.py", "a")
        writeindent = "    "
        objtypename = (objname.lower()).replace(" ", "") + "var" + str(count)
        server.write(
            writeindent + objtypename + "= await " + headnode + ".add_object(idx, \"" + objname + "\", " + objtype + ".nodeid)\n")
        return objtypename
    
    def writeinterface(objtype, objname, count):
        server = open("opcua-server.py", "a")
        writeindent = "    "
        objtypename = (objname.lower()).replace(" ","") + "var" + str(count)
        server.write(writeindent + objtypename + "= await server.nodes.objects.add_object(idx, \"" + objname + "\", " + objtype + ".nodeid)\n")
        return objtypename
    
    
    def writeevents(events, component, eventlist):
        server = open("opcua-server.py", "a")
        writeindent = "    "
        #eventelements = []
        for event in events:
            #eventelements.append(event["element"])
            eventname = (event["element"].lower()).replace(" ", "") + component + str(event["severity"])
            eventgenname = eventname + "generator"
            eventlist[eventname] = eventgenname
            severity = 0
            if event["severity"] == "debug":
                severity = 200
            elif event["severity"] == "info":
                severity = 400
            elif event["severity"] == "warning":
                severity = 600
            elif event["severity"] == "error":
                severity = 800
            elif event["severity"] == "critical":
                severity = 1000
            #print(eventname)
            server.write(writeindent + eventname + " = await server.create_custom_event_type(idx, \"" + str(event["element"]) + " " + str(event["severity"]) + "\", ua.ObjectIds.SystemEventType)\n")
            server.write(writeindent + str(eventlist[eventname]) + " = await server.get_event_generator(" + eventname + ", " + component + ")\n")
            server.write(writeindent + str(eventlist[eventname]) + ".event.Severity = " + str(severity) + "\n")
        return eventlist
    
    
    def writestreams(streams, component, streamslist, variablelist, jsonmeasurements, jsonparameters):
        server = open("opcua-server.py", "a")
        writeindent = "    "
        # eventelements = []
        updatestreamdict = {}
        for stream in streams:
            # eventelements.append(event["element"])
            streamname = (stream["measurement"].lower()).replace(" ", "") + component
            streamgenname = streamname + "generator"
            streamslist[streamname] = streamgenname
            server.write(writeindent + streamname + " = await server.create_custom_event_type(idx, \"" + str(stream["measurement"]) + "\", ua.ObjectIds.SystemEventType)\n")
            server.write(writeindent + str(streamslist[streamname]) + " = await server.get_event_generator(" + streamname + ", " + component + ")\n")
    
            if stream["streamType"] == "update":
                value = 0
                print(variablelist)
                for variable in variablelist:
                    print(variable)
                    if variable["name"] == stream["measurement"]:
                        for measurement in jsonmeasurements:
                            if measurement["uuid"] == variable["value"]:
                                value = measurement["value"]
                        for parameter in jsonparameters:
                            if parameter["uuid"] == variable["value"]:
                                value = parameter["value"]
    
                savevaluename = "save" + streamname
                writevalue = ""
                if type(value) == str:
                    writevalue = "\"" + value + "\""
                else:
                    writevalue = str(value)
                server.write(writeindent + savevaluename + " = " + writevalue + "\n")
                updatestreamdict[streamname] = savevaluename
    
        return streamslist, updatestreamdict
    
    
    def writendpt1():
        server = open("opcua-server.py", "a")
        server.write("""
        async with server:
            print("Server is running!")
            count = 0
            
            while True:\n""")
    
    
    def triggerevents(generatordict, eventslist, component, variablelist, jsonmeasurements, jsonparameters):
        server = open("opcua-server.py", "a")
        indent = 3
        writeindent = ""
        variablenamelist = {}
        for event in eventslist:
            writeindent = ""
            for i in range(indent):
                writeindent += "    "
            valuename = event["element"] + component
            eventsearchname = ""
    
            if event["element"] in list(variablenamelist.keys()):
                eventsearchname = variablenamelist[str(event["element"])]
            else:
                for variable in variablelist:
                    if variable["name"] == event["element"]:
                        for measurement in jsonmeasurements:
                            if variable["value"] == measurement["uuid"]:
                                eventsearchname = measurement["name"]
                        for parameter in jsonparameters:
                            if variable["value"] == parameter["uuid"]:
                                eventsearchname = parameter["name"]
                variablenamelist[str(event["element"])] = eventsearchname
                server.write(writeindent + valuename + "= await " + component + ".get_child([\"2:" + eventsearchname + "\"])\n")
            dictname = (event["element"].lower()).replace(" ", "") + component + event["severity"]
            generatorname = generatordict[dictname]
    
            trigger = str(event["target"])
            if type(event["target"]) == str:
                trigger = "\"" + str(event["target"] + "\"")
            server.write(writeindent + "if (await " + valuename + ".get_value()) " + event["trigger"] + " " + trigger + ":\n")
            writeindent += "    "
            server.write(writeindent + "await " + generatorname + ".trigger(message=\"" + event["message"] + "\")\n")
            writeindent = ""
            for i in range(indent):
                writeindent += "    "
    
    
    def triggerstreams(generatordict, streamslist, component, variablelist, jsonmeasurements, jsonparameters, savevaldict):
        server = open("opcua-server.py", "a")
        indent = 3
        for stream in streamslist:
            writeindent = ""
            streamname = ""
            valuename = "stream" + ((stream["measurement"]).lower()).replace(" ", "")
            for i in range(indent):
                writeindent += "    "
            for variable in variablelist:
                if variable["name"] == stream["measurement"]:
                    for measurement in jsonmeasurements:
                        if measurement["uuid"] == variable["value"]:
                            streamname = measurement["name"]
                    for parameter in jsonparameters:
                        if parameter["uuid"] == variable["value"]:
                            streamname = parameter["name"]
            server.write(writeindent + valuename + " = await " + component + ".get_child([\"2:" + streamname + "\"])\n")
            generatorname = generatordict[(stream["measurement"].lower()).replace(" ", "") + component]
            if stream["streamType"] == "update":
                server.write(writeindent + "if (await " + valuename + ".get_value()) != " + savevaldict[(stream["measurement"].lower()).replace(" ", "") + component] + ":\n")
                writeindent += "    "
                server.write(writeindent + "puffer = await " + valuename + ".get_value()\n")
                server.write(writeindent + "await " + generatorname + ".trigger(message= \"value\" + str(puffer))\n")
                server.write(writeindent + savevaldict[(stream["measurement"].lower()).replace(" ", "") + component] + " = puffer \n")
                writeindent = ""
                for i in range(indent):
                    writeindent += "    "
    
            if stream["streamType"] == "fixed":
                server.write(writeindent + stream["measurement"] + "time = count % " + str(stream["intervalValue"]) + "\n")
                server.write(writeindent + "if "+ stream["measurement"] + "time == 0:\n")
                writeindent += "    "
                server.write(writeindent + "puffer = await " + valuename + ".get_value()\n")
                server.write(writeindent + "await " + generatorname + ".trigger(message=\"" + stream["measurement"] + " value: \" + str(puffer))\n")
                writeindent = ""
                for i in range(indent):
                    writeindent += "    "
    
    
    def writendpt2():
        server = open("opcua-server.py", "a")
        server.write("""
                await asyncio.sleep(1)
                count += 1
    
    if __name__ == '__main__':
        asyncio.run(main())""")