Source code for glaciation.trdp.reader

# -*- coding: utf-8 -*
import numpy
import traceback
from io import open
from json import load
from binascii import unhexlify
from scapy.all import sniff, Raw


class GetOutOfLoop( Exception ):
    """
    :meta private:
    """
    pass

[docs]class TRDPReader( Exception ): def __init__(self, jsonBusFile, interface): """Class to read variables without CDU using scapy to capture and and parse Parameters ---------- jsonBusFile : string Path to bus json file downloaded from Aunia iface : string Interface name for sniffing the packet """ with open(jsonBusFile, encoding='utf-8') as f: self.trdpFile = load(f) self.iface = interface
[docs] def readTrdpVar(self, varName, inface=None): """This function reads given variable straigth forward from the network, sniffing one packet and parsing its corresponding multicast ip address, commId and offset (read from the json file). \b Self Class need to be initialized with corresponding bus json file (downloaded from Aunia) Parameters ---------- varName : string Variable name iface : string Interface name for sniffing the packet (default is None). \b If None interface from class initialization is taken Raises ------ Exception """ try: if inface is None: inface = self.iface multicastAddr = commId = found = False try: for devices in self.trdpFile["data"][0]["bus"]["configuration"]["devices"]: for container in devices["containers"]: for variable in container["variables"]: if variable["variable"] == varName: multicastAddr = container["multicast"] commId = container["address"] dataType = variable["type"] offset = int(variable["offset"] / 8 ) # Hex string size = int(variable["sizeBits"] / 8 ) # Hex string found = True raise GetOutOfLoop except GetOutOfLoop: pass capture = sniff(iface=inface, filter="udp and dst %s and udp[8+8:4]=%s" % (multicastAddr, commId), count=1) trdp = capture[0][Raw].load data = trdp[40:] # Header 40 Bytes return self.__getValue(data[offset:offset+size], dataType) except Exception as e: print(traceback.format_exc()) print("An exception occurred: "+ str(e)) exit(-1)
# TODO: Fill/Correct all types def __getValue(self, hexValue, varType): """ :meta private: """ if varType == "INTEGER32": numpyType = numpy.int32 elif varType == "INTEGER16": numpyType = numpy.int16 elif varType == "INTEGER8": numpyType = numpy.int8 elif varType == "UNSIGNED32": numpyType = numpy.uint32 elif varType == "UNSIGNED16": numpyType = numpy.int16 elif varType == "UNSIGNED8": numpyType = numpy.int8 elif varType == "REAL32": numpyType = numpy.single elif varType == "BOOLEAN8": numpyType = numpy.bool_ elif varType == "BITSE32": numpyType = numpy.uint32 elif varType == "BITSET16": numpyType = numpy.uint16 elif varType == "BITSET8": numpyType = numpy.uint8 elif varType == "ANTIVALENT8": numpyType = numpy.uint8 elif varType == "CHARACTER8": numpyType = numpy.ubyte elif varType == "TIMEDATE48": numpyType = numpy.bool_ elif varType == "ENUM8": numpyType = numpy.uint8 elif varType == "HEARTBEAT16": numpyType = numpy.int16 else: numpyType = numpy.float16 return numpy.frombuffer(hexValue, numpyType)[0].astype(int)