cpython/Demo/pysvr/pysvr.py

125 lines
2.8 KiB
Python
Raw Normal View History

1997-07-19 21:00:47 +00:00
#! /usr/bin/env python
"""A multi-threaded telnet-like server that gives a Python prompt.
This is really a prototype for the same thing in C.
Usage: pysvr.py [port]
For security reasons, it only accepts requests from the current host.
This can still be insecure, but restricts violations from people who
can log in on your machine. Use with caution!
1997-07-19 21:00:47 +00:00
"""
import sys, os, string, getopt, thread, socket, traceback
PORT = 4000 # Default port
1997-07-19 21:00:47 +00:00
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], "")
if len(args) > 1:
raise getopt.error, "Too many arguments."
except getopt.error, msg:
usage(msg)
for o, a in opts:
pass
if args:
try:
port = string.atoi(args[0])
except ValueError, msg:
usage(msg)
else:
port = PORT
main_thread(port)
def usage(msg=None):
sys.stdout = sys.stderr
if msg:
print msg
print "\n", __doc__,
sys.exit(2)
def main_thread(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("", port))
sock.listen(5)
print "Listening on port", port, "..."
1997-07-19 21:00:47 +00:00
while 1:
(conn, addr) = sock.accept()
if addr[0] != conn.getsockname()[0]:
conn.close()
print "Refusing connection from non-local host", addr[0], "."
continue
1997-07-19 21:00:47 +00:00
thread.start_new_thread(service_thread, (conn, addr))
del conn, addr
def service_thread(conn, addr):
(caddr, cport) = addr
print "Thread %s has connection from %s.\n" % (str(thread.get_ident()),
caddr),
1997-07-19 21:00:47 +00:00
stdin = conn.makefile("r")
stdout = conn.makefile("w", 0)
run_interpreter(stdin, stdout)
print "Thread %s is done.\n" % str(thread.get_ident()),
def run_interpreter(stdin, stdout):
globals = {}
try:
str(sys.ps1)
except:
sys.ps1 = ">>> "
source = ""
while 1:
stdout.write(sys.ps1)
line = stdin.readline()
if line[:2] == '\377\354':
line = ""
if not line and not source:
break
if line[-2:] == '\r\n':
line = line[:-2] + '\n'
source = source + line
try:
code = compile_command(source)
except SyntaxError, err:
source = ""
traceback.print_exception(SyntaxError, err, None, file=stdout)
continue
if not code:
continue
source = ""
try:
run_command(code, stdin, stdout, globals)
except SystemExit, how:
if how:
try:
how = str(how)
except:
how = ""
stdout.write("Exit %s\n" % how)
break
stdout.write("\nGoodbye.\n")
def run_command(code, stdin, stdout, globals):
save = sys.stdin, sys.stdout, sys.stderr
try:
sys.stdout = sys.stderr = stdout
sys.stdin = stdin
try:
exec code in globals
except SystemExit, how:
raise SystemExit, how, sys.exc_info()[2]
except:
type, value, tb = sys.exc_info()
if tb: tb = tb.tb_next
traceback.print_exception(type, value, tb)
del tb
finally:
sys.stdin, sys.stdout, sys.stderr = save
from code import compile_command
main()