Source code for glaciation.cdu.vardb

# -*- coding: utf-8 -*-
import io
import json
from multipledispatch import dispatch
from glaciation.cdu._lib.VarDbAccess import *


[docs]class VardbVarWrapper(): """Wrapper to use vardbvar and VarDbAccess libraries using Json files for the initial configuration. It will give an easy interface to read and force variables in diferent vardbs and containers. \b To be able to connect to vars it is necessary to previously syncronize with the remote CDU server ... Parameters ---------- jsonFile : string Path of a json file Attributes ---------- self.json : json File self.VARDB_LIST: List of vardbs self.VARDB: Object containing variable connections from VarDbAccess self.VARS : List of variables Methods ------- loadVars(vardbName=None) Init a connection to all variables in vardb !!!Indispensable to call it before reading or forcing vars!!! If not vardbName given it will try to connect to all vardbs in the json readVar(vdb, vname) Read a variable for vardb forceVar(vdb, vname, value) Force a variable in vardb readVarQOk(vdb, vname) Same as readVar(vdb, vname) but raising Exception in case of Q=0 readQuality(vdb, vname) Read quality of a variable unforceVar(vdb, vname) Unforce a variable in vardb ** Functions readVar, forceVar, readVarQOk, readQuality, and unforceVar accept alias instead of vdb, vname ** Raises ------ Exception An exception occurred in loadVars """ def __init__(self, jsonFile): self.json = jsonFile self.VARDB_LIST = [] # filled from variables.json self.VARDB = {} # filled from variables.json self.VARS = [] # filled from variables.json self.silence = False # if True will not print any message self.__initClass(jsonFile) def __initClass(self, jsonFile): """This function reads a json file and fills some global vars (VARDB_LIST, VARS, VARDB), to make the use of vardbvar easier. \b Then it will try to connect to all loaded variables using vardbvar and VarDbAccess libraries.\b To be able to connect to vars it is necessary to previously syncronize with the remote CDU server Parameters ---------- jsonFile : string Path of a json file containing all vardbs information Raises ------ Exception An exception occurred reading json """ try: with io.open(jsonFile, 'r', encoding='utf-8') as f: # not loading as utf-8. Python 2 bug?? data = json.load(f) for vardb in data["vardbs"]: if "VARIABLES" not in data["vardbs"][vardb]: continue self.VARDB_LIST.append(vardb) vardbData = data["vardbs"][vardb] # merge 2 dictionaries #allVars = self.__mergeDict(vardbData["VARIABLES"]["READ"], vardbData["VARIABLES"]["WRITE"]) elem = { "VARDB_NAME": vardbData["VARDB_NAME"], "CONTAINER": vardbData["CONTAINER"], "READ":vardbData["VARIABLES"]["READ"], "WRITE":vardbData["VARIABLES"]["WRITE"] } self.VARS.append(elem) except Exception as e: self.__printMessage("An exception occurred reading json: " + str(e))
[docs] def setSilence(self, boolValue): """ Activate/deactivate print in Vardbclass Parameters ---------- boolValue : bool If True it will not print when reading/writting """ self.silence = boolValue
[docs] def loadVars(self, vardbName=None): """ Creates a connection to all variables in vardb using VarDbAccess. \b Indispensable to call it before reading or forcing them. \b If not vardbName given it will try to connect to all vardbs in the json Parameters ---------- vardb : string vardb name given to the connection (default is None, which implies all conexions defined in json file) """ for vd in self.VARS: if ((vardbName is None) or (vd["VARDB_NAME"] == vardbName)): vnamesListR = [] vnamesListW = [] for container in vd["READ"]: for vName in vd["READ"][container]: if len(vd["CONTAINER"]) > 0: # in PXI or DAQ Main CDU container should be empty a string vnamesListR.append(("%s::%s" % (container, vName))) else: vnamesListR.append(vName) for container in vd["WRITE"]: for vName in vd["WRITE"][container]: if len(vd["CONTAINER"]) > 0: # in PXI or DAQ Main CDU container should be empty a string vnamesListW.append(("%s::%s" % (container, vName))) else: vnamesListW.append(vName) self.VARDB[vd["VARDB_NAME"]] = VarDbAccess(vd["VARDB_NAME"], vnamesListR, vnamesListW) print("VAR LIST OK IN: " + vd["VARDB_NAME"])
@dispatch(str, str) def readVar(self, vdb, vname): """Read a variable from main container. To read from a different container: container::vname Ex: "fwk_internal:CDU_PERIOD" Parameters ---------- vdb : string vardb name vname : string variable name Returns ------- variable value Raises ------ Exception Variable doesn't exist in vardb Error reading variable """ vn = self.__formatVariableName(vdb, vname) self.__printMessage("Read: VAR=%s" % (vn)) error, value, quality = self.VARDB[vdb].value(vn) self.__printMessage("Read: E=%i, V=%i, Q=%i" % (error, value, quality)) if error != 0: raise Exception('Error reading variable:' + vn) return value @dispatch(object) def readVar(self, alias): """Read a variable from main container using alias Parameters ---------- alias : function vardb + variable name. (See example in class def) Returns ------- variable value Raises ------ Exception Variable doesn't exist in vardb Error reading variable """ vdb, vname = alias return self.readVar(vdb, vname) @dispatch(str, str, int) def forceVar(self, vdb, vname, value): """Force a variable in the main container.\b To force in a different container: container::vname Ex: "fwk_internal:CDU_PERIOD" Parameters ---------- vdb : string vardb name vname : string variable name value : any value to force Raises ------ Exception Variable doesn't exist in vardb Error forcing var """ self.__forceVar(vdb, vname, value) @dispatch(str, str, float) def forceVar(self, vdb, vname, value): """:meta private:""" self.__forceVar(vdb, vname, value) @dispatch(object, int) def forceVar(self, alias, value): """Force an alias in the main container Parameters ---------- alias : function vardb + variable name. (See example in class def) value : int value to force Raises ------ Exception Variable doesn't exist in vardb Error forcing var """ vdb, vname = alias self.__forceVar(vdb, vname, value) def __forceVar(self, vdb, vname, value): """:meta private:""" vn = self.__formatVariableName(vdb, vname) self.__printMessage("Force: VAR=%s, VALUE=%i" % (vn, value)) self.VARDB[vdb].forcevar(vn, value) error, value, quality = self.VARDB[vdb].value(vn) self.__printMessage("Forced: E=%i, V=%i, Q=%i" % (error, value, quality)) if error: raise Exception("Error forcing var: " + vn) @dispatch(str, str, int) def writeVar(self, vdb, vname, value): """Write a variable in the main container.\b To write in a different container: container::vname Ex: "fwk_internal:CDU_PERIOD" Parameters ---------- vdb : string vardb name vname : string variable name value : any value to write Raises ------ Exception Variable doesn't exist in vardb Error forcing var """ self.__writeVar(vdb, vname, value) @dispatch(str, str, float) def writeVar(self, vdb, vname, value): """:meta private:""" self.__writeVar(vdb, vname, value) def __writeVar(self, vdb, vname, value): """:meta private:""" vn = self.__formatVariableName(vdb, vname) self.__printMessage("Write: VAR=%s, VALUE=%i" % (vn, value)) self.VARDB[vdb].writevar(vn, value) error, value, quality = self.VARDB[vdb].value(vn) self.__printMessage("Written: E=%i, V=%i, Q=%i" % (error, value, quality)) if error: raise Exception("Error writting var: " + vn) @dispatch(str, str) def readVarQOk(self, vdb, vname): """Read a variable from main container. \b Raise Exception if Quality is not OK Parameters ---------- vdb : string vardb name vname : string variable name Raises ------ Exception Variable doesn't exist in vardb Error reading variable Quality of variable not OK Returns ------- variable value """ vn = self.__formatVariableName(vdb, vname) self.__printMessage("Read: VAR=%s" % (vn)) error, value, quality = self.VARDB[vdb].value(vn) self.__printMessage("Read: E=%i, V=%i, Q=%i" % (error, value, quality)) if error != 0: raise Exception('Error reading variable:' + vn) if quality == 0: raise Exception('Quality of variable not OK: ' + vn) return value @dispatch(object) def readVarQOk(self, alias): """Read a variable from main container using alias.\b Raise Exception if Quality is not OK Parameters ---------- alias : function vardb + variable name. (See example in class def) Raises ------ Exception Variable doesn't exist in vardb Error reading variable Quality of variable not OK Returns ------- variable value """ vdb, vname = alias return self.readVarQOk(vdb, vname) @dispatch(str, str) def readQuality(self, vdb, vname): """Read the quality of a variable Parameters ---------- vdb : string vardb name vname : string variable name Raises ------ Exception Variable doesn't exist in vardb Error reading variable Returns ------- int variable quality """ vn = self.__formatVariableName(vdb, vname) self.__printMessage("Read: VAR=%s" % (vn)) error, value, quality = self.VARDB[vdb].value(vn) self.__printMessage("Read: E=%i, V=%i, Q=%i" % (error, value, quality)) return quality @dispatch(object) def readQuality(self, alias): """Read the qualityof a variable using alias Parameters ---------- alias : function vardb + variable name. (See example in class def) Raises ------ Exception Variable doesn't exist in vardb Error reading variable Returns ------- int variable quality """ vdb, vname = alias return self.readQuality(vdb, vname) @dispatch(str, str) def unforceVar(self, vdb, vname): """Unforce a variable in the main container. \b Raise Exception if error Parameters ---------- vdb : string vardb name vname : string variable name Raises ------ Exception Variable doesn't exist in vardb Error Unforcing var """ vn = self.__formatVariableName(vdb, vname) self.__printMessage("Unforce: VAR=%s" % (vn)) self.VARDB[vdb].unforcevar(vn) error, value, quality = self.VARDB[vdb].value(vn) self.__printMessage("Unforced: E=%i, V=%i, Q=%i" % (error, value, quality)) if error: raise Exception("Error Unforcing var: " + vn) @dispatch(object) def unforceVar(self, alias): """Unforce a variable in the main container using alias. \b Raise Exception if error Parameters ---------- alias : function vardb + variable name. (See example in class def) Raises ------ Exception Variable doesn't exist in vardb Error Unforcing var """ vdb, vname = alias self.unforceVar(vdb, vname) def __formatVariableName(self, vardb, variable): """ Shape variable searching and adding container name to the variable @param vdb: vardb name @param vname: variable name Return "container::variable" or just "name" in PXI/DAQ """ try: vdl = next(n for n in self.VARS if n["VARDB_NAME"] == vardb) except Exception as e: raise Exception(f"VARDB name {vardb} does not exist" ) # container already in the name. OR # is PXI or DAQ (Main CDU container is empty a string) if "::" in variable or len(vdl["CONTAINER"]) == 0: return variable else: return "%s::%s" % (vdl["CONTAINER"], variable) def __mergeDict(self, dict1, dict2): """ Merge 2 variable dictionaries @param dict1: dictionary 1 @param dict2: dictionary 2 Returns merged dictionary """ merged = {} if not dict1: return dict2 if not dict2: return dict1 fullList = dict1.copy() fullList.update(dict2) for container in fullList: merged[container] = [] if container in dict1: merged[container] = dict1[container] if container in dict2: merged[container] = merged[container] + dict2[container] return merged def __printMessage(self, message): if self.silence is not True: print(message)