# -*- coding: utf-8 -*
import numpy
import traceback
from io import open
from json import load
from binascii import unhexlify
from scapy.all import sniff, Raw
class GetOutOfLoop( Exception ):
"""
:meta private:
"""
pass
[docs]class TRDPReader( Exception ):
def __init__(self, jsonBusFile, interface):
"""Class to read variables without CDU
using scapy to capture and and parse
Parameters
----------
jsonBusFile : string
Path to bus json file downloaded from Aunia
iface : string
Interface name for sniffing the packet
"""
with open(jsonBusFile, encoding='utf-8') as f:
self.trdpFile = load(f)
self.iface = interface
[docs] def readTrdpVar(self, varName, inface=None):
"""This function reads given variable
straigth forward from the network, sniffing
one packet and parsing its corresponding multicast ip
address, commId and offset (read from the json file). \b
Self Class need to be initialized with corresponding
bus json file (downloaded from Aunia)
Parameters
----------
varName : string
Variable name
iface : string
Interface name for sniffing the packet (default is None). \b
If None interface from class initialization is taken
Raises
------
Exception
"""
try:
if inface is None:
inface = self.iface
multicastAddr = commId = found = False
try:
for devices in self.trdpFile["data"][0]["bus"]["configuration"]["devices"]:
for container in devices["containers"]:
for variable in container["variables"]:
if variable["variable"] == varName:
multicastAddr = container["multicast"]
commId = container["address"]
dataType = variable["type"]
offset = int(variable["offset"] / 8 ) # Hex string
size = int(variable["sizeBits"] / 8 ) # Hex string
found = True
raise GetOutOfLoop
except GetOutOfLoop:
pass
capture = sniff(iface=inface, filter="udp and dst %s and udp[8+8:4]=%s" % (multicastAddr, commId), count=1)
trdp = capture[0][Raw].load
data = trdp[40:] # Header 40 Bytes
return self.__getValue(data[offset:offset+size], dataType)
except Exception as e:
print(traceback.format_exc())
print("An exception occurred: "+ str(e))
exit(-1)
# TODO: Fill/Correct all types
def __getValue(self, hexValue, varType):
"""
:meta private:
"""
if varType == "INTEGER32":
numpyType = numpy.int32
elif varType == "INTEGER16":
numpyType = numpy.int16
elif varType == "INTEGER8":
numpyType = numpy.int8
elif varType == "UNSIGNED32":
numpyType = numpy.uint32
elif varType == "UNSIGNED16":
numpyType = numpy.int16
elif varType == "UNSIGNED8":
numpyType = numpy.int8
elif varType == "REAL32":
numpyType = numpy.single
elif varType == "BOOLEAN8":
numpyType = numpy.bool_
elif varType == "BITSE32":
numpyType = numpy.uint32
elif varType == "BITSET16":
numpyType = numpy.uint16
elif varType == "BITSET8":
numpyType = numpy.uint8
elif varType == "ANTIVALENT8":
numpyType = numpy.uint8
elif varType == "CHARACTER8":
numpyType = numpy.ubyte
elif varType == "TIMEDATE48":
numpyType = numpy.bool_
elif varType == "ENUM8":
numpyType = numpy.uint8
elif varType == "HEARTBEAT16":
numpyType = numpy.int16
else:
numpyType = numpy.float16
return numpy.frombuffer(hexValue, numpyType)[0].astype(int)