Skip to content
Snippets Groups Projects
Commit ab333a04 authored by Susanna Weber's avatar Susanna Weber
Browse files

Merge branch 'events_n_functions' of...

Merge branch 'events_n_functions' of https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/iop-ws-a.i/soil-opc-ua into events_n_functions
parents 987a8b48 fb6c3009
Branches
No related tags found
No related merge requests found
......@@ -2,12 +2,15 @@
FROM python:3.10
# install app dependencies
#python vorpackage
WORKDIR /app
RUN ./requirements.txt .
RUN mkdir /soil-opc-ua
WORKDIR /soil-opc-ua
COPY requirements.txt .
RUN pip3 install -r requirements.txt
COPY * /app
COPY * /soil-opc-ua
CMD ["python", "newmodeljsonparse.py", "robot.json"]
asyncua
asyncio
tomli
toml
numpy
opcua-client
\ No newline at end of file
......@@ -9,8 +9,11 @@ from asyncua.common import node
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
async def main():
# setup our server
# setup our server()
server = Server()
await server.init()
server.set_endpoint('opc.tcp://0.0.0.0:4840/freeopcua/server/')
......@@ -58,6 +61,22 @@ async def main():
##evtl alle statischen variablen als struct dynamisch mit extra variablen
myobj = await server.nodes.objects.add_object(idx, "Robot")
enode = await new_enum(server, idx, "MyEnum", [
"titi",
"toto",
"tutu",
])
enum_node = await new_enum(server, idx, "EnumerateTest", [ "UNDEFINED", "GOOD", "BAD", ])
await server.load_data_type_definitions()
enumTest = await myobj.add_variable(idx, "my_enumtest", ua.EnumerateTest.UNDEFINED, datatype=enum_node.nodeid)
await enumTest.set_modelling_rule(True)
#custom_objs = await server.load_data_type_definitions()
valnode = await server.nodes.objects.add_variable(idx, "my_enum", ua.MyEnum.toto, datatype=enode.nodeid)
#enum = await myobj.add_variable(idx, "my_enum", ua.MyEnum.toto, datatype=enode.nodeid)
#custom_objs = await server.load_data_type_definitions()
#pos struct hardcoded
#var = await myobj.add_variable(idx, "position",ua.Variant(ua.PosStruct('Position', 'Position in Cartesian coordinates given in metre.', [0,0,0], ua.Range(-50, 50), 'm'), ua.VariantType.ExtensionObject))
......@@ -84,23 +103,58 @@ async def main():
#tcp als variable mit properties
vartype = await server.nodes.base_data_type.add_data_type(idx, tcpobject.name, tcpobject.description)
#await vartype.write_value_rank(1)
#if tcpobject.range is not None:
# rangeprop = await vartype.add_property(idx, "range", ua.Range(-0.5,0.5))
# await rangeprop.set_modelling_rule(True)
# #if tcpobject.unit is not None:
# unitprop = await vartype.add_property(idx, "unit", "metre")
# await unitprop.set_modelling_rule(True)
tcpvar = await myobj.add_variable(idx, "TCP Var" , [0,0,0] , datatype = vartype.nodeid)
tcpvar = await myobj.add_variable(idx, "TCP Var" , [0,0,0] , ua.VariantType.Float ,datatype = vartype.nodeid)
await tcpvar.set_writable()
# await tcpvar.write_value_rank(1)
# await tcpvar.write_array_dimensions(2)
#object bekommt nicht datatypes properties
await tcpvar.add_property(idx, "range", ua.Range(-0.5,0.5))
await tcpvar.add_property(idx, "unit", "metre")
#def func(variant):
# tcpvar.set_value(ua.Variant(variant, ua.VariantType.Float, is_array = True))
testvar = await myobj.add_variable(idx, "test" , 3)
# @uamethod
# async def goTo (position):
# await tcpvar.set_value(position)
# @uamethod
# def test (position):
# testvar.set_value(position)
# return position
async def test(parent, position):
await testvar.set_value(ua.Variant(position))
async def goTo(parent, position):
await tcpvar.set_value(ua.Variant(position))
inargx = ua.Argument()
inargx.Name = "Position_neu"
inargx.DataType = ua.NodeId(ua.ObjectIds.Float)
inargx.ValueRank = 1
inargx.ArrayDimensions = [1,1,1]
inargx.Description = ua.LocalizedText("bewegt zu neuer position")
beweg_node = await myobj.add_method(idx, "bewegen", goTo, [ua.VariantType.Float])
testmethod = await myobj.add_method(idx, "test", test, [ua.VariantType.Int64])
async with server:
print(tcpobject.value)
#print(tcpobject.value)
print(await tcpvar.get_value())
#await tcpvar.set_value([1,1,1])
print(await tcpvar.get_value())
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment