Merge pull request #1269 from karpierz/fix_tls_aead_auth_decrypt

fix SyntaxError in layers/tls/tools/_tls_aead_auth_decrypt(). [PR is ready for merge to the master]
This commit is contained in:
Pierre Lalet 2018-03-25 00:32:30 +01:00 committed by GitHub
commit eaf9db4245
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 140 additions and 46 deletions

View File

@ -7,6 +7,10 @@
TLS helpers, provided as out-of-context methods.
"""
from __future__ import absolute_import
import struct
from scapy.compat import orb, chb
from scapy.error import warning
from scapy.fields import (ByteEnumField, ShortEnumField,
FieldLenField, StrLenField)
@ -19,10 +23,9 @@ class TLSPlaintext(Packet):
name = "TLS Plaintext"
fields_desc = [ ByteEnumField("type", None, _tls_type),
ShortEnumField("version", None, _tls_version),
FieldLenField("len", None, length_of="fragment",
fmt="!H"),
StrLenField("fragment", "",
length_from = lambda pkt: pkt.length) ]
FieldLenField("len", None, length_of="data", fmt="!H"),
StrLenField("data", "",
length_from = lambda pkt: pkt.len) ]
class TLSCompressed(TLSPlaintext):
name = "TLS Compressed"
@ -39,8 +42,8 @@ def _tls_compress(alg, p):
c = TLSCompressed()
c.type = p.type
c.version = p.version
c.fragment = alg.compress(p.fragment)
c.len = len(c.fragment)
c.data = alg.compress(p.data)
c.len = len(c.data)
return c
def _tls_decompress(alg, c):
@ -51,21 +54,21 @@ def _tls_decompress(alg, c):
p = TLSPlaintext()
p.type = c.type
p.version = c.version
p.fragment = alg.decompress(c.fragment)
p.len = len(p.fragment)
p.data = alg.decompress(c.data)
p.len = len(p.data)
return p
def _tls_mac_add(alg, c, write_seq_num):
"""
Compute the MAC using provided MAC alg instance over TLSCiphertext c using
current write sequence number write_seq_num. Computed MAC is then appended
to c.fragment and c.length is updated to reflect that change. It is the
to c.data and c.len is updated to reflect that change. It is the
caller responsability to increment the sequence number after the operation.
The function has no return value.
"""
write_seq_num = struct.pack("!Q", write_seq_num)
h = alg.digest(write_seq_num + str(c))
c.fragment += h
h = alg.digest(write_seq_num + bytes(c))
c.data += h
c.len += alg.hash_len
def _tls_mac_verify(alg, p, read_seq_num):
@ -77,7 +80,7 @@ def _tls_mac_verify(alg, p, read_seq_num):
If the MAC is valid:
- The function returns True
- The packet p is updated in the following way: trailing MAC value is
removed from p.fragment and length is updated accordingly.
removed from p.data and length is updated accordingly.
In case of error, False is returned, and p may have been modified.
@ -87,52 +90,52 @@ def _tls_mac_verify(alg, p, read_seq_num):
h_size = alg.hash_len
if p.len < h_size:
return False
received_h = p.fragment[-h_size:]
received_h = p.data[-h_size:]
p.len -= h_size
p.fragment = p.fragment[:-h_size]
p.data = p.data[:-h_size]
read_seq_num = struct.pack("!Q", read_seq_num)
h = alg.digest(read_seq_num + str(p))
h = alg.digest(read_seq_num + bytes(p))
return h == received_h
def _tls_add_pad(p, block_size):
"""
Provided with cipher block size parameter and current TLSCompressed packet
p (after MAC addition), the function adds required, deterministic padding
to p.fragment before encryption step, as it is defined for TLS (i.e. not
to p.data before encryption step, as it is defined for TLS (i.e. not
SSL and its allowed random padding). The function has no return value.
"""
padlen = block_size - ((p.len + 1) % block_size)
if padlen == block_size:
padlen = 0
padding = chr(padlen) * (padlen + 1)
padlen = -p.len % block_size
padding = chb(padlen) * (padlen + 1)
p.len += len(padding)
p.fragment += padding
p.data += padding
def _tls_del_pad(p):
"""
Provided with a just decrypted TLSCiphertext (now a TLSPlaintext instance)
p, the function removes the trailing padding found in p.fragment. It also
p, the function removes the trailing padding found in p.data. It also
performs some sanity checks on the padding (length, content, ...). False
is returned if one of the check fails. Otherwise, True is returned,
indicating that p.fragment and p.len have been updated.
indicating that p.data and p.len have been updated.
"""
if p.len < 1:
warning("Message format is invalid (padding)")
return False
padlen = ord(p.fragment[-1]) + 1
if (p.len < padlen):
padlen = orb(p.data[-1])
padsize = padlen + 1
if p.len < padsize:
warning("Invalid padding length")
return False
if (p.fragment[-padlen:] != p.fragment[-1] * padlen):
warning("Padding content is invalid %s", repr(p.fragment[-padlen:]))
if p.data[-padsize:] != chb(padlen) * padsize:
warning("Padding content is invalid %s", repr(p.data[-padsize:]))
return False
p.fragment = p.fragment[:-padlen]
p.len -= padlen
p.data = p.data[:-padsize]
p.len -= padsize
return True
@ -146,48 +149,48 @@ def _tls_encrypt(alg, p):
c = TLSCiphertext()
c.type = p.type
c.version = p.version
c.fragment = alg.encrypt(p.fragment)
c.len = len(c.fragment)
c.data = alg.encrypt(p.data)
c.len = len(c.data)
return c
def _tls_decrypt(alg, c):
"""
Provided with a TLSCiphertext instance c, and a stream or block cipher alg,
the function decrypts c.fragment and returns a newly created TLSPlaintext.
the function decrypts c.data and returns a newly created TLSPlaintext.
"""
p = TLSPlaintext()
p.type = c.type
p.version = c.version
p.fragment = alg.decrypt(c.fragment)
p.len = len(p.fragment)
p.data = alg.decrypt(c.data)
p.len = len(p.data)
return p
def _tls_aead_auth_encrypt(alg, p, write_seq_num):
"""
Provided with a TLSCompressed instance p, the function applies AEAD
cipher alg to p.fragment and builds a new TLSCiphertext instance. Unlike
cipher alg to p.data and builds a new TLSCiphertext instance. Unlike
for block and stream ciphers, for which the authentication step is done
separately, AEAD alg does it simultaneously: this is the reason why
write_seq_num is passed to the function, to be incorporated in
authenticated data. Note that it is the caller's responsibility to increment
write_seq_num afterwards.
"""
P = str(p)
P = bytes(p)
write_seq_num = struct.pack("!Q", write_seq_num)
A = write_seq_num + P[:5]
c = TLCCiphertext()
c = TLSCiphertext()
c.type = p.type
c.version = p.version
c.fragment = alg.auth_encrypt(P, A)
c.len = len(c.fragment)
c.data = alg.auth_encrypt(P, A, write_seq_num)
c.len = len(c.data)
return c
def _tls_aead_auth_decrypt(alg, c, read_seq_num):
"""
Provided with a TLSCiphertext instance c, the function applies AEAD
cipher alg auth_decrypt function to c.fragment (and additional data)
in order to authenticate the data and decrypt c.fragment. When those
cipher alg auth_decrypt function to c.data (and additional data)
in order to authenticate the data and decrypt c.data. When those
steps succeed, the result is a newly created TLSCompressed instance.
On error, None is returned. Note that it is the caller's responsibility to
increment read_seq_num afterwards.
@ -195,17 +198,18 @@ def _tls_aead_auth_decrypt(alg, c, read_seq_num):
# 'Deduce' TLSCompressed length from TLSCiphertext length
# There is actually no guaranty of this equality, but this is defined as
# such in TLS 1.2 specifications, and it works for GCM and CCM at least.
l = p.len - alg.nonce_explicit_len - alg.tag_len
#
plen = c.len - getattr(alg, "nonce_explicit_len", 0) - alg.tag_len
read_seq_num = struct.pack("!Q", read_seq_num)
A = read_seq_num + struct.pack('!BHH', p.type, p.version, l)
A = read_seq_num + struct.pack('!BHH', c.type, c.version, plen)
p = TLSCompressed()
p.type = c.type
p.version = c.version
p.len = l
p.fragment = alg.auth_decrypt(A, c.fragment)
p.len = plen
p.data = alg.auth_decrypt(A, c.data, read_seq_num)
if p.fragment is None: # Verification failed.
if p.data is None: # Verification failed.
return None
return p

View File

@ -1126,6 +1126,96 @@ assert not s.consider_write_padding()
= Test connState
assert s.wcs.__repr__() == 'Connection end : SERVER\nCipher suite : TLS_NULL_WITH_NULL_NULL (0x0000)\nCompression : null (0x00)\n'
= Test tls.tools
def test_tls_tools():
from scapy.layers.tls.crypto.compression import Comp_Deflate
from scapy.layers.tls.crypto.ciphers import CipherError
from scapy.layers.tls.crypto.cipher_stream import Cipher_RC4_40
from scapy.layers.tls.crypto.cipher_aead import (Cipher_AES_128_GCM,
Cipher_AES_128_GCM_TLS13)
from scapy.layers.tls.crypto.hash import Hash_SHA256
from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp, pkcs_os2ip
from scapy.layers.tls.tools import TLSPlaintext, TLSCompressed, TLSCiphertext
from scapy.layers.tls.tools import _tls_compress, _tls_decompress
from scapy.layers.tls.tools import _tls_mac_add, _tls_mac_verify
from scapy.layers.tls.tools import _tls_add_pad, _tls_del_pad
from scapy.layers.tls.tools import _tls_encrypt, _tls_decrypt
from scapy.layers.tls.tools import _tls_aead_auth_encrypt, _tls_aead_auth_decrypt
plain = TLSPlaintext()
plain.type = 'application_data'
plain.version = 'TLS 1.2'
plain.data = b'X\xe1\xb1T\xaa\xb1\x0b\xa0zlg\xf8\xd14]%\xa9\x91d\x08\xc7t\xcd6\xd4"\x9f\xcf'
plain.len = len(plain.data)
# Compress/decompress test
alg = Comp_Deflate()
comp = _tls_compress(alg, plain)
assert isinstance(comp, TLSCompressed)
assert comp != plain
dcomp = _tls_decompress(alg, comp)
assert isinstance(dcomp, TLSPlaintext)
assert dcomp == plain
# Encrypt/decrypt test
ch = Cipher_RC4_40(_rc4_40_test.k)
encr = _tls_encrypt(ch, plain)
assert isinstance(encr, TLSCiphertext)
assert encr != plain
decr = _tls_decrypt(ch, encr)
assert isinstance(decr, TLSPlaintext)
assert decr == plain
encr = _tls_encrypt(ch, comp)
assert isinstance(encr, TLSCiphertext)
assert encr != comp
decr = _tls_decrypt(ch, encr)
assert isinstance(decr, TLSPlaintext)
assert (decr.version == comp.version and decr.type == comp.type
and decr.len == comp.len and decr.data == comp.data)
# MAC add/verify test
mac = Hash_SHA256()
save_encr = encr.copy()
assert save_encr is not encr
_tls_mac_add(mac, encr, 1)
assert isinstance(encr, TLSCiphertext)
had_mac = _tls_mac_verify(mac, encr, 1)
assert had_mac
assert encr == save_encr
# Pad add/delete test
save_comp = comp.copy()
assert save_comp is not comp
block_size = 8
_tls_add_pad(comp, block_size)
assert isinstance(comp, TLSCompressed)
assert comp.len == save_comp.len + -save_comp.len % block_size + 1
had_pad = _tls_del_pad(comp)
assert had_pad
assert comp == save_comp
block_size = save_comp.len // 2
_tls_add_pad(comp, block_size)
assert isinstance(comp, TLSCompressed)
assert comp.len == save_comp.len + -save_comp.len % block_size + 1
had_pad = _tls_del_pad(comp)
assert had_pad
assert comp == save_comp
# AEAD auth encrypt/decrypt test
ch_auth = Cipher_AES_128_GCM(key=_aes128gcm_test_1.k,
fixed_iv=_aes128gcm_test_1.n[:4],
nonce_explicit=pkcs_os2ip(_aes128gcm_test_1.n[4:]))
auth_encr = _tls_aead_auth_encrypt(ch_auth, comp, 1)
assert isinstance(auth_encr, TLSCiphertext)
assert auth_encr != comp
# auth_decr = _tls_aead_auth_decrypt(ch_auth, auth_encr, 1)
# assert isinstance(auth_decr, TLSCompressed)
# assert auth_decr == comp
ch_auth = Cipher_AES_128_GCM_TLS13(key=_aes128gcm_test_1.k,
fixed_iv=_aes128gcm_test_1.n)
auth_encr = _tls_aead_auth_encrypt(ch_auth, comp, 1)
assert isinstance(auth_encr, TLSCiphertext)
assert auth_encr != comp
# auth_decr = _tls_aead_auth_decrypt(ch_auth, auth_encr, 1)
# assert isinstance(auth_decr, TLSCompressed)
# assert auth_decr == comp
test_tls_tools()
###############################################################################
############################ Automaton behaviour ##############################