Compare commits

...

2 Commits

Author SHA1 Message Date
ed 13025474bf optimize for busy servers:
if cpu load too high, start combining replies into larger packets
(old target was >= 480 bytes, now aim for >= 4k instead)

and on the contrary, avoid the windows-telnet buffer overflow by
ensuring that packets are never combined during the config wizard,
which is something that could happen until now
2024-07-20 18:25:38 +00:00
ed 188cc14529 use `poll()` to support 1000+ clients;
macos has a long history of bugs so keep `select()` as default, and
just stick to `select()` with its limit of 512 connections on windows
2024-07-20 00:24:00 +00:00
5 changed files with 336 additions and 45 deletions

View File

@ -48,7 +48,7 @@ technical:
* message input with readline-like editing (arrow-left/right, home/end, backspace)
* history of sent messages (arrow-up/down)
* bandwidth-conservative (push/pop lines instead of full redraws; scroll-regions)
* fast enough; 600 clients @ 750 msgs/sec, or 1'000 cli @ 350 msg/s
* fast enough; 600 clients @ 750 msgs/sec, or 1500 cli @ 75 msg/s
* [bridge](#irc) several irc channels from several networks into one r0c channel
## windows clients

View File

@ -20,6 +20,7 @@ if PY2:
else:
unicode = str
MACOS = platform.system() == "Darwin"
WINDOWS = False
if platform.system() == "Windows":
WINDOWS = [int(x) for x in platform.version().split(".")]

View File

@ -2,7 +2,7 @@
# coding: utf-8
from __future__ import print_function
from .__version__ import S_VERSION
from .__init__ import EP, WINDOWS, COLORS, unicode
from .__init__ import COLORS, EP, MACOS, WINDOWS, unicode
from . import util as Util
from . import inetcat as Inetcat
from . import itelnet as Itelnet
@ -68,6 +68,13 @@ def optgen(ap, pwd):
ac.add_argument("--i-rate", metavar="B,R", type=u, default="4,2", help="rate limit; burst of B messages, then R seconds between each")
ac.add_argument("--ctcp-ver", metavar="S", type=u, default="r0c v%s" % (S_VERSION), help="reply to CTCP VERSION")
ac = ap.add_argument_group("limits")
ac.add_argument("-nc", metavar="NUM", type=int, default=0, help="allow NUM simultaneous client connections (will keep OS-default if 0)")
if MACOS:
ac.add_argument("--poll", action="store_true", help="allow more than 1000 simultaneous client connections (WARNING: buggy on many macos versions)")
if hasattr(select, "poll") and not MACOS:
ac.add_argument("--no-poll", action="store_true", help="use the older 'select' API; reduces the max num simultaneous connections to 1000")
ac = ap.add_argument_group("ux")
ac.add_argument("--no-all", action="store_true", help="default-disable @all / @everyone")
ac.add_argument("--motd", metavar="PATH", type=u, default="", help="file to include at the end of the welcome-text (can be edited at runtime)")
@ -107,6 +114,7 @@ class Fargparse(object):
def run_fap(argv, pwd):
ap = Fargparse()
optgen(ap, pwd)
setattr(ap, "no_poll", True)
if u"-h" in unicode(([""] + argv)[-1]):
print()
@ -173,6 +181,37 @@ except:
run_ap = run_fap
def rlimit(bump):
try:
import resource
soft, hard = [
int(x) if x > 0 else 1024 * 1024
for x in list(resource.getrlimit(resource.RLIMIT_NOFILE))
]
except:
soft = hard = None
if not soft or not hard:
return "Unknown (check ulimit)"
if not bump:
return "%d (os-default)" % (soft,)
suf = ""
if bump > hard:
bump = hard
suf = " due to ulimit"
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (bump, hard))
soft = bump
except:
return "%d (bump failed)" % (soft,)
return "%d%s" % (bump, suf)
class Core(object):
def __init__(self):
pass
@ -260,6 +299,18 @@ printf '%s\\n' GK . . . . r0c.int . | openssl req -newkey rsa:2048 -sha256 -keyo
else:
print(" * {0} server disabled".format(srv))
if (
hasattr(select, "poll")
and getattr(ar, "poll", True)
and not getattr(ar, "no_poll", False)
):
poll = True
ncli = rlimit(ar.nc)
else:
poll = False
ncli = "500 (os-limit)" if WINDOWS else "1000"
print(" * Max num clients: %s" % (ncli,))
if ar.pw == "hunter2":
print("\033[1;31m")
print(" using default password '{0}'".format(ar.pw))
@ -318,7 +369,7 @@ printf '%s\\n' GK . . . . r0c.int . | openssl req -newkey rsa:2048 -sha256 -keyo
ircn.connect()
print(" * Running")
self.select_thr = Util.Daemon(self.select_worker, "selector")
self.select_thr = Util.Daemon(self.select_worker, "selector", (poll,))
return True
@ -423,10 +474,11 @@ printf '%s\\n' GK . . . . r0c.int . | openssl req -newkey rsa:2048 -sha256 -keyo
print(" * r0c is down")
return True
def select_worker(self):
def select_worker(self, poll):
srvs = {}
for iface in self.servers:
srvs[iface.srv_sck] = iface
s = iface.srv_sck
srvs[s.fileno() if poll else s] = iface
t_fast = 0.5 if self.ar.ircn else 1
@ -437,6 +489,9 @@ printf '%s\\n' GK . . . . r0c.int . | openssl req -newkey rsa:2048 -sha256 -keyo
nfast = 0
dirty_ref = 0
next_slow = 0
want_rx = []
want_tx = []
poller = None
timeout = None
while not self.shutdown_flag.is_set():
nsn = self.world.cserial
@ -452,13 +507,15 @@ printf '%s\\n' GK . . . . r0c.int . | openssl req -newkey rsa:2048 -sha256 -keyo
else:
fast[c.sck] = c
sc[c.sck] = c
sc[c.sck.fileno() if poll else c.sck] = c
timeout = 0.2 if slow else t_fast if fast else 69
want_rx = list(sc) + list(srvs)
if poll:
poller = select.poll()
_ = [poller.register(fd) for fd in want_rx]
want_tx = [s for s, c in fast.items() if c.writable()]
want_rx = [s for s, c in sc.items() if c.readable()]
want_rx += list(srvs.keys())
now = time.time()
if slow and now >= next_slow:
@ -481,8 +538,21 @@ printf '%s\\n' GK . . . . r0c.int . | openssl req -newkey rsa:2048 -sha256 -keyo
ct = 0.09 if nfast < 2 else timeout
try:
# print("sel", len(want_rx), len(want_tx), ct)
rxs, txs, _ = select.select(want_rx, want_tx, [], ct)
if poll:
want_tx = [x.fileno() for x in want_tx]
lut = set(want_tx)
for fd in want_rx:
if fd in lut:
poller.modify(fd, select.POLLIN | select.POLLOUT)
else:
poller.modify(fd, select.POLLIN)
ready = poller.poll(ct * 1000)
rxs = [x[0] for x in ready if x[1] & select.POLLIN]
txs = [x[0] for x in ready if x[1] & select.POLLOUT]
else:
rxs, txs, _ = select.select(want_rx, want_tx, [], ct)
if self.stopping:
break

