view atulweb.py @ 28:602baadb535a default tip

Added a requesthandler infrastructure to atulweb.
author Atul Varma <varmaa@toolness.com>
date Wed, 11 Mar 2009 23:19:18 -0500
parents 1850638c1da5
children
line wrap: on
line source

import StringIO
import httplib
import cgi
import traceback
import sys
import re
import os
import types
import mimetypes

try:
    import json
except ImportError:
    import simplejson as json

DEFAULT_LISTEN_PORT = 8000

OPEN_STR = "{"
CLOSE_STR = "}"

def _parse_request_path_pattern(pattern, current=None):
    """
    >>> _parse_request_path_pattern('/blarg/{foob}/narb')
    [('s', '/blarg/'), ('n', 'foob'), ('s', '/narb')]

    >>> _parse_request_path_pattern('{foob}/gump')
    [('n', 'foob'), ('s', '/gump')]

    >>> _parse_request_path_pattern('{foob}')
    [('n', 'foob')]

    >>> _parse_request_path_pattern('gump')
    [('s', 'gump')]

    >>> _parse_request_path_pattern('{}')
    Traceback (most recent call last):
    ...
    ValueError: empty noun

    >>> _parse_request_path_pattern('/goop/{regr')
    Traceback (most recent call last):
    ...
    ValueError: unclosed noun
    """

    if current is None:
        current = []

    open_index = pattern.find(OPEN_STR)
    if open_index != -1:
        string_part = pattern[:open_index]
        rest = pattern[open_index+len(OPEN_STR):]
        close_index = rest.find(CLOSE_STR)
        if close_index == -1:
            raise ValueError("unclosed noun")
        noun_part = rest[:close_index]
        rest = rest[close_index+len(CLOSE_STR):]
        if string_part:
            current.append(('s', string_part))
        if noun_part:
            current.append(('n', noun_part))
            _parse_request_path_pattern(rest, current)
        else:
            raise ValueError("empty noun")
    elif pattern:
        current.append(('s', pattern))
    return current

def make_request_map(requests):
    """
    >>> requests = {'blarg': 'GET /blar/{goop}',
    ...             'fnarg': 'POST /blarg/goopy'}
    >>> req_map = make_request_map(requests)

    >>> req_map['GET']['blarg']
    [('s', '/blar/'), ('n', 'goop')]

    >>> req_map['POST']['fnarg']
    [('s', '/blarg/goopy')]
    """

    request_map = {}
    for key in requests:
        method, pattern = requests[key].split(' ')
        if method not in request_map:
            request_map[method] = {}
        request_map[method][key] = _parse_request_path_pattern(pattern)
    return request_map

def _match_request_path(path_pattern, path, nouns):
    """
    >>> import re
    >>> pattern = [('s', '/bla/'), ('n', 'doc')]
    >>> nouns = {'doc': re.compile(r'[0-9]+')}
    >>> _match_request_path(pattern, '/bla/4532', nouns)
    {'doc': '4532'}

    >>> _match_request_path(pattern, '/bla/garg/4532/', nouns)

    >>> _match_request_path(pattern, '/bla/garg/4532', nouns)

    >>> _match_request_path(pattern, '/4532', {'doc': r'.*'})

    >>> _match_request_path([('s', '/status')], '/status', {})
    {}

    >>> _match_request_path([('n', 'doc')], '345', nouns)
    {'doc': '345'}

    >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345/go', nouns)
    {'doc': '345'}

    >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345', nouns)
    """

    matches = {}
    curr_noun = None
    path_pattern = list(path_pattern)
    path_pattern.append(('end', None))
    for seg_type, seg_value in path_pattern:
        noun_to_check = None
        if seg_type == 'n':
            curr_noun = seg_value
        elif seg_type == 's':
            index = path.find(seg_value)
            if (index == -1):
                return None
            elif index == 0:
                if curr_noun:
                    return None
                else:
                    path = path[len(seg_value):]
            else:
                if curr_noun:
                    noun_to_check = path[:index]
                    path = path[index:]
                else:
                    return None
        elif seg_type == 'end':
            if curr_noun is not None:
                noun_to_check = path
        else:
            raise ValueError('unknown segment type in path pattern')
        if noun_to_check is not None:
            if nouns[curr_noun].match(noun_to_check):
                matches[curr_noun] = noun_to_check
                curr_noun = None
            else:
                return None
    return matches

def map_request(environ, request_map, nouns, mapping_obj):
    method = environ['REQUEST_METHOD']
    if method in request_map:
        for name in request_map[method]:
            match = _match_request_path(request_map[method][name],
                                       environ['PATH_INFO'],
                                       nouns)
            if match is not None:
                return getattr(mapping_obj, name)(**match)
        raise HttpError('not found')
    else:
        raise HttpError('method not allowed')

def make_wsgiapp(request_handler):
    requests = {}
    for name in dir(request_handler):
        method = getattr(request_handler, name)
        if type(method) == types.MethodType:
            requests[name] = method.__doc__.strip()
    request_map = make_request_map(requests)

    @wsgiapp
    def application(environ, start_response):
        request_handler.environ = environ
        return map_request(environ,
                           request_map,
                           request_handler.NOUNS,
                           request_handler)

    return application

class HttpError(Exception):
    def __init__(self, status, body=None, headers=None):
        if isinstance(status, basestring):
            if not status[0].isdigit():
                status = status_codes[status.lower()]

        if isinstance(status, int):
            status = '%d %s' % (status, responses[status])
        else:
            raise ValueError("bad status '%s'" % status)

        if not body:
            body = status
        if not headers:
            headers = [('Content-Type','text/plain')]
        self.status = status
        self.body = body
        self.headers = headers

