Rework interruptions

This commit is contained in:
Oleksii Shevchuk 2018-03-11 23:33:09 +02:00
parent a9774a3c67
commit 3b54958c59
2 changed files with 123 additions and 78 deletions

View File

@ -601,7 +601,7 @@ class PupyCmd(cmd.Cmd):
j.stop()
self.display_success("job closed")
else:
j.interrupt(wait=False)
j.interrupt()
j.stop()
self.display_success("job killed")
self.pupsrv.del_job(modargs.kill)
@ -763,51 +763,43 @@ class PupyCmd(cmd.Cmd):
modjobs=[x for x in self.pupsrv.jobs.itervalues() if x.pupymodules[0].get_name() == mod.get_name() and x.pupymodules[0].client in l]
pj=None
try:
interactive=False
if mod.daemon and mod.unique_instance and modjobs:
pj=modjobs[0]
else:
pj=PupyJob(self.pupsrv, '{} {}'.format(modargs.module, ' '.join(args)))
if len(l)==1 and not modargs.bg and not mod.daemon:
ps=mod(l[0], pj, stdout=self.stdout, log=modargs.output)
pj.add_module(ps)
interactive=True
else:
for c in l:
ps=mod(c, pj, log=modargs.output)
pj.add_module(ps)
try:
pj.start(args, once=modargs.once)
except Exception as e:
self.display_error(e)
pj.stop()
if not mod.unique_instance:
if modargs.bg:
self.pupsrv.add_job(pj)
self.display_info("job %s started in background !"%pj)
elif mod.daemon:
self.pupsrv.add_job(pj)
self.display_info("job %s started in background !"%pj)
else:
error=pj.interactive_wait()
if error and not modjobs:
pj.stop()
interactive=False
if mod.daemon and mod.unique_instance and modjobs:
pj=modjobs[0]
else:
pj=PupyJob(self.pupsrv, '{} {}'.format(modargs.module, ' '.join(args)))
if len(l)==1 and not modargs.bg and not mod.daemon:
ps=mod(l[0], pj, stdout=self.stdout, log=modargs.output)
pj.add_module(ps)
interactive=True
else:
for c in l:
ps=mod(c, pj, log=modargs.output)
pj.add_module(ps)
try:
pj.start(args, once=modargs.once)
except Exception as e:
self.display_error(e)
pj.stop()
if not mod.unique_instance:
if modargs.bg:
self.pupsrv.add_job(pj)
self.display_info("job %s started in background !"%pj)
elif mod.daemon:
self.pupsrv.add_job(pj)
self.display_info("job %s started in background !"%pj)
else:
if mod.daemon and not modjobs:
self.pupsrv.add_job(pj)
error=pj.interactive_wait()
if error and not modjobs:
pj.stop()
except KeyboardInterrupt:
self.display_warning('interrupting job ... (please wait)')
interrupted = pj.interrupt()
if interrupted:
self.display_warning('job was interrupted')
else:
self.display_error('job was sent to background and may consume resources')
else:
if mod.daemon and not modjobs:
self.pupsrv.add_job(pj)
error=pj.interactive_wait()
if error and not modjobs:
pj.stop()
if not interactive:
summary = pj.result_summary()
@ -1620,6 +1612,25 @@ class PupyCmd(cmd.Cmd):
self.display(PupyCmd.table_format(objects, wl=['IP', 'PORTS']))
session.open_ports = {}
def do_stacktrace(self, arg):
""" Dump current stacks (debug) """
print >> sys.stderr, "\n*** STACKTRACE - START ***\n"
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
for line in code:
print >> sys.stderr, line
print >> sys.stderr, "\n*** STACKTRACE - END ***\n"
def do_exit(self, arg):
""" Quit Pupy Shell """
for job in self.pupsrv.jobs.itervalues():

View File

