Implement the hybi-10 version of the WebSocket protocol.

This commit is contained in:
Florian Diebold 2011-07-13 21:12:48 +02:00
parent 48e60770c5
commit d227f3bade
1 changed files with 188 additions and 3 deletions

View File

@ -17,6 +17,7 @@ import hashlib
import logging
import struct
import time
import base64
import tornado.escape
import tornado.web
@ -73,11 +74,20 @@ class WebSocketHandler(tornado.web.RequestHandler):
def _execute(self, transforms, *args, **kwargs):
self.open_args = args
self.open_kwargs = kwargs
if ("Sec-WebSocket-Version" in self.request.headers and
self.request.headers["Sec-WebSocket-Version"] == "8"):
logging.error("WebSocket protocol 8 request!")
if self.request.headers.get("Sec-WebSocket-Version") == "8":
self.ws_connection = WebSocketProtocol8(self)
self.ws_connection.accept_connection()
elif self.request.headers.get("Sec-WebSocket-Version"):
self.stream.write(tornado.escape.utf8(
"HTTP/1.1 426 Upgrade Required\r\n"
"Sec-WebSocket-Version: 8\r\n\r\n"))
self.stream.close()
else:
self.ws_connection = WebSocketProtocol76(self)
self.ws_connection.accept_connection()
def write_message(self, message):
"""Sends the given message to the client of this Web Socket."""
@ -175,6 +185,8 @@ class WebSocketProtocol76(WebSocketProtocol):
WebSocketProtocol.__init__(self, handler)
self.challenge = None
self._waiting = None
def accept_connection(self):
try:
self._handle_websocket_headers()
except ValueError:
@ -306,3 +318,176 @@ class WebSocketProtocol76(WebSocketProtocol):
self.stream.write("\xff\x00")
self._waiting = tornado.ioloop.IOLoop.instance().add_timeout(
time.time() + 5, self._abort)
class WebSocketProtocol8(WebSocketProtocol):
"""Implementation of the WebSocket protocol, version 8 (draft version 10).
Compare
http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10
"""
def __init__(self, handler):
WebSocketProtocol.__init__(self, handler)
self._final_frame = False
self._frame_opcode = None
self._frame_mask = None
self._frame_length = None
self._fragmented_message_buffer = None
self._fragmented_message_opcode = None
self._started_closing_handshake = False
def accept_connection(self):
try:
self._handle_websocket_headers()
self._accept_connection()
except ValueError:
logging.debug("Malformed WebSocket request received")
self._abort()
return
def _handle_websocket_headers(self):
"""Verifies all invariant- and required headers
If a header is missing or have an incorrect value ValueError will be
raised
"""
headers = self.request.headers
fields = ("Host", "Sec-Websocket-Key", "Sec-Websocket-Version")
connection = map(lambda s: s.strip().lower(), headers.get("Connection", '').split(","))
if (self.request.method != "GET" or
headers.get("Upgrade", '').lower() != "websocket" or
"upgrade" not in connection or
not all(map(lambda f: self.request.headers.get(f), fields))):
raise ValueError("Missing/Invalid WebSocket headers")
def _challenge_response(self):
sha1 = hashlib.sha1()
sha1.update(self.request.headers.get("Sec-Websocket-Key"))
sha1.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11") # Magic value
return base64.b64encode(sha1.digest())
def _accept_connection(self):
self.stream.write(tornado.escape.utf8(
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n\r\n" % self._challenge_response()))
self.async_callback(self.handler.open)(*self.handler.open_args, **self.handler.open_kwargs)
self._receive_frame()
def _write_frame(self, fin, opcode, data):
if fin:
finbit = 0b10000000
else:
finbit = 0
frame = struct.pack("B", finbit | opcode)
l = len(data)
if l <= 126:
frame += struct.pack("B", l)
elif l <= 0xFFFF:
frame += struct.pack("!BH", 126, l)
else:
frame += struct.pack("!BQ", 127, l)
frame += data
self.stream.write(frame)
def write_message(self, message, binary=False):
"""Sends the given message to the client of this Web Socket."""
if isinstance(message, dict):
message = tornado.escape.json_encode(message)
if isinstance(message, unicode):
message = message.encode("utf-8")
assert isinstance(message, bytes_type)
if not binary:
opcode = 0x1
else:
opcode = 0x2
self._write_frame(True, opcode, message)
def _receive_frame(self):
self.stream.read_bytes(2, self._on_frame_start)
def _on_frame_start(self, data):
header, payloadlen = struct.unpack("BB", data)
self._final_frame = header & 0b10000000
self._frame_opcode = header & 0b1111
if not (payloadlen & 0b10000000):
# Unmasked frame -> abort connection
self._abort()
payloadlen = payloadlen & 0b1111111
if payloadlen < 126:
self._frame_length = payloadlen
self.stream.read_bytes(4, self._on_masking_key)
elif payloadlen == 126:
self.stream.read_bytes(2, self._on_frame_length_16)
elif payloadlen == 127:
self.stream.read_bytes(8, self._on_frame_length_64)
def _on_frame_length_16(self, data):
self._frame_length = struct.unpack("!H", data)[0];
self.stream.read_bytes(4, self._on_masking_key);
def _on_frame_length_64(self, data):
self._frame_length = struct.unpack("!Q", data)[0];
self.stream.read_bytes(4, self._on_masking_key);
def _on_masking_key(self, data):
self._frame_mask = bytearray(data)
self.stream.read_bytes(self._frame_length, self._on_frame_data)
def _on_frame_data(self, data):
unmasked = bytearray(data)
for i in xrange(len(data)):
unmasked[i] = unmasked[i] ^ self._frame_mask[i % 4]
if not self._final_frame:
if self._fragmented_message_buffer:
self._fragmented_message_buffer += unmasked
else:
self._fragmented_message_opcode = self._frame_opcode
self._fragmented_message_buffer = unmasked
else:
if self._frame_opcode == 0:
unmasked = self._fragmented_message_buffer + unmasked
opcode = self._fragmented_message_opcode
self._fragmented_message_buffer = None
else:
opcode = self._frame_opcode
self._handle_message(opcode, bytes_type(unmasked))
if not self.client_terminated:
self._receive_frame()
def _handle_message(self, opcode, data):
if self.client_terminated: return
if opcode == 0x1:
# UTF-8 data
self.async_callback(self.handler.on_message)(data.decode("utf-8", "replace"))
elif opcode == 0x2:
# Binary data
self.async_callback(self.handler.on_message)(data)
elif opcode == 0x8:
# Close
self.client_terminated = True
if not self._started_closing_handshake:
self._write_frame(True, 0x8, b(""))
self.stream.close()
elif opcode == 0x9:
# Ping
self._write_frame(True, 0xA, b(""))
elif opcode == 0xA:
# Pong
pass
else:
self._abort()
def close(self):
"""Closes the WebSocket connection."""
self._write_frame(True, 0x8, b(""))
self._started_closing_handshake = True
self._waiting = tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 5, self._abort)