class Response(object):
    def __init__(self, body, mimetype='text/plain', headers=None):
        if isinstance(body, basestring):
            self.body = body
        elif mimetype == 'application/json':
            self.body = json.dumps(body)
        else:
            raise ValueError('Cannot process body: %s' % body)
        self.status = '200 OK'
        self.headers = [('Content-Type', mimetype),
                        ('Content-Length', str(len(self.body)))]
        if headers:
            self.headers += headers

class Request(object):
    def __init__(self, environ):
        self._environ = environ

    def get_input(self, mimetype='text/plain'):
        length = int(self._environ['CONTENT_LENGTH'])
        content = self._environ['wsgi.input'].read(length)
        if mimetype == 'application/json':
            content = json.loads(content)
        return content

def parse_qs(querystring):
    querydict = {}
    cgi_querydict = cgi.parse_qs(querystring)
    for key, value in cgi_querydict.items():
        querydict[key] = cgi_querydict[key][0]
    return querydict

def wsgiapp(application):
    def wrapped_application(environ, start_response, *args, **kwargs):
        def make_response(response):
            start_response(response.status, response.headers)
            return [response.body]
        environ['atulweb.request'] = Request(environ)
        try:
            retval = application(environ, start_response, *args, **kwargs)
        except HttpError, e:
            return make_response(e)
        except Exception, e:
            tb = traceback.format_exc()
            sys.stderr.write(tb)
            return make_response(HttpError('internal server error', tb))
        if isinstance(retval, Response):
            return make_response(retval)
        return retval
    for attr in ['__doc__', '__name__', '__module__']:
        setattr(wrapped_application, attr, getattr(application, attr))
    return wrapped_application

def main(application):
    if len(sys.argv) == 2 and sys.argv[1] == 'test':
        import doctest
        module = __import__('__main__')
        doctest.testmod(module, verbose=True)
    else:
        from wsgiref.simple_server import make_server
        port = DEFAULT_LISTEN_PORT

        @wsgiapp
        def parent_application(environ, start_response):
            match = re.match(r'\/static-files(\/.*)',
                             environ['PATH_INFO'])
            if match:
                environ['PATH_INFO'] = match.group(1)
                return static_file_wsgi_app(environ, start_response,
                                            base_dir='static-files')
            else:
                return application(environ, start_response)

        httpd = make_server('127.0.0.1', port, parent_application)
        print 'Listening on localhost port %d.' % port
        print "(Run '%s test' to execute test suite.)" % sys.argv[0]
        # Respond to requests until process is killed
        httpd.serve_forever()

@wsgiapp
def static_file_wsgi_app(environ, start_response, base_dir):
    path = environ['PATH_INFO'][1:]
    filename = os.path.join(base_dir, path)
    if os.path.exists(filename):
        if os.path.isdir(filename):
            return Response('\n'.join(os.listdir(filename)),
                            mimetype = 'text/plain')
        else:
            return Response(open(filename, 'r').read(),
                            mimetype = mimetypes.guess_type(filename)[0])
    else:
        raise HttpError('not found')



def test_request(application, requestline='', input='', **kwargs):
    environ = {}
    words = requestline.split()
    if words:
        environ['REQUEST_METHOD'] = words[0]
        if len(words) > 1:
            environ['PATH_INFO'] = words[1]
    environ['wsgi.input'] = StringIO.StringIO(input)
    environ['CONTENT_LENGTH'] = str(len(input))
    environ.update(kwargs)
    def mock_start_response(status, headers):
        print "HTTP/1.1 %s" % status
        for name, value in headers:
            print "%s: %s" % (name, value)
    result = application(environ, mock_start_response)
    print '.'
    print ''.join(result)

# Mapping status codes to official W3C names. This is included in the httplib
# module in Python 2.5, but we want this module to work in 2.4.
responses = {
    100: 'Continue',
    101: 'Switching Protocols',

    200: 'OK',
    201: 'Created',
    202: 'Accepted',
    203: 'Non-Authoritative Information',
    204: 'No Content',
    205: 'Reset Content',
    206: 'Partial Content',

    300: 'Multiple Choices',
    301: 'Moved Permanently',
    302: 'Found',
    303: 'See Other',
    304: 'Not Modified',
    305: 'Use Proxy',
    306: '(Unused)',
    307: 'Temporary Redirect',

    400: 'Bad Request',
    401: 'Unauthorized',
    402: 'Payment Required',
    403: 'Forbidden',
    404: 'Not Found',
    405: 'Method Not Allowed',
    406: 'Not Acceptable',
    407: 'Proxy Authentication Required',
    408: 'Request Timeout',
    409: 'Conflict',
    410: 'Gone',
    411: 'Length Required',
    412: 'Precondition Failed',
    413: 'Request Entity Too Large',
    414: 'Request-URI Too Long',
    415: 'Unsupported Media Type',
    416: 'Requested Range Not Satisfiable',
    417: 'Expectation Failed',

    500: 'Internal Server Error',
    501: 'Not Implemented',
    502: 'Bad Gateway',
    503: 'Service Unavailable',
    504: 'Gateway Timeout',
    505: 'HTTP Version Not Supported',
}

status_codes = {}

def __init():
    for key, value in responses.items():
        status_codes[value.lower()] = key

__init()

if __name__ == '__main__':
    import doctest
    doctest.testmod(verbose=True)