Select Git revision
protected-ssr-api.md
-
Nick Anton Christoph Kleine-Tebbe authoredNick Anton Christoph Kleine-Tebbe authored
newmodeljsonparse.py 15.59 KiB
import json
import os
import sys
import writefile as wf
from collections import namedtuple
def measurementJsonDecod(measurementdict):
return namedtuple('Measurement', measurementdict.keys())(*measurementdict.values())
def parameterJsonDecod(parameterdict):
return namedtuple('Parameter', parameterdict.keys())(*parameterdict.values())
def functionJsonDecod(functiondict):
return namedtuple('Function', functiondict.keys())(*functiondict.values())
class Measurement:
def __init__(self, name, description, datatype, value, dimension, unit=None, range=None):
self.name = name
self.description = description
self.datatype = datatype
self.range = range
self.dimension = dimension
self.unit = unit
array = []
if self.datatype == "float":
self.value = float(value)
else:
self.value = value
# calculate a start value
# if self.dimension != 0:
# for i in range(self.dimension):
# arrayval = self.datatype((self.range[0] + self.range[1]) / 2)
# if self.datatype == "float" or self.datatype == "double":
# arrayval = float(arrayval)
# array.append(arrayval)
# self.value = [0.0, 0.0, 0.0]
# else:
# self.value = self.datatype(((self.range[0] + self.range[1]) / 2))
class Parameter:
def __init__(self, name, description, datatype, dimension, value, rangevar=None, unit=None, default=None, constant=False):
self.name = name
self.description = description
self.datatype = datatype
# if range == [None, None]:
# self.range = None
# else:
self.range = rangevar
self.dimension = dimension
self.unit = unit
self.default = default
self.constant = constant
self.value = value
# array = []
# calculate a start value
# if self.default == False:
# if self.dimension != 0:
# for i in range(self.dimension):
# array.append(self.datatype((self.range[0] + self.range[1]) / 2))
# self.value = array
# else:
# self.value = self.datatype(((self.range[0] + self.range[1]) / 2))
# else:
# self.value = self.datatype(default)
class Function:
def __init__(self, name, description, inargsdatatypes = None, outargsdatatypes = None):
self.name = name
self.description = description
self.inargsdatatypes = inargsdatatypes
self.outargsdatatypes = outargsdatatypes
def dimensionchange(string):
dimarr = string["dimension"]
if type(dimarr) == list:
if dimarr:
value = dimarr[0]
else:
value = 0
newdimension = {"dimension": value}
string.update(newdimension)
def getdatatypes(argarray):
returnarr = []
if len(argarray) > 0:
for i in argarray:
returnarr.append(i["datatype"])
return returnarr
else:
return None
def changerange(string):
#print(type(string["range"][0]))
if iskeythere(string, "range"):
rangeval = string["range"]
rangetuple = (rangeval[0], rangeval[1])
#print(rangetuple)
string.update({"range": rangetuple})
#print(string["range"])
else:
string.update({"range": (None, None)})
# print(string)
def getParameters(parametersDict):
# extracts the measurements of a Json-Model
parameterobj = []
if len(parametersDict) > 0:
for i in parametersDict:
obj = i
# del obj["value"]
#del obj["uuid"]
dimensionchange(obj)
obj["value"]=getdefaultvalue(obj["dimension"], obj["datatype"])
obj = json.dumps(i)
parameterobji = json.loads(obj, object_hook=parameterJsonDecod)
parameterobj.append(parameterobji)
return parameterobj
def getMeasurements(measurementsDict):
#extracts the measurements of a Json-Model
measurementobj = []
if len(measurementsDict) > 0:
for i in measurementsDict:
obj = i
#del obj["value"]
#del obj["uuid"]
dimensionchange(obj)
obj["value"] = getdefaultvalue(obj["dimension"], obj["datatype"])
#if i["range"]:
changerange(obj)
if not iskeythere(obj, "unit"):
obj["unit"] = "UNITLESS"
obj = json.dumps(i)
measurementobji = json.loads(obj, object_hook=measurementJsonDecod)
measurementobj.append(measurementobji)
return measurementobj
def searchlists(list1, list2):
print("I am searching")
objmeasurementlist = []
# print(obj["measurements"])
if len(list1) <= 0:
return []
for input in list1:
for searchobj in list2:
# print(measentrance)
if input["value"] == searchobj["uuid"]:
objmeasurementlist.append(searchobj)
return objmeasurementlist
def getdefaultvalue(dimension, datatype):
if datatype == "float" or datatype == "double" or datatype == "int":
if dimension > 1:
value = []
for i in range(dimension):
value.append(0)
return value
else:
return 0
else:
return True
def iskeythere(list, key):
for keysearch in list:
if(keysearch == key):
return True
return False
def iteratingInterface(obj, dict, componentslist):
if iskeythere(list(obj.keys()), "baseComponent"):
for searchobj in componentslist:
if searchobj["uuid"] == obj["baseComponent"]:
wf.writeinterface(componentslist[searchobj["name"]], searchobj["name"])
def getInterfaceparam(list1, interface, searched, jsoncomponents, searchinglist):
#list1.append(searchlists(interface[searched], searchinglist))
# if interface["name"] == "Gripper":
# print("I am searching Gripper in: ")
# print(searchinglist)
# print([interface[searched]])
for inputsearch in interface[searched]:
for searchobj in searchinglist:
# print(inputsearch["value"] + " " + searchobj["uuid"])
if inputsearch["value"] == searchobj["uuid"]:
# print("found matching pair")
# print(inputsearch["value"] + " " + searchobj["uuid"])
list1.append(searchobj)
if iskeythere(list(interface.keys()), "baseComponent"):
for component in jsoncomponents:
if component["uuid"] == interface["baseComponent"]:
return getInterfaceparam(list1, component, searched, jsoncomponents, searchinglist)
else:
return list1
def buildComponents(interfacesreturn, jsoninterfaces, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone):
for obj in jsoninterfaces:
rootobj = {}
if obj["elementType"] == "interface":
for search in jsoncomponents:
if obj["rootComponent"] == search["uuid"]:
rootobj = search
else:
rootobj = obj
if obj["name"] in list(interfacesreturn.keys()):
continue
objcomponentslist = getInterfaceparam([], rootobj, "components", jsoncomponents, jsoncomponents)
if obj["name"] not in interfacesreturn.keys():
interfacesreturn[str(obj["name"])] = wf.openobject(obj["name"], rootobj["description"])
#print(obj["name"])
#del obj["cardOpen"]
#del obj["uuid"]
#rootObjname = obj["rootComponent"]
# if iskeythere(list(obj.keys()), "baseComponent"):
# basecomponent = obj["baseComponent"]
# for search in jsoncomponents:
# #print(search["uuid"])
# if basecomponent == search["uuid"]:
# wf.writebasecomponent(componenttypesdict[str(obj["name"])], componenttypesdict[str(search["name"])], search["name"])
#wf.writecomponents(interfacestypesdict[str(obj["name"])], objcomponentslist, interfacestypesdict)
objmeasurementlist = getInterfaceparam([], rootobj, "measurements", jsoncomponents, jsonmeasurements)
print(objmeasurementlist)
measurementsobjs = getMeasurements(objmeasurementlist)
#print(objmeasurementlist)
# print(interfacesreturn)
wf.writemeasurement(measurementsobjs, interfacesreturn[str(obj["name"])])
# print("I am searching Parameters")
objparameterlist = getInterfaceparam([], rootobj, "parameters", jsoncomponents, jsonparameters)
parameterobjs = getParameters(objparameterlist)
# print(objparameterlist)
wf.writeparameter(parameterobjs, interfacesreturn[str(obj["name"])])
buildComponents(interfacesreturn, objcomponentslist, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone)
return interfacesreturn
# if len(objcomponentslist) > 0:
# print("I got called")
# objcomponentsclone = objcomponentslist
# if len(objcomponentslist) == 0:
# if len(objcomponentsclone) > 0:
# print(" I got called")
# print(objcomponentslist)
# return buildComponents(interfacesreturn, objcomponentsclone, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone)
# else:
# return buildComponents(interfacesreturn, objcomponentslist, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone)
#
# else:
# if len(objcomponentslist) > 0:
# return buildComponents(interfacesreturn, objcomponentslist, jsoncomponents, jsonmeasurements, jsonparameters, objcomponentsclone)
# else:
# return interfacesreturn
def instantiateComponents(typesdict, interfaces, jsoncomponents, headnode, instantiatedict):
count = 0
countcomponent = 0
for interface in interfaces:
# root = {}
# if interface["elementType"] == "interface":
# for component in jsoncomponents:
# if interface["rootComponent"] == component["uuid"]:
# root = component
# else:
root = interface
components = getInterfaceparam([], root, "components", jsoncomponents, jsoncomponents)
if interface["name"] in list(instantiatedict.keys()):
count += 1
# dictname = str(interface["name"]) + str(count)
# if interface["elementType"] == "interface":
# instantiatedict[str(dictname)] = wf.writeinterface(typesdict[str(interface["name"])], interface["name"], count)
# else:
instantiatedict[str(interface["name"])] = wf.writeobjects(typesdict[str(interface["name"])], interface["name"], instantiatedict[str(headnode["name"])], count)
# for component in components:
# if component["name"] in list(instantiatedict.keys()):
# countcomponent += 1
#dictnamecomponent = str(component["name"]) + str(count)
# instantiatedict[dictnamecomponent] = wf.writeobjects(typesdict[str(component["name"])], component["name"], instantiatedict[dictname], countcomponent)
if len(components) > 0:
instantiateComponents(typesdict,components , jsoncomponents, interface, instantiatedict)
return instantiatedict
def instantiate(typesdict, interfaces, jsoncomponents):
instantiatedict = {}
#print(interfaces)
for interface in interfaces:
root = {}
count = 0
for component in jsoncomponents:
if interface["rootComponent"] == component["uuid"]:
root = component
components = getInterfaceparam([], root, "components", jsoncomponents, jsoncomponents)
#dictname = str(interface["name"]) + str(count)
instantiatedict[str(interface["name"])] = wf.writeinterface(typesdict[str(interface["name"])], interface["name"], count)
instantiateComponents(typesdict, components, jsoncomponents, interface, instantiatedict)
def main():
try:
JSON_MODEL = sys.argv[1][:-5]
if not os.path.exists(os.path.join('..', 'SOIL-OPC-UA', f'{JSON_MODEL}.json')):
raise Exception()
except Exception as e:
print('You must provide a file from the folder to be used as root file of the parsing, e.g., "python jsonparser.py model.json".')
exit()
with open(f'{JSON_MODEL}.json') as file:
wf.writebeginning()
jsonobj = json.loads(file.read())
elements = jsonobj["elements"]
if len(elements) <= 0:
exit()
jsonmeasurements = []
jsonparameters = []
jsonfunctions = []
jsoncomponents = []
jsoninterfaces = []
for obj in elements:
#print(obj)
if obj["elementType"] == "measurement":
jsonmeasurements.append(obj)
elif obj["elementType"] == "parameter":
jsonparameters.append(obj)
elif obj["elementType"] == "function":
jsonfunctions.append(obj)
elif obj["elementType"] == "component":
jsoncomponents.append(obj)
elif obj["elementType"] == "interface":
jsoninterfaces.append(obj)
# interfacestypesdict = {}
# for obj in jsoninterfaces:
# #print(obj["name"])
# #del obj["cardOpen"]
# #del obj["uuid"]
# rootobj = {}
# for search in jsoncomponents:
# if obj["rootComponent"] == search["uuid"]:
# rootobj = search
# objname = wf.openobject(obj["name"], rootobj["description"])
# interfacestypesdict[str(obj["name"])] = objname
# #rootObjname = obj["rootComponent"]
#
# # if iskeythere(list(obj.keys()), "baseComponent"):
# # basecomponent = obj["baseComponent"]
# # for search in jsoncomponents:
# # #print(search["uuid"])
# # if basecomponent == search["uuid"]:
# # wf.writebasecomponent(componenttypesdict[str(obj["name"])], componenttypesdict[str(search["name"])], search["name"])
# objcomponentslist = getInterfaceparam([], rootobj, "components", jsoncomponents, jsoncomponents)
# wf.writecomponents(interfacestypesdict[str(obj["name"])], objcomponentslist, interfacestypesdict)
# objmeasurementlist = getInterfaceparam([], rootobj, "measurements", jsoncomponents, jsonmeasurements)
# measurementsobjs = getMeasurements(objmeasurementlist)
# #print(objmeasurementlist)
# wf.writemeasurement(measurementsobjs, interfacestypesdict[str(obj["name"])])
# objparameterlist = getInterfaceparam([], rootobj, "parameters", jsoncomponents, jsonparameters)
# parameterobjs = getParameters(objparameterlist)
# wf.writemeasurement(parameterobjs, interfacestypesdict[str(obj["name"])])
typesdict = buildComponents({}, jsoninterfaces, jsoncomponents, jsonmeasurements, jsonparameters, [])
print(typesdict)
intantiateddict = instantiate(typesdict, jsoninterfaces, jsoncomponents)
wf.writend()
# print("Measurements: ")
# print(objmeasurementlist)
# print("\n")
# print("Components: ")
# print(objcomponentslist)
# print("\n")
# print("Parameters: ")
# print(objparameterlist)
# print("\n")
# print("Functions: ")
# print(jsonfunctions)
if __name__ == '__main__':
main()