Source code for socketio.transports

import gevent
import urllib
import urlparse
from geventwebsocket import WebSocketError
from gevent.queue import Empty


[docs]class BaseTransport(object): """Base class for all transports. Mostly wraps handler class functions.""" def __init__(self, handler, config, **kwargs): """Base transport class. :param config: dict Should contain the config keys, like ``heartbeat_interval``, ``heartbeat_timeout`` and ``close_timeout``. """ self.content_type = ("Content-Type", "text/plain; charset=UTF-8") self.headers = [ ("Access-Control-Allow-Origin", "*"), ("Access-Control-Allow-Credentials", "true"), ("Access-Control-Allow-Methods", "POST, GET, OPTIONS"), ("Access-Control-Max-Age", 3600), ] self.handler = handler self.config = config
[docs] def write(self, data=""): # Gevent v 0.13 if hasattr(self.handler, 'response_headers_list'): if 'Content-Length' not in self.handler.response_headers_list: self.handler.response_headers.append(('Content-Length', len(data))) self.handler.response_headers_list.append('Content-Length') elif not hasattr(self.handler, 'provided_content_length') or self.handler.provided_content_length is None: # Gevent 1.0bX l = len(data) self.handler.provided_content_length = l self.handler.response_headers.append(('Content-Length', l)) self.handler.write_smart(data)
[docs] def start_response(self, status, headers, **kwargs): if "Content-Type" not in [x[0] for x in headers]: headers.append(self.content_type) headers.extend(self.headers) self.handler.start_response(status, headers, **kwargs)
[docs]class XHRPollingTransport(BaseTransport): def __init__(self, *args, **kwargs): super(XHRPollingTransport, self).__init__(*args, **kwargs)
[docs] def options(self): self.start_response("200 OK", ()) self.write() return []
[docs] def get(self, socket): socket.heartbeat() heartbeat_interval = self.config['heartbeat_interval'] payload = self.get_messages_payload(socket, timeout=heartbeat_interval) if not payload: payload = "8::" # NOOP self.start_response("200 OK", []) self.write(payload)
def _request_body(self): return self.handler.wsgi_input.readline()
[docs] def post(self, socket): for message in self.decode_payload(self._request_body()): socket.put_server_msg(message) self.start_response("200 OK", [ ("Connection", "close"), ("Content-Type", "text/plain") ]) self.write("1")
[docs] def get_messages_payload(self, socket, timeout=None): """This will fetch the messages from the Socket's queue, and if there are many messes, pack multiple messages in one payload and return """ try: msgs = socket.get_multiple_client_msgs(timeout=timeout) data = self.encode_payload(msgs) except Empty: data = "" return data
[docs] def encode_payload(self, messages): """Encode list of messages. Expects messages to be unicode. ``messages`` - List of raw messages to encode, if necessary """ if not messages or messages[0] is None: return '' if len(messages) == 1: return messages[0].encode('utf-8') payload = u''.join([(u'\ufffd%d\ufffd%s' % (len(p), p)) for p in messages if p is not None]) # FIXME: why is it so that we must filter None from here ? How # is it even possible that a None gets in there ? return payload.encode('utf-8')
[docs] def decode_payload(self, payload): """This function can extract multiple messages from one HTTP payload. Some times, the XHR/JSONP/.. transports can pack more than one message on a single packet. They are encoding following the WebSocket semantics, which need to be reproduced here to unwrap the messages. The semantics are: \ufffd + [length as a string] + \ufffd + [payload as a unicode string] This function returns a list of messages, even though there is only one. Inspired by socket.io/lib/transports/http.js """ payload = payload.decode('utf-8') if payload[0] == u"\ufffd": ret = [] while len(payload) != 0: len_end = payload.find(u"\ufffd", 1) length = int(payload[1:len_end]) msg_start = len_end + 1 msg_end = length + msg_start message = payload[msg_start:msg_end] ret.append(message) payload = payload[msg_end:] return ret return [payload]
[docs] def do_exchange(self, socket, request_method): if not socket.connection_established: # Runs only the first time we get a Socket opening self.start_response("200 OK", [ ("Connection", "close"), ]) self.write("1::") # 'connect' packet return elif request_method in ("GET", "POST", "OPTIONS"): return getattr(self, request_method.lower())(socket) else: raise Exception("No support for the method: " + request_method)
[docs]class JSONPolling(XHRPollingTransport): def __init__(self, handler, config): super(JSONPolling, self).__init__(handler, config) self.content_type = ("Content-Type", "text/javascript; charset=UTF-8") def _request_body(self): data = super(JSONPolling, self)._request_body() # resolve %20%3F's, take out wrapping d="...", etc.. data = urllib.unquote_plus(data)[3:-1] \ .replace(r'\"', '"') \ .replace(r"\\", "\\") # For some reason, in case of multiple messages passed in one # query, IE7 sends it escaped, not utf-8 encoded. This dirty # hack handled it if data[0] == "\\": data = data.decode("unicode_escape").encode("utf-8") return data
[docs] def write(self, data): """Just quote out stuff before sending it out""" args = urlparse.parse_qs(self.handler.environ.get("QUERY_STRING")) if "i" in args: i = args["i"] else: i = "0" # TODO: don't we need to quote this data in here ? super(JSONPolling, self).write("io.j[%s]('%s');" % (i, data))
[docs]class XHRMultipartTransport(XHRPollingTransport): def __init__(self, handler): super(JSONPolling, self).__init__(handler) self.content_type = ( "Content-Type", "multipart/x-mixed-replace;boundary=\"socketio\"" )
[docs] def do_exchange(self, socket, request_method): if request_method == "GET": return self.get(socket) elif request_method == "POST": return self.post(socket) else: raise Exception("No support for such method: " + request_method)
[docs] def get(self, socket): header = "Content-Type: text/plain; charset=UTF-8\r\n\r\n" self.start_response("200 OK", [("Connection", "keep-alive")]) self.write_multipart("--socketio\r\n") self.write_multipart(header) self.write_multipart(str(socket.sessid) + "\r\n") self.write_multipart("--socketio\r\n") def chunk(): while True: payload = self.get_messages_payload(socket) if not payload: # That would mean the call to Queue.get() returned Empty, # so it was in fact killed, since we pass no timeout=.. return # See below else: try: self.write_multipart(header) self.write_multipart(payload) self.write_multipart("--socketio\r\n") except socket.error: # The client might try to reconnect, even with a socket # error, so let's just let it go, and not kill the # socket completely. Other processes will ensure # we kill everything if the user expires the timeouts. # # WARN: this means that this payload is LOST, unless we # decide to re-inject it into the queue. return socket.spawn(chunk)
[docs]class WebsocketTransport(BaseTransport):
[docs] def do_exchange(self, socket, request_method): websocket = self.handler.environ['wsgi.websocket'] websocket.send("1::") # 'connect' packet def send_into_ws(): while True: message = socket.get_client_msg() if message is None: break try: websocket.send(message) except (WebSocketError, TypeError): # We can't send a message on the socket # it is dead, let the other sockets know socket.disconnect() def read_from_ws(): while True: message = websocket.receive() if message is None: break else: if message is not None: socket.put_server_msg(message) socket.spawn(send_into_ws) socket.spawn(read_from_ws)
[docs]class FlashSocketTransport(WebsocketTransport): pass
[docs]class HTMLFileTransport(XHRPollingTransport): """Not tested at all!""" def __init__(self, handler, config): super(HTMLFileTransport, self).__init__(handler, config) self.content_type = ("Content-Type", "text/html")
[docs] def write_packed(self, data): self.write("<script>_('%s');</script>" % data)
[docs] def write(self, data): l = 1024 * 5 super(HTMLFileTransport, self).write("%d\r\n%s%s\r\n" % (l, data, " " * (l - len(data))))
[docs] def do_exchange(self, socket, request_method): return super(HTMLFileTransport, self).do_exchange(socket, request_method)
[docs] def get(self, socket): self.start_response("200 OK", [ ("Connection", "keep-alive"), ("Content-Type", "text/html"), ("Transfer-Encoding", "chunked"), ]) self.write("<html><body><script>var _ = function (msg) { parent.s._(msg, document); };</script>") self.write_packed("1::") # 'connect' packet def chunk(): while True: payload = self.get_messages_payload(socket) if not payload: # That would mean the call to Queue.get() returned Empty, # so it was in fact killed, since we pass no timeout=.. return else: try: self.write_packed(payload) except socket.error: # See comments for XHRMultipart return socket.spawn(chunk)