Select Git revision
-
Thomas Pätzold authoredThomas Pätzold authored
writefile.py 25.15 KiB
import os
import tomli
"""
This File takes the prepared Data structures and writes the OPC-UA Server based on them.
Methods: defFunction, getVariantType, writebeginning, writeenums, openobject, writeenumobj, writemeasurement, writeparameter,
writefunction, writeobjects, writeinterface, writeevents, writestreams, writendpt1, triggerevents, triggerstreams,
writendpt2
"""
def defFunction(functionname, file, indent, inargs, function):
#return values are not written in the function at the moment
"""
writes the body for each function, it leaves the body empty and gives a description for the method which should be implemented
arguments:
- functionname(name of the function changed to a lowercase String without blanks),
- file(file it writes in)
- indent(indent it writes with)
- inargs(arguments of the function
- function(the object)
"""
writeindent = ""
for i in range(indent):
writeindent = writeindent + " "
file.write(writeindent + "async def " + functionname + "(parent")
if inargs is not None:
file.write(", ")
for i in range (len(inargs)):
if i < (len(inargs) - 1):
file.write(inargs[i] + ", ")
else:
file.write(inargs[i])
file.write("):\n")
writeindent = writeindent + " "
file.write(writeindent + "#ToDo: Write a Function that fits the decription: \"" + str(function.description) + "\"\n")
file.write(writeindent + " # Use \"await\" before changing values and cast to Ua Varianttype\n")
file.write(writeindent + " # Use get and set_value to change the value at a given nodeid e.g: await testvar.set_value(ua.Variant(position))\n")
file.write(writeindent + "pass\n")
def getVariantType(datatype):
"""
returns the ua.VariantType according to the datatype of a variable.
arguments:
- the datatype
returns:
- ua.VarianType.<datatype> as String
(float and double get casted to ua.VariantType.Double and all forms of int get casted to ua.VariantType.Int64)
"""
varianttype = "ua.VariantType.Int64"
if datatype == "float" or datatype == "double":
varianttype = "ua.VariantType.Double"
elif datatype == "Boolean":
varianttype = "ua.VariantType.Boolean"
elif datatype == "time":
varianttype = "ua.VariantType.DateTime"
elif datatype == "string":
varianttype = "ua.VariantType.String"
elif datatype == "enum":
varianttype = "ua.VariantType.String"
return varianttype
with open("config.toml", "rb") as t:
"""
opens toml config file to access its data
"""
toml_dict = tomli.load(t)
#sets opcua-server address and namespace according to config file
server_endpoint = toml_dict['server']
namespace = toml_dict['namespace']
def writebeginning():
"""
delets the existing opcua-server.py and generates a new one. This new file is started with all Imports and the needed set up
is written. The objecttypes measurement and parameter are defined here too
arguments: None
returns: None
"""
if os.path.exists("opcua-server.py"):
os.remove("opcua-server.py")
server = open("opcua-server.py", "a")
server.write("""import logging
import asyncio
from asyncua import ua, Server
from asyncua.common.methods import uamethod
from asyncua.common.structures104 import new_struct, new_enum, new_struct_field
from asyncua.common import node
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
""")
server.write("\nasync def main():")
#äinten = "\tab"
server.write(f"""
# setup our server
server = Server()
await server.init()
server.set_endpoint('{server_endpoint}')
# setup our own namespace, not really necessary but should as spec
uri = '{namespace}'
idx = await server.register_namespace(uri)
#id of rangetype
rangeid = await server.nodes.base_data_type.get_child(["0:Structure", "0:Range"])
#creating dataypes for parameters and measurements
measurementtype = await server.nodes.base_data_type.add_data_type(idx, "Measurement")
parametertype = await server.nodes.base_data_type.add_data_type(idx, "Parameter")
""")
def writeenums(enumslist):
"""
Creates the needed enums in the beginning of the server. And creates three dicts needed for Information.
:param enumslist:
:return enumnodenames: Dict with format: {uuid of enum: opc ua object node of the enum}
:return returnvalues: Dict with format: {uuid of enum: valuelist with
added numbers to values showing their corresponding int Value}
:return enumvariantnames: Dict with format: {uuid of enum: official name of the enum}
Dict helps so that you don't need to search the fitting enum later on.
"""
server = open("opcua-server.py", "a")
writeindent = " "
returnvalues = {}
enumnodenames = {}
variantnames = {}
for enum in enumslist:
returnvallist = []
enumname = (enum["name"].lower()).replace(" ", "")
values = enum["values"]
for i in range(len(values)):
retval = str(i) + ":" + str(values[i])
returnvallist.append(retval)
server.write(writeindent + enumname + "= await new_enum(server, idx, \""+ str(enum["name"]) + "\", values=" + str(enum["values"]) + ")\n")
enumnodenames[enum["uuid"]] = enumname
returnvalues[enum["uuid"]] = returnvallist
variantnames[enum["uuid"]] = enum["name"]
return enumnodenames, returnvalues, variantnames
def openobject(objname, description):
#ToDO:description adden
"""
Creates the needed objecttype
:param objname: Name of object
:param description:Description of the object
:return objtypename: the name of the new node
"""
server = open("opcua-server.py", "a")
writeindent = " "
server.write(writeindent + "#creating the objecttype\n")
objtypename = (objname.lower()).replace(" ", "") + "type"
server.write(writeindent + objtypename + "= await server.nodes.base_object_type.add_object_type(idx, \"" + objname +"\" )\n")
return objtypename
def writeenumobj(variable, headnode, enumnodedict, enumvaldict, enumvariantdict):
"""
Creates the variables with the type enum.
:param variable: the variableobject that is created
:param headnode: the component to which the variable belongs
:param enumnodedict: Dict of the format {enum uuid: enum opc ua nodename}
:param enumvaldict: Dict of the format {enum uuid: changed values for the propertie names}
(same as returnvalues from writeenums)
:param enumvariantdict: Dict of the format {enum uuid: enum name}
:return: name of the created variablenode
"""
server = open("opcua-server.py", "a")
writeindent = " "
uuid = variable.uuid
varname = (variable.name.lower()).replace(" ","") + "var"
server.write(writeindent + varname + " = await " + headnode + ".add_variable(idx, \"" + variable.name + "\", " + variable.value + ", datatype=" + enumnodedict[variable.uuid] + ".nodeid)\n")
server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
i = 0
for value in enumvaldict[variable.uuid]:
valname = "value" + str(i)
server.write(writeindent + valname + " = await " + varname + ".add_property(idx, \"" + str(value) + "\", \"" + str(value) + "\")\n")
server.write(writeindent + "await " + valname + "." + "set_modelling_rule(True)\n")
i += 1
return varname
def writemeasurement(measurementobj, headnode, enumnodedict, enumvaldict, enumvariantdict):
#ToDo: Schauen ob man returns rausnehmen kann
"""
Creates the measurement variables with their properties
:param measurementobj: a list of measurementobjects
:param headnode: the component to which the varianles belong
:param enumnodedict: Dict for the names of the enum nodes from writeenums (used for writeenumobj)
:param enumvaldict: Dict for the enumvalues from writenums (used for writeenumobj)
:param enumvariantdict: Dict for the name of the enums (used for writeenumobj)
:return: a dict for the measurement node names
"""
server = open("opcua-server.py", "a")
writeindent = " "
measurementdict = {}
for i in measurementobj:
if i.datatype == "enum":
varname = writeenumobj(i, headnode, enumnodedict, enumvaldict, enumvariantdict)
measurementdict[i.name] = varname
else:
varianttype = getVariantType(i.datatype)
datatype = "measurementtype.nodeid"
varname = ((i.name).lower()).replace(" ", "") + "var"
measurementdict[i.name] = varname
server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
if i.range is not None and i.range != [None, None]:
rangename = "rangeprop"
server.write(writeindent + rangename + " = await " + varname + ".add_property(idx, \"range\", ua.Range(" + str(i.range[0]) + ", " + str(i.range[1]) + "))\n")
server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
if i.unit != "UNITLESS":
unitname = "unitprop"
server.write(
writeindent + unitname + " = await " + varname + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
server.write(
writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
server.write(writeindent + "await "+ varname + "description.set_modelling_rule(True)\n\n")
server.write("\n" + writeindent + "#creating all parameters with their properties\n")
return measurementdict
def writeparameter(parameterobj, headnode, enumnodedict, enumvaldict, enumvariantdict):
"""
Creates the parameter values. (Similar to writemeasurement)
:param parameterobj: A list of the objparameters
:param headnode: The component to which the variables belong
:param enumnodedict: Dict for the names of the enum nodes from writeenums (used for writeenumobj)
:param enumvaldict: Dict for the enumvalues from writenums (used for writeenumobj)
:param enumvariantdict: Dict for the name of the enums (used for writeenumobj)
:return: A dict with the node names of the parameters
"""
writeindent = " "
server = open("opcua-server.py", "a")
parameterdict = {}
for i in parameterobj:
if i.datatype == "enum":
varname = writeenumobj(i, headnode, enumnodedict, enumvaldict, enumvariantdict)
parameterdict[i.name] = varname
else:
varianttype = getVariantType(i.datatype)
datatype = "parametertype.nodeid"
varname = ((i.name).lower()).replace(" ", "") + "var"
parameterdict[i.name] = varname
server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
if i.range is not None and i.range != [None, None]:
rangename = "rangeprop"
server.write(
writeindent + rangename + " = await " + varname + ".add_property(idx, \"range\", ua.Range(" + str(i.range[0]) + ", " + str(i.range[1]) + "))\n")
server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
if i.unit != "UNITLESS":
unitname = "unitprop"
server.write(
writeindent + unitname + " = await " + varname + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
server.write(
writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
server.write(writeindent + "await " + varname + "description.set_modelling_rule(True)\n\n")
return parameterdict
def writefunction(functionobj, headnode):
"""
Takes the function objects writes the function body by calling deffunction and adds it to the objecttypes
:param functionobj: List of functionobjects
:param headnode: component the functions belong to
:return: None
"""
server = open("opcua-server.py", "a")
writeindent = " "
for i in functionobj:
functionname = (i.name.lower()).replace(" ", "")
defFunction(functionname, server, 1, i.inargsnames, i)
server.write("\n")
inargslist = []
if i.inargsdatatypes != None:
for j in i.inargsdatatypes:
inargslist.append(getVariantType(j))
outargslist = []
if i.outargsdatatypes != None:
for j in i.outargsdataypes:
inargslist.append(getVariantType(j))
methodvarname = functionname + "var"
server.write(writeindent + methodvarname + " = await " + headnode + ".add_method(idx, \"" + i.name +"\", " + functionname + ", " + str(inargslist) + ", "+ str(outargslist) + ")\n\n")
server.write(writeindent + "await " + methodvarname + ".set_modelling_rule(True)\n\n")
def writeobjects(objtype, objname, headnode, count):
#ToDo: Implement count or cut it out
"""
Generates the objects of the components.
:param objtype: The node of the node with the objecttype
:param objname: The name of the object
:param headnode: The node the component belongs to.
:param count: (Currently not really implemented) but for counting the names up
if an object with the same name is instantiated twice.
:return: The name of the newly created objectnode
"""
server = open("opcua-server.py", "a")
writeindent = " "
objtypename = (objname.lower()).replace(" ", "") + "var" + str(count)
server.write(
writeindent + objtypename + "= await " + headnode + ".add_object(idx, \"" + objname + "\", " + objtype + ".nodeid)\n")
return objtypename
def writeinterface(objtype, objname, count):
"""
Generates the interface, meaning the highest object layer. (Works similar to writeobjects)
:param objtype: The name of the node with the objecttype
:param objname: The name of the object
:param count: (Currently not really implemented) but for counting the names up
if an object with the same name is instantiated twice.
:return: The name of the newly created objectnode
"""
server = open("opcua-server.py", "a")
writeindent = " "
objtypename = (objname.lower()).replace(" ","") + "var" + str(count)
server.write(writeindent + objtypename + "= await server.nodes.objects.add_object(idx, \"" + objname + "\", " + objtype + ".nodeid)\n")
return objtypename
def writeevents(events, component, eventlist):
"""
Generates the events and event generator for each event and adds the fitting severity.
:param events: List of eventdicts
:param component: The component the events belong to
:param eventlist: Empty dict (filled with the names of the eventgenerator nodes)
:return: The eventlist now filled in the format {event name(generated in the code) : eventgenerator node name}
"""
server = open("opcua-server.py", "a")
writeindent = " "
for event in events:
eventname = (event["element"].lower()).replace(" ", "") + component + str(event["severity"])
eventgenname = eventname + "generator"
eventlist[eventname] = eventgenname
severity = 0
if event["severity"] == "debug":
severity = 200
elif event["severity"] == "info":
severity = 400
elif event["severity"] == "warning":
severity = 600
elif event["severity"] == "error":
severity = 800
elif event["severity"] == "critical":
severity = 1000
server.write(writeindent + eventname + " = await server.create_custom_event_type(idx, \"" + str(event["element"]) + " " + str(event["severity"]) + "\", ua.ObjectIds.SystemEventType)\n")
server.write(writeindent + str(eventlist[eventname]) + " = await server.get_event_generator(" + eventname + ", " + component + ")\n")
server.write(writeindent + str(eventlist[eventname]) + ".event.Severity = " + str(severity) + "\n")
return eventlist
def writestreams(streams, component, streamslist, variablelist, jsonmeasurements, jsonparameters):
"""
Generates the streams as events withe event generator, uses standard severity 1.
:param streams: List of streamdicts
:param component: Component the streams belong to.
:param streamslist: Empty dict (filled with streamgenerator node names later on)
:param variablelist: List of the variables belonging to the component in Format {name : value}
:param jsonmeasurements: List of all measurement elements
:param jsonparameters: List of all parameter elements
:return streamlist: The streamslist now filled in the format
{streamname(generated in the code) : streamgenerator node name}
:return updatestreamsdict: A dict in which the compare variable names for
update streams are saved in format {streamname: variablename}
"""
server = open("opcua-server.py", "a")
writeindent = " "
updatestreamdict = {}
for stream in streams:
streamname = (stream["measurement"].lower()).replace(" ", "") + component
streamgenname = streamname + "generator"
streamslist[streamname] = streamgenname
server.write(writeindent + streamname + " = await server.create_custom_event_type(idx, \"" + str(stream["measurement"]) + "\", ua.ObjectIds.SystemEventType)\n")
server.write(writeindent + str(streamslist[streamname]) + " = await server.get_event_generator(" + streamname + ", " + component + ")\n")
if stream["streamType"] == "update":
value = 0
for variable in variablelist:
if variable["name"] == stream["measurement"]:
for measurement in jsonmeasurements:
if measurement["uuid"] == variable["value"]:
value = measurement["value"]
for parameter in jsonparameters:
if parameter["uuid"] == variable["value"]:
value = parameter["value"]
savevaluename = "save" + streamname
writevalue = ""
if type(value) == str:
writevalue = "\"" + value + "\""
else:
writevalue = str(value)
server.write(writeindent + savevaluename + " = " + writevalue + "\n")
updatestreamdict[streamname] = savevaluename
return streamslist, updatestreamdict
def writendpt1():
"""
Writes the beginning of the while loop which starts the server.
:return: None
"""
server = open("opcua-server.py", "a")
server.write("""
async with server:
print("Server is running!")
count = 0
while True:\n""")
def triggerevents(generatordict, eventslist, component, variablelist, jsonmeasurements, jsonparameters):
"""
Writes the if cases in which the events get triggered.
:param generatordict: Dict with the format
{eventname (generated according to format from writeevents) : eventgeneratorname}
:param eventslist: List of eventdicts
:param component: Component the events belong to
:param variablelist: List of all variabledicts from component in the format {name : value}
:param jsonmeasurements: List of all measurements
:param jsonparameters: List of all parameters
:return: None
"""
server = open("opcua-server.py", "a")
indent = 3
writeindent = ""
variablenamelist = {}
for event in eventslist:
writeindent = ""
for i in range(indent):
writeindent += " "
valuename = event["element"] + component
eventsearchname = ""
if event["element"] in list(variablenamelist.keys()):
eventsearchname = variablenamelist[str(event["element"])]
else:
for variable in variablelist:
if variable["name"] == event["element"]:
for measurement in jsonmeasurements:
if variable["value"] == measurement["uuid"]:
eventsearchname = measurement["name"]
for parameter in jsonparameters:
if variable["value"] == parameter["uuid"]:
eventsearchname = parameter["name"]
variablenamelist[str(event["element"])] = eventsearchname
server.write(writeindent + valuename + "= await " + component + ".get_child([\"2:" + eventsearchname + "\"])\n")
dictname = (event["element"].lower()).replace(" ", "") + component + event["severity"]
generatorname = generatordict[dictname]
trigger = str(event["target"])
if type(event["target"]) == str:
trigger = "\"" + str(event["target"] + "\"")
server.write(writeindent + "if (await " + valuename + ".get_value()) " + event["trigger"] + " " + trigger + ":\n")
writeindent += " "
server.write(writeindent + "await " + generatorname + ".trigger(message=\"" + event["message"] + "\")\n")
writeindent = ""
for i in range(indent):
writeindent += " "
def triggerstreams(generatordict, streamslist, component, variablelist, jsonmeasurements, jsonparameters, savevaldict):
"""
Generates the Streams according to their stream type. (Dynamic streams are still missing)
:param generatordict: Dict with the format
{streamname (generated according to format from writestreams) : streamgeneratorname}
:param streamslist: List of streamdicts
:param component: Component the streams belong to
:param variablelist: List of all variabledicts from the component in the format {name: value}
:param jsonmeasurements: List of all measurements
:param jsonparameters: List of all parameters
:param savevaldict: Dict with the name of the compare values for update streams.
Format {streamname : comparevaluename}
:return: None
"""
server = open("opcua-server.py", "a")
indent = 3
for stream in streamslist:
writeindent = ""
streamname = ""
valuename = "stream" + ((stream["measurement"]).lower()).replace(" ", "")
for i in range(indent):
writeindent += " "
for variable in variablelist:
if variable["name"] == stream["measurement"]:
for measurement in jsonmeasurements:
if measurement["uuid"] == variable["value"]:
streamname = measurement["name"]
for parameter in jsonparameters:
if parameter["uuid"] == variable["value"]:
streamname = parameter["name"]
server.write(writeindent + valuename + " = await " + component + ".get_child([\"2:" + streamname + "\"])\n")
generatorname = generatordict[(stream["measurement"].lower()).replace(" ", "") + component]
if stream["streamType"] == "update":
server.write(writeindent + "if (await " + valuename + ".get_value()) != " + savevaldict[(stream["measurement"].lower()).replace(" ", "") + component] + ":\n")
writeindent += " "
server.write(writeindent + "puffer = await " + valuename + ".get_value()\n")
server.write(writeindent + "await " + generatorname + ".trigger(message= \"value\" + str(puffer))\n")
server.write(writeindent + savevaldict[(stream["measurement"].lower()).replace(" ", "") + component] + " = puffer \n")
writeindent = ""
for i in range(indent):
writeindent += " "
if stream["streamType"] == "fixed":
server.write(writeindent + stream["measurement"] + "time = count % " + str(stream["intervalValue"]) + "\n")
server.write(writeindent + "if "+ stream["measurement"] + "time == 0:\n")
writeindent += " "
server.write(writeindent + "puffer = await " + valuename + ".get_value()\n")
server.write(writeindent + "await " + generatorname + ".trigger(message=\"" + stream["measurement"] + " value: \" + str(puffer))\n")
writeindent = ""
for i in range(indent):
writeindent += " "
def writendpt2():
"""
Writes the end of the server with the await asyncio.sleep(1) and the time counter.
:return: None
"""
server = open("opcua-server.py", "a")
server.write("""
await asyncio.sleep(1)
count += 1
if __name__ == '__main__':
asyncio.run(main())""")