Source code for glaciation.devices.moxa_TN_5508A

from pysnmp import hlapi
from glaciation.snmp import *
from pyasn1.type.univ import Integer,OctetString

[docs]def set_port(ip, port, cmd="check"): """This function disables the port "port" of Moxa Car Switch via SNMP cmd: "disable","enable" or "check". Check print the port state Parameters ---------- ip : string ip address of moxa switch port : int port number cmd : string "disable","enable" or "check". Check print the port state (default is check) """ hlapi.CommunityData('private') #this parameter must match with the one configurated in moxa CSW (this is the default value) #MOXA OIDS for TN5508A CSW #port_index="1.3.6.1.4.1.8691.7.30.1.9.1.1.1" #port_enable="1.3.6.1.4.1.8691.7.30.1.9.1.1.3" #p_7_port_status="1.3.6.1.4.1.8691.7.30.1.9.1.1.3.7" port_oid="1.3.6.1.4.1.8691.7.30.1.9.1.1.3.{}".format(port) p_disable=Integer(0) p_enable=Integer(1) enable_port={port_oid: p_enable} disable_port={port_oid: p_disable} csw_ip=ip p_sts=(get(csw_ip, [port_oid], hlapi.CommunityData('private'))) if cmd=="enable": print("Enabling port {} of {}...".format(port,ip)) set(csw_ip, enable_port, hlapi.CommunityData('private')) p_sts=(get(csw_ip, [port_oid], hlapi.CommunityData('private'))) print("Now, the port {} state of CSW {} is: {}".format(port,ip,p_sts.values())) elif cmd=="disable": print("Disabling port {} of CSW {}4...".format(port,ip)) set(csw_ip, disable_port, hlapi.CommunityData('private')) p_sts=(get(csw_ip, [port_oid], hlapi.CommunityData('private'))) print("Now, the port {} state of CSW {} is: {}".format(port,ip,p_sts.values())) elif cmd=="check": p_sts=(get(csw_ip, [port_oid], hlapi.CommunityData('private'))) print("The port {} state of CSW {} is: {}".format(port,ip,p_sts.values())) else: print("Unknow command, use disable, enable or check")
[docs]def port_mirror(ip, s_port, d_port, mode=0, cmd="check"): """Moxa configure mirroring using snmp Parameters ---------- ip : string ip address of moxa switch s_port : int source port number d_port : int destination port number mode : int 0 -> only incoming traffic, 1 -> only outgoing traffic, 2 -> incoming and outgoing traffic (default is 0) cmd : string "disable","enable" or "check". Check print the port state (default is check) """ hlapi.CommunityData('private') #MOXA OIDS for TN5508A CSW target_port_oid= "1.3.6.1.4.1.8691.7.30.1.13.1.0"#the mirrored port mirroring_port_oid= "1.3.6.1.4.1.8691.7.30.1.13.2.0"#the mirroring port monitor_direction_oid="1.3.6.1.4.1.8691.7.30.1.13.3.0"#direction orig_s_port=s_port if s_port==1: s_port="\x80\x00" elif s_port==2: s_port="\x40\x00" elif s_port==3: s_port="\x20\x00" elif s_port==4: s_port="\x10\x00" elif s_port==5: s_port="\x08\x00" elif s_port==6: s_port="\x04\x00" elif s_port==7: s_port="\x02\x00" elif s_port==8: s_port="\x01\x00" s_port=OctetString(s_port) d_port = Integer(d_port) mode=Integer(mode) #s_port=(format(s_port,'08b')) #decimal to octet string t_p_v = {target_port_oid: s_port} m_p_v = {mirroring_port_oid: d_port} print("moxa_port_mirror:") print("mode = 0 -> only incoming traffic") print("mode = 1 -> only outgoing traffic") print("mode = 2 -> incoming and outgoing traffic") reset_t_p = {target_port_oid: OctetString("\x00\x00")} reset_m_p = {mirroring_port_oid: Integer(0)} reset_mode= {monitor_direction_oid: Integer(0)} csw_ip = ip if cmd=="enable": print("Setting target port {} of {}...".format(orig_s_port,ip)) set(csw_ip, t_p_v, hlapi.CommunityData('private')) print("Setting mirroring port {} of {}...".format(d_port,ip)) set(csw_ip, m_p_v, hlapi.CommunityData('private')) print("Setting mirroring mode of {} as {}...".format(ip,mode)) m_d_v = {monitor_direction_oid: mode} set(csw_ip, m_d_v, hlapi.CommunityData('private')) elif cmd=="disable": print("Resetting target port {} of {}...".format(orig_s_port,ip)) set(csw_ip, reset_t_p, hlapi.CommunityData('private')) print("Resetting mirroring port {} of {}...".format(d_port,ip)) set(csw_ip, reset_m_p, hlapi.CommunityData('private')) print("Setting mirroring mode of {} as {}...".format(ip,mode)) set(csw_ip, reset_mode, hlapi.CommunityData('private')) elif cmd=="check": t_p_sts=(get(csw_ip, [target_port_oid], hlapi.CommunityData('private'))) m_p_sts=(get(csw_ip, [mirroring_port_oid], hlapi.CommunityData('private'))) m_d_sts=(get(csw_ip, [monitor_direction_oid], hlapi.CommunityData('private'))) print("The port {} is mirrored to port {} in the CSW {}. The mirror mode is : {}".format( t_p_sts, m_p_sts, ip, m_d_sts,)) else: print("Unknow command, use disable, enable or check")