r0c/test/stress.py

746 lines
16 KiB
Python

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import print_function
"""stress.py: retr0chat stress tester"""
__version__ = "0.9"
__author__ = "ed <a@ocv.me>"
__credits__ = ["stackoverflow.com"]
__license__ = "MIT"
__copyright__ = 2018
# config
#
#NUM_CLIENTS = 1
NUM_CLIENTS = 4
#CHANNELS = ['#1']
CHANNELS = ['#1','#2','#3','#4']
EVENT_DELAY = 0.01
EVENT_DELAY = 0.001
EVENT_DELAY = None
ITERATIONS = 1000000
ITERATIONS = 10000000
IMMEDIATE_TX = True
#IMMEDIATE_TX = False
VISUAL_CLIENT = True
#VISUAL_CLIENT = False
TELNET = False
TELNET = True
#
# config end
import multiprocessing
import threading
import asyncore
import socket
import signal
import random
import time
import sys
import os
sys.path.insert(1, os.path.join(sys.path[0], '..'))
from r0c.util import *
import builtins
try: print = __builtin__.print
except: print = builtins.print
PY2 = (sys.version_info[0] == 2)
if PY2:
from Queue import Queue
else:
from queue import Queue
def get_term_size():
"""
https://github.com/chrippa/backports.shutil_get_terminal_size
MIT licensed
"""
import struct
try:
from ctypes import windll, create_string_buffer, WinError
_handle_ids = {
0: -10,
1: -11,
2: -12,
}
def _get_terminal_size(fd):
handle = windll.kernel32.GetStdHandle(_handle_ids[fd])
if handle == 0:
raise OSError('handle cannot be retrieved')
if handle == -1:
raise WinError()
csbi = create_string_buffer(22)
res = windll.kernel32.GetConsoleScreenBufferInfo(handle, csbi)
if res:
res = struct.unpack("hhhhHhhhhhh", csbi.raw)
left, top, right, bottom = res[5:9]
columns = right - left + 1
lines = bottom - top + 1
return [columns, lines]
else:
raise WinError()
except ImportError:
import fcntl
import termios
def _get_terminal_size(fd):
try:
res = fcntl.ioctl(fd, termios.TIOCGWINSZ, b"\x00" * 4)
except IOError as e:
raise OSError(e)
lines, columns = struct.unpack("hh", res)
return [columns, lines]
try:
columns = int(os.environ["COLUMNS"])
except (KeyError, ValueError):
columns = 0
try:
lines = int(os.environ["LINES"])
except (KeyError, ValueError):
lines = 0
# Only query if necessary
if columns <= 0 or lines <= 0:
try:
size = _get_terminal_size(sys.__stdout__.fileno())
except (NameError, OSError):
size = [80,24]
if columns <= 0:
columns = size[0]
if lines <= 0:
lines = size[1]
return [columns, lines]
tsz = get_term_size()
tsz[1] -= 1
tszb = struct.pack('>HH', *tsz)
#print(b2hex(tszb))
#sys.exit(0)
class Client(asyncore.dispatcher):
def __init__(self, core, port, behavior, status_q):
asyncore.dispatcher.__init__(self)
self.core = core
self.port = port
self.behavior = behavior
self.status_q = status_q
self.explain = True
self.explain = False
self.dead = False
self.stopping = False
self.actor_active = False
self.bootup()
def bootup(self):
self.in_text = u''
self.tx_only = False
self.outbox = Queue(1000)
self.backlog = None
self.num_outbox = 0
self.num_sent = 0
self.pkt_sent = 0
self.stage = 'start'
self.nick = 'x'
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(('127.0.0.1', self.port))
thr = threading.Thread(target=self.actor)
thr.daemon = True # for emergency purposes
thr.start()
def send_status(self, txt):
if False:
print(txt)
self.status_q.put(txt)
def actor(self):
print_stages = False
self.actor_active = True
print('actor going up')
while not self.stopping:
time.sleep(0.02)
if self.stage == 'start':
self.send_status('start')
hit = False
if 'verify that your previous config' in self.in_text:
hit = True
self.in_text = u''
self.tx('n')
if 'type the text below, then hit [Enter]:' in self.in_text:
hit = True
self.stage = 'qwer'
self.in_text = u''
for ch in u'qwer asdf\n':
self.tx(ch)
time.sleep(0.1)
if hit and TELNET:
#print('sending telnet termsize\n'*100)
self.txb(b'\xff\xfa\x1f' + tszb + b'\xff\xf0')
continue
if self.stage == 'qwer':
self.send_status('qwer')
test_pass = False
if 'your client is stuck in line-buffered mode' in self.in_text:
print('WARNING: r0c thinks we are linemode')
test_pass = True
self.tx(u'a')
if 'text appeared as you typed' in self.in_text:
test_pass = True
self.tx(u'b')
if test_pass:
self.in_text = u''
self.stage = 'color'
continue
if self.stage == 'color':
self.send_status('color')
if 'does colours work?' in self.in_text:
self.stage = 'codec'
self.in_text = u''
self.tx(u'y')
continue
if self.stage == 'codec':
self.send_status('codec')
if 'which line looks like' in self.in_text:
self.stage = 'getnick'
self.tx(u'a')
continue
if self.stage == 'getnick':
self.send_status('getnick')
ofs1 = self.in_text.find(u'H\033[0;36m')
ofs2 = self.in_text.find(u'>\033[0m ')
if ofs1 >= 0 and ofs2 == ofs1 + 8 + 6:
if not TELNET:
#print('sending vt100 termsize\n'*100)
self.tx(u'\033[{1};{0}R'.format(*tsz))
self.nick = self.in_text[ofs2-6:ofs2]
self.in_text = u''
self.tx(u'/join {0}\n'.format(CHANNELS[0]))
self.stage = 'ready'
self.send_status('{0}:start'.format(self.nick))
if self.behavior == 'flood_single_channel':
return self.flood_single_channel()
if self.behavior == 'jump_channels':
return self.jump_channels()
if self.behavior == 'reconnect_loop':
return self.reconnect_loop()
if self.behavior == 'split_utf8_runes':
return self.split_utf8_runes()
print('u wot')
return
self.actor_active = False
def flood_single_channel(self):
while not self.stopping:
time.sleep(0.02)
if self.stage == 'ready':
if 'fire/' in self.in_text:
self.stage = 'main'
self.in_Text = u''
continue
if self.stage == 'main':
self.stage = 'done'
for n in range(4000):
time.sleep(0.01)
self.tx(u'{0} {1}\n'.format(time.time(), n))
continue
if self.stage == 'done':
time.sleep(1)
self.text = u'' # dont care
if not self.outbox.empty():
continue
self.tx(u'{0} done\n'.format(time.time()))
self.actor_active = False
def split_utf8_runes(self):
charset = u'⢀⣴⣷⣄⠈⠻⡿⠋'
to_send = charset.encode('utf-8')
while False:
#print('tx up')
for n in range(len(to_send)):
#print('tx ch')
self.txb(to_send[n:n+1])
time.sleep(0.2)
self.txb(to_send[0:2])
to_send = to_send[2:] + to_send[0:2]
while True:
for n in range(0, len(to_send), 3):
self.txb(to_send[n:n+3])
time.sleep(0.2)
def reconnect_loop(self):
#print('reconnect_loop here')
channels_avail = CHANNELS
for chan in channels_avail:
self.tx(u'/join {0}\n'.format(chan))
time.sleep(1)
#print('reconnect_loop closing')
self.close()
#print('reconnect_loop booting')
self.bootup()
#print('reconnect_loop sayonara')
def expl(self, msg):
if not self.explain:
return
print(msg)
self.await_continue()
def await_continue(self):
self.in_text = u''
t0 = time.time()
while not self.stopping and not 'zxc mkl' in self.in_text:
time.sleep(0.1)
if time.time() - t0 > 10:
break
self.in_text = u''
def jump_channels(self):
immediate = IMMEDIATE_TX
delay = EVENT_DELAY
#print(immediate)
#sys.exit(0)
self.tx_only = immediate
script = []
active_chan = 0
member_of = [CHANNELS[0]]
channels_avail = CHANNELS
# maps to channels_avail
msg_id = [0]*len(channels_avail)
# ---- acts ----
# next channel
# join a channel
# part a channel
# send a message
chance = [ 10, 5, 4, 18 ]
chance = [ 10, 3, 2, 30 ]
chance = [ 10, 30, 2, 130 ]
for n in range(len(chance)-1):
chance[n+1] += chance[n]
print(chance)
#sys.exit(1)
odds_next, odds_join, odds_part, odds_send = chance
for n in range(ITERATIONS):
if self.stopping:
break
if n % 1000 == 0:
self.send_status('{0}:ev.{1}'.format(self.nick, n))
#self.tx(u'at event {0}\n'.format(n))
while not self.stopping:
if not member_of:
next_act = 13
else:
next_act = random.randrange(sum(chance))
if self.explain:
print('in [{0}], active [{1}:{2}], msgid [{3}], next [{4}]'.format(
','.join(member_of), active_chan, channels_avail[active_chan],
','.join(str(x) for x in msg_id), next_act))
if next_act <= odds_next:
if not member_of:
self.expl('tried to jump channel but we are all alone ;_;')
continue
changed_from_i = active_chan
changed_from_t = member_of[active_chan]
active_chan += 1
script.append(b'\x18')
if active_chan >= len(member_of):
# we do not consider the status channel
script.append(b'\x18')
active_chan = 0
changed_to_i = active_chan
changed_to_t = member_of[active_chan]
if self.explain:
self.expl('switching to next channel from {0} to {1} ({2} to {3})'.format(
changed_from_i, changed_to_i,
changed_from_t, changed_to_t))
for act in script:
self.txb(act)
self.txb(b'hello\n')
self.await_continue()
script = []
break
if next_act <= odds_join:
if len(member_of) == len(channels_avail):
self.expl('tried to join channel but filled {0} of {1} possible'.format(
len(member_of), len(channels_avail)))
# out of channels to join, try a different act
continue
while True:
to_join = random.choice(channels_avail)
if to_join not in member_of:
break
member_of.append(to_join)
active_chan = len(member_of) - 1
self.expl('going to join {0}:{1}, moving from {2}:{3}'.format(
len(member_of), to_join, active_chan, member_of[active_chan]))
script.append(u'/join {0}\n'.format(to_join).encode('utf-8'))
if self.explain:
for act in script:
self.txb(act)
self.txb(b'hello\n')
self.await_continue()
script = []
break
if next_act <= odds_part:
#continue
if not member_of:
self.expl('tried to leave channel but theres nothing to leave')
# out of channels to part, try a different act
continue
to_part = random.choice(member_of)
chan_idx = member_of.index(to_part)
self.expl('gonna leave {0}:{1}, we are in {2}:{3}'.format(
chan_idx, to_part, active_chan, member_of[active_chan]))
# jump to the channel to part from
while active_chan != chan_idx:
self.expl('jumping over from {0} to {1}'.format(
active_chan, active_chan + 1 ))
active_chan += 1
script.append(b'\x18')
if active_chan >= len(member_of):
self.expl('wraparound; dodging the status chan')
# we do not consider the status channel
script.append(b'\x18')
active_chan = 0
if active_chan == len(member_of) - 1:
self.expl('we are at the end of the channel list, decreasing int')
del member_of[active_chan]
active_chan -= 1
else:
self.expl('we are not at the end of the channel list, keeping it')
del member_of[active_chan]
if member_of:
self.expl('we will end up in {0}:{1}'.format(active_chan, member_of[active_chan]))
else:
self.expl('we have now left all our channels')
script.append(b'/part\n')
if self.explain:
for act in script:
self.txb(act)
self.txb(b'hello\n')
self.await_continue()
script = []
break
if not member_of:
# not in any channels, try a different act
continue
chan_name = member_of[active_chan]
chan_idx = channels_avail.index(chan_name)
msg_id[chan_idx] += 1
self.expl('gonna talk to {0}:{1}, msg #{2}'.format(
chan_idx, chan_name, msg_id[chan_idx]))
script.append(u'{0} {1} {2}\n'.format(
chan_name, msg_id[chan_idx], n).encode('utf-8'))
if self.explain:
for act in script:
self.txb(act)
self.txb(b'hello\n')
self.await_continue()
script = []
if immediate:
for action in script:
self.txb(action)
if delay:
time.sleep(delay)
script = []
break
self.tx(u'q\n')
while not self.stopping:
if 'fire/' in self.in_text:
break
time.sleep(0.01)
self.tx_only = True
for n, ev in enumerate(script):
if self.stopping:
break
if n % 100 == 0:
print('at event {0}\n'.format(n))
self.txb(ev)
if delay:
time.sleep(delay)
self.tx(u'done')
print('done')
self.actor_active = False
def handle_close(self):
self.dead = True
def handle_error(self):
whoops()
def tx(self, bv):
self.txb(bv.encode('utf-8'))
def txb(self, bv):
self.num_outbox += len(bv)
self.outbox.put(bv)
def readable(self):
return not self.dead
def writable(self):
return (self.backlog or not self.outbox.empty())
def handle_write(self):
msg = self.backlog
if not msg:
msg = self.outbox.get()
sent = self.send(msg)
self.backlog = msg[sent:]
self.num_sent += sent
self.pkt_sent += 1
if self.pkt_sent % 8192 == 8191:
#print('outbox {0} sent {1} queue {2}'.format(self.num_outbox, self.num_sent, self.num_outbox - self.num_sent))
self.send_status('{0}:s{1},q{2}'.format(self.nick, self.num_sent, self.num_outbox - self.num_sent))
def handle_read(self):
if self.dead:
print('!!! read when dead')
return
data = self.recv(8192)
if not data:
self.dead = True
return
if VISUAL_CLIENT:
print(data.decode('utf-8', 'ignore'))
if not self.tx_only:
self.in_text += data.decode('utf-8', 'ignore')
#if self.explain:
# print(self.in_text)
class SubCore(object):
def __init__(self, port, behavior, cmd_q, stat_q):
self.cmd_q = cmd_q
self.stat_q = stat_q
self.port = port
self.behavior = behavior
self.stopped = False
self.client = Client(self, self.port, self.behavior, stat_q)
def run(self):
timeout = 0.2
while self.cmd_q.empty():
asyncore.loop(timeout, count=2)
self.client.stopping = True
clean_shutdown = False
for n in range(0, 40): # 2sec
if not self.client.actor_active:
clean_shutdown = True
break
time.sleep(0.05)
self.client.close()
self.stopped = True
class ClientAPI(object):
def __init__(self, mproc, cmd_q, stat_q):
self.mproc = mproc
self.cmd_q = cmd_q
self.stat_q = stat_q
self.status = 'not.init'
def recv_status(self):
while not self.stat_q.empty():
self.status = self.stat_q.get()
class Core(object):
def __init__(self):
if len(sys.argv) < 2:
print('need 1 argument: telnet port')
sys.exit(1)
port = int(sys.argv[1])
self.stopping = False
self.asyncore_alive = False
signal.signal(signal.SIGINT, self.signal_handler)
behaviors = ['jump_channels'] * (NUM_CLIENTS)
#behaviors.append('reconnect_loop')
#behaviors = ['split_utf8_runes'] * (NUM_CLIENTS)
self.clients = []
for behavior in behaviors:
cmd_q = multiprocessing.Queue()
stat_q = multiprocessing.Queue()
mproc = multiprocessing.Process(target=self.new_subcore, args=(cmd_q, stat_q))
self.clients.append(ClientAPI(mproc, cmd_q, stat_q))
cmd_q.put(port)
cmd_q.put(behavior)
mproc.start()
def new_subcore(self, cmd_q, stat_q):
subcore = SubCore(cmd_q.get(), cmd_q.get(), cmd_q, stat_q)
subcore.run()
cmd_q.get()
def print_status(self):
msg = u''
for cli in self.clients:
msg += u'{0}, '.format(cli.status)
print(msg)
def run(self):
print(' * test is running')
print_status = not VISUAL_CLIENT
while not self.stopping:
for n in range(0,5):
time.sleep(0.1)
for cli in self.clients:
cli.recv_status()
if self.stopping:
break
if print_status:
self.print_status()
print('\r\n * subcores stopping')
for subcore in self.clients:
subcore.cmd_q.put('x')
for n in range(0, 40): # 2sec
clean_shutdown = True
for subcore in self.clients:
if not subcore.cmd_q.empty():
clean_shutdown = False
break
if clean_shutdown:
print(' * actor stopped')
break
time.sleep(0.05)
if not clean_shutdown:
print(' -X- some subcores are stuck')
for subcore in self.clients:
if not subcore[1].empty():
subcore[0].terminate()
print(' * test ended')
def shutdown(self):
self.stopping = True
def signal_handler(self, signal, frame):
self.shutdown()
if __name__ == '__main__':
core = Core()
core.run()
# cat log | grep -E ' adding msg ' | awk '{printf "%.3f\n", $1-v; v=$1}' | sed -r 's/\.//;s/^0*//;s/^$/0/' | awk 'BEGIN {sum=0} $1<10000 {sum=sum+$1} NR%10==0 {v=sum/32; sum=0; printf "%" v "s\n", "" }' | tr ' ' '#'