Something went wrong on our end
Select Git revision
To learn more about this project, read the wiki.
writefile.py 16.13 KiB
import jsonparse
import os
import tomli
import json
def defFunction(functionname, file, indent, inargs):
writeindent = ""
for i in range(indent):
writeindent = writeindent + " "
file.write(writeindent + "async def " + functionname + "(parent")
if inargs is not None:
file.write(", ")
for i in range (len(inargs)):
if i < (len(inargs) - 1):
file.write(inargs[i] + ", ")
else:
file.write(inargs[i])
file.write("):\n")
writeindent = writeindent + " "
file.write(writeindent + """#ToDo: Write an algorithm that fits the description of the function.
# Use "await" before changing values and cast to Ua Varianttype
# Use get and set_value to change the value at a given nodeid e.g: await testvar.set_value(ua.Variant(position))
pass
""")
def getVariantType(datatype):
varianttype = "ua.VariantType.Int64"
if datatype == "float" or datatype == "double":
varianttype = "ua.VariantType.Double"
elif datatype == "Boolean":
varianttype = "ua.VariantType.Boolean"
elif datatype == "time":
varianttype = "ua.VariantType.DateTime"
elif datatype == "string":
varianttype = "ua.VariantType.String"
elif datatype == "enum":
varianttype = "ua.VariantType.String"
return varianttype
with open("config.toml", "rb") as t:
toml_dict = tomli.load(t)
server_endpoint = toml_dict['server']
namespace = toml_dict['namespace']
def writebeginning():
if os.path.exists("opcua-server.py"):
os.remove("opcua-server.py")
server = open("opcua-server.py", "a")
server.write("""import logging
import asyncio
from asyncua import ua, Server
from asyncua.common.methods import uamethod
from asyncua.common.structures104 import new_struct, new_enum, new_struct_field
from asyncua.common import node
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
""")
server.write("\nasync def main():")
#äinten = "\tab"
server.write(f"""
# setup our server
server = Server()
await server.init()
server.set_endpoint('{server_endpoint}')
# setup our own namespace, not really necessary but should as spec
uri = '{namespace}'
idx = await server.register_namespace(uri)
#id of rangetype
rangeid = await server.nodes.base_data_type.get_child(["0:Structure", "0:Range"])
#creating dataypes for parameters and measurements
measurementtype = await server.nodes.base_data_type.add_data_type(idx, "Measurement")
parametertype = await server.nodes.base_data_type.add_data_type(idx, "Parameter")
""")
def openobject(objname, description):
server = open("opcua-server.py", "a")
writeindent = " "
server.write(writeindent + "#creating the objecttype\n")
objtypename = (objname.lower()).replace(" ", "") + "type"
server.write(writeindent + objtypename + "= await server.nodes.base_object_type.add_object_type(idx, \"" + objname +"\" )\n")
return objtypename
def writebasecomponent(headnode, objtype,name):
server = open("opcua-server.py", "a")
writeindent = " "
objname = objtype + "component"
server.write(writeindent + objname + " = await " + headnode + ".add_object(idx, \""+ name + "\", " + objtype + ".nodeid)\n")
server.write(writeindent + "await " + objname + "." + "set_modelling_rule(True)\n")
def writecomponents(headnode, componentslist, typesdict):
server = open("opcua-server.py", "a")
writeindent = " "
for component in componentslist:
objname = component["name"].lower() + "component"
server.write(writeindent + objname + " = await " + headnode + ".add_object(idx, \"" + component["name"] + "\")\n")
server.write(writeindent + "await " + objname + "." + "set_modelling_rule(True)\n")
def writemeasurement(measurementobj, headnode):
server = open("opcua-server.py", "a")
writeindent = " "
measurementdict = {}
for i in measurementobj:
varianttype = getVariantType(i.datatype)
datatype = "measurementtype.nodeid"
varname = ((i.name).lower()).replace(" ", "") + "var"
measurementdict[i.name] = varname
server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
if i.range is not None and i.range != [None, None]:
rangename = "rangeprop"
server.write(writeindent + rangename + " = await " + varname + ".add_property(idx, \"range\", ua.Range(" + str(i.range[0]) + ", " + str(i.range[1]) + "))\n")
server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
if i.unit != "UNITLESS":
unitname = "unitprop"
server.write(
writeindent + unitname + " = await " + varname + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
server.write(
writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
server.write(writeindent + "await "+ varname + "description.set_modelling_rule(True)\n\n")
server.write("\n" + writeindent + "#creating all parameters with their properties\n")
return measurementdict
def writeparameter(parameterobj, headnode):
writeindent = " "
server = open("opcua-server.py", "a")
parameterdict = {}
for i in parameterobj:
varianttype = getVariantType(i.datatype)
datatype = "parametertype.nodeid"
varname = ((i.name).lower()).replace(" ", "") + "var"
parameterdict[i.name] = varname
server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
if i.range is not None and i.range != [None, None]:
rangename = "rangeprop"
server.write(
writeindent + rangename + " = await " + varname + ".add_property(idx, \"range\", ua.Range(" + str(i.range[0]) + ", " + str(i.range[1]) + "))\n")
server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
if i.unit != "UNITLESS":
unitname = "unitprop"
server.write(
writeindent + unitname + " = await " + varname + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
server.write(
writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
server.write(writeindent + "await " + varname+ "description.set_modelling_rule(True)\n\n")
def writefunction(functionobj, headnode):
server = open("opcua-server.py", "a")
writeindent = " "
for i in functionobj:
functionname = (i.name.lower()).replace(" ", "")
defFunction(functionname, server, 1, i.inargsnames)
server.write("\n")
inargslist = []
if i.inargsdatatypes != None:
for j in i.inargsdatatypes:
inargslist.append(getVariantType(j))
outargslist = []
if i.outargsdatatypes != None:
for j in i.outargsdataypes:
inargslist.append(getVariantType(j))
methodvarname = functionname + "var"
server.write(writeindent + methodvarname + " = await " + headnode + ".add_method(idx, \"" + i.name +"\", " + functionname + ", " + str(inargslist) + ", "+ str(outargslist) + ")\n\n")
server.write(writeindent + "await " + methodvarname + ".set_modelling_rule(True)\n\n")
def writeobjects(objtype, objname, headnode, count):
server = open("opcua-server.py", "a")
writeindent = " "
objtypename = (objname.lower()).replace(" ", "") + "var" + str(count)
server.write(
writeindent + objtypename + "= await " + headnode + ".add_object(idx, \"" + objname + "\", " + objtype + ".nodeid)\n")
return objtypename
def writeinterface(objtype, objname, count):
server = open("opcua-server.py", "a")
writeindent = " "
objtypename = (objname.lower()).replace(" ","") + "var" + str(count)
server.write(writeindent + objtypename + "= await server.nodes.objects.add_object(idx, \"" + objname + "\", " + objtype + ".nodeid)\n")
return objtypename
def writeevents(events, component, eventlist):
server = open("opcua-server.py", "a")
writeindent = " "
#eventelements = []
for event in events:
#eventelements.append(event["element"])
eventname = (event["element"].lower()).replace(" ", "") + component + str(event["severity"])
eventgenname = eventname + "generator"
eventlist[eventname] = eventgenname
severity = 0
if event["severity"] == "debug":
severity = 200
elif event["severity"] == "info":
severity = 400
elif event["severity"] == "warning":
severity = 600
elif event["severity"] == "error":
severity = 800
elif event["severity"] == "critical":
severity = 1000
#print(eventname)
server.write(writeindent + eventname + " = await server.create_custom_event_type(idx, \"" + str(event["element"]) + " " + str(event["severity"]) + "\", ua.ObjectIds.SystemEventType)\n")
server.write(writeindent + str(eventlist[eventname]) + " = await server.get_event_generator(" + eventname + ", " + component + ")\n")
server.write(writeindent + str(eventlist[eventname]) + ".event.Severity = " + str(severity) + "\n")
return eventlist
def writestreams(streams, component, streamslist, variablelist, jsonmeasurements, jsonparameters):
server = open("opcua-server.py", "a")
writeindent = " "
# eventelements = []
updatestreamdict = {}
for stream in streams:
# eventelements.append(event["element"])
streamname = (stream["measurement"].lower()).replace(" ", "") + component
streamgenname = streamname + "generator"
streamslist[streamname] = streamgenname
server.write(writeindent + streamname + " = await server.create_custom_event_type(idx, \"" + str(stream["measurement"]) + "\", ua.ObjectIds.SystemEventType)\n")
server.write(writeindent + str(streamslist[streamname]) + " = await server.get_event_generator(" + streamname + ", " + component + ")\n")
if stream["streamType"] == "update":
value = 0
print(variablelist)
for variable in variablelist:
print(variable)
if variable["name"] == stream["measurement"]:
for measurement in jsonmeasurements:
if measurement["uuid"] == variable["value"]:
value = measurement["value"]
for parameter in jsonparameters:
if parameter["uuid"] == variable["value"]:
value = parameter["value"]
savevaluename = "save" + streamname
writevalue = ""
if type(value) == str:
writevalue = "\"" + value + "\""
else:
writevalue = str(value)
server.write(writeindent + savevaluename + " = " + writevalue + "\n")
updatestreamdict[streamname] = savevaluename
return streamslist, updatestreamdict
def writendpt1():
server = open("opcua-server.py", "a")
server.write("""
async with server:
print("Server is running!")
count = 0
while True:\n""")
def triggerevents(generatordict, eventslist, component, variablelist, jsonmeasurements, jsonparameters):
server = open("opcua-server.py", "a")
indent = 3
writeindent = ""
variablenamelist = {}
for event in eventslist:
writeindent = ""
for i in range(indent):
writeindent += " "
valuename = event["element"] + component
eventsearchname = ""
if event["element"] in list(variablenamelist.keys()):
eventsearchname = variablenamelist[str(event["element"])]
else:
for variable in variablelist:
if variable["name"] == event["element"]:
for measurement in jsonmeasurements:
if variable["value"] == measurement["uuid"]:
eventsearchname = measurement["name"]
for parameter in jsonparameters:
if variable["value"] == parameter["uuid"]:
eventsearchname = parameter["name"]
variablenamelist[str(event["element"])] = eventsearchname
server.write(writeindent + valuename + "= await " + component + ".get_child([\"2:" + eventsearchname + "\"])\n")
dictname = (event["element"].lower()).replace(" ", "") + component + event["severity"]
generatorname = generatordict[dictname]
trigger = str(event["target"])
if type(event["target"]) == str:
trigger = "\"" + str(event["target"] + "\"")
server.write(writeindent + "if (await " + valuename + ".get_value()) " + event["trigger"] + " " + trigger + ":\n")
writeindent += " "
server.write(writeindent + "await " + generatorname + ".trigger(message=\"" + event["message"] + "\")\n")
writeindent = ""
for i in range(indent):
writeindent += " "
def triggerstreams(generatordict, streamslist, component, variablelist, jsonmeasurements, jsonparameters, savevaldict):
server = open("opcua-server.py", "a")
indent = 3
for stream in streamslist:
writeindent = ""
streamname = ""
valuename = "stream" + ((stream["measurement"]).lower()).replace(" ", "")
for i in range(indent):
writeindent += " "
for variable in variablelist:
if variable["name"] == stream["measurement"]:
for measurement in jsonmeasurements:
if measurement["uuid"] == variable["value"]:
streamname = measurement["name"]
for parameter in jsonparameters:
if parameter["uuid"] == variable["value"]:
streamname = parameter["name"]
server.write(writeindent + valuename + " = await " + component + ".get_child([\"2:" + streamname + "\"])\n")
generatorname = generatordict[(stream["measurement"].lower()).replace(" ", "") + component]
if stream["streamType"] == "update":
server.write(writeindent + "if (await " + valuename + ".get_value()) != " + savevaldict[(stream["measurement"].lower()).replace(" ", "") + component] + ":\n")
writeindent += " "
server.write(writeindent + "puffer = await " + valuename + ".get_value()\n")
server.write(writeindent + "await " + generatorname + ".trigger(message= \"value\" + str(puffer))\n")
server.write(writeindent + savevaldict[(stream["measurement"].lower()).replace(" ", "") + component] + " = puffer \n")
writeindent = ""
for i in range(indent):
writeindent += " "
if stream["streamType"] == "fixed":
server.write(writeindent + stream["measurement"] + "time = count % " + str(stream["intervalValue"]) + "\n")
server.write(writeindent + "if "+ stream["measurement"] + "time == 0:\n")
writeindent += " "
server.write(writeindent + "puffer = await " + valuename + ".get_value()\n")
server.write(writeindent + "await " + generatorname + ".trigger(message=\"" + stream["measurement"] + " value: \" + str(puffer))\n")
writeindent = ""
for i in range(indent):
writeindent += " "
def writendpt2():
server = open("opcua-server.py", "a")
server.write("""
await asyncio.sleep(1)
count += 1
if __name__ == '__main__':
asyncio.run(main())""")