Compare commits

...

1 Commits

Author SHA1 Message Date
Ilya Etingof c54a3f6dc8 WIP: gracefully shutdown asyncio dispatcher 2016-11-05 22:59:31 +01:00
10 changed files with 114 additions and 65 deletions

View File

@ -58,3 +58,15 @@ Sparse notes on major existing problems/plans
* add RowStatus checks when reading MIB tables (LCD) * add RowStatus checks when reading MIB tables (LCD)
* Disallow empty SET value reaching scalar MIB object * Disallow empty SET value reaching scalar MIB object
----
- Make pyasn1 classes calling .super() for MI to work
- Release pyasn1 and make pysnmp dependent on it
- Make all pysnmp classes new-style
- Make pysnmp's TextualConvention leading MI
- Fix pysmi to put TextualConvention at the first position in MI
- Make SNMP NULL objects singletones
- Allow MIB resolution to fail gracefully on constraints violation
- add AES192/256 examples + snmpsim
- docstrings for USM protocol constants not rendered in sphinx docs

View File

@ -27,6 +27,7 @@ from pysnmp.hlapi.asyncio import *
@asyncio.coroutine @asyncio.coroutine
def run(): def run():
snmpEngine = SnmpEngine() snmpEngine = SnmpEngine()
errorIndication, errorStatus, errorIndex, varBinds = yield from sendNotification( errorIndication, errorStatus, errorIndex, varBinds = yield from sendNotification(
snmpEngine, snmpEngine,
CommunityData('public', mpModel=0), CommunityData('public', mpModel=0),
@ -44,7 +45,7 @@ def run():
if errorIndication: if errorIndication:
print(errorIndication) print(errorIndication)
snmpEngine.transportDispatcher.closeDispatcher() yield from unconfigureNtfOrg(snmpEngine)
asyncio.get_event_loop().run_until_complete(run()) asyncio.get_event_loop().run_until_complete(run())

View File

@ -56,7 +56,12 @@ def sendone(snmpEngine, hostname, notifyType):
snmpEngine = SnmpEngine() snmpEngine = SnmpEngine()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
# send notifications concurrently
loop.run_until_complete( loop.run_until_complete(
asyncio.wait([sendone(snmpEngine, 'demo.snmplabs.com', 'trap'), asyncio.wait([sendone(snmpEngine, 'demo.snmplabs.com', 'trap'),
sendone(snmpEngine, 'demo.snmplabs.com', 'inform')]) sendone(snmpEngine, 'demo.snmplabs.com', 'inform')])
) )
# this will cancel internal timer
loop.run_until_complete(unconfigureNtfOrg(snmpEngine))

View File

@ -22,11 +22,9 @@ from pysnmp.hlapi.asyncio import *
@asyncio.coroutine @asyncio.coroutine
def run(varBinds): def run(snmpEngine, varBinds):
snmpEngine = SnmpEngine()
while True: while True:
errorIndication, errorStatus, errorIndex, \ errorIndication, errorStatus, errorIndex, varBindTable = yield from bulkCmd(
varBindTable = yield from bulkCmd(
snmpEngine, snmpEngine,
UsmUserData('usr-none-none'), UsmUserData('usr-none-none'),
UdpTransportTarget(('demo.snmplabs.com', 161)), UdpTransportTarget(('demo.snmplabs.com', 161)),
@ -38,11 +36,8 @@ def run(varBinds):
print(errorIndication) print(errorIndication)
break break
elif errorStatus: elif errorStatus:
print('%s at %s' % ( print('%s at %s' % (errorStatus.prettyPrint(),
errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
errorIndex and varBinds[int(errorIndex) - 1][0] or '?'
)
)
else: else:
for varBindRow in varBindTable: for varBindRow in varBindTable:
for varBind in varBindRow: for varBind in varBindRow:
@ -52,10 +47,11 @@ def run(varBinds):
if isEndOfMib(varBinds): if isEndOfMib(varBinds):
break break
snmpEngine.transportDispatcher.closeDispatcher() yield from unconfigureCmdGen(snmpEngine)
snmpEngine = SnmpEngine()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.run_until_complete( loop.run_until_complete(
run([ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr'))]) run(snmpEngine, [ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr'))])
) )

View File

@ -47,8 +47,13 @@ def getone(snmpEngine, hostname):
snmpEngine = SnmpEngine() snmpEngine = SnmpEngine()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
# run parallel queries
loop.run_until_complete( loop.run_until_complete(
asyncio.wait([getone(snmpEngine, ('demo.snmplabs.com', 1161)), asyncio.wait([getone(snmpEngine, ('demo.snmplabs.com', 1161)),
getone(snmpEngine, ('demo.snmplabs.com', 2161)), getone(snmpEngine, ('demo.snmplabs.com', 2161)),
getone(snmpEngine, ('demo.snmplabs.com', 3161))]) getone(snmpEngine, ('demo.snmplabs.com', 3161))])
) )
# unconfigure SNMP engine
loop.run_until_complete(unconfigureCmdGen(snmpEngine))

View File

@ -37,8 +37,8 @@ def getone(snmpEngine, hostname):
print('%s at %s' % ( print('%s at %s' % (
errorStatus.prettyPrint(), errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or '?' errorIndex and varBinds[int(errorIndex) - 1][0] or '?'
)
) )
)
else: else:
for varBind in varBinds: for varBind in varBinds:
print(' = '.join([x.prettyPrint() for x in varBind])) print(' = '.join([x.prettyPrint() for x in varBind]))
@ -48,11 +48,11 @@ def getone(snmpEngine, hostname):
def getall(snmpEngine, hostnames): def getall(snmpEngine, hostnames):
for hostname in hostnames: for hostname in hostnames:
yield from getone(snmpEngine, hostname) yield from getone(snmpEngine, hostname)
yield from unconfigureCmdGen(snmpEngine)
snmpEngine = SnmpEngine() snmpEngine = SnmpEngine()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.run_until_complete(getall(snmpEngine, [('demo.snmplabs.com', 1161), loop.run_until_complete(getall(snmpEngine, [('demo.snmplabs.com', 1161),
('demo.snmplabs.com', 2161), ('demo.snmplabs.com', 2161),
('demo.snmplabs.com', 3161)])) ('demo.snmplabs.com', 3161)]))

View File

@ -51,13 +51,14 @@ class AsyncioDispatcher(AbstractTransportDispatcher):
self.__transportCount = 0 self.__transportCount = 0
if 'timeout' in kwargs: if 'timeout' in kwargs:
self.setTimerResolution(kwargs['timeout']) self.setTimerResolution(kwargs['timeout'])
self.loopingcall = None self._futureTimer = None
@asyncio.coroutine @asyncio.coroutine
def handle_timeout(self): def fireTimer(self):
while True: yield asyncio.From(asyncio.sleep(self.getTimerResolution()))
yield asyncio.From(asyncio.sleep(self.getTimerResolution())) self.handleTimerTick(loop.time())
self.handleTimerTick(loop.time()) if self._futureTimer:
self._futureTimer = asyncio.async(self.fireTimer())
def runDispatcher(self, timeout=0.0): def runDispatcher(self, timeout=0.0):
if not loop.is_running(): if not loop.is_running():
@ -69,8 +70,8 @@ class AsyncioDispatcher(AbstractTransportDispatcher):
raise PySnmpError(';'.join(traceback.format_exception(*sys.exc_info()))) raise PySnmpError(';'.join(traceback.format_exception(*sys.exc_info())))
def registerTransport(self, tDomain, transport): def registerTransport(self, tDomain, transport):
if self.loopingcall is None and self.getTimerResolution() > 0: if not self._futureTimer and self.getTimerResolution() > 0:
self.loopingcall = asyncio.async(self.handle_timeout()) self._futureTimer = asyncio.async(self.fireTimer())
AbstractTransportDispatcher.registerTransport( AbstractTransportDispatcher.registerTransport(
self, tDomain, transport self, tDomain, transport
) )
@ -83,18 +84,20 @@ class AsyncioDispatcher(AbstractTransportDispatcher):
self.__transportCount -= 1 self.__transportCount -= 1
# The last transport has been removed, stop the timeout # The last transport has been removed, stop the timeout
if self.__transportCount == 0 and not self.loopingcall.done(): if self.__transportCount == 0:
self.loopingcall.cancel() if self._futureTimer:
self.loopingcall = None self._futureTimer.cancel()
self._futureTimer = None
# Trollius or Tulip? # Trollius or Tulip?
if not hasattr(asyncio, "From"): if not hasattr(asyncio, "From"):
exec ("""\ exec("""\
@asyncio.coroutine @asyncio.coroutine
def handle_timeout(self): def fireTimer(self):
while True: yield from asyncio.sleep(self.getTimerResolution())
yield from asyncio.sleep(self.getTimerResolution()) self.handleTimerTick(loop.time())
self.handleTimerTick(loop.time()) if self._futureTimer:
AsyncioDispatcher.handle_timeout = handle_timeout\ self._futureTimer = asyncio.async(self.fireTimer())
AsyncioDispatcher.fireTimer = fireTimer\
""") """)

View File

@ -44,7 +44,7 @@ try:
except ImportError: except ImportError:
import trollius as asyncio import trollius as asyncio
__all__ = ['getCmd', 'nextCmd', 'setCmd', 'bulkCmd', 'isEndOfMib'] __all__ = ['getCmd', 'nextCmd', 'setCmd', 'bulkCmd', 'isEndOfMib', 'unconfigureCmdGen']
vbProcessor = CommandGeneratorVarBinds() vbProcessor = CommandGeneratorVarBinds()
lcd = CommandGeneratorLcdConfigurator() lcd = CommandGeneratorLcdConfigurator()
@ -52,6 +52,21 @@ lcd = CommandGeneratorLcdConfigurator()
isEndOfMib = lambda x: not cmdgen.getNextVarBinds(x)[1] isEndOfMib = lambda x: not cmdgen.getNextVarBinds(x)[1]
@asyncio.coroutine
def unconfigureCmdGen(snmpEngine, authData=None):
"""Remove LCD configuration entry.
If `authData` is not given, all currently configured LCD entries will be
removed.
Note
----
Configuration entry removal may have a side effect of removing unused transport
and shutting down unused transport dispatcher.
"""
lcd.unconfigure(snmpEngine, authData)
@asyncio.coroutine @asyncio.coroutine
def getCmd(snmpEngine, authData, transportTarget, contextData, def getCmd(snmpEngine, authData, transportTarget, contextData,
*varBinds, **options): *varBinds, **options):

View File

@ -21,12 +21,27 @@ try:
except ImportError: except ImportError:
import trollius as asyncio import trollius as asyncio
__all__ = ['sendNotification'] __all__ = ['sendNotification', 'unconfigureNtfOrg']
vbProcessor = NotificationOriginatorVarBinds() vbProcessor = NotificationOriginatorVarBinds()
lcd = NotificationOriginatorLcdConfigurator() lcd = NotificationOriginatorLcdConfigurator()
@asyncio.coroutine
def unconfigureNtfOrg(snmpEngine, authData=None):
"""Remove LCD configuration entry.
If `authData` is not given, all currently configured LCD entries will be
removed.
Note
----
Configuration entry removal may have a side effect of removing unused transport
and shutting down unused transport dispatcher.
"""
lcd.unconfigure(snmpEngine, authData)
@asyncio.coroutine @asyncio.coroutine
def sendNotification(snmpEngine, authData, transportTarget, contextData, def sendNotification(snmpEngine, authData, transportTarget, contextData,
notifyType, varBinds, **options): notifyType, varBinds, **options):

View File

@ -37,7 +37,8 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
def configure(self, snmpEngine, authData, transportTarget, *options): def configure(self, snmpEngine, authData, transportTarget, *options):
cache = self._getCache(snmpEngine) cache = self._getCache(snmpEngine)
if isinstance(authData, CommunityData): if isinstance(authData, CommunityData):
if authData.communityIndex not in cache['auth']: authDataKey = authData.communityIndex
if authDataKey not in cache['auth']:
config.addV1System( config.addV1System(
snmpEngine, snmpEngine,
authData.communityIndex, authData.communityIndex,
@ -66,21 +67,26 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
paramsKey = (authData.securityName, paramsKey = (authData.securityName,
authData.securityLevel, authData.securityLevel,
authData.mpModel) authData.mpModel)
if paramsKey in cache['parm']: if paramsKey in cache['parm']:
paramsName, useCount = cache['parm'][paramsKey] paramsName, useCount = cache['parm'][paramsKey]
cache['parm'][paramsKey] = paramsName, useCount + 1 useCount.add(authDataKey)
else: else:
paramsName = 'p%s' % self.nextID() paramsName = 'p%s' % self.nextID()
config.addTargetParams( config.addTargetParams(
snmpEngine, paramsName, snmpEngine, paramsName,
authData.securityName, authData.securityLevel, authData.mpModel authData.securityName, authData.securityLevel, authData.mpModel
) )
cache['parm'][paramsKey] = paramsName, 1 cache['parm'][paramsKey] = paramsName, set([authDataKey])
transportKey = (paramsName, transportTarget.transportDomain,
transportTarget.transportAddr,
transportTarget.tagList)
if transportTarget.transportDomain in cache['tran']: if transportTarget.transportDomain in cache['tran']:
transport, useCount = cache['tran'][transportTarget.transportDomain] transport, useCount = cache['tran'][transportTarget.transportDomain]
transportTarget.verifyDispatcherCompatibility(snmpEngine) transportTarget.verifyDispatcherCompatibility(snmpEngine)
cache['tran'][transportTarget.transportDomain] = transport, useCount + 1 useCount.add(transportKey)
elif config.getTransport(snmpEngine, transportTarget.transportDomain): elif config.getTransport(snmpEngine, transportTarget.transportDomain):
transportTarget.verifyDispatcherCompatibility(snmpEngine) transportTarget.verifyDispatcherCompatibility(snmpEngine)
else: else:
@ -90,15 +96,10 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
transportTarget.transportDomain, transportTarget.transportDomain,
transport transport
) )
cache['tran'][transportTarget.transportDomain] = transport, 1 cache['tran'][transportTarget.transportDomain] = transport, set([transportKey])
transportKey = (paramsName, transportTarget.transportDomain,
transportTarget.transportAddr,
transportTarget.tagList)
if transportKey in cache['addr']: if transportKey in cache['addr']:
addrName, useCount = cache['addr'][transportKey] addrName = cache['addr'][transportKey]
cache['addr'][transportKey] = addrName, useCount + 1
else: else:
addrName = 'a%s' % self.nextID() addrName = 'a%s' % self.nextID()
config.addTargetAddr( config.addTargetAddr(
@ -110,7 +111,7 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
transportTarget.retries, transportTarget.retries,
transportTarget.tagList transportTarget.tagList
) )
cache['addr'][transportKey] = addrName, 1 cache['addr'][transportKey] = addrName
return addrName, paramsName return addrName, paramsName
@ -152,12 +153,12 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
paramsKey = (authDataX.securityName, paramsKey = (authDataX.securityName,
authDataX.securityLevel, authDataX.securityLevel,
authDataX.mpModel) authDataX.mpModel)
if paramsKey in cache['parm']: if paramsKey in cache['parm']:
paramsName, useCount = cache['parm'][paramsKey] paramsName, useCount = cache['parm'][paramsKey]
useCount -= 1 if authDataKey in useCount:
if useCount: useCount.remove(authDataKey)
cache['parm'][paramsKey] = paramsName, useCount if not useCount:
else:
del cache['parm'][paramsKey] del cache['parm'][paramsKey]
config.delTargetParams( config.delTargetParams(
snmpEngine, paramsName snmpEngine, paramsName
@ -169,24 +170,20 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
addrKeys = [x for x in cache['addr'] if x[0] == paramsName] addrKeys = [x for x in cache['addr'] if x[0] == paramsName]
for addrKey in addrKeys: for addrKey in addrKeys:
addrName, useCount = cache['addr'][addrKey] addrName = cache['addr'][addrKey]
useCount -= 1
if useCount:
cache['addr'][addrKey] = addrName, useCount
else:
config.delTargetAddr(snmpEngine, addrName)
addrNames.add(addrKey) config.delTargetAddr(snmpEngine, addrName)
if addrKey[1] in cache['tran']: addrNames.add(addrKey)
transport, useCount = cache['tran'][addrKey[1]]
if useCount > 1: if addrKey[1] in cache['tran']:
useCount -= 1 transport, useCount = cache['tran'][addrKey[1]]
cache['tran'][addrKey[1]] = transport, useCount if addrKey in useCount:
else: useCount.remove(addrKey)
config.delTransport(snmpEngine, addrKey[1]) if not useCount:
transport.closeTransport() config.delTransport(snmpEngine, addrKey[1])
del cache['tran'][addrKey[1]] transport.closeTransport()
del cache['tran'][addrKey[1]]
return addrNames, paramsNames return addrNames, paramsNames