WIP: gracefully shutdown asyncio dispatcher

asyncio-dispatcher-fixes
Ilya Etingof 2016-11-05 22:59:31 +01:00
parent a0ef4b6ce8
commit c54a3f6dc8
10 changed files with 114 additions and 65 deletions

View File

@ -58,3 +58,15 @@ Sparse notes on major existing problems/plans
* add RowStatus checks when reading MIB tables (LCD)
* Disallow empty SET value reaching scalar MIB object
----
- Make pyasn1 classes calling .super() for MI to work
- Release pyasn1 and make pysnmp dependent on it
- Make all pysnmp classes new-style
- Make pysnmp's TextualConvention leading MI
- Fix pysmi to put TextualConvention at the first position in MI
- Make SNMP NULL objects singletones
- Allow MIB resolution to fail gracefully on constraints violation
- add AES192/256 examples + snmpsim
- docstrings for USM protocol constants not rendered in sphinx docs

View File

@ -27,6 +27,7 @@ from pysnmp.hlapi.asyncio import *
@asyncio.coroutine
def run():
snmpEngine = SnmpEngine()
errorIndication, errorStatus, errorIndex, varBinds = yield from sendNotification(
snmpEngine,
CommunityData('public', mpModel=0),
@ -44,7 +45,7 @@ def run():
if errorIndication:
print(errorIndication)
snmpEngine.transportDispatcher.closeDispatcher()
yield from unconfigureNtfOrg(snmpEngine)
asyncio.get_event_loop().run_until_complete(run())

View File

@ -56,7 +56,12 @@ def sendone(snmpEngine, hostname, notifyType):
snmpEngine = SnmpEngine()
loop = asyncio.get_event_loop()
# send notifications concurrently
loop.run_until_complete(
asyncio.wait([sendone(snmpEngine, 'demo.snmplabs.com', 'trap'),
sendone(snmpEngine, 'demo.snmplabs.com', 'inform')])
)
# this will cancel internal timer
loop.run_until_complete(unconfigureNtfOrg(snmpEngine))

View File

@ -22,11 +22,9 @@ from pysnmp.hlapi.asyncio import *
@asyncio.coroutine
def run(varBinds):
snmpEngine = SnmpEngine()
def run(snmpEngine, varBinds):
while True:
errorIndication, errorStatus, errorIndex, \
varBindTable = yield from bulkCmd(
errorIndication, errorStatus, errorIndex, varBindTable = yield from bulkCmd(
snmpEngine,
UsmUserData('usr-none-none'),
UdpTransportTarget(('demo.snmplabs.com', 161)),
@ -38,11 +36,8 @@ def run(varBinds):
print(errorIndication)
break
elif errorStatus:
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or '?'
)
)
print('%s at %s' % (errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
else:
for varBindRow in varBindTable:
for varBind in varBindRow:
@ -52,10 +47,11 @@ def run(varBinds):
if isEndOfMib(varBinds):
break
snmpEngine.transportDispatcher.closeDispatcher()
yield from unconfigureCmdGen(snmpEngine)
snmpEngine = SnmpEngine()
loop = asyncio.get_event_loop()
loop.run_until_complete(
run([ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr'))])
run(snmpEngine, [ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr'))])
)

View File

@ -47,8 +47,13 @@ def getone(snmpEngine, hostname):
snmpEngine = SnmpEngine()
loop = asyncio.get_event_loop()
# run parallel queries
loop.run_until_complete(
asyncio.wait([getone(snmpEngine, ('demo.snmplabs.com', 1161)),
getone(snmpEngine, ('demo.snmplabs.com', 2161)),
getone(snmpEngine, ('demo.snmplabs.com', 3161))])
)
# unconfigure SNMP engine
loop.run_until_complete(unconfigureCmdGen(snmpEngine))

View File

@ -37,8 +37,8 @@ def getone(snmpEngine, hostname):
print('%s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or '?'
)
)
)
else:
for varBind in varBinds:
print(' = '.join([x.prettyPrint() for x in varBind]))
@ -48,11 +48,11 @@ def getone(snmpEngine, hostname):
def getall(snmpEngine, hostnames):
for hostname in hostnames:
yield from getone(snmpEngine, hostname)
yield from unconfigureCmdGen(snmpEngine)
snmpEngine = SnmpEngine()
loop = asyncio.get_event_loop()
loop.run_until_complete(getall(snmpEngine, [('demo.snmplabs.com', 1161),
('demo.snmplabs.com', 2161),
('demo.snmplabs.com', 3161)]))
('demo.snmplabs.com', 3161)]))

View File

@ -51,13 +51,14 @@ class AsyncioDispatcher(AbstractTransportDispatcher):
self.__transportCount = 0
if 'timeout' in kwargs:
self.setTimerResolution(kwargs['timeout'])
self.loopingcall = None
self._futureTimer = None
@asyncio.coroutine
def handle_timeout(self):
while True:
yield asyncio.From(asyncio.sleep(self.getTimerResolution()))
self.handleTimerTick(loop.time())
def fireTimer(self):
yield asyncio.From(asyncio.sleep(self.getTimerResolution()))
self.handleTimerTick(loop.time())
if self._futureTimer:
self._futureTimer = asyncio.async(self.fireTimer())
def runDispatcher(self, timeout=0.0):
if not loop.is_running():
@ -69,8 +70,8 @@ class AsyncioDispatcher(AbstractTransportDispatcher):
raise PySnmpError(';'.join(traceback.format_exception(*sys.exc_info())))
def registerTransport(self, tDomain, transport):
if self.loopingcall is None and self.getTimerResolution() > 0:
self.loopingcall = asyncio.async(self.handle_timeout())
if not self._futureTimer and self.getTimerResolution() > 0:
self._futureTimer = asyncio.async(self.fireTimer())
AbstractTransportDispatcher.registerTransport(
self, tDomain, transport
)
@ -83,18 +84,20 @@ class AsyncioDispatcher(AbstractTransportDispatcher):
self.__transportCount -= 1
# The last transport has been removed, stop the timeout
if self.__transportCount == 0 and not self.loopingcall.done():
self.loopingcall.cancel()
self.loopingcall = None
if self.__transportCount == 0:
if self._futureTimer:
self._futureTimer.cancel()
self._futureTimer = None
# Trollius or Tulip?
if not hasattr(asyncio, "From"):
exec ("""\
exec("""\
@asyncio.coroutine
def handle_timeout(self):
while True:
yield from asyncio.sleep(self.getTimerResolution())
self.handleTimerTick(loop.time())
AsyncioDispatcher.handle_timeout = handle_timeout\
def fireTimer(self):
yield from asyncio.sleep(self.getTimerResolution())
self.handleTimerTick(loop.time())
if self._futureTimer:
self._futureTimer = asyncio.async(self.fireTimer())
AsyncioDispatcher.fireTimer = fireTimer\
""")

View File

@ -44,7 +44,7 @@ try:
except ImportError:
import trollius as asyncio
__all__ = ['getCmd', 'nextCmd', 'setCmd', 'bulkCmd', 'isEndOfMib']
__all__ = ['getCmd', 'nextCmd', 'setCmd', 'bulkCmd', 'isEndOfMib', 'unconfigureCmdGen']
vbProcessor = CommandGeneratorVarBinds()
lcd = CommandGeneratorLcdConfigurator()
@ -52,6 +52,21 @@ lcd = CommandGeneratorLcdConfigurator()
isEndOfMib = lambda x: not cmdgen.getNextVarBinds(x)[1]
@asyncio.coroutine
def unconfigureCmdGen(snmpEngine, authData=None):
"""Remove LCD configuration entry.
If `authData` is not given, all currently configured LCD entries will be
removed.
Note
----
Configuration entry removal may have a side effect of removing unused transport
and shutting down unused transport dispatcher.
"""
lcd.unconfigure(snmpEngine, authData)
@asyncio.coroutine
def getCmd(snmpEngine, authData, transportTarget, contextData,
*varBinds, **options):

View File

@ -21,12 +21,27 @@ try:
except ImportError:
import trollius as asyncio
__all__ = ['sendNotification']
__all__ = ['sendNotification', 'unconfigureNtfOrg']
vbProcessor = NotificationOriginatorVarBinds()
lcd = NotificationOriginatorLcdConfigurator()
@asyncio.coroutine
def unconfigureNtfOrg(snmpEngine, authData=None):
"""Remove LCD configuration entry.
If `authData` is not given, all currently configured LCD entries will be
removed.
Note
----
Configuration entry removal may have a side effect of removing unused transport
and shutting down unused transport dispatcher.
"""
lcd.unconfigure(snmpEngine, authData)
@asyncio.coroutine
def sendNotification(snmpEngine, authData, transportTarget, contextData,
notifyType, varBinds, **options):

View File

@ -37,7 +37,8 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
def configure(self, snmpEngine, authData, transportTarget, *options):
cache = self._getCache(snmpEngine)
if isinstance(authData, CommunityData):
if authData.communityIndex not in cache['auth']:
authDataKey = authData.communityIndex
if authDataKey not in cache['auth']:
config.addV1System(
snmpEngine,
authData.communityIndex,
@ -66,21 +67,26 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
paramsKey = (authData.securityName,
authData.securityLevel,
authData.mpModel)
if paramsKey in cache['parm']:
paramsName, useCount = cache['parm'][paramsKey]
cache['parm'][paramsKey] = paramsName, useCount + 1
useCount.add(authDataKey)
else:
paramsName = 'p%s' % self.nextID()
config.addTargetParams(
snmpEngine, paramsName,
authData.securityName, authData.securityLevel, authData.mpModel
)
cache['parm'][paramsKey] = paramsName, 1
cache['parm'][paramsKey] = paramsName, set([authDataKey])
transportKey = (paramsName, transportTarget.transportDomain,
transportTarget.transportAddr,
transportTarget.tagList)
if transportTarget.transportDomain in cache['tran']:
transport, useCount = cache['tran'][transportTarget.transportDomain]
transportTarget.verifyDispatcherCompatibility(snmpEngine)
cache['tran'][transportTarget.transportDomain] = transport, useCount + 1
useCount.add(transportKey)
elif config.getTransport(snmpEngine, transportTarget.transportDomain):
transportTarget.verifyDispatcherCompatibility(snmpEngine)
else:
@ -90,15 +96,10 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
transportTarget.transportDomain,
transport
)
cache['tran'][transportTarget.transportDomain] = transport, 1
transportKey = (paramsName, transportTarget.transportDomain,
transportTarget.transportAddr,
transportTarget.tagList)
cache['tran'][transportTarget.transportDomain] = transport, set([transportKey])
if transportKey in cache['addr']:
addrName, useCount = cache['addr'][transportKey]
cache['addr'][transportKey] = addrName, useCount + 1
addrName = cache['addr'][transportKey]
else:
addrName = 'a%s' % self.nextID()
config.addTargetAddr(
@ -110,7 +111,7 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
transportTarget.retries,
transportTarget.tagList
)
cache['addr'][transportKey] = addrName, 1
cache['addr'][transportKey] = addrName
return addrName, paramsName
@ -152,12 +153,12 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
paramsKey = (authDataX.securityName,
authDataX.securityLevel,
authDataX.mpModel)
if paramsKey in cache['parm']:
paramsName, useCount = cache['parm'][paramsKey]
useCount -= 1
if useCount:
cache['parm'][paramsKey] = paramsName, useCount
else:
if authDataKey in useCount:
useCount.remove(authDataKey)
if not useCount:
del cache['parm'][paramsKey]
config.delTargetParams(
snmpEngine, paramsName
@ -169,24 +170,20 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
addrKeys = [x for x in cache['addr'] if x[0] == paramsName]
for addrKey in addrKeys:
addrName, useCount = cache['addr'][addrKey]
useCount -= 1
if useCount:
cache['addr'][addrKey] = addrName, useCount
else:
config.delTargetAddr(snmpEngine, addrName)
addrName = cache['addr'][addrKey]
addrNames.add(addrKey)
config.delTargetAddr(snmpEngine, addrName)
if addrKey[1] in cache['tran']:
transport, useCount = cache['tran'][addrKey[1]]
if useCount > 1:
useCount -= 1
cache['tran'][addrKey[1]] = transport, useCount
else:
config.delTransport(snmpEngine, addrKey[1])
transport.closeTransport()
del cache['tran'][addrKey[1]]
addrNames.add(addrKey)
if addrKey[1] in cache['tran']:
transport, useCount = cache['tran'][addrKey[1]]
if addrKey in useCount:
useCount.remove(addrKey)
if not useCount:
config.delTransport(snmpEngine, addrKey[1])
transport.closeTransport()
del cache['tran'][addrKey[1]]
return addrNames, paramsNames