From 80860229209b4c6eb8384e1bca3cabdbe062fe6e Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Thu, 30 Apr 2015 09:04:22 +1200 Subject: [PATCH] Add a tiny utility class for keeping bi-directional mappings. Use it in websocket and socks. --- netlib/socks.py | 52 ++++++++++++++++++++++------------------- netlib/utils.py | 26 +++++++++++++++++++++ netlib/websockets.py | 25 +++++++++++++++----- test/test_utils.py | 11 ++++++++- test/test_websockets.py | 4 ++++ 5 files changed, 87 insertions(+), 31 deletions(-) diff --git a/netlib/socks.py b/netlib/socks.py index a3c4e9a23..497b8eef3 100644 --- a/netlib/socks.py +++ b/netlib/socks.py @@ -2,7 +2,7 @@ from __future__ import (absolute_import, print_function, division) import socket import struct import array -from . import tcp +from . import tcp, utils class SocksError(Exception): @@ -11,40 +11,45 @@ class SocksError(Exception): self.code = code -class VERSION(object): - SOCKS4 = 0x04 +VERSION = utils.BiDi( + SOCKS4 = 0x04, SOCKS5 = 0x05 +) -class CMD(object): - CONNECT = 0x01 - BIND = 0x02 +CMD = utils.BiDi( + CONNECT = 0x01, + BIND = 0x02, UDP_ASSOCIATE = 0x03 +) -class ATYP(object): - IPV4_ADDRESS = 0x01 - DOMAINNAME = 0x03 +ATYP = utils.BiDi( + IPV4_ADDRESS = 0x01, + DOMAINNAME = 0x03, IPV6_ADDRESS = 0x04 +) -class REP(object): - SUCCEEDED = 0x00 - GENERAL_SOCKS_SERVER_FAILURE = 0x01 - CONNECTION_NOT_ALLOWED_BY_RULESET = 0x02 - NETWORK_UNREACHABLE = 0x03 - HOST_UNREACHABLE = 0x04 - CONNECTION_REFUSED = 0x05 - TTL_EXPIRED = 0x06 - COMMAND_NOT_SUPPORTED = 0x07 - ADDRESS_TYPE_NOT_SUPPORTED = 0x08 +REP = utils.BiDi( + SUCCEEDED = 0x00, + GENERAL_SOCKS_SERVER_FAILURE = 0x01, + CONNECTION_NOT_ALLOWED_BY_RULESET = 0x02, + NETWORK_UNREACHABLE = 0x03, + HOST_UNREACHABLE = 0x04, + CONNECTION_REFUSED = 0x05, + TTL_EXPIRED = 0x06, + COMMAND_NOT_SUPPORTED = 0x07, + ADDRESS_TYPE_NOT_SUPPORTED = 0x08, +) -class METHOD(object): - NO_AUTHENTICATION_REQUIRED = 0x00 - GSSAPI = 0x01 - USERNAME_PASSWORD = 0x02 +METHOD = utils.BiDi( + NO_AUTHENTICATION_REQUIRED = 0x00, + GSSAPI = 0x01, + USERNAME_PASSWORD = 0x02, NO_ACCEPTABLE_METHODS = 0xFF +) def _read(f, n): @@ -146,4 +151,3 @@ class Message(object): "Unknown ATYP: %s" % self.atyp ) f.write(struct.pack("!H", self.addr.port)) - diff --git a/netlib/utils.py b/netlib/utils.py index 44bed43ab..905d948fd 100644 --- a/netlib/utils.py +++ b/netlib/utils.py @@ -65,3 +65,29 @@ def getbit(byte, offset): mask = 1 << offset if byte & mask: return True + + +class BiDi: + """ + A wee utility class for keeping bi-directional mappings, like field + constants in protocols: + + CONST = BiDi(a=1, b=2) + assert CONST.a == 1 + assert CONST[1] == "a" + """ + def __init__(self, **kwargs): + self.names = kwargs + self.values = {} + for k, v in kwargs.items(): + self.values[v] = k + if len(self.names) != len(self.values): + raise ValueError("Duplicate values not allowed.") + + def __getattr__(self, k): + if k in self.names: + return self.names[k] + raise AttributeError("No such attribute: %s", k) + + def __getitem__(self, k): + return self.values[k] diff --git a/netlib/websockets.py b/netlib/websockets.py index 493bb18a8..d358ed535 100644 --- a/netlib/websockets.py +++ b/netlib/websockets.py @@ -25,13 +25,14 @@ MAX_16_BIT_INT = (1 << 16) MAX_64_BIT_INT = (1 << 64) -class OPCODE: - CONTINUE = 0x00 - TEXT = 0x01 - BINARY = 0x02 - CLOSE = 0x08 - PING = 0x09 +OPCODE = utils.BiDi( + CONTINUE = 0x00, + TEXT = 0x01, + BINARY = 0x02, + CLOSE = 0x08, + PING = 0x09, PONG = 0x0a +) def apply_mask(message, masking_key): @@ -160,6 +161,18 @@ class FrameHeader: if self.masking_key and len(self.masking_key) != 4: raise ValueError("Masking key must be 4 bytes.") + def human_readable(self): + return "\n".join([ + ("fin - " + str(self.fin)), + ("rsv1 - " + str(self.rsv1)), + ("rsv2 - " + str(self.rsv2)), + ("rsv3 - " + str(self.rsv3)), + ("opcode - " + str(self.opcode)), + ("mask - " + str(self.mask)), + ("length_code - " + str(self.length_code)), + ("masking_key - " + repr(str(self.masking_key))), + ]) + def to_bytes(self): first_byte = utils.setbit(0, 7, self.fin) first_byte = utils.setbit(first_byte, 6, self.rsv1) diff --git a/test/test_utils.py b/test/test_utils.py index 971e5076c..0cdd3faed 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -1,5 +1,14 @@ from netlib import utils -import socket +import tutils + + +def test_bidi(): + b = utils.BiDi(a=1, b=2) + assert b.a == 1 + assert b[1] == "a" + tutils.raises(AttributeError, getattr, b, "c") + tutils.raises(KeyError, b.__getitem__, 5) + def test_hexdump(): assert utils.hexdump("one\0"*10) diff --git a/test/test_websockets.py b/test/test_websockets.py index 4b286b6fe..9266d93ef 100644 --- a/test/test_websockets.py +++ b/test/test_websockets.py @@ -184,6 +184,10 @@ class TestFrameHeader: round(opcode=websockets.OPCODE.PING) round(masking_key="test") + def test_human_readable(self): + f = websockets.FrameHeader(masking_key="test", mask=False) + assert f.human_readable() + def test_funky(self): f = websockets.FrameHeader(masking_key="test", mask=False) bytes = f.to_bytes()