Mercurial > wiki
diff atulweb.py @ 21:1850638c1da5
Added simple python WSGI server.
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Thu, 12 Feb 2009 19:21:11 -0800 |
parents | |
children | 602baadb535a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/atulweb.py Thu Feb 12 19:21:11 2009 -0800 @@ -0,0 +1,203 @@ +import StringIO +import httplib +import cgi +import traceback +import sys +import re +import os +import mimetypes + +try: + import json +except ImportError: + import simplejson as json + +DEFAULT_LISTEN_PORT = 8000 + +class HttpError(Exception): + def __init__(self, status, body=None, headers=None): + if isinstance(status, basestring): + if not status[0].isdigit(): + status = status_codes[status.lower()] + + if isinstance(status, int): + status = '%d %s' % (status, responses[status]) + else: + raise ValueError("bad status '%s'" % status) + + if not body: + body = status + if not headers: + headers = [('Content-Type','text/plain')] + self.status = status + self.body = body + self.headers = headers + +class Response(object): + def __init__(self, body, mimetype='text/plain', headers=None): + if isinstance(body, basestring): + self.body = body + elif mimetype == 'application/json': + self.body = json.dumps(body) + else: + raise ValueError('Cannot process body: %s' % body) + self.status = '200 OK' + self.headers = [('Content-Type', mimetype), + ('Content-Length', str(len(self.body)))] + if headers: + self.headers += headers + +class Request(object): + def __init__(self, environ): + self._environ = environ + + def get_input(self, mimetype='text/plain'): + length = int(self._environ['CONTENT_LENGTH']) + content = self._environ['wsgi.input'].read(length) + if mimetype == 'application/json': + content = json.loads(content) + return content + +def parse_qs(querystring): + querydict = {} + cgi_querydict = cgi.parse_qs(querystring) + for key, value in cgi_querydict.items(): + querydict[key] = cgi_querydict[key][0] + return querydict + +def wsgiapp(application): + def wrapped_application(environ, start_response, *args, **kwargs): + def make_response(response): + start_response(response.status, response.headers) + return [response.body] + environ['atulweb.request'] = Request(environ) + try: + retval = application(environ, start_response, *args, **kwargs) + except HttpError, e: + return make_response(e) + except Exception, e: + tb = traceback.format_exc() + sys.stderr.write(tb) + return make_response(HttpError('internal server error', tb)) + if isinstance(retval, Response): + return make_response(retval) + return retval + for attr in ['__doc__', '__name__', '__module__']: + setattr(wrapped_application, attr, getattr(application, attr)) + return wrapped_application + +def main(application): + if len(sys.argv) == 2 and sys.argv[1] == 'test': + import doctest + module = __import__('__main__') + doctest.testmod(module, verbose=True) + else: + from wsgiref.simple_server import make_server + port = DEFAULT_LISTEN_PORT + + @wsgiapp + def parent_application(environ, start_response): + match = re.match(r'\/static-files(\/.*)', + environ['PATH_INFO']) + if match: + environ['PATH_INFO'] = match.group(1) + return static_file_wsgi_app(environ, start_response, + base_dir='static-files') + else: + return application(environ, start_response) + + httpd = make_server('127.0.0.1', port, parent_application) + print 'Listening on localhost port %d.' % port + print "(Run '%s test' to execute test suite.)" % sys.argv[0] + # Respond to requests until process is killed + httpd.serve_forever() + +@wsgiapp +def static_file_wsgi_app(environ, start_response, base_dir): + path = environ['PATH_INFO'][1:] + filename = os.path.join(base_dir, path) + if os.path.exists(filename): + if os.path.isdir(filename): + return Response('\n'.join(os.listdir(filename)), + mimetype = 'text/plain') + else: + return Response(open(filename, 'r').read(), + mimetype = mimetypes.guess_type(filename)[0]) + else: + raise HttpError('not found') + +def test_request(application, requestline='', input='', **kwargs): + environ = {} + words = requestline.split() + if words: + environ['REQUEST_METHOD'] = words[0] + if len(words) > 1: + environ['PATH_INFO'] = words[1] + environ['wsgi.input'] = StringIO.StringIO(input) + environ['CONTENT_LENGTH'] = str(len(input)) + environ.update(kwargs) + def mock_start_response(status, headers): + print "HTTP/1.1 %s" % status + for name, value in headers: + print "%s: %s" % (name, value) + result = application(environ, mock_start_response) + print '.' + print ''.join(result) + +# Mapping status codes to official W3C names. This is included in the httplib +# module in Python 2.5, but we want this module to work in 2.4. +responses = { + 100: 'Continue', + 101: 'Switching Protocols', + + 200: 'OK', + 201: 'Created', + 202: 'Accepted', + 203: 'Non-Authoritative Information', + 204: 'No Content', + 205: 'Reset Content', + 206: 'Partial Content', + + 300: 'Multiple Choices', + 301: 'Moved Permanently', + 302: 'Found', + 303: 'See Other', + 304: 'Not Modified', + 305: 'Use Proxy', + 306: '(Unused)', + 307: 'Temporary Redirect', + + 400: 'Bad Request', + 401: 'Unauthorized', + 402: 'Payment Required', + 403: 'Forbidden', + 404: 'Not Found', + 405: 'Method Not Allowed', + 406: 'Not Acceptable', + 407: 'Proxy Authentication Required', + 408: 'Request Timeout', + 409: 'Conflict', + 410: 'Gone', + 411: 'Length Required', + 412: 'Precondition Failed', + 413: 'Request Entity Too Large', + 414: 'Request-URI Too Long', + 415: 'Unsupported Media Type', + 416: 'Requested Range Not Satisfiable', + 417: 'Expectation Failed', + + 500: 'Internal Server Error', + 501: 'Not Implemented', + 502: 'Bad Gateway', + 503: 'Service Unavailable', + 504: 'Gateway Timeout', + 505: 'HTTP Version Not Supported', +} + +status_codes = {} + +def __init(): + for key, value in responses.items(): + status_codes[value.lower()] = key + +__init()