From c2ff43a9ed2aadfb944228ee3a729e4c8604e973 Mon Sep 17 00:00:00 2001 From: Adam Karpierz Date: Tue, 20 Mar 2018 15:52:50 +0100 Subject: [PATCH] Fix SyntaxError in layers/tls/tools/_tls_aead_auth_decrypt() All another errors in layers/tls/tools module has been fixed Add missing imports All code in this module has been ported to Python3 Tests are added for all functions in this module --- scapy/layers/tls/tools.py | 96 ++++++++++++++++++++------------------- test/tls.uts | 90 ++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 46 deletions(-) diff --git a/scapy/layers/tls/tools.py b/scapy/layers/tls/tools.py index 716681fe6..5250dcb4a 100644 --- a/scapy/layers/tls/tools.py +++ b/scapy/layers/tls/tools.py @@ -7,6 +7,10 @@ TLS helpers, provided as out-of-context methods. """ +from __future__ import absolute_import +import struct + +from scapy.compat import orb, chb from scapy.error import warning from scapy.fields import (ByteEnumField, ShortEnumField, FieldLenField, StrLenField) @@ -19,10 +23,9 @@ class TLSPlaintext(Packet): name = "TLS Plaintext" fields_desc = [ ByteEnumField("type", None, _tls_type), ShortEnumField("version", None, _tls_version), - FieldLenField("len", None, length_of="fragment", - fmt="!H"), - StrLenField("fragment", "", - length_from = lambda pkt: pkt.length) ] + FieldLenField("len", None, length_of="data", fmt="!H"), + StrLenField("data", "", + length_from = lambda pkt: pkt.len) ] class TLSCompressed(TLSPlaintext): name = "TLS Compressed" @@ -39,8 +42,8 @@ def _tls_compress(alg, p): c = TLSCompressed() c.type = p.type c.version = p.version - c.fragment = alg.compress(p.fragment) - c.len = len(c.fragment) + c.data = alg.compress(p.data) + c.len = len(c.data) return c def _tls_decompress(alg, c): @@ -51,21 +54,21 @@ def _tls_decompress(alg, c): p = TLSPlaintext() p.type = c.type p.version = c.version - p.fragment = alg.decompress(c.fragment) - p.len = len(p.fragment) + p.data = alg.decompress(c.data) + p.len = len(p.data) return p def _tls_mac_add(alg, c, write_seq_num): """ Compute the MAC using provided MAC alg instance over TLSCiphertext c using current write sequence number write_seq_num. Computed MAC is then appended - to c.fragment and c.length is updated to reflect that change. It is the + to c.data and c.len is updated to reflect that change. It is the caller responsability to increment the sequence number after the operation. The function has no return value. """ write_seq_num = struct.pack("!Q", write_seq_num) - h = alg.digest(write_seq_num + str(c)) - c.fragment += h + h = alg.digest(write_seq_num + bytes(c)) + c.data += h c.len += alg.hash_len def _tls_mac_verify(alg, p, read_seq_num): @@ -77,7 +80,7 @@ def _tls_mac_verify(alg, p, read_seq_num): If the MAC is valid: - The function returns True - The packet p is updated in the following way: trailing MAC value is - removed from p.fragment and length is updated accordingly. + removed from p.data and length is updated accordingly. In case of error, False is returned, and p may have been modified. @@ -87,52 +90,52 @@ def _tls_mac_verify(alg, p, read_seq_num): h_size = alg.hash_len if p.len < h_size: return False - received_h = p.fragment[-h_size:] + received_h = p.data[-h_size:] p.len -= h_size - p.fragment = p.fragment[:-h_size] + p.data = p.data[:-h_size] read_seq_num = struct.pack("!Q", read_seq_num) - h = alg.digest(read_seq_num + str(p)) + h = alg.digest(read_seq_num + bytes(p)) return h == received_h def _tls_add_pad(p, block_size): """ Provided with cipher block size parameter and current TLSCompressed packet p (after MAC addition), the function adds required, deterministic padding - to p.fragment before encryption step, as it is defined for TLS (i.e. not + to p.data before encryption step, as it is defined for TLS (i.e. not SSL and its allowed random padding). The function has no return value. """ - padlen = block_size - ((p.len + 1) % block_size) - if padlen == block_size: - padlen = 0 - padding = chr(padlen) * (padlen + 1) + padlen = -p.len % block_size + padding = chb(padlen) * (padlen + 1) p.len += len(padding) - p.fragment += padding + p.data += padding def _tls_del_pad(p): """ Provided with a just decrypted TLSCiphertext (now a TLSPlaintext instance) - p, the function removes the trailing padding found in p.fragment. It also + p, the function removes the trailing padding found in p.data. It also performs some sanity checks on the padding (length, content, ...). False is returned if one of the check fails. Otherwise, True is returned, - indicating that p.fragment and p.len have been updated. + indicating that p.data and p.len have been updated. """ if p.len < 1: warning("Message format is invalid (padding)") return False - padlen = ord(p.fragment[-1]) + 1 - if (p.len < padlen): + padlen = orb(p.data[-1]) + padsize = padlen + 1 + + if p.len < padsize: warning("Invalid padding length") return False - if (p.fragment[-padlen:] != p.fragment[-1] * padlen): - warning("Padding content is invalid %s", repr(p.fragment[-padlen:])) + if p.data[-padsize:] != chb(padlen) * padsize: + warning("Padding content is invalid %s", repr(p.data[-padsize:])) return False - p.fragment = p.fragment[:-padlen] - p.len -= padlen + p.data = p.data[:-padsize] + p.len -= padsize return True @@ -146,48 +149,48 @@ def _tls_encrypt(alg, p): c = TLSCiphertext() c.type = p.type c.version = p.version - c.fragment = alg.encrypt(p.fragment) - c.len = len(c.fragment) + c.data = alg.encrypt(p.data) + c.len = len(c.data) return c def _tls_decrypt(alg, c): """ Provided with a TLSCiphertext instance c, and a stream or block cipher alg, - the function decrypts c.fragment and returns a newly created TLSPlaintext. + the function decrypts c.data and returns a newly created TLSPlaintext. """ p = TLSPlaintext() p.type = c.type p.version = c.version - p.fragment = alg.decrypt(c.fragment) - p.len = len(p.fragment) + p.data = alg.decrypt(c.data) + p.len = len(p.data) return p def _tls_aead_auth_encrypt(alg, p, write_seq_num): """ Provided with a TLSCompressed instance p, the function applies AEAD - cipher alg to p.fragment and builds a new TLSCiphertext instance. Unlike + cipher alg to p.data and builds a new TLSCiphertext instance. Unlike for block and stream ciphers, for which the authentication step is done separately, AEAD alg does it simultaneously: this is the reason why write_seq_num is passed to the function, to be incorporated in authenticated data. Note that it is the caller's responsibility to increment write_seq_num afterwards. """ - P = str(p) + P = bytes(p) write_seq_num = struct.pack("!Q", write_seq_num) A = write_seq_num + P[:5] - c = TLCCiphertext() + c = TLSCiphertext() c.type = p.type c.version = p.version - c.fragment = alg.auth_encrypt(P, A) - c.len = len(c.fragment) + c.data = alg.auth_encrypt(P, A, write_seq_num) + c.len = len(c.data) return c def _tls_aead_auth_decrypt(alg, c, read_seq_num): """ Provided with a TLSCiphertext instance c, the function applies AEAD - cipher alg auth_decrypt function to c.fragment (and additional data) - in order to authenticate the data and decrypt c.fragment. When those + cipher alg auth_decrypt function to c.data (and additional data) + in order to authenticate the data and decrypt c.data. When those steps succeed, the result is a newly created TLSCompressed instance. On error, None is returned. Note that it is the caller's responsibility to increment read_seq_num afterwards. @@ -195,17 +198,18 @@ def _tls_aead_auth_decrypt(alg, c, read_seq_num): # 'Deduce' TLSCompressed length from TLSCiphertext length # There is actually no guaranty of this equality, but this is defined as # such in TLS 1.2 specifications, and it works for GCM and CCM at least. - l = p.len - alg.nonce_explicit_len - alg.tag_len + # + plen = c.len - getattr(alg, "nonce_explicit_len", 0) - alg.tag_len read_seq_num = struct.pack("!Q", read_seq_num) - A = read_seq_num + struct.pack('!BHH', p.type, p.version, l) + A = read_seq_num + struct.pack('!BHH', c.type, c.version, plen) p = TLSCompressed() p.type = c.type p.version = c.version - p.len = l - p.fragment = alg.auth_decrypt(A, c.fragment) + p.len = plen + p.data = alg.auth_decrypt(A, c.data, read_seq_num) - if p.fragment is None: # Verification failed. + if p.data is None: # Verification failed. return None return p diff --git a/test/tls.uts b/test/tls.uts index 4d538eec3..aa72af703 100644 --- a/test/tls.uts +++ b/test/tls.uts @@ -1126,6 +1126,96 @@ assert not s.consider_write_padding() = Test connState assert s.wcs.__repr__() == 'Connection end : SERVER\nCipher suite : TLS_NULL_WITH_NULL_NULL (0x0000)\nCompression : null (0x00)\n' += Test tls.tools +def test_tls_tools(): + from scapy.layers.tls.crypto.compression import Comp_Deflate + from scapy.layers.tls.crypto.ciphers import CipherError + from scapy.layers.tls.crypto.cipher_stream import Cipher_RC4_40 + from scapy.layers.tls.crypto.cipher_aead import (Cipher_AES_128_GCM, + Cipher_AES_128_GCM_TLS13) + from scapy.layers.tls.crypto.hash import Hash_SHA256 + from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp, pkcs_os2ip + from scapy.layers.tls.tools import TLSPlaintext, TLSCompressed, TLSCiphertext + from scapy.layers.tls.tools import _tls_compress, _tls_decompress + from scapy.layers.tls.tools import _tls_mac_add, _tls_mac_verify + from scapy.layers.tls.tools import _tls_add_pad, _tls_del_pad + from scapy.layers.tls.tools import _tls_encrypt, _tls_decrypt + from scapy.layers.tls.tools import _tls_aead_auth_encrypt, _tls_aead_auth_decrypt + plain = TLSPlaintext() + plain.type = 'application_data' + plain.version = 'TLS 1.2' + plain.data = b'X\xe1\xb1T\xaa\xb1\x0b\xa0zlg\xf8\xd14]%\xa9\x91d\x08\xc7t\xcd6\xd4"\x9f\xcf' + plain.len = len(plain.data) + # Compress/decompress test + alg = Comp_Deflate() + comp = _tls_compress(alg, plain) + assert isinstance(comp, TLSCompressed) + assert comp != plain + dcomp = _tls_decompress(alg, comp) + assert isinstance(dcomp, TLSPlaintext) + assert dcomp == plain + # Encrypt/decrypt test + ch = Cipher_RC4_40(_rc4_40_test.k) + encr = _tls_encrypt(ch, plain) + assert isinstance(encr, TLSCiphertext) + assert encr != plain + decr = _tls_decrypt(ch, encr) + assert isinstance(decr, TLSPlaintext) + assert decr == plain + encr = _tls_encrypt(ch, comp) + assert isinstance(encr, TLSCiphertext) + assert encr != comp + decr = _tls_decrypt(ch, encr) + assert isinstance(decr, TLSPlaintext) + assert (decr.version == comp.version and decr.type == comp.type + and decr.len == comp.len and decr.data == comp.data) + # MAC add/verify test + mac = Hash_SHA256() + save_encr = encr.copy() + assert save_encr is not encr + _tls_mac_add(mac, encr, 1) + assert isinstance(encr, TLSCiphertext) + had_mac = _tls_mac_verify(mac, encr, 1) + assert had_mac + assert encr == save_encr + # Pad add/delete test + save_comp = comp.copy() + assert save_comp is not comp + block_size = 8 + _tls_add_pad(comp, block_size) + assert isinstance(comp, TLSCompressed) + assert comp.len == save_comp.len + -save_comp.len % block_size + 1 + had_pad = _tls_del_pad(comp) + assert had_pad + assert comp == save_comp + block_size = save_comp.len // 2 + _tls_add_pad(comp, block_size) + assert isinstance(comp, TLSCompressed) + assert comp.len == save_comp.len + -save_comp.len % block_size + 1 + had_pad = _tls_del_pad(comp) + assert had_pad + assert comp == save_comp + # AEAD auth encrypt/decrypt test + ch_auth = Cipher_AES_128_GCM(key=_aes128gcm_test_1.k, + fixed_iv=_aes128gcm_test_1.n[:4], + nonce_explicit=pkcs_os2ip(_aes128gcm_test_1.n[4:])) + auth_encr = _tls_aead_auth_encrypt(ch_auth, comp, 1) + assert isinstance(auth_encr, TLSCiphertext) + assert auth_encr != comp + # auth_decr = _tls_aead_auth_decrypt(ch_auth, auth_encr, 1) + # assert isinstance(auth_decr, TLSCompressed) + # assert auth_decr == comp + ch_auth = Cipher_AES_128_GCM_TLS13(key=_aes128gcm_test_1.k, + fixed_iv=_aes128gcm_test_1.n) + auth_encr = _tls_aead_auth_encrypt(ch_auth, comp, 1) + assert isinstance(auth_encr, TLSCiphertext) + assert auth_encr != comp + # auth_decr = _tls_aead_auth_decrypt(ch_auth, auth_encr, 1) + # assert isinstance(auth_decr, TLSCompressed) + # assert auth_decr == comp + +test_tls_tools() + ############################################################################### ############################ Automaton behaviour ##############################