View File

@ -11,7 +11,6 @@ import re
import time
import zlib
import socket
import threading
import binascii
from datetime import datetime
import operator
@ -704,36 +703,8 @@ class VT100_Client(object):
def say(self, message):
self.outbox.append(message)
def readable(self):
return not self.dead
def writable(self):
# if not self.replies.empty() or self.backlog:
# print('REPLY!!')
# else:
# print('@' if self.backlog or not self.replies.empty() or not self.outbox.empty() else '.', end='')
# sys.stdout.flush()
# if self.slowmo_tx:
# #print('x')
# now = time.time()
# if self.last_tx is not None and now - self.last_tx < 0.01:
# return False
# #print('ooo')
# looks like we might end up here after all,
# TODO: safeguard against similar issues (thanks asyncore)
try:
return not self.dead and (self.backlog or self.replies or self.outbox)
except:
# terrible print-once guard
try:
self.crash_case_1 += 1
except:
self.crash_case_1 = 1
Util.whoops()
if not self.dead:
self.host.part(self)
return (self.backlog or self.replies or self.outbox) and not self.dead
def handle_close(self):
if not self.dead:
@ -745,16 +716,19 @@ class VT100_Client(object):
self.host.part(self)
def handle_write(self):
if not self.writable():
return
slow = self.slowmo_tx or self.wizard_stage
frame_sz = 480 if slow else 3939
msg = self.backlog
self.backlog = b""
for src in [self.replies, self.outbox]:
while len(msg) < 480 and src:
while src and len(msg) < frame_sz:
msg += src.pop(0)
if self.dead or not msg:
return
if self.ar.hex_tx:
if len(msg) < self.ar.hex_lim:
Util.hexdump(msg, "<<--")
@ -767,7 +741,7 @@ class VT100_Client(object):
)
Util.hexdump(msg, "<", self.wire_log)
if self.slowmo_tx:
if slow:
end_pos = next(
(
i
@ -776,7 +750,6 @@ class VT100_Client(object):
),
len(msg),
)
self.backlog = msg[end_pos:]
sent = self.sck.send(msg[:end_pos])
self.backlog = msg[sent:]
self.slowmo_skips = self.slowmo_tx

247
test/astress.py Normal file
View File

