commit
a8d1dc6730
|
@ -20,6 +20,7 @@ import mitogen.ssh
|
|||
import mitogen.utils
|
||||
from mitogen.ansible import helpers
|
||||
|
||||
import ansible.errors
|
||||
import ansible.plugins.connection
|
||||
|
||||
|
||||
|
@ -38,16 +39,18 @@ class Connection(ansible.plugins.connection.ConnectionBase):
|
|||
if self.connected:
|
||||
return
|
||||
self.broker = mitogen.master.Broker()
|
||||
self.router = mitogen.master.Router(self.broker)
|
||||
if self._play_context.remote_addr == 'localhost':
|
||||
self.context = mitogen.master.connect(self.broker)
|
||||
self.context = self.router.connect(mitogen.master.Stream)
|
||||
else:
|
||||
self.context = mitogen.ssh.connect(broker,
|
||||
self._play_context.remote_addr)
|
||||
self.context = self.router.connect(mitogen.ssh.Stream,
|
||||
hostname=self._play_context.remote_addr,
|
||||
)
|
||||
|
||||
def exec_command(self, cmd, in_data=None, sudoable=True):
|
||||
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)
|
||||
if in_data:
|
||||
raise AnsibleError("does not support module pipelining")
|
||||
raise ansible.errors.AnsibleError("does not support module pipelining")
|
||||
|
||||
return self.context.call(helpers.exec_command, cmd, in_data)
|
||||
|
||||
|
|
|
@ -1,4 +1,11 @@
|
|||
try:
|
||||
import ast
|
||||
except ImportError:
|
||||
# ast module is not available in Python 2.4.x, instead we shall use the
|
||||
# the compiler module as a fallback
|
||||
ast = None
|
||||
import commands
|
||||
import compiler
|
||||
import errno
|
||||
import getpass
|
||||
import imp
|
||||
|
@ -563,6 +570,7 @@ class Stream(mitogen.core.Stream):
|
|||
# base64'd and passed to 'python -c'. It forks, dups 0->100, creates a
|
||||
# pipe, then execs a new interpreter with a custom argv. 'CONTEXT_NAME' is
|
||||
# replaced with the context name. Optimized for size.
|
||||
@staticmethod
|
||||
def _first_stage():
|
||||
import os,sys,zlib
|
||||
R,W=os.pipe()
|
||||
|
|
|
@ -70,6 +70,7 @@ class Stream(mitogen.master.Stream):
|
|||
if self.port:
|
||||
self.name += ':%s' % (self.port,)
|
||||
|
||||
auth_incorrect_msg = 'SSH authentication is incorrect'
|
||||
password_incorrect_msg = 'SSH password is incorrect'
|
||||
password_required_msg = 'SSH password was requested, but none specified'
|
||||
|
||||
|
|
|
@ -7,6 +7,8 @@ import socket
|
|||
|
||||
import mitogen.core
|
||||
|
||||
from mitogen.core import LOG
|
||||
|
||||
|
||||
class Listener(mitogen.core.BasicStream):
|
||||
def __init__(self, broker, address=None, backlog=30):
|
||||
|
@ -21,7 +23,7 @@ class Listener(mitogen.core.BasicStream):
|
|||
|
||||
def on_receive(self, broker):
|
||||
sock, addr = self._sock.accept()
|
||||
context = Context(self._broker, name=addr)
|
||||
context = mitogen.core.Context(self._broker, name=addr)
|
||||
stream = mitogen.core.Stream(context)
|
||||
stream.accept(sock.fileno(), sock.fileno())
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ def log_to_file(path=None, io=True, level='INFO'):
|
|||
|
||||
|
||||
def run_with_router(func, *args, **kwargs):
|
||||
"""Arrange for `func(broker, *args, **kwargs)` to run with a temporary
|
||||
"""Arrange for `func(router, *args, **kwargs)` to run with a temporary
|
||||
:py:class:`mitogen.master.Router`, ensuring the Router and Broker are
|
||||
correctly shut down during normal or exceptional return."""
|
||||
broker = mitogen.master.Broker()
|
||||
|
@ -58,12 +58,12 @@ def run_with_router(func, *args, **kwargs):
|
|||
|
||||
|
||||
def with_router(func):
|
||||
"""Decorator version of :py:func:`run_with_broker`. Example:
|
||||
"""Decorator version of :py:func:`run_with_router`. Example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@with_broker
|
||||
def do_stuff(broker, arg):
|
||||
@with_router
|
||||
def do_stuff(router, arg):
|
||||
pass
|
||||
|
||||
do_stuff(blah, 123)
|
||||
|
|
|
@ -8,9 +8,9 @@ import mitogen.master
|
|||
import mitogen.utils
|
||||
|
||||
|
||||
@mitogen.utils.with_broker
|
||||
def do_stuff(broker):
|
||||
context = mitogen.master.connect(broker)
|
||||
@mitogen.utils.with_router
|
||||
def do_stuff(router):
|
||||
context = router.connect(mitogen.master.Stream)
|
||||
t0 = time.time()
|
||||
ncalls = 1000
|
||||
for x in xrange(ncalls):
|
||||
|
|
|
@ -6,27 +6,27 @@ import mitogen.master
|
|||
import mitogen.utils
|
||||
|
||||
|
||||
def func0(broker):
|
||||
return broker
|
||||
def func0(router):
|
||||
return router
|
||||
|
||||
|
||||
@mitogen.utils.with_broker
|
||||
def func(broker):
|
||||
return broker
|
||||
@mitogen.utils.with_router
|
||||
def func(router):
|
||||
return router
|
||||
|
||||
|
||||
class RunWithBrokerTest(unittest.TestCase):
|
||||
class RunWithRouterTest(unittest.TestCase):
|
||||
# test_shutdown_on_exception
|
||||
# test_shutdown_on_success
|
||||
|
||||
def test_run_with_broker(self):
|
||||
broker = mitogen.utils.run_with_broker(func0)
|
||||
self.assertTrue(isinstance(broker, mitogen.master.Broker))
|
||||
self.assertFalse(broker._thread.isAlive())
|
||||
router = mitogen.utils.run_with_router(func0)
|
||||
self.assertTrue(isinstance(router, mitogen.master.Router))
|
||||
self.assertFalse(router.broker._thread.isAlive())
|
||||
|
||||
|
||||
class WithBrokerTest(unittest.TestCase):
|
||||
class WithRouterTest(unittest.TestCase):
|
||||
def test_with_broker(self):
|
||||
broker = func()
|
||||
self.assertTrue(isinstance(broker, mitogen.master.Broker))
|
||||
self.assertFalse(broker._thread.isAlive())
|
||||
router = func()
|
||||
self.assertTrue(isinstance(router, mitogen.master.Router))
|
||||
self.assertFalse(router.broker._thread.isAlive())
|
||||
|
|
Loading…
Reference in New Issue