from pysnmp import hlapi
from glaciation.snmp import *
from pyasn1.type.univ import Integer,OctetString
[docs]def set_port(ip, port, cmd="check"):
"""This function disables the port "port" of Moxa Car Switch via SNMP
cmd: "disable","enable" or "check". Check print the port state
Parameters
----------
ip : string
ip address of moxa switch
port : int
port number
cmd : string
"disable","enable" or "check". Check print the port state (default is check)
"""
hlapi.CommunityData('private') #this parameter must match with the one configurated in moxa CSW (this is the default value)
#MOXA OIDS for TN5508A CSW
#port_index="1.3.6.1.4.1.8691.7.30.1.9.1.1.1"
#port_enable="1.3.6.1.4.1.8691.7.30.1.9.1.1.3"
#p_7_port_status="1.3.6.1.4.1.8691.7.30.1.9.1.1.3.7"
port_oid="1.3.6.1.4.1.8691.7.30.1.9.1.1.3.{}".format(port)
p_disable=Integer(0)
p_enable=Integer(1)
enable_port={port_oid: p_enable}
disable_port={port_oid: p_disable}
csw_ip=ip
p_sts=(get(csw_ip, [port_oid], hlapi.CommunityData('private')))
if cmd=="enable":
print("Enabling port {} of {}...".format(port,ip))
set(csw_ip, enable_port, hlapi.CommunityData('private'))
p_sts=(get(csw_ip, [port_oid], hlapi.CommunityData('private')))
print("Now, the port {} state of CSW {} is: {}".format(port,ip,p_sts.values()))
elif cmd=="disable":
print("Disabling port {} of CSW {}4...".format(port,ip))
set(csw_ip, disable_port, hlapi.CommunityData('private'))
p_sts=(get(csw_ip, [port_oid], hlapi.CommunityData('private')))
print("Now, the port {} state of CSW {} is: {}".format(port,ip,p_sts.values()))
elif cmd=="check":
p_sts=(get(csw_ip, [port_oid], hlapi.CommunityData('private')))
print("The port {} state of CSW {} is: {}".format(port,ip,p_sts.values()))
else:
print("Unknow command, use disable, enable or check")
[docs]def port_mirror(ip, s_port, d_port, mode=0, cmd="check"):
"""Moxa configure mirroring using snmp
Parameters
----------
ip : string
ip address of moxa switch
s_port : int
source port number
d_port : int
destination port number
mode : int
0 -> only incoming traffic, 1 -> only outgoing traffic, 2 -> incoming and outgoing traffic (default is 0)
cmd : string
"disable","enable" or "check". Check print the port state (default is check)
"""
hlapi.CommunityData('private')
#MOXA OIDS for TN5508A CSW
target_port_oid= "1.3.6.1.4.1.8691.7.30.1.13.1.0"#the mirrored port
mirroring_port_oid= "1.3.6.1.4.1.8691.7.30.1.13.2.0"#the mirroring port
monitor_direction_oid="1.3.6.1.4.1.8691.7.30.1.13.3.0"#direction
orig_s_port=s_port
if s_port==1:
s_port="\x80\x00"
elif s_port==2:
s_port="\x40\x00"
elif s_port==3:
s_port="\x20\x00"
elif s_port==4:
s_port="\x10\x00"
elif s_port==5:
s_port="\x08\x00"
elif s_port==6:
s_port="\x04\x00"
elif s_port==7:
s_port="\x02\x00"
elif s_port==8:
s_port="\x01\x00"
s_port=OctetString(s_port)
d_port = Integer(d_port)
mode=Integer(mode)
#s_port=(format(s_port,'08b')) #decimal to octet string
t_p_v = {target_port_oid: s_port}
m_p_v = {mirroring_port_oid: d_port}
print("moxa_port_mirror:")
print("mode = 0 -> only incoming traffic")
print("mode = 1 -> only outgoing traffic")
print("mode = 2 -> incoming and outgoing traffic")
reset_t_p = {target_port_oid: OctetString("\x00\x00")}
reset_m_p = {mirroring_port_oid: Integer(0)}
reset_mode= {monitor_direction_oid: Integer(0)}
csw_ip = ip
if cmd=="enable":
print("Setting target port {} of {}...".format(orig_s_port,ip))
set(csw_ip, t_p_v, hlapi.CommunityData('private'))
print("Setting mirroring port {} of {}...".format(d_port,ip))
set(csw_ip, m_p_v, hlapi.CommunityData('private'))
print("Setting mirroring mode of {} as {}...".format(ip,mode))
m_d_v = {monitor_direction_oid: mode}
set(csw_ip, m_d_v, hlapi.CommunityData('private'))
elif cmd=="disable":
print("Resetting target port {} of {}...".format(orig_s_port,ip))
set(csw_ip, reset_t_p, hlapi.CommunityData('private'))
print("Resetting mirroring port {} of {}...".format(d_port,ip))
set(csw_ip, reset_m_p, hlapi.CommunityData('private'))
print("Setting mirroring mode of {} as {}...".format(ip,mode))
set(csw_ip, reset_mode, hlapi.CommunityData('private'))
elif cmd=="check":
t_p_sts=(get(csw_ip, [target_port_oid], hlapi.CommunityData('private')))
m_p_sts=(get(csw_ip, [mirroring_port_oid], hlapi.CommunityData('private')))
m_d_sts=(get(csw_ip, [monitor_direction_oid], hlapi.CommunityData('private')))
print("The port {} is mirrored to port {} in the CSW {}. The mirror mode is : {}".format( t_p_sts, m_p_sts, ip, m_d_sts,))
else:
print("Unknow command, use disable, enable or check")