Skip to content
Snippets Groups Projects
Commit 8f63ce02 authored by Hansolli's avatar Hansolli
Browse files

robot file with variables,properties component

parent b7bd3241
Branches
No related tags found
No related merge requests found
robot.py 0 → 100644
import logging
import asyncio
from asyncua import ua, Server
from asyncua.common.methods import uamethod
from asyncua.common.structures104 import new_struct, new_enum, new_struct_field
from asyncua.common import node
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
async def main():
# setup our server
server = Server()
await server.init()
server.set_endpoint('opc.tcp://0.0.0.0:4840/freeopcua/server/')
# setup our own namespace, not really necessary but should as spec
uri = 'http://examples.freeopcua.github.io'
idx = await server.register_namespace(uri)
#id des rangedatentyps
rangeid = await server.nodes.base_data_type.get_child(["0:Structure" , "0:Range"])
class Measurement:
def __init__(self, name, description, datatype, dimension, rangevar = None, unit= None):
self.name = name
self.description = description
self.datatype = datatype
self.range = rangevar
self.dimension = dimension
self.unit = unit
array = []
#calculate a start value
if self.dimension != 0:
for i in range(self.dimension):
array.append(self.datatype((self.range[0] + self.range[1]) / 2))
self.value = array
else: self.value = self.datatype(((self.range[0] + self.range[1]) / 2))
#const parameter modeling missing
class Parameter:
def __init__(self, name, description, datatype, dimension, rangevar = None, unit= None, default= None, constant = False):
self.name = name
self.description = description
self.datatype = datatype
self.range = rangevar
self.dimension = dimension
self.unit = unit
self.default = default
self.constant = constant
array = []
#calculate a start value
if self.default == False:
if self.dimension != 0:
for i in range(self.dimension):
array.append(self.datatype((self.range[0] + self.range[1]) / 2))
self.value = array
else: self.value = self.datatype(((self.range[0] + self.range[1]) / 2))
else: self.value = self.datatype(default)
#measurementobjekte für dynamischer
tcpobject = Measurement("Tool Center Point", "Tool center point of a six-arm robot.", float, 3 , (-0.5, 0.5), "metre")
positionobject = Measurement("Position" , "Position in Cartesian coordinates given in metre.", float, 3, (-50, 50) , "metre")
batteryobject = Measurement("Battery level", "Battery level", int , 0 ,(0, 100), "percent")
measureobjectlist = [tcpobject , positionobject, batteryobject]
#parameterobjekte
openobject = Parameter("Open", "Flag to specify if something is open.", bool, 0, default = True)
automoveobject = Parameter("Autonomous Movement", "If true, the robot moves around freely autonomously.", bool, 0, default = True)
paraobjectlist = [openobject, automoveobject]
#hauptobjekt
myobj = await server.nodes.objects.add_object(idx, "Robot")
#tcp als variable mit properties
vartype = await server.nodes.base_data_type.add_data_type(idx, "Measurement")
paratype = await server.nodes.base_data_type.add_data_type(idx, "Parameter")
#tcpvar = await myobj.add_variable(idx, "TCP VAR" , [0,0,0] , datatype = vartype.nodeid)
#variable mit entsprechenden properties
#measurements evtl varianttype vom variabletype und datatype measurement
for a in measureobjectlist:
var = await myobj.add_variable(idx, a.name , a.value , datatype = vartype.nodeid)
if a.range is not None:
rangeprop = await var.add_property(idx, "range", ua.Range(a.range[0], a.range[1]))
if a.unit is not None:
unitporp = await var.add_property(idx, "unit", a.unit)
description = await var.add_property(idx, "Description", a.description)
#parameters
for a in paraobjectlist:
para = await myobj.add_variable(idx, a.name , a.value , datatype = paratype.nodeid)
if a.range is not None:
rangeprop = await para.add_property(idx, "range", ua.Range(a.range[0], a.range[1]))
if a.unit is not None:
unitporp = await para.add_property(idx, "unit", a.unit)
description = await para.add_property(idx, "Description", a.description)
#gripper als componente
gripper = await myobj.add_object(idx, "Gripper")
await gripper.add_property(idx, "Description", "Gripper of a robot")
openvar = await gripper.add_variable(idx, "Open", True, datatype = paratype.nodeid)
await openvar.add_property(idx, "Description", "Flag to specify if something is open.")
#server
async with server:
print("Server läuft!")
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
\ No newline at end of file
......@@ -38,65 +38,65 @@ async def main():
tcpobject = Measurement("Tool Center Point", "Tool center point of a six-arm robot.", float, 3 , (-0.5, 0.5), "metre")
posnode, _ = await new_struct(server, idx, "PosStruct", [
new_struct_field("name" , ua.VariantType.String),
new_struct_field("description" , ua.VariantType.String),
new_struct_field("value" , ua.VariantType.Float, array = True),
new_struct_field("range" , rangeid),
new_struct_field("unit" , ua.VariantType.String)
])
#mehr abfragen benötigt damit struct dementsprechend erstellt wird
if tcpobject.range is not None:
if tcpobject.unit is not None:
tcpnode, _ = await new_struct(server, idx, "TCPStruct", [
new_struct_field("name" , ua.VariantType.String),
new_struct_field("description" , ua.VariantType.String),
new_struct_field("value" , ua.VariantType.Float, array = True),
new_struct_field("range" , rangeid),
new_struct_field("unit" , ua.VariantType.String)
])
# posnode, _ = await new_struct(server, idx, "PosStruct", [
# new_struct_field("name" , ua.VariantType.String),
# new_struct_field("description" , ua.VariantType.String),
# new_struct_field("value" , ua.VariantType.Float, array = True),
# new_struct_field("range" , rangeid),
# new_struct_field("unit" , ua.VariantType.String)
# ])
# #mehr abfragen benötigt damit struct dementsprechend erstellt wird
# if tcpobject.range is not None:
# if tcpobject.unit is not None:
# tcpnode, _ = await new_struct(server, idx, "TCPStruct", [
# new_struct_field("name" , ua.VariantType.String),
# new_struct_field("description" , ua.VariantType.String),
# new_struct_field("value" , ua.VariantType.Float, array = True),
# new_struct_field("range" , rangeid),
# new_struct_field("unit" , ua.VariantType.String)
# ])
##evtl alle statischen variablen als struct dynamisch mit extra variablen
myobj = await server.nodes.objects.add_object(idx, "Robot")
custom_objs = await server.load_data_type_definitions()
#custom_objs = await server.load_data_type_definitions()
#pos struct hardcoded
var = await myobj.add_variable(idx, "position",ua.Variant(ua.PosStruct('Position', 'Position in Cartesian coordinates given in metre.', [0,0,0], ua.Range(-50, 50), 'm'), ua.VariantType.ExtensionObject))
#var = await myobj.add_variable(idx, "position",ua.Variant(ua.PosStruct('Position', 'Position in Cartesian coordinates given in metre.', [0,0,0], ua.Range(-50, 50), 'm'), ua.VariantType.ExtensionObject))
#tcp struct dynamischer
var2 = await myobj.add_variable(idx, "TCP", ua.Variant(ua.PosStruct(tcpobject.name, tcpobject.description, tcpobject.value, ua.Range(tcpobject.range[0],tcpobject.range[1]), tcpobject.unit), ua.VariantType.ExtensionObject))
await var.set_writable()
#var2 = await myobj.add_variable(idx, "TCP", ua.Variant(ua.PosStruct(tcpobject.name, tcpobject.description, tcpobject.value, ua.Range(tcpobject.range[0],tcpobject.range[1]), tcpobject.unit), ua.VariantType.ExtensionObject))
#await var.set_writable()
#await var.set_value(ua.PosStruct('bla') , ua.VariantType.ExtensionObject)
bla = await var.get_value()
#bla = await var.get_value()
#alternativ tcp als objekt mit variablen
measurementtype = await server.nodes.base_object_type.add_object_type(idx, tcpobject.description)
if tcpobject.range is not None:
rangevar = await measurementtype.add_variable(idx, "range", ua.Range(-0.5,0.5))
await rangevar.set_modelling_rule(True)
if tcpobject.unit is not None:
unit = await measurementtype.add_variable(idx, "unit", "metre")
await unit.set_modelling_rule(True)
value = await measurementtype.add_variable(idx, "value", 0)
await value.set_modelling_rule(True)
#await value.write_array_dimensions({tcpobject.dimension})
#await value.write_value_rank(1)
await myobj.add_object(idx, tcpobject.name , measurementtype.nodeid)
# measurementtype = await server.nodes.base_object_type.add_object_type(idx, tcpobject.description)
# if tcpobject.range is not None:
# rangevar = await measurementtype.add_variable(idx, "range", ua.Range(-0.5,0.5))
# await rangevar.set_modelling_rule(True)
# if tcpobject.unit is not None:
# unit = await measurementtype.add_variable(idx, "unit", "metre")
# await unit.set_modelling_rule(True)
# value = await measurementtype.add_variable(idx, "value", 0)
# await value.set_modelling_rule(True)
# #await value.write_array_dimensions({tcpobject.dimension})
# #await value.write_value_rank(1)
# await myobj.add_object(idx, tcpobject.name , measurementtype.nodeid)
#tcp als variable mit properties
vartype = await server.nodes.base_data_type.add_data_type(idx, tcpobject.name, tcpobject.description)
#if tcpobject.range is not None:
rangeprop = await vartype.add_property(idx, "range", ua.Range(-0.5,0.5))
await rangeprop.set_modelling_rule(True)
#if tcpobject.unit is not None:
unitprop = await vartype.add_property(idx, "unit", "metre")
await unitprop.set_modelling_rule(True)
# rangeprop = await vartype.add_property(idx, "range", ua.Range(-0.5,0.5))
# await rangeprop.set_modelling_rule(True)
# #if tcpobject.unit is not None:
# unitprop = await vartype.add_property(idx, "unit", "metre")
# await unitprop.set_modelling_rule(True)
tcpvar = await myobj.add_variable(idx, "TCP Var" , [0,0,0] , datatype = vartype.nodeid)
#object bekommt nicht datatypes properties
await tcpvar.add_property(idx, "range", ua.Range(-0.5,0.5))
await tcpvar.add_property(idx, "unit", "metre")
async with server:
print(bla)
print(tcpobject.value)
while True:
await asyncio.sleep(1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment