synchronous oneliner apps redesigned to offer Python generator-based

API along with more comprehensive set of accepted parameters
pull/45/head
elie 2015-07-06 21:30:04 +00:00
parent 51841e9cba
commit f308702c78
37 changed files with 1095 additions and 998 deletions

View File

@ -19,6 +19,8 @@ Revision 4.3.0
* Promote the use of dedicated classes for dealing with OID-value pairs.
Instances of those classes resemble OBJECT-IDENTITY, OBJECT-TYPE and
NOTIFICATION-TYPE MIB structures.
* Synchronous oneliner apps redesigned to offer Python generator-based
API along with a more comprehensive set of accepted parameters.
* keep backward compatibility for all existing major/documented interfaces
- Execution Observer facility implemented to give app an inside view
of SNMP engine inner workings. This is thought to be a generic

View File

@ -22,29 +22,29 @@ targets = (
# 1-st target (SNMPv1 over IPv4/UDP)
( cmdgen.CommunityData('public', mpModel=0),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
( cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0),
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) ) ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) )) ),
# 2-nd target (SNMPv2c over IPv4/UDP)
( cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
( cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0),
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) ) ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) )) ),
# 3-nd target (SNMPv2c over IPv4/UDP) - same community and
# different transport address.
( cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('localhost', 161)),
( cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysContact', 0),
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysName', 0) ) ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysContact', 0)),
cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysName', 0))) ),
# 4-nd target (SNMPv3 over IPv4/UDP)
( cmdgen.UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
( cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0),
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) ) ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) )) ),
# 5-th target (SNMPv3 over IPv6/UDP)
( cmdgen.UsmUserData('usr-md5-none', 'authkey1'),
cmdgen.Udp6TransportTarget(('::1', 161)),
( cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0),
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) ) ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) )) ),
# N-th target
# ...
)

View File

@ -21,19 +21,20 @@ targets = (
# 1-st target (SNMPv1 over IPv4/UDP)
( cmdgen.CommunityData('public', mpModel=0),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
( '1.3.6.1.2.1', '1.3.6.1.3.1') ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('1.3.6.1.2.1')),
cmdgen.ObjectType(cmdgen.ObjectIdentity('1.3.6.1.3.1')) ) ),
# 2-nd target (SNMPv2c over IPv4/UDP)
( cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
( '1.3.6.1.4.1', ) ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('1.3.6.1.4.1')), ) ),
# 3-nd target (SNMPv3 over IPv4/UDP)
( cmdgen.UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
( cmdgen.ObjectIdentity('SNMPv2-MIB', 'system'), ) ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'system')), ) ),
# 4-th target (SNMPv3 over IPv6/UDP)
( cmdgen.UsmUserData('usr-md5-none', 'authkey1'),
cmdgen.Udp6TransportTarget(('::1', 161)),
( cmdgen.ObjectIdentity('IF-MIB', 'ifTable'), ) )
( cmdgen.ObjectType(cmdgen.ObjectIdentity('IF-MIB', 'ifTable')), ) )
# N-th target
# ...
)
@ -77,10 +78,10 @@ snmpEngine = engine.SnmpEngine()
cmdGen = cmdgen.AsyncCommandGenerator()
# Submit initial GETNEXT requests and wait for responses
for authData, transportTarget, varNames in targets:
varBindHead = cmdGen.makeVarBindsHead(snmpEngine, varNames)
for authData, transportTarget, varBinds in targets:
varBindHead = [ x[0] for x in cmdGen.makeVarBinds(snmpEngine, varBinds ) ]
cmdGen.nextCmd(
snmpEngine, authData, transportTarget, cmdgen.ContextData(), varNames,
snmpEngine, authData, transportTarget, cmdgen.ContextData(), varBinds,
# User-space callback function and its context
(cbFun, (varBindHead, authData, transportTarget)),
lookupNames=True, lookupValues=True

View File

@ -35,18 +35,18 @@ targets = (
# 1-st target (SNMPv1 over IPv4/UDP)
( cmdgen.CommunityData('public', mpModel=0),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
( cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0),
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) ) ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) )) ),
# 2-nd target (SNMPv2c over IPv4/UDP)
( cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 1161)),
( cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0),
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) ) ),
( cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) )) ),
# 3-nd target (SNMPv3 over IPv4/UDP)
( cmdgen.UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 2161)),
( cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0),
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) ) )
( cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
cmdgen.ObjectType(cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) )) )
# N-th target
# ...
)

View File

@ -14,28 +14,33 @@
# requires having a collection of Managed Objects registered under
# the ContextEngineId being used.
#
from pysnmp.entity import engine
from pysnmp.entity.rfc3413 import context
from pysnmp.entity.rfc3413.oneliner import ntforg
from pysnmp.entity.rfc3413.oneliner.ntforg import *
from pysnmp.proto import rfc1902
snmpEngine = engine.SnmpEngine()
ntfOrg = ntforg.NotificationOriginator(snmpEngine)
errorIndication, errorStatus, errorIndex, varBinds = ntfOrg.sendNotification(
ntforg.UsmUserData('usr-md5-none', 'authkey1'),
ntforg.UdpTransportTarget(('localhost', 162)),
'inform',
ntforg.NotificationType(ntforg.ObjectIdentity('1.3.6.1.6.3.1.1.5.2')),
contextEngineId=rfc1902.OctetString(hexValue='8000000004030201')
)
if errorIndication:
print('Notification not sent: %s' % errorIndication)
elif errorStatus:
print('Notification Receiver returned error: %s @%s' %
(errorStatus, errorIndex))
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
for errorIndication, \
errorStatus, errorIndex, \
varBinds in \
sendNotification(SnmpEngine(),
UsmUserData('usr-md5-none', 'authkey1'),
UdpTransportTarget(('localhost', 162)),
ContextData(
rfc1902.OctetString(hexValue='8000000004030201')
),
'inform',
NotificationType(
ObjectIdentity('1.3.6.1.6.3.1.1.5.2')
),
lookupNames=True, lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -14,25 +14,30 @@
# requires having a collection of Managed Objects registered under
# the ContextName being used.
#
from pysnmp.entity import engine
from pysnmp.entity.rfc3413.oneliner import ntforg
from pysnmp.entity.rfc3413.oneliner.ntforg import *
snmpEngine = engine.SnmpEngine()
ntfOrg = ntforg.NotificationOriginator(snmpEngine)
errorIndication, errorStatus, errorIndex, varBinds = ntfOrg.sendNotification(
ntforg.UsmUserData('usr-md5-none', 'authkey1'),
ntforg.UdpTransportTarget(('localhost', 162)),
'inform',
ntforg.NotificationType(ntforg.ObjectIdentity('1.3.6.1.6.3.1.1.5.2')),
contextName='my-context'
)
if errorIndication:
print('Notification not sent: %s' % errorIndication)
elif errorStatus:
print('Notification Receiver returned error: %s @%s' % (errorStatus, errorIndex))
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
for errorIndication, \
errorStatus, errorIndex, \
varBinds in \
sendNotification(SnmpEngine(),
UsmUserData('usr-md5-none', 'authkey1'),
UdpTransportTarget(('localhost', 162)),
ContextData(contextName='my-context'),
'inform',
NotificationType(
ObjectIdentity('1.3.6.1.6.3.1.1.5.2')
),
lookupNames=True, lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -13,27 +13,24 @@
# * overriding Enterprise OID with 1.3.6.1.4.1.20408.4.1.1.2
# * include managed object information '1.3.6.1.2.1.1.1.0' = 'my system'
#
from pysnmp.entity.rfc3413.oneliner import ntforg
from pysnmp.entity.rfc3413.oneliner.ntforg import *
from pysnmp.proto import rfc1902
#from pysnmp import debug
#debug.setLogger(debug.Debug('mibview'))
ntfOrg = ntforg.NotificationOriginator()
errorIndication = ntfOrg.sendNotification(
ntforg.CommunityData('public', mpModel=0),
ntforg.UdpTransportTarget(('localhost', 162)),
'trap',
ntforg.NotificationType(
ntforg.ObjectIdentity('1.3.6.1.4.1.20408.4.1.1.2.0.432'),
).addVarBinds(
('1.3.6.1.2.1.1.3.0', 12345),
('1.3.6.1.6.3.18.1.3.0', '127.0.0.1'),
('1.3.6.1.6.3.1.1.4.3.0', '1.3.6.1.4.1.20408.4.1.1.2'),
('1.3.6.1.2.1.1.1.0', rfc1902.OctetString('my system'))
)
)
if errorIndication:
print('Notification not sent: %s' % errorIndication)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in \
sendNotification(SnmpEngine(),
CommunityData('public', mpModel=0),
UdpTransportTarget(('localhost', 162)),
ContextData(),
'trap',
NotificationType(
ObjectIdentity('1.3.6.1.4.1.20408.4.1.1.2.0.432'),
).addVarBinds(
('1.3.6.1.2.1.1.3.0', 12345),
('1.3.6.1.6.3.18.1.3.0', '127.0.0.1'),
('1.3.6.1.6.3.1.1.4.3.0', '1.3.6.1.4.1.20408.4.1.1.2'),
('1.3.6.1.2.1.1.1.0', rfc1902.OctetString('my system'))
)):
if errorIndication:
print(errorIndication)

View File

@ -13,22 +13,22 @@
# * with Enterprise OID 1.3.6.1.4.1.20408.4.1.1.2
# * include managed object information '1.3.6.1.2.1.1.1.0' = 'my system'
#
from pysnmp.entity.rfc3413.oneliner import ntforg
from pysnmp.entity.rfc3413.oneliner.ntforg import *
from pysnmp.proto import rfc1902
ntfOrg = ntforg.NotificationOriginator()
errorIndication = ntfOrg.sendNotification(
ntforg.CommunityData('public', mpModel=0),
ntforg.UdpTransportTarget(('localhost', 162)),
'trap',
ntforg.NotificationType(
ntforg.ObjectIdentity('1.3.6.1.6.3.1.1.5.2')
).addVarBinds(
('1.3.6.1.6.3.1.1.4.3.0', '1.3.6.1.4.1.20408.4.1.1.2'),
('1.3.6.1.2.1.1.1.0', rfc1902.OctetString('my system'))
)
)
if errorIndication:
print('Notification not sent: %s' % errorIndication)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in \
sendNotification(SnmpEngine(),
CommunityData('public', mpModel=0),
UdpTransportTarget(('localhost', 162)),
ContextData(),
'trap',
NotificationType(
ObjectIdentity('1.3.6.1.6.3.1.1.5.2')
).addVarBinds(
('1.3.6.1.6.3.1.1.4.3.0', '1.3.6.1.4.1.20408.4.1.1.2'),
('1.3.6.1.2.1.1.1.0', rfc1902.OctetString('my system'))
)):
if errorIndication:
print(errorIndication)

View File

@ -11,29 +11,32 @@
# * include managed object information specified as a MIB symbol
# * perform response OIDs and values resolution at MIB
#
from pysnmp.entity.rfc3413.oneliner import ntforg
ntfOrg = ntforg.NotificationOriginator()
errorIndication, errorStatus, errorIndex, varBinds = ntfOrg.sendNotification(
ntforg.CommunityData('public'),
ntforg.UdpTransportTarget(('localhost', 162)),
'inform',
ntforg.NotificationType(
ntforg.ObjectIdentity('SNMPv2-MIB', 'coldStart')
).addVarBinds(
( ntforg.ObjectIdentity('SNMPv2-MIB', 'sysName', 0), 'my system' )
),
lookupNames=True, lookupValues=True
)
if errorIndication:
print('Notification not sent: %s' % errorIndication)
elif errorStatus:
print('Notification Receiver returned error: %s @%s' %
(errorStatus, errorIndex))
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
from pysnmp.entity.rfc3413.oneliner.ntforg import *
for errorIndication, \
errorStatus, errorIndex, \
varBinds in \
sendNotification(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget(('localhost', 162)),
ContextData(),
'inform',
NotificationType(
ObjectIdentity('SNMPv2-MIB', 'coldStart')
).addVarBinds(
( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysName', 0), 'my system') )
),
lookupNames=True, lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -10,16 +10,18 @@
# * with TRAP ID 'coldStart' specified as a MIB symbol
# * include managed object information specified as a MIB symbol
#
from pysnmp.entity.rfc3413.oneliner import ntforg
from pysnmp.entity.rfc3413.oneliner.ntforg import *
ntfOrg = ntforg.NotificationOriginator()
errorIndication = ntfOrg.sendNotification(
ntforg.CommunityData('public'),
ntforg.UdpTransportTarget(('localhost', 162)),
'trap',
ntforg.NotificationType(ntforg.ObjectIdentity('SNMPv2-MIB', 'coldStart'))
)
if errorIndication:
print('Notification not sent: %s' % errorIndication)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in \
sendNotification(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget(('localhost', 162)),
ContextData(),
'trap',
NotificationType(
ObjectIdentity('SNMPv2-MIB', 'coldStart')
)):
if errorIndication:
print(errorIndication)

View File

@ -10,27 +10,35 @@
# * with TRAP ID 'warmStart' specified as a string OID
# * include managed object information 1.3.6.1.2.1.1.5.0 = 'system name'
#
from pysnmp.entity.rfc3413.oneliner import ntforg
from pysnmp.entity.rfc3413.oneliner.ntforg import *
from pysnmp.proto import rfc1902
ntfOrg = ntforg.NotificationOriginator()
for errorIndication, \
errorStatus, errorIndex, \
varBinds in \
sendNotification(SnmpEngine(),
UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
UdpTransportTarget(('localhost', 162)),
ContextData(),
'inform',
NotificationType(
ObjectIdentity('1.3.6.1.6.3.1.1.5.2')
).addVarBinds(
( ObjectType(ObjectIdentity('1.3.6.1.2.1.1.5.0'),
rfc1902.OctetString('system name')) )
),
lookupNames=True, lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
errorIndication, errorStatus, errorIndex, varBinds = ntfOrg.sendNotification(
ntforg.UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
ntforg.UdpTransportTarget(('localhost', 162)),
'inform',
ntforg.NotificationType(
ntforg.ObjectIdentity('1.3.6.1.6.3.1.1.5.2')
).addVarBinds(
('1.3.6.1.2.1.1.5.0', rfc1902.OctetString('system name'))
)
)
if errorIndication:
print('Notification not sent: %s' % errorIndication)
elif errorStatus:
print('Notification Receiver returned error: %s @%s' %
(errorStatus, errorIndex))
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))

View File

@ -4,7 +4,7 @@
# Send SNMP notification using the following options:
#
# * SNMPv3
# * with local snmpEngineId = 0x8000000001020304
# * with local snmpEngineId = 0x8000000001020304 (must configure at Receiver)
# * with user 'usr-sha-aes128', auth: SHA, priv: AES128
# * over IPv4/UDP
# * send TRAP notification
@ -15,25 +15,23 @@
# value of SnmpEngineId with Notification Receiver. To facilitate that
# we will use static (e.g. not autogenerated) version of snmpEngineId.
#
from pysnmp.entity import engine
from pysnmp.entity.rfc3413.oneliner import ntforg
from pysnmp.proto import rfc1902
from pysnmp.entity.rfc3413.oneliner.ntforg import *
from pysnmp.proto.rfc1902 import OctetString
# This SNMP Engine ID value should also be configured to TRAP receiver.
snmpEngineId = rfc1902.OctetString(hexValue='8000000001020304')
ntfOrg = ntforg.NotificationOriginator(engine.SnmpEngine(snmpEngineId))
errorIndication = ntfOrg.sendNotification(
ntforg.UsmUserData('usr-sha-aes128', 'authkey1', 'privkey1',
authProtocol=ntforg.usmHMACSHAAuthProtocol,
privProtocol=ntforg.usmAesCfb128Protocol),
ntforg.UdpTransportTarget(('127.0.0.1', 162)),
'trap',
ntforg.NotificationType(
ntforg.ObjectIdentity('SNMPv2-MIB', 'authenticationFailure')
)
)
if errorIndication:
print('Notification not sent: %s' % errorIndication)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in \
sendNotification(SnmpEngine(
OctetString(hexValue='8000000001020304')
),
UsmUserData('usr-sha-aes128', 'authkey1', 'privkey1',
authProtocol=usmHMACSHAAuthProtocol,
privProtocol=usmAesCfb128Protocol),
UdpTransportTarget(('localhost', 162)),
ContextData(),
'trap',
NotificationType(
ObjectIdentity('SNMPv2-MIB', 'authenticationFailure')
)):
if errorIndication:
print(errorIndication)

View File

@ -8,29 +8,29 @@
# * to an Agent at demo.snmplabs.com:161
# * setting SNMPv2-MIB::sysName.0 to new value (type taken from MIB)
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.setCmd(
cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectType(
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysORDescr', 1),
'new system name'
)
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in setCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(
ObjectIdentity('SNMPv2-MIB', 'sysORDescr', 1),
'new system name'
)):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -9,27 +9,29 @@
# * for IF-MIB::ifInOctets.1 MIB object
# * Pass attached MIB compiler non-default ASN.1 MIB source
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectIdentity('IF-MIB', 'ifInOctets', 1).addAsn1MibSource('file:///usr/share/snmp', 'http://mibs.snmplabs.com/asn1/@mib@'),
lookupNames=True, lookupValues=True
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in getCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(
ObjectIdentity('IF-MIB', 'ifInOctets', 1).addAsn1MibSource('file:///usr/share/snmp', 'http://mibs.snmplabs.com/asn1/@mib@')
),
lookupNames=True, lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for oid, val in varBinds:
print('%s = %s' % (oid.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -11,32 +11,33 @@
# * contextName 'a172334d7d97871b72241397f713fa12'
# * setting SNMPv2-MIB::sysName.0 to new value (type taken from MIB)
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
from pysnmp.proto import rfc1902
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.setCmd(
cmdgen.UsmUserData('usr-md5-none', 'authkey1'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectType(
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysORDescr', 1),
'new system name'
),
contextEngineId=rfc1902.OctetString(hexValue='80004fb805636c6f75644dab22cc'),
contextName='da761cfc8c94d3aceef4f60f049105ba'
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in setCmd(SnmpEngine(),
UsmUserData('usr-md5-none', 'authkey1'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(
contextEngineId=rfc1902.OctetString(hexValue='80004fb805636c6f75644dab22cc'),
contextName='da761cfc8c94d3aceef4f60f049105ba'
),
ObjectType(
ObjectIdentity('SNMPv2-MIB', 'sysORDescr', 1),
'new system name'
)):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -10,35 +10,35 @@
# * to an Agent at demo.snmplabs.com:161
# * setting SNMPv2-MIB::sysName.0 to new value (type taken from MIB)
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
from pysnmp.proto import rfc1902
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.setCmd(
cmdgen.UsmUserData(
'usr-md5-des', 'authkey1', 'privkey1',
securityEngineId=rfc1902.OctetString(
hexValue='80004fb805636c6f75644dab22cc'
)
),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectType(
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysORDescr', 1),
'new system name'
)
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in setCmd(SnmpEngine(),
UsmUserData(
'usr-md5-des', 'authkey1', 'privkey1',
securityEngineId=rfc1902.OctetString(
hexValue='80004fb805636c6f75644dab22cc'
)
),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(
ObjectIdentity('SNMPv2-MIB', 'sysORDescr', 1),
'new system name'
)):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -12,30 +12,31 @@
# maxCalls == 10 request-response interactions occur
# * ignoring non-increasing OIDs whenever reported by Agent
#
# make sure IF-MIB.py is search path
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.bulkCmd(
cmdgen.UsmUserData('usr-none-none'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
0, 50,
cmdgen.ObjectIdentity('TCP-MIB', 'tcpConnTable').addMibSource('/tmp/mibs'),
lexicographicMode=True, maxRows=100, maxCalls=10,ignoreNonIncreasingOid=True
)
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in bulkCmd(SnmpEngine(),
UsmUserData('usr-none-none'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
0, 50,
ObjectType(ObjectIdentity('TCP-MIB', 'tcpConnTable').addMibSource('/tmp/mibs')),
maxRows=100, maxCalls=10,
lexicographicMode=True,
ignoreNonIncreasingOid=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
break
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
break
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -22,26 +22,28 @@
# if no response arrives, there will be no retry. Likewise, retries=1
# means one initial request plus one retry.
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161),timeout=1.5,retries=0),
'1.3.6.1.2.1.1.1.0',
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in getCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget(
('demo.snmplabs.com', 161), timeout=1.5, retries=0
),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -14,26 +14,27 @@
# securityName can be made human-readable, also it is not an index in
# usmUserTable, thus duplicate securityName parameters are possible.
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.UsmUserData('usr-md5-none', 'authkey1', securityName='myuser'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
'1.3.6.1.2.1.1.1.0'
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in getCmd(SnmpEngine(),
UsmUserData('usr-md5-none', 'authkey1',
securityName='myuser'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -9,28 +9,28 @@
# * for three OIDs: one passed as a ObjectIdentity object while others are
# * in string form
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
cmdgen.Udp6TransportTarget(('::1', 161)),
cmdgen.ObjectIdentity('1.3.6.1.2.1.1.1.0'),
'1.3.6.1.2.1.1.2.0',
'1.3.6.1.2.1.1.3.0'
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in getCmd(SnmpEngine(),
UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
Udp6TransportTarget(('::1', 161)),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.2.0')),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.3.0'))):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -9,27 +9,27 @@
# * for IF-MIB::ifInOctets.1 MIB object
# * perform response OIDs and values resolution at MIB
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.UsmUserData('usr-none-none'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectIdentity('IF-MIB', 'ifInOctets', 1),
lookupNames=True, lookupValues=True
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in getCmd(SnmpEngine(),
UsmUserData('usr-none-none'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('IF-MIB', 'ifInOctets', 1)),
lookupNames=True, lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -8,28 +8,33 @@
# * to an Agent at demo.snmplabs.com:161
# * for TCP-MIB::tcpConnLocalAddress."0.0.0.0".22."0.0.0.0".0 MIB object
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.UsmUserData('usr-sha-aes128', 'authkey1', 'privkey1',
authProtocol=cmdgen.usmHMACSHAAuthProtocol,
privProtocol=cmdgen.usmAesCfb128Protocol ),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectIdentity('TCP-MIB', 'tcpConnLocalAddress', '0.0.0.0', 22, '0.0.0.0', 0)
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in getCmd(SnmpEngine(),
UsmUserData('usr-sha-aes128', 'authkey1', 'privkey1',
authProtocol=usmHMACSHAAuthProtocol,
privProtocol=usmAesCfb128Protocol ),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(
ObjectIdentity('TCP-MIB',
'tcpConnLocalAddress',
'0.0.0.0', 22,
'0.0.0.0', 0)
)):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -11,32 +11,30 @@
# * stop when response OIDs leave the scopes of the table OR maxRows == 20
# * perform response OIDs and values resolution at MIB
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
# Send a series of SNMP GETBULK requests
# make sure IF-MIB.py and IP-MIB.py are in search path
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.bulkCmd(
cmdgen.UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
cmdgen.Udp6TransportTarget(('::1', 161)),
1, 25,
cmdgen.ObjectIdentity('IP-MIB', 'ipAdEntAddr'),
cmdgen.ObjectIdentity('IF-MIB', 'ifEntry'),
lookupNames=True, lookupValues=True, maxRows=20
)
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in bulkCmd(SnmpEngine(),
UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
Udp6TransportTarget(('::1', 161)),
ContextData(),
1, 25,
ObjectType(ObjectIdentity('IP-MIB', 'ipAdEntAddr')),
ObjectType(ObjectIdentity('IP-MIB', 'ipAddrEntry')),
lookupNames=True, lookupValues=True, maxRows=20):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
break
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
break
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -12,30 +12,31 @@
#
# make sure IF-MIB.py is search path
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
cmdgen.UsmUserData('usr-sha-aes128', 'authkey1', 'privkey1',
authProtocol=cmdgen.usmHMACSHAAuthProtocol,
privProtocol=cmdgen.usmAesCfb128Protocol),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectIdentity('IF-MIB', '').loadMibs(),
lexicographicMode=True, maxRows=100,
ignoreNonIncreasingOid=True
)
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in nextCmd(SnmpEngine(),
UsmUserData('usr-sha-aes128', 'authkey1', 'privkey1',
authProtocol=usmHMACSHAAuthProtocol,
privProtocol=usmAesCfb128Protocol),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('IF-MIB', '').loadMibs()),
lexicographicMode=True, maxRows=100,
ignoreNonIncreasingOid=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
break
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
break
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -9,27 +9,28 @@
# * for two OIDs in string form
# * stop when response OIDs leave the scopes of initial OIDs
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
'1.3.6.1.2.1.2.2.1.2',
'1.3.6.1.2.1.2.2.1.3'
)
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in nextCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.2.2.1.2')),
ObjectType(ObjectIdentity('1.3.6.1.2.1.2.2.1.3'))):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
break
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
break
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -12,27 +12,28 @@
#
# make sure IF-MIB.py is in search path
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
cmdgen.UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
cmdgen.Udp6TransportTarget(('::1', 161)),
cmdgen.ObjectIdentity('IF-MIB', 'ifEntry'),
lookupNames=True, lookupValues=True
)
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in nextCmd(SnmpEngine(),
UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
Udp6TransportTarget(('::1', 161)),
ContextData(),
ObjectType(ObjectIdentity('IF-MIB', 'ifEntry')),
lookupNames=True, lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
break
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
break
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -10,29 +10,28 @@
# * stop when response OIDs leave the scopes of the table
# * perform response values resolution at MIB
#
# make sure IF-MIB.py is in search path
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
cmdgen.UsmUserData('usr-md5-none', 'authkey1'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectIdentity('IF-MIB', ''),
lookupValues=True
)
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in nextCmd(SnmpEngine(),
UsmUserData('usr-md5-none', 'authkey1'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('IF-MIB', '')),
lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
break
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
break
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -9,32 +9,32 @@
# * for some columns of the IF-MIB::ifEntry table
# * stop when response OIDs leave the scopes of initial OIDs
#
# make sure IF-MIB.py is in search path
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
cmdgen.CommunityData('public', mpModel=0),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectIdentity('IF-MIB', 'ifDescr'),
cmdgen.ObjectIdentity('IF-MIB', 'ifType'),
cmdgen.ObjectIdentity('IF-MIB', 'ifMtu'),
cmdgen.ObjectIdentity('IF-MIB', 'ifSpeed'),
cmdgen.ObjectIdentity('IF-MIB', 'ifPhysAddress')
)
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in nextCmd(SnmpEngine(),
CommunityData('public', mpModel=0),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('IF-MIB', 'ifDescr')),
ObjectType(ObjectIdentity('IF-MIB', 'ifType')),
ObjectType(ObjectIdentity('IF-MIB', 'ifMtu')),
ObjectType(ObjectIdentity('IF-MIB', 'ifSpeed')),
ObjectType(ObjectIdentity('IF-MIB', 'ifPhysAddress')),
ObjectType(ObjectIdentity('IF-MIB', 'ifType'))):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
break
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
break
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -8,29 +8,29 @@
# * to an Agent at demo.snmplabs.com:161
# * setting two OIDs to new values (types explicitly specified)
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
from pysnmp.proto import rfc1902
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.setCmd(
cmdgen.CommunityData('public', mpModel=0),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
('1.3.6.1.2.1.1.9.1.2.1', rfc1902.ObjectName('1.3.6.1.4.1.20408.1.1')),
('1.3.6.1.2.1.1.9.1.2.1', '1.3.6.1.4.1.20408.1.1'),
('1.3.6.1.2.1.1.9.1.3.1', rfc1902.OctetString('new system name'))
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in setCmd(SnmpEngine(),
CommunityData('public', mpModel=0),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
('1.3.6.1.2.1.1.9.1.2.1', rfc1902.ObjectName('1.3.6.1.4.1.20408.1.1')),
('1.3.6.1.2.1.1.9.1.2.1', '1.3.6.1.4.1.20408.1.1'),
('1.3.6.1.2.1.1.9.1.3.1', rfc1902.OctetString('new system name'))):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -13,14 +13,11 @@
# ** to the same SNMP Engine ID
# ** for an OID in text form
#
from pysnmp.entity import engine
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
snmpEngine = engine.SnmpEngine()
snmpEngine = SnmpEngine()
cmdGen = cmdgen.CommandGenerator(snmpEngine)
transportTarget = cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161))
transportTarget = UdpTransportTarget(('demo.snmplabs.com', 161))
#
# To discover remote SNMP EngineID we will tap on SNMP engine inner workings
@ -39,11 +36,12 @@ snmpEngine.observer.registerObserver(
# Send probe SNMP request with invalid credentials
authData = cmdgen.UsmUserData('non-existing-user')
authData = UsmUserData('non-existing-user')
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
authData, transportTarget
)
for errorIndication, errorStatus, errorIndex, \
varBinds in getCmd(snmpEngine, authData,
transportTarget, ContextData()):
break
# See if our SNMP engine received REPORT PDU containing securityEngineId
@ -59,12 +57,14 @@ print('Remote securityEngineId = %s' % securityEngineId.prettyPrint())
# Query remote SNMP Engine using usmUserTable entry configured for it
#
authData = cmdgen.UsmUserData('usr-md5-none', 'authkey1',
securityEngineId=securityEngineId)
authData = UsmUserData('usr-md5-none', 'authkey1',
securityEngineId=securityEngineId)
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
authData, transportTarget, '1.3.6.1.2.1.1.1.0'
)
for errorIndication, errorStatus, errorIndex, \
varBinds in getCmd(snmpEngine, authData, \
transportTarget, ContextData(), \
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))):
break
# Check for errors and print out results
if errorIndication:

View File

@ -8,26 +8,26 @@
# * to an Agent at demo.snmplabs.com:161
# * for IF-MIB::ifInOctets.1 MIB object
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.UsmUserData('usr-md5-none', 'authkey1'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectIdentity('IF-MIB', 'ifInOctets', 1)
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in getCmd(SnmpEngine(),
UsmUserData('usr-md5-none', 'authkey1'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('IF-MIB', 'ifInOctets', 1))):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -9,28 +9,27 @@
# * for two instances of SNMPv2-MIB::sysDescr.0 MIB object,
# * one in label and another in MIB symbol form
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.CommunityData('public', mpModel=0),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
cmdgen.ObjectIdentity('iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0'),
cmdgen.ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in getCmd(SnmpEngine(),
CommunityData('public', mpModel=0),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
lookupNames=True, lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -8,27 +8,27 @@
# * to an Agent at demo.snmplabs.com:161
# * for two OIDs in string form
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
'1.3.6.1.2.1.1.1.0',
'1.3.6.1.2.1.1.6.0'
)
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in getCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.6.0'))):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
else:
for name, val in varBinds:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))
break

View File

@ -10,28 +10,29 @@
# * for two OIDs in string form
# * stop when response OIDs leave the scopes of initial OIDs
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.bulkCmd(
cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
0, 25,
'1.3.6.1.2.1.2.2',
'1.3.6.1.2.1.2.3',
)
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1][0] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in bulkCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
0, 25,
ObjectType(ObjectIdentity('1.3.6.1.2.1.2.2')),
ObjectType(ObjectIdentity('1.3.6.1.2.1.2.3'))):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
break
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
break
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -20,27 +20,28 @@
# currently loaded MIBs, unresolved OIDs and values will still be
# returned.
#
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.entity.rfc3413.oneliner.cmdgen import *
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
cmdgen.CommunityData('public'),
cmdgen.UdpTransportTarget(('demo.snmplabs.com', 161)),
'1.3.6.1.2.1.1',
lookupNames=True, lookupValues=True
)
if errorIndication:
print(errorIndication)
else:
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1] or '?'
)
)
for errorIndication, \
errorStatus, errorIndex, \
varBinds in nextCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1')),
lookupNames=True, lookupValues=True):
# Check for errors and print out results
if errorIndication:
print(errorIndication)
break
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
if errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex)-1][0] or '?'
)
)
break
else:
for varBind in varBinds:
print(' = '.join([ x.prettyPrint() for x in varBind ]))

View File

@ -26,6 +26,9 @@ usmAesCfb192Protocol = config.usmAesCfb192Protocol
usmAesCfb256Protocol = config.usmAesCfb256Protocol
usmNoPrivProtocol = config.usmNoPrivProtocol
# SNMP engine
SnmpEngine = engine.SnmpEngine
nextID = nextid.Integer(0xffffffff)
class AsyncCommandGenerator:
@ -241,7 +244,7 @@ class AsyncCommandGenerator:
# Async SNMP apps
def getCmd(self, snmpEngine, authData, transportTarget, contextData,
varNames, cbInfo, lookupNames=False, lookupValues=False):
varBinds, cbInfo, lookupNames=False, lookupValues=False):
def __cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBinds, cbCtx):
@ -268,7 +271,7 @@ class AsyncCommandGenerator:
addrName,
contextData.contextEngineId,
contextData.contextName,
self.makeVarBinds(snmpEngine, [(x, self._null) for x in varNames]),
self.makeVarBinds(snmpEngine, varBinds),
__cbFun,
(lookupNames, lookupValues, cbFun, cbCtx)
)
@ -307,7 +310,7 @@ class AsyncCommandGenerator:
)
def nextCmd(self, snmpEngine, authData, transportTarget, contextData,
varNames, cbInfo, lookupNames=False, lookupValues=False):
varBinds, cbInfo, lookupNames=False, lookupValues=False):
def __cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBindTable, cbCtx):
@ -330,13 +333,13 @@ class AsyncCommandGenerator:
snmpEngine,
addrName,
contextData.contextEngineId, contextData.contextName,
self.makeVarBinds(snmpEngine, [(x, self._null) for x in varNames]),
self.makeVarBinds(snmpEngine, varBinds),
__cbFun,
(lookupNames, lookupValues, cbFun, cbCtx)
)
def bulkCmd(self, snmpEngine, authData, transportTarget, contextData,
nonRepeaters, maxRepetitions, varNames, cbInfo,
nonRepeaters, maxRepetitions, varBinds, cbInfo,
lookupNames=False, lookupValues=False):
def __cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
@ -362,12 +365,246 @@ class AsyncCommandGenerator:
contextData.contextEngineId,
contextData.contextName,
nonRepeaters, maxRepetitions,
self.makeVarBinds(snmpEngine, [(x, self._null) for x in varNames]),
self.makeVarBinds(snmpEngine, varBinds),
__cbFun,
(lookupNames, lookupValues, cbFun, cbCtx)
)
# compatibility implementation, never use this class for new applications
# Synchronous one-liner SNMP apps
def getCmd(snmpEngine, authData, transportTarget, contextData,
*varBinds, **kwargs):
def cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBinds, cbCtx):
cbCtx['errorIndication'] = errorIndication
cbCtx['errorStatus'] = errorStatus
cbCtx['errorIndex'] = errorIndex
cbCtx['varBinds'] = varBinds
cbCtx = {}
AsyncCommandGenerator().getCmd(
snmpEngine,
authData,
transportTarget,
contextData,
varBinds,
(cbFun, cbCtx),
kwargs.get('lookupNames'),
kwargs.get('lookupValues')
)
snmpEngine.transportDispatcher.runDispatcher()
yield cbCtx['errorIndication'], \
cbCtx['errorStatus'], cbCtx['errorIndex'], \
cbCtx['varBinds']
def setCmd(snmpEngine, authData, transportTarget, contextData,
*varBinds, **kwargs):
def cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBinds, cbCtx):
cbCtx['errorIndication'] = errorIndication
cbCtx['errorStatus'] = errorStatus
cbCtx['errorIndex'] = errorIndex
cbCtx['varBinds'] = varBinds
cbCtx = {}
AsyncCommandGenerator().setCmd(
snmpEngine,
authData,
transportTarget,
contextData,
varBinds,
(cbFun, cbCtx),
kwargs.get('lookupNames'),
kwargs.get('lookupValues')
)
snmpEngine.transportDispatcher.runDispatcher()
yield cbCtx['errorIndication'], \
cbCtx['errorStatus'], cbCtx['errorIndex'], \
cbCtx['varBinds']
def nextCmd(snmpEngine, authData, transportTarget, contextData,
*varBinds, **kwargs):
def cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBindTable, cbCtx):
cbCtx['errorIndication'] = errorIndication
cbCtx['errorStatus'] = errorStatus
cbCtx['errorIndex'] = errorIndex
cbCtx['varBindTable'] = varBindTable
lookupNames = kwargs.get('lookupNames', False)
lookupValues = kwargs.get('lookupValues', False)
lexicographicMode = kwargs.get('lexicographicMode', False)
ignoreNonIncreasingOid = kwargs.get('ignoreNonIncreasingOid', False)
maxRows = kwargs.get('maxRows', 0)
maxCalls = kwargs.get('maxCalls', 0)
cbCtx = {}
cmdGen = AsyncCommandGenerator()
initialVars = [ x[0] for x in cmdGen.makeVarBinds(snmpEngine, varBinds) ]
totalRows = totalCalls = 0
while True:
cmdGen.nextCmd(snmpEngine,
authData,
transportTarget,
contextData,
[ (x[0], univ.Null()) for x in varBinds ],
(cbFun, cbCtx),
kwargs.get('lookupNames'),
kwargs.get('lookupValues'))
snmpEngine.transportDispatcher.runDispatcher()
errorIndication = cbCtx['errorIndication']
errorStatus = cbCtx['errorStatus']
errorIndex = cbCtx['errorIndex']
if ignoreNonIncreasingOid and errorIndication and \
isinstance(errorIndication, errind.OidNotIncreasing):
errorIndication = None
if errorStatus or errorIndication:
if errorStatus == 2:
# Hide SNMPv1 noSuchName error which leaks in here
# from SNMPv1 Agent through internal pysnmp proxy.
errorStatus = errorStatus.clone(0)
errorIndex = errorIndex.clone(0)
yield errorIndication, errorStatus, errorIndex, varBinds
continue
else:
varBinds = cbCtx['varBindTable'] and cbCtx['varBindTable'][0]
for idx, varBind in enumerate(varBinds):
name, val = varBind
if not isinstance(val, univ.Null):
if lexicographicMode or initialVars[idx].isPrefixOf(name):
break
else:
return
totalRows += 1
totalCalls += 1
yield errorIndication, errorStatus, errorIndex, varBinds
if maxRows and totalRows >= maxRows or \
maxCalls and totalCalls >= maxCalls:
return
def bulkCmd(snmpEngine, authData, transportTarget, contextData,
nonRepeaters, maxRepetitions, *varBinds, **kwargs):
def cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBindTable, cbCtx):
cbCtx['errorIndication'] = errorIndication
cbCtx['errorStatus'] = errorStatus
cbCtx['errorIndex'] = errorIndex
cbCtx['varBindTable'] = varBindTable
lookupNames = kwargs.get('lookupNames', False)
lookupValues = kwargs.get('lookupValues', False)
lexicographicMode = kwargs.get('lexicographicMode', False)
ignoreNonIncreasingOid = kwargs.get('ignoreNonIncreasingOid', False)
maxRows = kwargs.get('maxRows', 0)
maxCalls = kwargs.get('maxCalls', 0)
cbCtx = {}
cmdGen = AsyncCommandGenerator()
initialVars = [ x[0] for x in cmdGen.makeVarBinds(snmpEngine, varBinds) ]
nullVarBinds = [ False ] * len(initialVars)
totalRows = totalCalls = 0
stopFlag = False
while not stopFlag:
if maxRows and totalRows < maxRows:
maxRepetitions = min(maxRepetitions, maxRows-totalRows)
cmdGen.bulkCmd(snmpEngine,
authData,
transportTarget,
contextData,
nonRepeaters, maxRepetitions,
[ (x[0], univ.Null()) for x in varBinds ],
(cbFun, cbCtx),
kwargs.get('lookupNames'),
kwargs.get('lookupValues'))
snmpEngine.transportDispatcher.runDispatcher()
errorIndication = cbCtx['errorIndication']
errorStatus = cbCtx['errorStatus']
errorIndex = cbCtx['errorIndex']
varBindTable = cbCtx['varBindTable']
if ignoreNonIncreasingOid and errorIndication and \
isinstance(errorIndication, errind.OidNotIncreasing):
errorIndication = None
if errorStatus or errorIndication:
if errorStatus == 2:
# Hide SNMPv1 noSuchName error which leaks in here
# from SNMPv1 Agent through internal pysnmp proxy.
errorStatus = errorStatus.clone(0)
errorIndex = errorIndex.clone(0)
yield errorIndication, errorStatus, errorIndex, varBindTable and varBindTable[0] or []
continue
else:
for i in range(len(varBindTable)):
stopFlag = True
if len(varBindTable[i]) != len(initialVars):
varBindTable = i and varBindTable[:i-1] or []
break
for j in range(len(varBindTable[i])):
name, val = varBindTable[i][j]
if nullVarBinds[j]:
varBindTable[i][j] = name, rfc1905.endOfMibView
continue
stopFlag = False
if isinstance(val, univ.Null):
nullVarBinds[j] = True
elif not lexicographicMode and \
not initialVars[j].isPrefixOf(name):
varBindTable[i][j] = name, rfc1905.endOfMibView
nullVarBinds[j] = True
if stopFlag:
varBindTable = i and varBindTable[:i-1] or []
break
totalRows += len(varBindTable)
totalCalls += 1
if maxRows and totalRows >= maxRows:
if totalRows > maxRows:
varBindTable = varBindTable[:-(totalRows-maxRows)]
stopFlag = True
if maxCalls and totalCalls >= maxCalls:
stopFlag = True
for varBinds in varBindTable:
yield errorIndication, errorStatus, errorIndex, varBinds
#
# The rest of code in this file belongs to obsolete, compatibility wrappers.
# Never use interfaces below for new applications!
#
class AsynCommandGenerator:
def __init__(self, snmpEngine=None):
if snmpEngine is None:
@ -428,7 +665,9 @@ class AsynCommandGenerator:
return self.__asyncCmdGen.getCmd(
self.snmpEngine,
authData, transportTarget,
ContextData(contextEngineId, contextName), varNames, cbInfo,
ContextData(contextEngineId, contextName),
[(x, self._null) for x in varNames],
cbInfo,
lookupNames, lookupValues
)
@ -482,7 +721,9 @@ class AsynCommandGenerator:
return self.__asyncCmdGen.nextCmd(
self.snmpEngine,
authData, transportTarget,
ContextData(contextEngineId, contextName), varNames, cbInfo,
ContextData(contextEngineId, contextName),
[(x, self._null) for x in varNames],
cbInfo,
lookupNames, lookupValues
)
@ -512,7 +753,8 @@ class AsynCommandGenerator:
authData, transportTarget,
ContextData(contextEngineId, contextName),
nonRepeaters, maxRepetitions,
varNames, cbInfo,
[(x, self._null) for x in varNames],
cbInfo,
lookupNames, lookupValues
)
@ -520,262 +762,57 @@ class AsynCommandGenerator:
class CommandGenerator:
def __init__(self, snmpEngine=None, asynCmdGen=None):
if asynCmdGen is None:
self.__asynCmdGen = AsynCommandGenerator(snmpEngine)
else:
self.__asynCmdGen = asynCmdGen
# compatibility attributes
self.snmpEngine = self.__asynCmdGen.snmpEngine
self.mibViewController = self.__asynCmdGen.mibViewController
self.snmpEngine = snmpEngine or SnmpEngine()
self.mibViewController = AsyncCommandGenerator().getMibViewController(self.snmpEngine)
def getCmd(self, authData, transportTarget, *varNames, **kwargs):
def __cbFun(sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBinds, appReturn):
appReturn['errorIndication'] = errorIndication
appReturn['errorStatus'] = errorStatus
appReturn['errorIndex'] = errorIndex
appReturn['varBinds'] = varBinds
appReturn = {}
self.__asynCmdGen.getCmd(
authData,
transportTarget,
varNames,
(__cbFun, appReturn),
kwargs.get('lookupNames', False),
kwargs.get('lookupValues', False),
kwargs.get('contextEngineId'),
kwargs.get('contextName', null)
)
self.snmpEngine.transportDispatcher.runDispatcher()
return (
appReturn['errorIndication'],
appReturn['errorStatus'],
appReturn['errorIndex'],
appReturn['varBinds']
)
for x in getCmd(self.snmpEngine, authData, transportTarget,
ContextData(kwargs.get('contextEngineId'),
kwargs.get('contextName', null)),
*[ (x, univ.Null()) for x in varNames ],
**kwargs):
return x
def setCmd(self, authData, transportTarget, *varBinds, **kwargs):
def __cbFun(sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBinds, appReturn):
appReturn['errorIndication'] = errorIndication
appReturn['errorStatus'] = errorStatus
appReturn['errorIndex'] = errorIndex
appReturn['varBinds'] = varBinds
appReturn = {}
self.__asynCmdGen.setCmd(
authData,
transportTarget,
varBinds,
(__cbFun, appReturn),
kwargs.get('lookupNames', False),
kwargs.get('lookupValues', False),
kwargs.get('contextEngineId'),
kwargs.get('contextName', null)
)
self.snmpEngine.transportDispatcher.runDispatcher()
return (
appReturn['errorIndication'],
appReturn['errorStatus'],
appReturn['errorIndex'],
appReturn['varBinds']
)
for x in setCmd(self.snmpEngine, authData, transportTarget,
ContextData(kwargs.get('contextEngineId'),
kwargs.get('contextName', null)),
*varBinds,
**kwargs):
return x
def nextCmd(self, authData, transportTarget, *varNames, **kwargs):
def __cbFun(sendRequestHandle, errorIndication,
errorStatus, errorIndex, varBindTable, cbCtx):
(self, varBindHead, varBindTotalTable, appReturn) = cbCtx
if (ignoreNonIncreasingOid or \
hasattr(self, 'ignoreNonIncreasingOid') and \
self.ignoreNonIncreasingOid ) and \
errorIndication and \
isinstance(errorIndication, errind.OidNotIncreasing):
errorIndication = None
if errorStatus or errorIndication:
appReturn['errorIndication'] = errorIndication
if errorStatus == 2:
# Hide SNMPv1 noSuchName error which leaks in here
# from SNMPv1 Agent through internal pysnmp proxy.
appReturn['errorStatus'] = errorStatus.clone(0)
appReturn['errorIndex'] = errorIndex.clone(0)
else:
appReturn['errorStatus'] = errorStatus
appReturn['errorIndex'] = errorIndex
appReturn['varBindTable'] = varBindTotalTable
return
else:
varBindTableRow = varBindTable and varBindTable[-1] or varBindTable
for idx in range(len(varBindTableRow)):
name, val = varBindTableRow[idx]
# XXX extra rows
if not isinstance(val, univ.Null):
if lexicographicMode or \
hasattr(self, 'lexicographicMode') and \
self.lexicographicMode: # obsolete
if varBindHead[idx] <= name:
break
else:
if varBindHead[idx].isPrefixOf(name):
break
else:
appReturn['errorIndication'] = errorIndication
appReturn['errorStatus'] = errorStatus
appReturn['errorIndex'] = errorIndex
appReturn['varBindTable'] = varBindTotalTable
return
varBindTotalTable.extend(varBindTable)
varBindTable = []
for errorIndication, \
errorStatus, errorIndex, \
varBinds in nextCmd(self.snmpEngine, authData, transportTarget,
ContextData(kwargs.get('contextEngineId'),
kwargs.get('contextName', null)),
*[ (x, univ.Null()) for x in varNames ],
**kwargs):
if errorIndication or errorStatus:
return errorIndication, errorStatus, errorIndex, varBinds
if maxRows and len(varBindTotalTable) >= maxRows or \
hasattr(self, 'maxRows') and self.maxRows and \
len(varBindTotalTable) >= self.maxRows:
appReturn['errorIndication'] = errorIndication
appReturn['errorStatus'] = errorStatus
appReturn['errorIndex'] = errorIndex
if hasattr(self, 'maxRows'):
appReturn['varBindTable'] = varBindTotalTable[:self.maxRows]
else:
appReturn['varBindTable'] = varBindTotalTable[:maxRows]
return
if maxCalls[0] > 0:
maxCalls[0] -= 1
if maxCalls[0] == 0:
appReturn['errorIndication'] = errorIndication
appReturn['errorStatus'] = errorStatus
appReturn['errorIndex'] = errorIndex
appReturn['varBindTable'] = varBindTotalTable
return
varBindTable.append(varBinds)
return 1 # continue table retrieval
lookupNames = kwargs.get('lookupNames', False)
lookupValues = kwargs.get('lookupValues', False)
contextEngineId = kwargs.get('contextEngineId')
contextName = kwargs.get('contextName', null)
lexicographicMode = kwargs.get('lexicographicMode', False)
maxRows = kwargs.get('maxRows', 0)
maxCalls = [ kwargs.get('maxCalls', 0) ]
ignoreNonIncreasingOid = kwargs.get('ignoreNonIncreasingOid', False)
varBindHead = [ univ.ObjectIdentifier(x[0]) for x in self.__asynCmdGen.makeReadVarBinds(varNames) ]
appReturn = {}
self.__asynCmdGen.nextCmd(
authData,
transportTarget,
varNames,
(__cbFun, (self, varBindHead, [], appReturn)),
lookupNames, lookupValues,
contextEngineId, contextName
)
self.snmpEngine.transportDispatcher.runDispatcher()
return (
appReturn['errorIndication'],
appReturn['errorStatus'],
appReturn['errorIndex'],
appReturn['varBindTable']
)
return errorIndication, errorStatus, errorIndex, varBindTable
def bulkCmd(self, authData, transportTarget,
nonRepeaters, maxRepetitions, *varNames, **kwargs):
def __cbFun(sendRequestHandle, errorIndication,
errorStatus, errorIndex, varBindTable, cbCtx):
(self, varBindHead, nullVarBinds, varBindTotalTable, appReturn) = cbCtx
if (ignoreNonIncreasingOid or \
hasattr(self, 'ignoreNonIncreasingOid') and \
self.ignoreNonIncreasingOid ) and \
errorIndication and \
isinstance(errorIndication, errind.OidNotIncreasing):
errorIndication = None
if errorStatus or errorIndication:
appReturn['errorIndication'] = errorIndication
appReturn['errorStatus'] = errorStatus
appReturn['errorIndex'] = errorIndex
appReturn['varBindTable'] = varBindTable
return
else:
stopFlag = False
if not lexicographicMode: # cut possible extra OIDs
stopFlag = True
for i in range(len(varBindTable)):
stopFlag = True
if len(varBindTable[i]) != len(varBindHead):
varBindTable = i and varBindTable[:i-1] or []
break
for j in range(len(varBindTable[i])): # dichotomy?
name, val = varBindTable[i][j]
if nullVarBinds[j]:
varBindTable[i][j] = name, rfc1905.endOfMibView
continue
stopFlag = False
if not isinstance(val, univ.Null):
if not varBindHead[j].isPrefixOf(name):
varBindTable[i][j] = name, rfc1905.endOfMibView
nullVarBinds[j] = True
if stopFlag:
varBindTable = i and varBindTable[:i-1] or []
break
varBindTable = []
for errorIndication, \
errorStatus, errorIndex, \
varBinds in bulkCmd(self.snmpEngine, authData, transportTarget,
ContextData(kwargs.get('contextEngineId'),
kwargs.get('contextName', null)),
nonRepeaters, maxRepetitions,
*[ (x, univ.Null()) for x in varNames ],
**kwargs):
if errorIndication or errorStatus:
return errorIndication, errorStatus, errorIndex, varBinds
varBindTotalTable.extend(varBindTable)
varBindTable.append(varBinds)
appReturn['errorIndication'] = errorIndication
appReturn['errorStatus'] = errorStatus
appReturn['errorIndex'] = errorIndex
appReturn['varBindTable'] = varBindTotalTable
return errorIndication, errorStatus, errorIndex, varBindTable
if maxCalls[0] > 0:
maxCalls[0] -= 1
if maxCalls[0] == 0:
return
if maxRows and len(varBindTotalTable) >= maxRows or \
hasattr(self, 'maxRows') and self.maxRows and \
len(varBindTotalTable) >= self.maxRows: # obsolete
if hasattr(self, 'maxRows'):
appReturn['varBindTable'] = varBindTotalTable[:self.maxRows]
else:
appReturn['varBindTable'] = varBindTotalTable[:maxRows]
return
return not stopFlag # continue table retrieval
lookupNames = kwargs.get('lookupNames', False)
lookupValues = kwargs.get('lookupValues', False)
contextEngineId = kwargs.get('contextEngineId')
contextName = kwargs.get('contextName', null)
lexicographicMode = kwargs.get('lexicographicMode', False)
if not lexicographicMode: # obsolete
if hasattr(self, 'lexicographicMode') and self.lexicographicMode:
lexicographicMode = True
maxRows = kwargs.get('maxRows', 0)
maxCalls = [ kwargs.get('maxCalls', 0) ]
ignoreNonIncreasingOid = kwargs.get('ignoreNonIncreasingOid', False)
varBindHead = [ univ.ObjectIdentifier(x[0]) for x in self.__asynCmdGen.makeReadVarBinds(varNames) ]
nullVarBinds = [ False ] * len(varBindHead)
appReturn = {}
self.__asynCmdGen.bulkCmd(
authData,
transportTarget,
nonRepeaters, maxRepetitions,
varNames,
(__cbFun, (self, varBindHead, nullVarBinds, [], appReturn)),
lookupNames, lookupValues,
contextEngineId, contextName
)
self.snmpEngine.transportDispatcher.runDispatcher()
return (
appReturn['errorIndication'],
appReturn['errorStatus'],
appReturn['errorIndex'],
appReturn['varBindTable']
)

View File

@ -23,6 +23,7 @@ usmAesCfb192Protocol = config.usmAesCfb192Protocol
usmAesCfb256Protocol = config.usmAesCfb256Protocol
usmNoPrivProtocol = config.usmNoPrivProtocol
SnmpEngine = engine.SnmpEngine
ContextData = cmdgen.ContextData
nextID = nextid.Integer(0xffffffff)
@ -177,6 +178,47 @@ class AsyncNotificationOriginator:
return ntforg.NotificationOriginator().sendVarBinds(snmpEngine, notifyName, contextData.contextEngineId, contextData.contextName, self.makeVarBinds(snmpEngine, varBinds), __cbFun, (lookupNames, lookupValues, cbFun, cbCtx))
#
# Synchronous one-liner Notification Originator application
#
def sendNotification(snmpEngine, authData, transportTarget, contextData,
notifyType, notificationType, **kwargs):
def cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBinds, cbCtx):
cbCtx['errorIndication'] = errorIndication
cbCtx['errorStatus'] = errorStatus
cbCtx['errorIndex'] = errorIndex
cbCtx['varBinds'] = varBinds
cbCtx = {}
AsyncNotificationOriginator().sendNotification(
snmpEngine,
authData,
transportTarget,
contextData,
notifyType,
notificationType,
(cbFun, cbCtx),
kwargs.get('lookupNames'),
kwargs.get('lookupValues')
)
snmpEngine.transportDispatcher.runDispatcher()
if cbCtx:
yield cbCtx['errorIndication'], \
cbCtx['errorStatus'], cbCtx['errorIndex'], \
cbCtx['varBinds']
#
# The rest of code in this file belongs to obsolete, compatibility wrappers.
# Never use interfaces below for new applications!
#
# substitute sendNotification return object for backward compatibility
class ErrorIndicationReturn:
def __init__(self, *vars): self.__vars = vars
@ -185,9 +227,6 @@ class ErrorIndicationReturn:
def __bool__(self): return bool(self.__vars[0])
def __str__(self): return str(self.__vars[0])
#
# Compatibility implementation, never use this class for new applications
#
class AsynNotificationOriginator:
def __init__(self, snmpEngine=None, snmpContext=None):
if snmpEngine is None:
@ -274,31 +313,18 @@ class AsynNotificationOriginator:
class NotificationOriginator:
def __init__(self, snmpEngine=None, snmpContext=None, asynNtfOrg=None):
if asynNtfOrg is None:
self.__asynNtfOrg = AsynNotificationOriginator(
snmpEngine, snmpContext
)
else:
self.__asynNtfOrg = asynNtfOrg
# compatibility attributes
self.snmpEngine = snmpEngine or SnmpEngine()
self.mibViewController = AsyncNotificationOriginator().getMibViewController(self.snmpEngine)
# the varBinds parameter is legacy, use NotificationType instead
def sendNotification(self, authData, transportTarget, notifyType,
notificationType, *varBinds, **kwargs):
def __cbFun(sendRequestHandle, errorIndication,
errorStatus, errorIndex, varBinds, appReturn):
appReturn[0] = ErrorIndicationReturn(
errorIndication, errorStatus, errorIndex, varBinds
)
appReturn = { 0: ErrorIndicationReturn(None, 0, 0, ()) }
self.__asynNtfOrg.sendNotification(
authData, transportTarget, notifyType, notificationType,
varBinds, (__cbFun, appReturn),
kwargs.get('lookupNames', False),
kwargs.get('lookupValues', False),
kwargs.get('contextEngineId'),
kwargs.get('contextName', null)
)
self.__asynNtfOrg.snmpEngine.transportDispatcher.runDispatcher()
return appReturn[0]
for x in sendNotification(self.snmpEngine, authData, transportTarget,
ContextData(kwargs.get('contextEngineId'),
kwargs.get('contextName', null)),
notifyType,
notificationType.addVarBinds(*varBinds),
**kwargs):
return x