diff --git a/robottest.py b/robottest.py new file mode 100644 index 0000000000000000000000000000000000000000..19e18d44701d562124bdddd32bb1fc13257ed8f6 --- /dev/null +++ b/robottest.py @@ -0,0 +1,106 @@ +import logging +import asyncio + +from asyncua import ua, Server +from asyncua.common.methods import uamethod +from asyncua.common.structures104 import new_struct, new_enum, new_struct_field +from asyncua.common import node + +logging.basicConfig(level=logging.INFO) +_logger = logging.getLogger('asyncua') + +async def main(): + # setup our server + server = Server() + await server.init() + server.set_endpoint('opc.tcp://0.0.0.0:4840/freeopcua/server/') + + # setup our own namespace, not really necessary but should as spec + uri = 'http://examples.freeopcua.github.io' + idx = await server.register_namespace(uri) + #id des rangedatentyps + rangeid = await server.nodes.base_data_type.get_child(["0:Structure" , "0:Range"]) + + class Measurement: + def __init__(self, name, description, datatype, dimension, rangevar = None, unit= None): + self.name = name + self.description = description + self.datatype = datatype + self.range = rangevar + self.dimension = dimension + self.unit = unit + array = [] + #calculate a start value + for i in range(self.dimension): + array.append((self.range[0] + self.range[1]) / 2) + self.value = array + #objekt für dynamischer + tcpobject = Measurement("Tool Center Point", "Tool center point of a six-arm robot.", float, 3 , (-0.5, 0.5), "metre") + + + posnode, _ = await new_struct(server, idx, "PosStruct", [ + new_struct_field("name" , ua.VariantType.String), + new_struct_field("description" , ua.VariantType.String), + new_struct_field("value" , ua.VariantType.Float, array = True), + new_struct_field("range" , rangeid), + new_struct_field("unit" , ua.VariantType.String) + ]) + #mehr abfragen benötigt damit struct dementsprechend erstellt wird + if tcpobject.range is not None: + if tcpobject.unit is not None: + tcpnode, _ = await new_struct(server, idx, "TCPStruct", [ + new_struct_field("name" , ua.VariantType.String), + new_struct_field("description" , ua.VariantType.String), + new_struct_field("value" , ua.VariantType.Float, array = True), + new_struct_field("range" , rangeid), + new_struct_field("unit" , ua.VariantType.String) + ]) + + ##evtl alle statischen variablen als struct dynamisch mit extra variablen + myobj = await server.nodes.objects.add_object(idx, "Robot") + custom_objs = await server.load_data_type_definitions() + #pos struct hardcoded + var = await myobj.add_variable(idx, "position",ua.Variant(ua.PosStruct('Position', 'Position in Cartesian coordinates given in metre.', [0,0,0], ua.Range(-50, 50), 'm'), ua.VariantType.ExtensionObject)) + #tcp struct dynamischer + var2 = await myobj.add_variable(idx, "TCP", ua.Variant(ua.PosStruct(tcpobject.name, tcpobject.description, tcpobject.value, ua.Range(tcpobject.range[0],tcpobject.range[1]), tcpobject.unit), ua.VariantType.ExtensionObject)) + await var.set_writable() + #await var.set_value(ua.PosStruct('bla') , ua.VariantType.ExtensionObject) + bla = await var.get_value() + + + #alternativ tcp als objekt mit variablen + measurementtype = await server.nodes.base_object_type.add_object_type(idx, tcpobject.description) + if tcpobject.range is not None: + rangevar = await measurementtype.add_variable(idx, "range", ua.Range(-0.5,0.5)) + await rangevar.set_modelling_rule(True) + if tcpobject.unit is not None: + unit = await measurementtype.add_variable(idx, "unit", "metre") + await unit.set_modelling_rule(True) + value = await measurementtype.add_variable(idx, "value", 0) + await value.set_modelling_rule(True) + #await value.write_array_dimensions({tcpobject.dimension}) + #await value.write_value_rank(1) + await myobj.add_object(idx, tcpobject.name , measurementtype.nodeid) + + #tcp als variable mit properties + vartype = await server.nodes.base_data_type.add_data_type(idx, tcpobject.name, tcpobject.description) + #if tcpobject.range is not None: + rangeprop = await vartype.add_property(idx, "range", ua.Range(-0.5,0.5)) + await rangeprop.set_modelling_rule(True) + #if tcpobject.unit is not None: + unitprop = await vartype.add_property(idx, "unit", "metre") + await unitprop.set_modelling_rule(True) + tcpvar = await myobj.add_variable(idx, "TCP Var" , [0,0,0] , datatype = vartype.nodeid) + #object bekommt nicht datatypes properties + await tcpvar.add_property(idx, "range", ua.Range(-0.5,0.5)) + await tcpvar.add_property(idx, "unit", "metre") + + async with server: + print(bla) + print(tcpobject.value) + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file