Mercurial > bzapi
view bzapi_server.py @ 35:45d84e588d14
massive refactorings to bzapi_server.py
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Thu, 24 Dec 2009 17:53:19 -0800 |
parents | b4fab248d1eb |
children | 352f4cc55d12 |
line wrap: on
line source
import os import sys import math import re import httplib import cStringIO import mimetools import mimetypes import weakref import cgi import logging from cosocket import * import channels try: import json except ImportError: import simplejson as json KEEP_ALIVE_MAX_REQUESTS = 99 KEEP_ALIVE_TIMEOUT = int(DEFAULT_TIMEOUT) KEEP_ALIVE_ENABLED = True BLOCK_SIZE = 8192 def until_http_response_sent(msg = '', mimetype = 'text/plain', length = None, code = 200, additional_headers = None): headers = {'Content-Type': mimetype} if KEEP_ALIVE_ENABLED: headers.update({'Keep-Alive': 'timeout=%d, max=%d' % (KEEP_ALIVE_TIMEOUT, KEEP_ALIVE_MAX_REQUESTS), 'Connection': 'Keep-Alive'}) if additional_headers: headers.update(additional_headers) if length is None: length = len(msg) headers['Content-Length'] = str(length) header_lines = ['HTTP/1.1 %d %s' % (code, httplib.responses[code])] header_lines.extend(['%s: %s' % (key, value) for key, value in headers.items()]) header_lines.extend(['', msg]) content = '\r\n'.join(header_lines) yield until_sent(content) def until_http_file_sent(filename, block_size = BLOCK_SIZE): ext = '.' + filename.split('.')[-1] if ext in mimetypes.types_map: mimetype = mimetypes.types_map[ext] else: mimetype = 'text/plain' length = os.stat(filename).st_size num_blocks = length / block_size if length % block_size: num_blocks += 1 infile = open(filename, 'r') yield until_http_response_sent(mimetype = mimetype, length = length) for i in range(num_blocks): # TODO: This could be bad since we're reading the file # synchronously. block = infile.read(block_size) yield until_sent(block) class AsyncWebServer(object): def __init__(self, addr, app): self._num_connections = 0 self._app = app AsyncChatCoroutine(self._server_coroutine(addr)) def _server_coroutine(self, bind_addr): yield until_listening(bind_addr) while 1: conn, addr = yield until_connection_accepted() AsyncChatCoroutine(self._connection_coroutine(addr), conn) def _connection_coroutine(self, addr): self._num_connections += 1 try: if KEEP_ALIVE_ENABLED: for i in range(KEEP_ALIVE_MAX_REQUESTS): yield self._until_one_request_processed(addr) else: yield self._until_one_request_processed(addr) finally: logging.info('Closing connection to %s' % repr(addr)) self._num_connections -= 1 def _until_one_request_processed(self, addr): request = yield until_received(terminator = '\r\n\r\n') request = request.splitlines() request_line = request[0] logging.info("Request from %s: %s" % (addr, request_line)) stringfile = cStringIO.StringIO('\n'.join(request[1:])) headers = mimetools.Message(stringfile) req_parts = request_line.split() yield self._app.until_request_processed(method = req_parts[0], path = req_parts[1], headers = headers, addr = addr) class BugzillaApiApp(object): QUERYSTRING_TEMPLATE = re.compile('([^\?]*)\?(.*)') REDIRECT_TEMPLATE = re.compile('\/([A-Za-z0-9_]+)$') URL_TEMPLATE = re.compile('\/([A-Za-z0-9_]+)/(.*)') MAX_MESSAGE_SIZE = 8192 ROBOTS_TXT = "User-agent: *\r\nDisallow: /" def __init__(self, media_dir, index_filename): self._media_dir = media_dir self._index_filename = index_filename def until_request_processed(self, method, path, headers, addr): match = self.URL_TEMPLATE.match(path) if path == '/': yield until_http_file_sent(self._index_filename) elif not match: match = self.REDIRECT_TEMPLATE.match(path) if match: newpath = path + '/' yield until_http_response_sent( newpath, code = 301, additional_headers = {'Location': newpath} ) elif path == '/robots.txt': yield until_http_response_sent(self.ROBOTS_TXT) else: yield until_http_response_sent('not found', code = 404) else: conv_name = match.group(1) page = match.group(2) if conv_name == 'status': # TODO: Return 404 if page is non-empty. lines = ('open connections : %d' % self._num_connections, 'open timers : %d' % len(time_map)) yield until_http_response_sent('\r\n'.join(lines)) elif conv_name == 'media': path = os.path.join(self._media_dir, *page.split('/')) path = os.path.normpath(path) if (path.startswith(self._media_dir) and os.path.exists(path) and os.path.isfile(path)): yield until_http_file_sent(path) else: yield until_http_response_sent('not found', code = 404) else: yield self._until_conv_request_processed(addr, headers, method, conv_name, page) def _parse_qs(self, querystring): querydict = {} cgi_querydict = cgi.parse_qs(querystring) for key, value in cgi_querydict.items(): querydict[key] = cgi_querydict[key][0] return querydict def _until_conv_request_processed(self, addr, headers, method, conv_name, page): match = self.QUERYSTRING_TEMPLATE.match(page) querydict = {} if match: querydict.update(self._parse_qs(match.group(2))) page = match.group(1) if page == 'listen': logging.info("Waiting for message on channel '%s' for %s" % (conv_name, addr)) msg = yield channels.until_message_received(conv_name) yield until_http_response_sent( json.dumps(msg), mimetype = 'application/json' ) elif page == 'send': length = int(headers.getheader('Content-Length', 0)) if length == 0 or length > self.MAX_MESSAGE_SIZE: yield until_http_response_sent('message too large', code = 413) else: msg = yield until_received(bytes = length) json_msg = json.loads(msg) yield channels.until_message_sent(conv_name, json_msg) yield until_http_response_sent('sent.') else: yield until_http_response_sent('not found', code = 404) if __name__ == '__main__': args = dict(ip = '127.0.0.1', port = 8071, logfile = '', loglevel = 'info', media_dir = os.path.abspath('media')) args['index_filename'] = os.path.join(args['media_dir'], 'html', 'index.html') args['loglevel'] = getattr(logging, args['loglevel'].upper()) if args['logfile']: logging.basicConfig(filename = args['logfile'], level = args['loglevel']) else: logging.basicConfig(stream = sys.stdout, level = args['loglevel']) app = BugzillaApiApp(media_dir = args['media_dir'], index_filename = args['index_filename']) server = AsyncWebServer(addr = (args['ip'], args['port']), app = app) logging.info("Starting server with configuration: %s" % args) loop()