@ -69,8 +69,8 @@ class ThreadPool(object):
self.thread_pool=[]
def apply_async(self, func, args):
t=Thread(target=func, args=args)
t.daemon=True
t = Thread(target=func, args=args)
t.daemon = True
self.thread_pool.append(t)
t.start()
@ -79,25 +79,39 @@ class ThreadPool(object):
if t.isAlive():
t.stop()
def join(self):
def join(self, timeout=5, on_interrupt=None):
giveup = False
allok = True
interrupt_sent = False
while True:
try:
allok=True
allok = True
for t in self.thread_pool:
if t.isAlive():
t.join(0.5)
allok=False
t.join(timeout)
allok = False
if allok:
break
except KeyboardInterrupt:
if not giveup:
print "{ Press ^C once again to give up on waiting }"
giveup = True
if not interrupt_sent:
if on_interrupt:
if not on_interrupt():
break
else:
print "{ Press ^C once again to give up on waiting }"
interrupt_sent = True
else:
giveup = True
else:
break
return not giveup and allok
def all_finished(self):
for t in self.thread_pool:
if t.isAlive():
@ -115,10 +129,11 @@ class PupyJob(object):
self.pupymodules = []
self.worker_pool = ThreadPool()
self.started = threading.Event()
self.error_happened = threading.Event()
self.error = None
self.jid = None
self.destroyed = False
self.id = None
self.interrupted = False
def add_module(self, mod):
self.pupymodules.append(mod)
@ -130,31 +145,43 @@ class PupyJob(object):
self.interrupt()
def module_worker(self, module, cmdline, args, once):
e = None
try:
module.import_dependencies()
module.init(cmdline, args)
module.run(args)
except PupyModuleExit as e:
self.error = e
return
except PupyModuleError as e:
self.error_happened.set()
module.error(str(e))
self.error = e
if not self.interrupted:
module.error(str(e))
except KeyboardInterrupt:
pass
except Exception as e:
self.error_happened.set()
module.error(str(e))
import traceback
traceback.print_exc(e, file=module.stdout)
self.error = e
if not self.interrupted:
module.error(str(e))
import traceback
traceback.print_exc(e, file=module.stdout)
finally:
if once:
if not self.interrupted and once:
module.clean_dependencies()
if self.id is not None:
self.pupsrv.handler.display_srvinfo('{}: {} - {}'.format(
self.id,
self.name,
'failed' if self.error_happened.is_set() else 'success'))
display = self.pupsrv.handler.display_warning if \
self.interrupted else self.pupsrv.handler.display_srvinfo
display(
'{}: {} - {}'.format(
self.id, self.name, 'failed' if e else 'success'))
def start(self, args, once=False):
#if self.started.is_set():
@ -167,59 +194,66 @@ class PupyJob(object):
margs.unknown_args = unknown_args
else:
margs = m.arg_parser.parse_args(args)
except PupyModuleExit as e:
m.error("Arguments parse error : %s"%e)
continue
res = m.is_compatible()
if type(res) is tuple:
comp, comp_exp=res
comp, comp_exp = res
elif res is None:
comp=True
comp_exp=""
comp = True
comp_exp = ""
else:
comp=res
comp_exp="reason not precised"
comp_exp = "reason not precised"
if not comp:
m.error("Compatibility error : %s"%comp_exp)
continue
self.worker_pool.apply_async(self.module_worker, (m, args, margs, once))
self.started.set()
def interrupt(self, wait=True):
def interrupt(self):
if not self.started:
raise RuntimeError("can't interrupt. job %s has not been started"%str(self))
if self.interrupted:
return True
self.interrupted = True
#calling the interrupt method is one is defined for the module instead of killing the thread
if hasattr(self.pupymodules[0],'interrupt'):
if hasattr(self.pupymodules[0], 'interrupt'):
self.pupsrv.handler.display_info('Sending interrupt request')
for m in self.pupymodules:
m.interrupt()
return True
else:
self.pupsrv.handler.display_warning('Module does not support interrupts. Resources may leak!')
self.worker_pool.interrupt_all()
if wait:
self.wait()
self.check()
return False
def interactive_wait(self):
while True:
if self.is_finished():
break
time.sleep(0.1)
if self.error_happened.is_set():
self.worker_pool.join(on_interrupt=self.interrupt)
if self.error:
return True
return False
def wait(self):
self.worker_pool.join()
def check(self):
for m in self.pupymodules:
while True:
if not m.client:
break
try:
m.client.conn._conn.ping(timeout=2, block=True)
logging.info('connection {} alive'.format(m))
m.client.conn._conn.ping(timeout=2)
break
except KeyboardInterrupt: