# -*- coding: utf-8 -*-
import io
import json
from multipledispatch import dispatch
from glaciation.cdu._lib.VarDbAccess import *
[docs]class VardbVarWrapper():
"""Wrapper to use vardbvar and VarDbAccess libraries using Json files for
the initial configuration. It will give an easy interface to read and
force variables in diferent vardbs and containers. \b
To be able to connect to vars it is necessary to previously
syncronize with the remote CDU server
...
Parameters
----------
jsonFile : string
Path of a json file
Attributes
----------
self.json : json File
self.VARDB_LIST: List of vardbs
self.VARDB: Object containing variable connections from VarDbAccess
self.VARS : List of variables
Methods
-------
loadVars(vardbName=None)
Init a connection to all variables in vardb
!!!Indispensable to call it before reading or forcing vars!!!
If not vardbName given it will try to connect to all vardbs in the json
readVar(vdb, vname)
Read a variable for vardb
forceVar(vdb, vname, value)
Force a variable in vardb
readVarQOk(vdb, vname)
Same as readVar(vdb, vname) but raising Exception in case of Q=0
readQuality(vdb, vname)
Read quality of a variable
unforceVar(vdb, vname)
Unforce a variable in vardb
** Functions readVar, forceVar, readVarQOk, readQuality, and unforceVar accept alias instead of vdb, vname **
Raises
------
Exception
An exception occurred in loadVars
"""
def __init__(self, jsonFile):
self.json = jsonFile
self.VARDB_LIST = [] # filled from variables.json
self.VARDB = {} # filled from variables.json
self.VARS = [] # filled from variables.json
self.silence = False # if True will not print any message
self.__initClass(jsonFile)
def __initClass(self, jsonFile):
"""This function reads a json file and fills some global vars
(VARDB_LIST, VARS, VARDB), to make the use of vardbvar easier. \b
Then it will try to connect to all loaded variables
using vardbvar and VarDbAccess libraries.\b
To be able to connect to vars it is necessary to previously
syncronize with the remote CDU server
Parameters
----------
jsonFile : string
Path of a json file containing all vardbs information
Raises
------
Exception
An exception occurred reading json
"""
try:
with io.open(jsonFile, 'r', encoding='utf-8') as f: # not loading as utf-8. Python 2 bug??
data = json.load(f)
for vardb in data["vardbs"]:
if "VARIABLES" not in data["vardbs"][vardb]:
continue
self.VARDB_LIST.append(vardb)
vardbData = data["vardbs"][vardb]
# merge 2 dictionaries
#allVars = self.__mergeDict(vardbData["VARIABLES"]["READ"], vardbData["VARIABLES"]["WRITE"])
elem = {
"VARDB_NAME": vardbData["VARDB_NAME"],
"CONTAINER": vardbData["CONTAINER"],
"READ":vardbData["VARIABLES"]["READ"],
"WRITE":vardbData["VARIABLES"]["WRITE"]
}
self.VARS.append(elem)
except Exception as e:
self.__printMessage("An exception occurred reading json: " + str(e))
[docs] def setSilence(self, boolValue):
""" Activate/deactivate print in Vardbclass
Parameters
----------
boolValue : bool
If True it will not print when reading/writting
"""
self.silence = boolValue
[docs] def loadVars(self, vardbName=None):
""" Creates a connection to all variables in vardb using VarDbAccess. \b
Indispensable to call it before reading or forcing them. \b
If not vardbName given it will try to connect to all vardbs in the json
Parameters
----------
vardb : string
vardb name given to the connection (default is None, which implies all conexions defined in json file)
"""
for vd in self.VARS:
if ((vardbName is None) or (vd["VARDB_NAME"] == vardbName)):
vnamesListR = []
vnamesListW = []
for container in vd["READ"]:
for vName in vd["READ"][container]:
if len(vd["CONTAINER"]) > 0: # in PXI or DAQ Main CDU container should be empty a string
vnamesListR.append(("%s::%s" % (container, vName)))
else:
vnamesListR.append(vName)
for container in vd["WRITE"]:
for vName in vd["WRITE"][container]:
if len(vd["CONTAINER"]) > 0: # in PXI or DAQ Main CDU container should be empty a string
vnamesListW.append(("%s::%s" % (container, vName)))
else:
vnamesListW.append(vName)
self.VARDB[vd["VARDB_NAME"]] = VarDbAccess(vd["VARDB_NAME"], vnamesListR, vnamesListW)
print("VAR LIST OK IN: " + vd["VARDB_NAME"])
@dispatch(str, str)
def readVar(self, vdb, vname):
"""Read a variable from main container. To read from a different container:
container::vname Ex: "fwk_internal:CDU_PERIOD"
Parameters
----------
vdb : string
vardb name
vname : string
variable name
Returns
-------
variable value
Raises
------
Exception
Variable doesn't exist in vardb
Error reading variable
"""
vn = self.__formatVariableName(vdb, vname)
self.__printMessage("Read: VAR=%s" % (vn))
error, value, quality = self.VARDB[vdb].value(vn)
self.__printMessage("Read: E=%i, V=%i, Q=%i" % (error, value, quality))
if error != 0:
raise Exception('Error reading variable:' + vn)
return value
@dispatch(object)
def readVar(self, alias):
"""Read a variable from main container using alias
Parameters
----------
alias : function
vardb + variable name. (See example in class def)
Returns
-------
variable value
Raises
------
Exception
Variable doesn't exist in vardb
Error reading variable
"""
vdb, vname = alias
return self.readVar(vdb, vname)
@dispatch(str, str, int)
def forceVar(self, vdb, vname, value):
"""Force a variable in the main container.\b
To force in a different container:
container::vname Ex: "fwk_internal:CDU_PERIOD"
Parameters
----------
vdb : string
vardb name
vname : string
variable name
value : any
value to force
Raises
------
Exception
Variable doesn't exist in vardb
Error forcing var
"""
self.__forceVar(vdb, vname, value)
@dispatch(str, str, float)
def forceVar(self, vdb, vname, value):
""":meta private:"""
self.__forceVar(vdb, vname, value)
@dispatch(object, int)
def forceVar(self, alias, value):
"""Force an alias in the main container
Parameters
----------
alias : function
vardb + variable name. (See example in class def)
value : int
value to force
Raises
------
Exception
Variable doesn't exist in vardb
Error forcing var
"""
vdb, vname = alias
self.__forceVar(vdb, vname, value)
def __forceVar(self, vdb, vname, value):
""":meta private:"""
vn = self.__formatVariableName(vdb, vname)
self.__printMessage("Force: VAR=%s, VALUE=%i" % (vn, value))
self.VARDB[vdb].forcevar(vn, value)
error, value, quality = self.VARDB[vdb].value(vn)
self.__printMessage("Forced: E=%i, V=%i, Q=%i" % (error, value, quality))
if error:
raise Exception("Error forcing var: " + vn)
@dispatch(str, str, int)
def writeVar(self, vdb, vname, value):
"""Write a variable in the main container.\b
To write in a different container:
container::vname Ex: "fwk_internal:CDU_PERIOD"
Parameters
----------
vdb : string
vardb name
vname : string
variable name
value : any
value to write
Raises
------
Exception
Variable doesn't exist in vardb
Error forcing var
"""
self.__writeVar(vdb, vname, value)
@dispatch(str, str, float)
def writeVar(self, vdb, vname, value):
""":meta private:"""
self.__writeVar(vdb, vname, value)
def __writeVar(self, vdb, vname, value):
""":meta private:"""
vn = self.__formatVariableName(vdb, vname)
self.__printMessage("Write: VAR=%s, VALUE=%i" % (vn, value))
self.VARDB[vdb].writevar(vn, value)
error, value, quality = self.VARDB[vdb].value(vn)
self.__printMessage("Written: E=%i, V=%i, Q=%i" % (error, value, quality))
if error:
raise Exception("Error writting var: " + vn)
@dispatch(str, str)
def readVarQOk(self, vdb, vname):
"""Read a variable from main container. \b
Raise Exception if Quality is not OK
Parameters
----------
vdb : string
vardb name
vname : string
variable name
Raises
------
Exception
Variable doesn't exist in vardb
Error reading variable
Quality of variable not OK
Returns
-------
variable value
"""
vn = self.__formatVariableName(vdb, vname)
self.__printMessage("Read: VAR=%s" % (vn))
error, value, quality = self.VARDB[vdb].value(vn)
self.__printMessage("Read: E=%i, V=%i, Q=%i" % (error, value, quality))
if error != 0:
raise Exception('Error reading variable:' + vn)
if quality == 0:
raise Exception('Quality of variable not OK: ' + vn)
return value
@dispatch(object)
def readVarQOk(self, alias):
"""Read a variable from main container using alias.\b
Raise Exception if Quality is not OK
Parameters
----------
alias : function
vardb + variable name. (See example in class def)
Raises
------
Exception
Variable doesn't exist in vardb
Error reading variable
Quality of variable not OK
Returns
-------
variable value
"""
vdb, vname = alias
return self.readVarQOk(vdb, vname)
@dispatch(str, str)
def readQuality(self, vdb, vname):
"""Read the quality of a variable
Parameters
----------
vdb : string
vardb name
vname : string
variable name
Raises
------
Exception
Variable doesn't exist in vardb
Error reading variable
Returns
-------
int
variable quality
"""
vn = self.__formatVariableName(vdb, vname)
self.__printMessage("Read: VAR=%s" % (vn))
error, value, quality = self.VARDB[vdb].value(vn)
self.__printMessage("Read: E=%i, V=%i, Q=%i" % (error, value, quality))
return quality
@dispatch(object)
def readQuality(self, alias):
"""Read the qualityof a variable using alias
Parameters
----------
alias : function
vardb + variable name. (See example in class def)
Raises
------
Exception
Variable doesn't exist in vardb
Error reading variable
Returns
-------
int
variable quality
"""
vdb, vname = alias
return self.readQuality(vdb, vname)
@dispatch(str, str)
def unforceVar(self, vdb, vname):
"""Unforce a variable in the main container. \b
Raise Exception if error
Parameters
----------
vdb : string
vardb name
vname : string
variable name
Raises
------
Exception
Variable doesn't exist in vardb
Error Unforcing var
"""
vn = self.__formatVariableName(vdb, vname)
self.__printMessage("Unforce: VAR=%s" % (vn))
self.VARDB[vdb].unforcevar(vn)
error, value, quality = self.VARDB[vdb].value(vn)
self.__printMessage("Unforced: E=%i, V=%i, Q=%i" % (error, value, quality))
if error:
raise Exception("Error Unforcing var: " + vn)
@dispatch(object)
def unforceVar(self, alias):
"""Unforce a variable in the main container using alias. \b
Raise Exception if error
Parameters
----------
alias : function
vardb + variable name. (See example in class def)
Raises
------
Exception
Variable doesn't exist in vardb
Error Unforcing var
"""
vdb, vname = alias
self.unforceVar(vdb, vname)
def __formatVariableName(self, vardb, variable):
"""
Shape variable searching and adding container name
to the variable
@param vdb: vardb name
@param vname: variable name
Return "container::variable" or just "name" in PXI/DAQ
"""
try:
vdl = next(n for n in self.VARS if n["VARDB_NAME"] == vardb)
except Exception as e:
raise Exception(f"VARDB name {vardb} does not exist" )
# container already in the name. OR
# is PXI or DAQ (Main CDU container is empty a string)
if "::" in variable or len(vdl["CONTAINER"]) == 0:
return variable
else:
return "%s::%s" % (vdl["CONTAINER"], variable)
def __mergeDict(self, dict1, dict2):
"""
Merge 2 variable dictionaries
@param dict1: dictionary 1
@param dict2: dictionary 2
Returns merged dictionary
"""
merged = {}
if not dict1:
return dict2
if not dict2:
return dict1
fullList = dict1.copy()
fullList.update(dict2)
for container in fullList:
merged[container] = []
if container in dict1:
merged[container] = dict1[container]
if container in dict2:
merged[container] = merged[container] + dict2[container]
return merged
def __printMessage(self, message):
if self.silence is not True:
print(message)