@ -0,0 +1,247 @@
#!/usr/bin/env python3
# coding: utf-8
import asyncio
import random
import struct
import signal
import time
import sys
import os
sys.path.insert(1, os.path.join(sys.path[0], ".."))
from r0c import util # noqa: E402
"""astress.py: retr0chat stress tester (async edition)"""
__version__ = "0.9"
__author__ = "ed <a@ocv.me>"
__credits__ = ["stackoverflow.com"]
__license__ = "MIT"
__copyright__ = 2024
## config
##
NUM_CLIENTS = 1
NUM_CLIENTS = 750
CHANNELS = ['#1']
# CHANNELS = ["#1", "#2", "#3", "#4"]
EVENT_DELAY = 0.01
EVENT_DELAY = 0.005
EVENT_DELAY = 20
# EVENT_DELAY = None
VISUAL_CLIENT = True
# VISUAL_CLIENT = False
TELNET = False
TELNET = True
##
## config end
tsz = [80, 24]
tszb = struct.pack(">HH", *tsz)
# print(b2hex(tszb))
# sys.exit(0)
class Client(object):
def __init__(self, core, port, behavior, n):
self.core = core
self.port = port
self.behavior = behavior
self.n = n
self.explain = True
self.explain = False
self.dead = False
self.stopping = False
self.actor_active = False
self.in_text = ""
self.tx_only = False
self.num_sent = 0
self.pkt_sent = 0
self.stage = "start"
self.status = ""
self.nick = "x"
def close(self):
self.stopping = True
self.sck_rd.close()
self.sck_wr.close()
async def tx(self, txt):
self.sck_wr.write(txt.encode("utf-8"))
await self.sck_wr.drain()
async def readloop(self):
rd = self.sck_rd
while not self.stopping:
buf = await rd.read(8192)
if self.tx_only:
continue
self.in_text += buf.decode("utf-8", "ignore")
async def run(self):
rd, wr = await asyncio.open_connection("127.0.0.1", self.port)
self.sck_rd = rd
self.sck_wr = wr
self.actor_active = True
print("client %d going up" % (self.n,))
while not self.stopping:
buf = await rd.read(8192)
self.in_text += buf.decode("utf-8", "ignore")
if self.stage == "start":
termsize_rsp = b"\xff\xfa\x1f" + tszb + b"\xff\xf0"
if "verify that your config" in self.in_text:
self.in_text = ""
wr.write(termsize_rsp)
await self.tx("n\n\n")
if "type the text below, then hit [Enter] [Enter]:" in self.in_text:
self.stage = "qwer"
self.in_text = ""
wr.write(termsize_rsp)
for ch in "qwer asdf\n\n":
await self.tx(ch)
await asyncio.sleep(0.1)
continue
if self.stage == "qwer":
test_pass = False
if "your client is stuck in line-buffered mode" in self.in_text:
print("WARNING: r0c thinks we are linemode")
test_pass = True
await self.tx("a")
if "text appeared as you typed" in self.in_text:
test_pass = True
await self.tx("b")
if test_pass:
self.in_text = ""
self.stage = "color"
continue
if self.stage == "color":
if "does colours work?" in self.in_text:
self.stage = "codec"
self.in_text = ""
await self.tx("y")
continue
if self.stage == "codec":
if "which line looks like" in self.in_text:
self.stage = "getnick"
await self.tx("a")
continue
if self.stage == "getnick":
ofs1 = self.in_text.find("H\033[0;36m")
ofs2 = self.in_text.find(">\033[0m ")
if ofs1 >= 0 and ofs2 == ofs1 + 8 + 6:
if not TELNET:
# print('sending vt100 termsize\n'*100)
await self.tx("\033[%d;%dR" % tsz)
self.nick = self.in_text[ofs2 - 6 : ofs2]
self.in_text = ""
await self.tx("/join %s\n" % (CHANNELS[0],))
self.stage = "ready"
self.status = "%s:start" % (self.nick,)
self.task_rd = asyncio.create_task(self.readloop())
if self.behavior == "flood_single_channel":
await self.flood_single_channel()
break
print("u wot")
break
self.actor_active = False
async def flood_single_channel(self):
while "ok go" not in self.in_text:
self.in_text = ""
await asyncio.sleep(3)
await asyncio.sleep(5 + EVENT_DELAY * random.random())
self.tx_only = True
while not self.stopping:
await self.tx("%s\n" % (time.time(),))
await asyncio.sleep(EVENT_DELAY)
class Core(object):
def __init__(self):
if len(sys.argv) < 2:
print("need 1 argument: telnet port")
sys.exit(1)
self.stopping = False
self.clients = []
self.ctasks = []
signal.signal(signal.SIGINT, self.signal_handler)
def print_status(self):
msg = [x.status for x in self.clients]
print(", ".join(msg))
async def run(self):
port = int(sys.argv[1])
behaviors = ["flood_single_channel"] * int(NUM_CLIENTS)
for n, behavior in enumerate(behaviors):
cli = Client(self, port, behavior, n)
self.ctasks.append(asyncio.create_task(cli.run()))
self.clients.append(cli)
await asyncio.sleep(0.07)
if self.stopping:
break
print(" * test is running")
print_status = not VISUAL_CLIENT
while not self.stopping:
for n in range(5):
await asyncio.sleep(1)
if self.stopping:
break
if print_status:
self.print_status()
print(" * test ended")
def shutdown(self):
self.stopping = True
def signal_handler(self, signal, frame):
if self.stopping:
sys.exit(0)
self.shutdown()
if __name__ == "__main__":
core = Core()
asyncio.run(core.run())
r_ = """
taskset -c 3 python3 -m r0c --bench -nc 9191
taskset -c 1 python3 test/astress.py 2323
taskset -c 2 python3 test/astress.py 2323
"""