Select Git revision
-
Carl Klemm authoredCarl Klemm authored
writefile.py 7.91 KiB
import jsonparse
import os
def defFunction(functionname, file, indent, inargs):
writeindent = ""
for i in range(indent):
writeindent = writeindent + " "
file.write(writeindent + "async def " + functionname + "(parent")
if inargs is not None:
file.write(", ")
for i in range (len(inargs)):
if i < (len(inargs) - 1):
file.write(inargs[i] + ", ")
else:
file.write(inargs[i])
file.write("):\n")
writeindent = writeindent + " "
file.write(writeindent + """#ToDo: Write an algorithm that fits the description of the function.
pass
""")
def getVariantType(datatype):
varianttype = "ua.VariantType.Int64"
if datatype == "float" or datatype == "double":
varianttype = "ua.VariantType.Double"
elif datatype == "Boolean":
varianttype = "ua.VariantType.Boolean"
return varianttype
def writebeginning():
if os.path.exists("opcua-server.py"):
os.remove("opcua-server.py")
server = open("opcua-server.py", "a")
server.write("""import logging
import asyncio
from asyncua import ua, Server
from asyncua.common.methods import uamethod
from asyncua.common.structures104 import new_struct, new_enum, new_struct_field
from asyncua.common import node
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
""")
server.write("\nasync def main():")
#äinten = "\tab"
server.write("""
# setup our server
server = Server()
await server.init()
server.set_endpoint('opc.tcp://0.0.0.0:4840/freeopcua/server/')
# setup our own namespace, not really necessary but should as spec
uri = 'http://examples.freeopcua.github.io/'
idx = await server.register_namespace(uri)
#id of rangetype
rangeid = await server.nodes.base_data_type.get_child(["0:Structure", "0:Range"])
#creating dataypes for parameters and measurements
measurementtype = await server.nodes.base_data_type.add_data_type(idx, "Measurement")
parametertype = await server.nodes.base_data_type.add_data_type(idx, "Parameter")
""")
def openobject(objname, description):
server = open("opcua-server.py", "a")
writeindent = " "
server.write(writeindent + "#creating the objecttype\n")
objtypename = (objname.lower()).replace(" ", "") + "type"
server.write(writeindent + objtypename + "= await server.nodes.base_object_type.add_object_type(idx, \"" + objname +"\" )\n")
return objtypename
def writebasecomponent(headnode, objtype,name):
server = open("opcua-server.py", "a")
writeindent = " "
objname = objtype + "component"
server.write(writeindent + objname + " = await " + headnode + ".add_object(idx, \""+ name + "\", " + objtype + ".nodeid)\n")
server.write(writeindent + "await " + objname + "." + "set_modelling_rule(True)\n")
def writecomponents(headnode, componentslist, typesdict):
server = open("opcua-server.py", "a")
writeindent = " "
for component in componentslist:
objname = typesdict[component["name"]] + "component"
server.write(writeindent + objname + " = await " + headnode + ".add_object(idx, \""+ component["name"] + "\", " + typesdict[component["name"]] + ".nodeid)\n")
server.write(writeindent + "await " + objname + "." + "set_modelling_rule(True)\n")
def writemeasurement(measurementobj, headnode):
server = open("opcua-server.py", "a")
writeindent = " "
measurementdict = {}
for i in measurementobj:
varianttype = getVariantType(i.datatype)
datatype = "measurementtype.nodeid"
varname = ((i.name).lower()).replace(" ", "") + "var"
measurementdict[i.name] = varname
server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
# if i.range is not None and i.range is not "(None, None)":
# rangename = "rangeprop"
# server.write(
# writeindent + rangename + " = await " + headnode + ".add_property(idx, \"range\", ua.Range(" + str(
# i.range[0]) + "," + str(i.range[1]) + "))\n")
# server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
# if i.unit != "UNITLESS":
# unitname = "unitprop"
# server.write(
# writeindent + unitname + " = await " + headnode + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
# server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
server.write(
writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
server.write(writeindent + "await "+ varname + "description.set_modelling_rule(True)\n\n")
server.write("\n" + writeindent + "#creating all parameters with their properties\n")
return measurementdict
def writeparameter(parameterobj, headnode):
writeindent = " "
server = open("opcua-server.py", "a")
parameterdict = {}
for i in parameterobj:
varianttype = getVariantType(i.datatype)
datatype = "parametertype.nodeid"
varname = ((i.name).lower()).replace(" ", "") + "var"
parameterdict[i.name] = varname
server.write(writeindent + varname + " = " + "await " + headnode + ".add_variable(idx, \"" + str(
i.name) + "\", " + str(i.value) + ", " + varianttype + ", " + datatype + ")\n")
server.write(writeindent + "await " + varname + "." + "set_modelling_rule(True)\n")
if i.range is not None or i.range != (None, None):
rangename = "rangeprop"
server.write(
writeindent + rangename + " = await " + headnode + ".add_property(idx, \"range\", ua.Range(" + str(
i.range[0]) + "," + str(i.range[1]) + "))\n")
server.write(writeindent + "await " + rangename + ".set_modelling_rule(True)\n")
if i.unit != "UNITLESS":
unitname = "unitprop"
server.write(
writeindent + unitname + " = await " + headnode + ".add_property(idx, \"unit\"," + "\"" + i.unit + "\")\n")
server.write(writeindent + "await " + unitname + ".set_modelling_rule(True)\n")
server.write(
writeindent + varname + "description = await " + varname + ".add_property(idx, \"Description\", " + "\"" + i.description + "\")" + "\n")
server.write(writeindent + "await " + varname+ "description.set_modelling_rule(True)\n\n")
def writefunction(functionobj, headnode):
server = open("opcua-server.py", "a")
writeindent = " "
for i in functionobj:
functionname = (i.name.lower()).replace(" ", "")
defFunction(functionname, server, 1, i.inargsnames)
server.write("\n")
inargslist = []
if i.inargsdatatypes != None:
for j in i.inargsdatatypes:
inargslist.append(getVariantType(j))
outargslist = []
if i.outargsdatatypes != None:
for j in i.outargsdataypes:
inargslist.append(getVariantType(j))
server.write(writeindent + "await mobilerobot.add_method(idx, \"" + i.name +"\", " + functionname + ", " + str(inargslist) + ", "+ str(outargslist) + ")\n\n")
def writeinterface(objtype, objname):
server = open("opcua-server.py", "a")
writeindent = " "
objtypename = (objname.lower()).replace(" ","") + "var"
server.write(writeindent + objtypename + "= await server.nodes.objects.add_object(idx, \"" + objname + "\", " + objtype + ".nodeid)\n")
def writend():
server = open("opcua-server.py", "a")
server.write("""
async with server:
print("Server is running!")
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())""")