Skip to content
Snippets Groups Projects
Commit b7bd3241 authored by Hansolli's avatar Hansolli
Browse files

testserver robot(structs or variables?)

parent 1886c040
Branches
No related tags found
No related merge requests found
import logging
import asyncio
from asyncua import ua, Server
from asyncua.common.methods import uamethod
from asyncua.common.structures104 import new_struct, new_enum, new_struct_field
from asyncua.common import node
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
async def main():
# setup our server
server = Server()
await server.init()
server.set_endpoint('opc.tcp://0.0.0.0:4840/freeopcua/server/')
# setup our own namespace, not really necessary but should as spec
uri = 'http://examples.freeopcua.github.io'
idx = await server.register_namespace(uri)
#id des rangedatentyps
rangeid = await server.nodes.base_data_type.get_child(["0:Structure" , "0:Range"])
class Measurement:
def __init__(self, name, description, datatype, dimension, rangevar = None, unit= None):
self.name = name
self.description = description
self.datatype = datatype
self.range = rangevar
self.dimension = dimension
self.unit = unit
array = []
#calculate a start value
for i in range(self.dimension):
array.append((self.range[0] + self.range[1]) / 2)
self.value = array
#objekt für dynamischer
tcpobject = Measurement("Tool Center Point", "Tool center point of a six-arm robot.", float, 3 , (-0.5, 0.5), "metre")
posnode, _ = await new_struct(server, idx, "PosStruct", [
new_struct_field("name" , ua.VariantType.String),
new_struct_field("description" , ua.VariantType.String),
new_struct_field("value" , ua.VariantType.Float, array = True),
new_struct_field("range" , rangeid),
new_struct_field("unit" , ua.VariantType.String)
])
#mehr abfragen benötigt damit struct dementsprechend erstellt wird
if tcpobject.range is not None:
if tcpobject.unit is not None:
tcpnode, _ = await new_struct(server, idx, "TCPStruct", [
new_struct_field("name" , ua.VariantType.String),
new_struct_field("description" , ua.VariantType.String),
new_struct_field("value" , ua.VariantType.Float, array = True),
new_struct_field("range" , rangeid),
new_struct_field("unit" , ua.VariantType.String)
])
##evtl alle statischen variablen als struct dynamisch mit extra variablen
myobj = await server.nodes.objects.add_object(idx, "Robot")
custom_objs = await server.load_data_type_definitions()
#pos struct hardcoded
var = await myobj.add_variable(idx, "position",ua.Variant(ua.PosStruct('Position', 'Position in Cartesian coordinates given in metre.', [0,0,0], ua.Range(-50, 50), 'm'), ua.VariantType.ExtensionObject))
#tcp struct dynamischer
var2 = await myobj.add_variable(idx, "TCP", ua.Variant(ua.PosStruct(tcpobject.name, tcpobject.description, tcpobject.value, ua.Range(tcpobject.range[0],tcpobject.range[1]), tcpobject.unit), ua.VariantType.ExtensionObject))
await var.set_writable()
#await var.set_value(ua.PosStruct('bla') , ua.VariantType.ExtensionObject)
bla = await var.get_value()
#alternativ tcp als objekt mit variablen
measurementtype = await server.nodes.base_object_type.add_object_type(idx, tcpobject.description)
if tcpobject.range is not None:
rangevar = await measurementtype.add_variable(idx, "range", ua.Range(-0.5,0.5))
await rangevar.set_modelling_rule(True)
if tcpobject.unit is not None:
unit = await measurementtype.add_variable(idx, "unit", "metre")
await unit.set_modelling_rule(True)
value = await measurementtype.add_variable(idx, "value", 0)
await value.set_modelling_rule(True)
#await value.write_array_dimensions({tcpobject.dimension})
#await value.write_value_rank(1)
await myobj.add_object(idx, tcpobject.name , measurementtype.nodeid)
#tcp als variable mit properties
vartype = await server.nodes.base_data_type.add_data_type(idx, tcpobject.name, tcpobject.description)
#if tcpobject.range is not None:
rangeprop = await vartype.add_property(idx, "range", ua.Range(-0.5,0.5))
await rangeprop.set_modelling_rule(True)
#if tcpobject.unit is not None:
unitprop = await vartype.add_property(idx, "unit", "metre")
await unitprop.set_modelling_rule(True)
tcpvar = await myobj.add_variable(idx, "TCP Var" , [0,0,0] , datatype = vartype.nodeid)
#object bekommt nicht datatypes properties
await tcpvar.add_property(idx, "range", ua.Range(-0.5,0.5))
await tcpvar.add_property(idx, "unit", "metre")
async with server:
print(bla)
print(tcpobject.value)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment