diff --git a/jsonparse.py b/jsonparse.py deleted file mode 100644 index 685d7cb3a3d765ccfb7c0fbb5091ecedc298be64..0000000000000000000000000000000000000000 --- a/jsonparse.py +++ /dev/null @@ -1,256 +0,0 @@ -import json -import os -import sys -import writefile as wf -from collections import namedtuple - - -def measurementJsonDecod(measurementdict): - return namedtuple('Measurement', measurementdict.keys())(*measurementdict.values()) - - -def parameterJsonDecod(parameterdict): - return namedtuple('Parameter', parameterdict.keys())(*parameterdict.values()) - - -def functionJsonDecod(functiondict): - return namedtuple('Function', functiondict.keys())(*functiondict.values()) - - -class Measurement: - def __init__(self, name, description, datatype, value, dimension, unit=None, range=None): - self.name = name - self.description = description - self.datatype = datatype - self.range = range - self.dimension = dimension - self.unit = unit - array = [] - if self.datatype == "float": - self.value = float(value) - else: - self.value = value - # calculate a start value - # if self.dimension != 0: - # for i in range(self.dimension): - # arrayval = self.datatype((self.range[0] + self.range[1]) / 2) - # if self.datatype == "float" or self.datatype == "double": - # arrayval = float(arrayval) - # array.append(arrayval) - # self.value = [0.0, 0.0, 0.0] - # else: - # self.value = self.datatype(((self.range[0] + self.range[1]) / 2)) - -class Parameter: - def __init__(self, name, description, datatype, dimension, value, rangevar=None, unit=None, default=None, constant=False): - self.name = name - self.description = description - self.datatype = datatype - # if range == [None, None]: - # self.range = None - # else: - self.range = rangevar - self.dimension = dimension - self.unit = unit - self.default = default - self.constant = constant - self.value = value - # array = [] - # calculate a start value - # if self.default == False: - # if self.dimension != 0: - # for i in range(self.dimension): - # array.append(self.datatype((self.range[0] + self.range[1]) / 2)) - # self.value = array - # else: - # self.value = self.datatype(((self.range[0] + self.range[1]) / 2)) - # else: - # self.value = self.datatype(default) - -class Function: - def __init__(self, name, description, inargsdatatypes = None, outargsdatatypes = None): - self.name = name - self.description = description - self.inargsdatatypes = inargsdatatypes - self.outargsdatatypes = outargsdatatypes - - -def dimensionchange(string): - dimarr = string["dimension"] - if dimarr: - value = dimarr[0] - else: - value = 0 - newdimension = {"dimension": value} - string.update(newdimension) - - -def getdatatypes(argarray): - returnarr = [] - if len(argarray) > 0: - for i in argarray: - returnarr.append(i["datatype"]) - return returnarr - else: - return None - - -def changerange(string): - #print(type(string["range"][0])) - if string["range"] != "[None, None]": - rangeval = string["range"] - rangetuple = (rangeval[0], rangeval[1]) - #print(rangetuple) - string.update({"range": rangetuple}) - print(string["range"]) - else: - del string["range"] - print(string) - - -def getobjnames(argarray): - returnarr = [] - if len(argarray) > 0: - for i in argarray: - name = (i["name"].lower()).replace(" ", "") - returnarr.append(name) - return returnarr - else: - return None - - -def createcomponent(jsonobj, iscomponent): - measurements = jsonobj["measurements"] - measurementobj = [] - if len(measurements) > 0: - - for i in measurements: - obj = i - # del obj["value"] - del obj["uuid"] - dimensionchange(obj) - # if i["range"]: - changerange(obj) - obj = json.dumps(obj) - measurementobji = json.loads(obj, object_hook=measurementJsonDecod) - measurementobj.append(measurementobji) - - print(measurementobj) - parameters = jsonobj["parameters"] - parameterobj = [] - if len(parameters) > 0: - - for i in parameters: - obj = i - # del obj["value"] - del obj["uuid"] - dimensionchange(obj) - if i["range"]: - changerange(obj) - obj = json.dumps(obj) - parameterobji = json.loads(obj, object_hook=parameterJsonDecod) - parameterobj.append(parameterobji) - - functions = jsonobj["functions"] - functionobj = [] - if len(functions) > 0: - - for i in functions: - obj = i - # del obj["value"] - obj["inargsnames"] = getobjnames(obj["arguments"]) - obj["outargsnames"] = getobjnames(obj["returns"]) - obj["inargsdatatypes"] = getdatatypes(obj["arguments"]) - obj["outargsdatatypes"] = getdatatypes(obj["returns"]) - del obj["uuid"] - del obj["arguments"] - del obj["returns"] - obj = json.dumps(obj) - functionobji = json.loads(obj, object_hook=functionJsonDecod) - functionobj.append(functionobji) - print(functionobj) - print(parameterobj) - # exec(open("writefile.py").read()) - wf.writeinserver(jsonobj["name"], jsonobj["description"], measurementobj, parameterobj, functionobj, iscomponent) - - -def getParameters(parametersDict): - # extracts the measurements of a Json-Model - parameterobj = [] - if len(parametersDict) > 0: - for i in parametersDict: - obj = i - # del obj["value"] - del obj["uuid"] - dimensionchange(obj) - if i["range"]: - changerange(obj) - obj = json.dumps(i) - parameterobji = json.loads(obj, object_hook=parameterJsonDecod) - parameterobj.append(parameterobji) - return parameterobj - -def getMeasurements(measurementsDict): - #extracts the measurements of a Json-Model - measurementobj = [] - if len(measurementsDict) > 0: - for i in measurementsDict: - obj = i - #del obj["value"] - del obj["uuid"] - dimensionchange(obj) - #if i["range"]: - changerange(obj) - obj = json.dumps(i) - measurementobji = json.loads(obj, object_hook=measurementJsonDecod) - measurementobj.append(measurementobji) - return measurementobj - -def getComponents(components): - componentobj = [] - if len(components) > 0: - for i in components: - par = getParameters(i["parameters"]) - mea = getMeasurements(i["measurements"]) - comp = getComponents(i["components"]) - - obj = i - # del obj["value"] - del obj["uuid"] - obj = json.dumps(i) - componentobji = json.loads(obj) - componentobji['parameters'] = par - componentobji['measurements'] = mea - componentobji['components'] = comp - componentobj.append(componentobji) - return componentobj - - - -def main(): - try: - JSON_MODEL = sys.argv[1][:-5] - if not os.path.exists(os.path.join('..', 'SOIL OPC-UA', f'{JSON_MODEL}.json')): - raise Exception() - except Exception as e: - print('You must provide a file from the folder to be used as root file of the parsing, e.g., "python jsonparser.py model.json".') - exit() - - with open(f'{JSON_MODEL}.json') as file: - - #robot = json.dumps(file.read()) - robotload = json.loads(file.read()) - wf.writebeginning() - createcomponent(robotload, False) - components = robotload["components"] - componentobj = getComponents(components) - print(componentobj) - wf.writend() - - - - - - -if __name__ == '__main__': - main() diff --git a/robot.py b/robot.py deleted file mode 100644 index 8054d3ac0f703c8af00e8a0599f6e1ac0dc69f02..0000000000000000000000000000000000000000 --- a/robot.py +++ /dev/null @@ -1,243 +0,0 @@ -import logging -import asyncio -import numpy -import jsonparse - -from asyncua import ua, Server -from asyncua.common.methods import uamethod -from asyncua.common.structures104 import new_struct, new_enum, new_struct_field -from asyncua.common import node - -logging.basicConfig(level=logging.INFO) -_logger = logging.getLogger('asyncua') - -def getobj(name, list): - for i in list: - if i.name == name: - return i - - -class Measurement: - def __init__(self, name, description, datatype, dimension, rangevar=None, unit=None): - self.name = name - self.description = description - self.datatype = datatype - self.range = rangevar - self.dimension = dimension - self.unit = unit - array = [] - # calculate a start value - if self.dimension != 0: - for i in range(self.dimension): - array.append(self.datatype((self.range[0] + self.range[1]) / 2)) - self.value = array - else: - self.value = self.datatype(((self.range[0] + self.range[1]) / 2)) - - -# const parameter modeling missing -class Parameter: - def __init__(self, name, description, datatype, dimension, rangevar=None, unit=None, default=None, constant=False): - self.name = name - self.description = description - self.datatype = datatype - self.range = rangevar - self.dimension = dimension - self.unit = unit - self.default = default - self.constant = constant - array = [] - # calculate a start value - if self.default == False: - if self.dimension != 0: - for i in range(self.dimension): - array.append(self.datatype((self.range[0] + self.range[1]) / 2)) - self.value = array - else: - self.value = self.datatype(((self.range[0] + self.range[1]) / 2)) - else: - self.value = self.datatype(default) - -#goTo method -@uamethod -def goTo (parent, position, node): - node.set_value(position) - - -async def main(): - # setup our server - server = Server() - await server.init() - server.set_endpoint('opc.tcp://0.0.0.0:4840/freeopcua/server/') - - # setup our own namespace, not really necessary but should as spec - uri = 'http://examples.freeopcua.github.io' - idx = await server.register_namespace(uri) - #id des rangedatentyps - rangeid = await server.nodes.base_data_type.get_child(["0:Structure", "0:Range"]) - - #measurementobjekte für dynamischer - # tcpobject = Measurement("Tool Center Point", "Tool center point of a six-arm robot.", float, 3 , (-0.5, 0.5), "metre") - # positionobject = Measurement("Position", "Position in Cartesian coordinates given in metre.", float, 3, (-50, 50) , "metre") - # batteryobject = Measurement("Battery level", "Battery level", int , 0 ,(0, 100), "percent") - # measureobjectlist = [tcpobject , positionobject, batteryobject] - - tcpobject = getobj("Tool Center Point", jsonparse.measurementobj) - positionobject = getobj("Position", jsonparse.measurementobj) - batteryobject = getobj("Battery level", jsonparse.measurementobj) - measureobjectlist = jsonparse.measurementobj - - #parameterobjekte - openobject = getobj("Autonomous Movement", jsonparse.parameterobj) - #automoveobject = Parameter("Autonomous Movement", "If true, the robot moves around freely autonomously.", bool, 0, default = True) - # paraobjectlist = [openobject, automoveobject] - paraobjectlist = jsonparse.parameterobj - - #hauptobjekt - myobjtype = await server.nodes.base_object_type.add_object_type(idx, "Robot") - - #tcp als variable mit properties - vartype = await server.nodes.base_data_type.add_data_type(idx, "Measurement") - paratype = await server.nodes.base_data_type.add_data_type(idx, "Parameter") - #tcpvar = await .add_variable(idx, "TCP VAR" , [0,0,0] , datatype = vartype.nodeid) - - #variable mit entsprechenden properties - #measurements evtl varianttype vom variabletype und datatype measurement - - for a in measureobjectlist: - varianttype = ua.VariantType.Int64 - if a.datatype == "float" or a.datatype == "double": - varianttype = ua.VariantType.Double - elif a.datatype == "boolean": - varianttype = ua.VariantType.Boolean - name = a.name + "var" - name = await myobjtype.add_variable(idx, a.name, a.value, varianttype=varianttype, datatype=vartype.nodeid) - await name.set_modelling_rule(True) - #namelist = namelist + name - if a.range is not None and a.range is not [None, None]: - rangeprop = await name.add_property(idx, "range", ua.Range(a.range[0], a.range[1])) - await rangeprop.set_modelling_rule(True) - if a.unit is not None: - unitprop = await name.add_property(idx, "unit", a.unit) - await unitprop.set_modelling_rule(True) - description = await name.add_property(idx, "Description", a.description) - await description.set_modelling_rule(True) - - - #parameters - - for a in paraobjectlist: - varianttype = ua.VariantType.Int64 - if a.datatype == "float" or a.datatype == "double": - varianttype = ua.VariantType.Double - elif a.datatype == "boolean": - varianttype = ua.VariantType.Boolean - para = await myobjtype.add_variable(idx, a.name, a.value, varianttype=varianttype, datatype=paratype.nodeid) - await para.set_modelling_rule(True) - if (a.range is not None) and (a.range != [None, None]): - rangeprop = await para.add_property(idx, "range", ua.Range(a.range[0], a.range[1])) - await rangeprop.set_modelling_rule(True) - if a.unit is not None: - unitprop = await para.add_property(idx, "unit", a.unit) - await unitprop.set_modelling_rule(True) - description = await para.add_property(idx, "Description", a.description) - await description.set_modelling_rule(True) - - - - #gripper als componente - - gripper = await myobjtype.add_object(idx, "Gripper") - await gripper.set_modelling_rule(True) - gripprop = await gripper.add_property(idx, "Description", "Gripper of a robot") - await gripprop.set_modelling_rule(True) - openvar = await gripper.add_variable(idx, "Open", True, datatype = paratype.nodeid) - await openvar.set_modelling_rule(True) - grip_desc = await openvar.add_property(idx, "Description", "Flag to specify if something is open.") - await grip_desc.set_modelling_rule(True) - - robot = await server.nodes.objects.add_object(idx, "Robot", myobjtype) - - mobilerobot = await server.nodes.objects.add_object(idx, "MobileRobot", myobjtype) - - #getting node ids of battery and position - robotposname = "2:" + positionobject.name - positionrobot = await mobilerobot.get_child([robotposname]) - await positionrobot.set_writable(True) - - - - @uamethod - def goTo(parent, position): - positionrobot.set_value(position) - return position - - batteryvarname = "2:" + batteryobject.name - batterymeas = await mobilerobot.get_child([batteryvarname]) - - tcpvarname = "2:" + tcpobject.name - tcpvarmeas = await mobilerobot.get_child([tcpvarname]) - # tcpvarmeas.set_writable(True) - - # tcprobmeas = await robot.get_child([tcpvarname]) - - - #adding function goTo to robot - goTofunc = await mobilerobot.add_method(idx, "goTo", goTo, [ua.VariantType.Float], [ua.VariantType.Float]) - - #batterylow warning event - batterywarning = await server.create_custom_event_type(idx, "BatteryLowWarning", ua.ObjectIds.SystemEventType) - batterywarninggen = await server.get_event_generator(batterywarning, mobilerobot) - batterywarninggen.event.Severity = 800 - - #batterylow error event - batteryerror = await server.create_custom_event_type(idx, "BatteryLowError", ua.ObjectIds.SystemEventType) - batteryerrorgen = await server.get_event_generator(batteryerror, mobilerobot) - batteryerrorgen.event.Severity = 1000 - - #streamevents - # tcpstream = await server.create_custom_event_type(idx, "Streaming", ua.ObjectIds.BaseEventType, [("tcpstream", ua.VariantType.Float)]) - # tcpstreamgenrob = await server.get_event_generator(tcpstream, robot) - #tcpstreamgenrob.tcpstream = await tcpvarmeas.get_value() - - tcpstreammob = await server.create_custom_event_type(idx, "StreamingMob", ua.ObjectIds.BaseEventType) - tcpstreamgenmobrob = await server.get_event_generator(tcpstreammob, mobilerobot) - tcpstreamgenmobrob.tcpstream = await tcpvarmeas.get_value() - - #tryout für zugriff - # tryoutpara = await robot.add_variable(idx, "Try out", , ua.VariantType.StatusCode) - # tryoutpara.set_writable(True) - - async with server: - print("Server läuft!") - print(paraobjectlist) - time = 0 - - while True: - tcppos = await tcpvarmeas.get_value() - # tcpposrob = await tcprobmeas.get_value() - if time % 10 == 0: - await tcpstreamgenmobrob.trigger(message="MobileRobot updatet value [" + str(tcppos[0]) + "," + str(tcppos[1]) + "," + str(tcppos[2]) + "]") - # await tcpstreamgenrob.trigger(message = "Robot updatet value [" + str(tcppos[0]) + "," + str(tcppos[1]) + "," + str(tcppos[2]) + "]") - # await positionrobot.set_value([1.0, 0.0, 0.0]) - batteryload = await batterymeas.get_value() - #await batterymeas.set_value(19) - if 20 > batteryload >= 10: - await batterywarninggen.trigger(message = "Battery level is below 20%. Load shortly.") - if batteryload < 10: - await batteryerrorgen.trigger(message = "Battery level is below 10%. Please load the battery immediately.") - if batteryload <= 2: - #server.get_event_generator() - await server.stop() - - # await batteryerror.response_params[idx].StatusCode.is_good() - await asyncio.sleep(1) - time += 1 - batterysave = batteryload - 1 - await batterymeas.set_value(batterysave) - tcppossave = tcppos[0] + 1.0 - await tcpvarmeas.set_value([tcppossave, 0.0, 0.0]) - - -if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file diff --git a/robottest.py b/robottest.py deleted file mode 100644 index 8cd8a02f5c665479dceeab35ee42eb871288c59c..0000000000000000000000000000000000000000 --- a/robottest.py +++ /dev/null @@ -1,160 +0,0 @@ -import logging -import asyncio - -from asyncua import ua, Server -from asyncua.common.methods import uamethod -from asyncua.common.structures104 import new_struct, new_enum, new_struct_field -from asyncua.common import node - -logging.basicConfig(level=logging.INFO) -_logger = logging.getLogger('asyncua') - - - - -async def main(): - # setup our server() - server = Server() - await server.init() - server.set_endpoint('opc.tcp://0.0.0.0:4840/freeopcua/server/') - - # setup our own namespace, not really necessary but should as spec - uri = 'http://examples.freeopcua.github.io' - idx = await server.register_namespace(uri) - #id des rangedatentyps - rangeid = await server.nodes.base_data_type.get_child(["0:Structure" , "0:Range"]) - - class Measurement: - def __init__(self, name, description, datatype, dimension, rangevar = None, unit= None): - self.name = name - self.description = description - self.datatype = datatype - self.range = rangevar - self.dimension = dimension - self.unit = unit - array = [] - #calculate a start value - for i in range(self.dimension): - array.append((self.range[0] + self.range[1]) / 2) - self.value = array - #objekt für dynamischer - tcpobject = Measurement("Tool Center Point", "Tool center point of a six-arm robot.", float, 3 , (-0.5, 0.5), "metre") - - - # posnode, _ = await new_struct(server, idx, "PosStruct", [ - # new_struct_field("name" , ua.VariantType.String), - # new_struct_field("description" , ua.VariantType.String), - # new_struct_field("value" , ua.VariantType.Float, array = True), - # new_struct_field("range" , rangeid), - # new_struct_field("unit" , ua.VariantType.String) - # ]) - # #mehr abfragen benötigt damit struct dementsprechend erstellt wird - # if tcpobject.range is not None: - # if tcpobject.unit is not None: - # tcpnode, _ = await new_struct(server, idx, "TCPStruct", [ - # new_struct_field("name" , ua.VariantType.String), - # new_struct_field("description" , ua.VariantType.String), - # new_struct_field("value" , ua.VariantType.Float, array = True), - # new_struct_field("range" , rangeid), - # new_struct_field("unit" , ua.VariantType.String) - # ]) - - ##evtl alle statischen variablen als struct dynamisch mit extra variablen - myobj = await server.nodes.objects.add_object(idx, "Robot") - - enode = await new_enum(server, idx, "MyEnum", [ - "titi", - "toto", - "tutu", - ]) - - enum_node = await new_enum(server, idx, "EnumerateTest", [ "UNDEFINED", "GOOD", "BAD", ]) - await server.load_data_type_definitions() - enumTest = await myobj.add_variable(idx, "my_enumtest", ua.EnumerateTest.UNDEFINED, datatype=enum_node.nodeid) - await enumTest.set_modelling_rule(True) - - #custom_objs = await server.load_data_type_definitions() - valnode = await server.nodes.objects.add_variable(idx, "my_enum", ua.MyEnum.toto, datatype=enode.nodeid) - - #enum = await myobj.add_variable(idx, "my_enum", ua.MyEnum.toto, datatype=enode.nodeid) - #custom_objs = await server.load_data_type_definitions() - #pos struct hardcoded - #var = await myobj.add_variable(idx, "position",ua.Variant(ua.PosStruct('Position', 'Position in Cartesian coordinates given in metre.', [0,0,0], ua.Range(-50, 50), 'm'), ua.VariantType.ExtensionObject)) - #tcp struct dynamischer - #var2 = await myobj.add_variable(idx, "TCP", ua.Variant(ua.PosStruct(tcpobject.name, tcpobject.description, tcpobject.value, ua.Range(tcpobject.range[0],tcpobject.range[1]), tcpobject.unit), ua.VariantType.ExtensionObject)) - #await var.set_writable() - #await var.set_value(ua.PosStruct('bla') , ua.VariantType.ExtensionObject) - #bla = await var.get_value() - - - #alternativ tcp als objekt mit variablen - # measurementtype = await server.nodes.base_object_type.add_object_type(idx, tcpobject.description) - # if tcpobject.range is not None: - # rangevar = await measurementtype.add_variable(idx, "range", ua.Range(-0.5,0.5)) - # await rangevar.set_modelling_rule(True) - # if tcpobject.unit is not None: - # unit = await measurementtype.add_variable(idx, "unit", "metre") - # await unit.set_modelling_rule(True) - # value = await measurementtype.add_variable(idx, "value", 0) - # await value.set_modelling_rule(True) - # #await value.write_array_dimensions({tcpobject.dimension}) - # #await value.write_value_rank(1) - # await myobj.add_object(idx, tcpobject.name , measurementtype.nodeid) - - #tcp als variable mit properties - vartype = await server.nodes.base_data_type.add_data_type(idx, tcpobject.name, tcpobject.description) - #await vartype.write_value_rank(1) - #if tcpobject.range is not None: - # rangeprop = await vartype.add_property(idx, "range", ua.Range(-0.5,0.5)) - # await rangeprop.set_modelling_rule(True) - # #if tcpobject.unit is not None: - # unitprop = await vartype.add_property(idx, "unit", "metre") - # await unitprop.set_modelling_rule(True) - tcpvar = await myobj.add_variable(idx, "TCP Var" , [0,0,0] , ua.VariantType.Float ,datatype = vartype.nodeid) - await tcpvar.set_writable() - # await tcpvar.write_value_rank(1) - # await tcpvar.write_array_dimensions(2) - #object bekommt nicht datatypes properties - await tcpvar.add_property(idx, "range", ua.Range(-0.5,0.5)) - await tcpvar.add_property(idx, "unit", "metre") - #def func(variant): - # tcpvar.set_value(ua.Variant(variant, ua.VariantType.Float, is_array = True)) - testvar = await myobj.add_variable(idx, "test" , 3) - - # @uamethod - # async def goTo (position): - # await tcpvar.set_value(position) - - # @uamethod - # def test (position): - # testvar.set_value(position) - # return position - - async def test(parent, position): - await testvar.set_value(ua.Variant(position)) - - async def goTo(parent, position): - await tcpvar.set_value(ua.Variant(position)) - - inargx = ua.Argument() - inargx.Name = "Position_neu" - inargx.DataType = ua.NodeId(ua.ObjectIds.Float) - inargx.ValueRank = 1 - inargx.ArrayDimensions = [1,1,1] - inargx.Description = ua.LocalizedText("bewegt zu neuer position") - - beweg_node = await myobj.add_method(idx, "bewegen", goTo, [ua.VariantType.Float]) - - testmethod = await myobj.add_method(idx, "test", test, [ua.VariantType.Int64]) - - async with server: - #print(tcpobject.value) - print(await tcpvar.get_value()) - #await tcpvar.set_value([1,1,1]) - print(await tcpvar.get_value()) - while True: - await asyncio.sleep(1) - - -if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file