Add a tiny utility class for keeping bi-directional mappings.
Use it in websocket and socks.
This commit is contained in:
parent
b7a2fc8553
commit
8086022920
|
@ -2,7 +2,7 @@ from __future__ import (absolute_import, print_function, division)
|
|||
import socket
|
||||
import struct
|
||||
import array
|
||||
from . import tcp
|
||||
from . import tcp, utils
|
||||
|
||||
|
||||
class SocksError(Exception):
|
||||
|
@ -11,40 +11,45 @@ class SocksError(Exception):
|
|||
self.code = code
|
||||
|
||||
|
||||
class VERSION(object):
|
||||
SOCKS4 = 0x04
|
||||
VERSION = utils.BiDi(
|
||||
SOCKS4 = 0x04,
|
||||
SOCKS5 = 0x05
|
||||
)
|
||||
|
||||
|
||||
class CMD(object):
|
||||
CONNECT = 0x01
|
||||
BIND = 0x02
|
||||
CMD = utils.BiDi(
|
||||
CONNECT = 0x01,
|
||||
BIND = 0x02,
|
||||
UDP_ASSOCIATE = 0x03
|
||||
)
|
||||
|
||||
|
||||
class ATYP(object):
|
||||
IPV4_ADDRESS = 0x01
|
||||
DOMAINNAME = 0x03
|
||||
ATYP = utils.BiDi(
|
||||
IPV4_ADDRESS = 0x01,
|
||||
DOMAINNAME = 0x03,
|
||||
IPV6_ADDRESS = 0x04
|
||||
)
|
||||
|
||||
|
||||
class REP(object):
|
||||
SUCCEEDED = 0x00
|
||||
GENERAL_SOCKS_SERVER_FAILURE = 0x01
|
||||
CONNECTION_NOT_ALLOWED_BY_RULESET = 0x02
|
||||
NETWORK_UNREACHABLE = 0x03
|
||||
HOST_UNREACHABLE = 0x04
|
||||
CONNECTION_REFUSED = 0x05
|
||||
TTL_EXPIRED = 0x06
|
||||
COMMAND_NOT_SUPPORTED = 0x07
|
||||
ADDRESS_TYPE_NOT_SUPPORTED = 0x08
|
||||
REP = utils.BiDi(
|
||||
SUCCEEDED = 0x00,
|
||||
GENERAL_SOCKS_SERVER_FAILURE = 0x01,
|
||||
CONNECTION_NOT_ALLOWED_BY_RULESET = 0x02,
|
||||
NETWORK_UNREACHABLE = 0x03,
|
||||
HOST_UNREACHABLE = 0x04,
|
||||
CONNECTION_REFUSED = 0x05,
|
||||
TTL_EXPIRED = 0x06,
|
||||
COMMAND_NOT_SUPPORTED = 0x07,
|
||||
ADDRESS_TYPE_NOT_SUPPORTED = 0x08,
|
||||
)
|
||||
|
||||
|
||||
class METHOD(object):
|
||||
NO_AUTHENTICATION_REQUIRED = 0x00
|
||||
GSSAPI = 0x01
|
||||
USERNAME_PASSWORD = 0x02
|
||||
METHOD = utils.BiDi(
|
||||
NO_AUTHENTICATION_REQUIRED = 0x00,
|
||||
GSSAPI = 0x01,
|
||||
USERNAME_PASSWORD = 0x02,
|
||||
NO_ACCEPTABLE_METHODS = 0xFF
|
||||
)
|
||||
|
||||
|
||||
def _read(f, n):
|
||||
|
@ -146,4 +151,3 @@ class Message(object):
|
|||
"Unknown ATYP: %s" % self.atyp
|
||||
)
|
||||
f.write(struct.pack("!H", self.addr.port))
|
||||
|
||||
|
|
|
@ -65,3 +65,29 @@ def getbit(byte, offset):
|
|||
mask = 1 << offset
|
||||
if byte & mask:
|
||||
return True
|
||||
|
||||
|
||||
class BiDi:
|
||||
"""
|
||||
A wee utility class for keeping bi-directional mappings, like field
|
||||
constants in protocols:
|
||||
|
||||
CONST = BiDi(a=1, b=2)
|
||||
assert CONST.a == 1
|
||||
assert CONST[1] == "a"
|
||||
"""
|
||||
def __init__(self, **kwargs):
|
||||
self.names = kwargs
|
||||
self.values = {}
|
||||
for k, v in kwargs.items():
|
||||
self.values[v] = k
|
||||
if len(self.names) != len(self.values):
|
||||
raise ValueError("Duplicate values not allowed.")
|
||||
|
||||
def __getattr__(self, k):
|
||||
if k in self.names:
|
||||
return self.names[k]
|
||||
raise AttributeError("No such attribute: %s", k)
|
||||
|
||||
def __getitem__(self, k):
|
||||
return self.values[k]
|
||||
|
|
|
@ -25,13 +25,14 @@ MAX_16_BIT_INT = (1 << 16)
|
|||
MAX_64_BIT_INT = (1 << 64)
|
||||
|
||||
|
||||
class OPCODE:
|
||||
CONTINUE = 0x00
|
||||
TEXT = 0x01
|
||||
BINARY = 0x02
|
||||
CLOSE = 0x08
|
||||
PING = 0x09
|
||||
OPCODE = utils.BiDi(
|
||||
CONTINUE = 0x00,
|
||||
TEXT = 0x01,
|
||||
BINARY = 0x02,
|
||||
CLOSE = 0x08,
|
||||
PING = 0x09,
|
||||
PONG = 0x0a
|
||||
)
|
||||
|
||||
|
||||
def apply_mask(message, masking_key):
|
||||
|
@ -160,6 +161,18 @@ class FrameHeader:
|
|||
if self.masking_key and len(self.masking_key) != 4:
|
||||
raise ValueError("Masking key must be 4 bytes.")
|
||||
|
||||
def human_readable(self):
|
||||
return "\n".join([
|
||||
("fin - " + str(self.fin)),
|
||||
("rsv1 - " + str(self.rsv1)),
|
||||
("rsv2 - " + str(self.rsv2)),
|
||||
("rsv3 - " + str(self.rsv3)),
|
||||
("opcode - " + str(self.opcode)),
|
||||
("mask - " + str(self.mask)),
|
||||
("length_code - " + str(self.length_code)),
|
||||
("masking_key - " + repr(str(self.masking_key))),
|
||||
])
|
||||
|
||||
def to_bytes(self):
|
||||
first_byte = utils.setbit(0, 7, self.fin)
|
||||
first_byte = utils.setbit(first_byte, 6, self.rsv1)
|
||||
|
|
|
@ -1,5 +1,14 @@
|
|||
from netlib import utils
|
||||
import socket
|
||||
import tutils
|
||||
|
||||
|
||||
def test_bidi():
|
||||
b = utils.BiDi(a=1, b=2)
|
||||
assert b.a == 1
|
||||
assert b[1] == "a"
|
||||
tutils.raises(AttributeError, getattr, b, "c")
|
||||
tutils.raises(KeyError, b.__getitem__, 5)
|
||||
|
||||
|
||||
def test_hexdump():
|
||||
assert utils.hexdump("one\0"*10)
|
||||
|
|
|
@ -184,6 +184,10 @@ class TestFrameHeader:
|
|||
round(opcode=websockets.OPCODE.PING)
|
||||
round(masking_key="test")
|
||||
|
||||
def test_human_readable(self):
|
||||
f = websockets.FrameHeader(masking_key="test", mask=False)
|
||||
assert f.human_readable()
|
||||
|
||||
def test_funky(self):
|
||||
f = websockets.FrameHeader(masking_key="test", mask=False)
|
||||
bytes = f.to_bytes()
|
||||
|
|
Loading…
Reference in New Issue