diff atulweb.py @ 0:1c8dbbdce596

Origination.
author Atul Varma <varmaa@toolness.com>
date Tue, 07 Apr 2009 16:07:54 -0700
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/atulweb.py	Tue Apr 07 16:07:54 2009 -0700
@@ -0,0 +1,375 @@
+import StringIO
+import httplib
+import cgi
+import traceback
+import sys
+import re
+import os
+import types
+import mimetypes
+
+try:
+    import json
+except ImportError:
+    import simplejson as json
+
+DEFAULT_LISTEN_PORT = 8000
+
+OPEN_STR = "{"
+CLOSE_STR = "}"
+
+def _parse_request_path_pattern(pattern, current=None):
+    """
+    >>> _parse_request_path_pattern('/blarg/{foob}/narb')
+    [('s', '/blarg/'), ('n', 'foob'), ('s', '/narb')]
+
+    >>> _parse_request_path_pattern('{foob}/gump')
+    [('n', 'foob'), ('s', '/gump')]
+
+    >>> _parse_request_path_pattern('{foob}')
+    [('n', 'foob')]
+
+    >>> _parse_request_path_pattern('gump')
+    [('s', 'gump')]
+
+    >>> _parse_request_path_pattern('{}')
+    Traceback (most recent call last):
+    ...
+    ValueError: empty noun
+
+    >>> _parse_request_path_pattern('/goop/{regr')
+    Traceback (most recent call last):
+    ...
+    ValueError: unclosed noun
+    """
+
+    if current is None:
+        current = []
+
+    open_index = pattern.find(OPEN_STR)
+    if open_index != -1:
+        string_part = pattern[:open_index]
+        rest = pattern[open_index+len(OPEN_STR):]
+        close_index = rest.find(CLOSE_STR)
+        if close_index == -1:
+            raise ValueError("unclosed noun")
+        noun_part = rest[:close_index]
+        rest = rest[close_index+len(CLOSE_STR):]
+        if string_part:
+            current.append(('s', string_part))
+        if noun_part:
+            current.append(('n', noun_part))
+            _parse_request_path_pattern(rest, current)
+        else:
+            raise ValueError("empty noun")
+    elif pattern:
+        current.append(('s', pattern))
+    return current
+
+def make_request_map(requests):
+    """
+    >>> requests = {'blarg': 'GET /blar/{goop}',
+    ...             'fnarg': 'POST /blarg/goopy'}
+    >>> req_map = make_request_map(requests)
+
+    >>> req_map['GET']['blarg']
+    [('s', '/blar/'), ('n', 'goop')]
+
+    >>> req_map['POST']['fnarg']
+    [('s', '/blarg/goopy')]
+    """
+
+    request_map = {}
+    for key in requests:
+        method, pattern = requests[key].split(' ')
+        if method not in request_map:
+            request_map[method] = {}
+        request_map[method][key] = _parse_request_path_pattern(pattern)
+    return request_map
+
+def _match_request_path(path_pattern, path, nouns):
+    """
+    >>> import re
+    >>> pattern = [('s', '/bla/'), ('n', 'doc')]
+    >>> nouns = {'doc': re.compile(r'[0-9]+')}
+    >>> _match_request_path(pattern, '/bla/4532', nouns)
+    {'doc': '4532'}
+
+    >>> _match_request_path(pattern, '/bla/garg/4532/', nouns)
+
+    >>> _match_request_path(pattern, '/bla/garg/4532', nouns)
+
+    >>> _match_request_path(pattern, '/4532', {'doc': r'.*'})
+
+    >>> _match_request_path([('s', '/status')], '/status', {})
+    {}
+
+    >>> _match_request_path([('n', 'doc')], '345', nouns)
+    {'doc': '345'}
+
+    >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345/go', nouns)
+    {'doc': '345'}
+
+    >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345', nouns)
+    """
+
+    matches = {}
+    curr_noun = None
+    path_pattern = list(path_pattern)
+    path_pattern.append(('end', None))
+    for seg_type, seg_value in path_pattern:
+        noun_to_check = None
+        if seg_type == 'n':
+            curr_noun = seg_value
+        elif seg_type == 's':
+            index = path.find(seg_value)
+            if (index == -1):
+                return None
+            elif index == 0:
+                if curr_noun:
+                    return None
+                else:
+                    path = path[len(seg_value):]
+            else:
+                if curr_noun:
+                    noun_to_check = path[:index]
+                    path = path[index:]
+                else:
+                    return None
+        elif seg_type == 'end':
+            if curr_noun is not None:
+                noun_to_check = path
+        else:
+            raise ValueError('unknown segment type in path pattern')
+        if noun_to_check is not None:
+            if nouns[curr_noun].match(noun_to_check):
+                matches[curr_noun] = noun_to_check
+                curr_noun = None
+            else:
+                return None
+    return matches
+
+def map_request(environ, request_map, nouns, mapping_obj):
+    method = environ['REQUEST_METHOD']
+    if method in request_map:
+        for name in request_map[method]:
+            match = _match_request_path(request_map[method][name],
+                                       environ['PATH_INFO'],
+                                       nouns)
+            if match is not None:
+                return getattr(mapping_obj, name)(**match)
+        raise HttpError('not found')
+    else:
+        raise HttpError('method not allowed')
+
+def make_wsgiapp(request_handler):
+    requests = {}
+    for name in dir(request_handler):
+        method = getattr(request_handler, name)
+        if type(method) == types.MethodType:
+            requests[name] = method.__doc__.strip()
+    request_map = make_request_map(requests)
+
+    @wsgiapp
+    def application(environ, start_response):
+        request_handler.environ = environ
+        return map_request(environ,
+                           request_map,
+                           request_handler.NOUNS,
+                           request_handler)
+
+    return application
+
+class HttpError(Exception):
+    def __init__(self, status, body=None, headers=None):
+        if isinstance(status, basestring):
+            if not status[0].isdigit():
+                status = status_codes[status.lower()]
+
+        if isinstance(status, int):
+            status = '%d %s' % (status, responses[status])
+        else:
+            raise ValueError("bad status '%s'" % status)
+
+        if not body:
+            body = status
+        if not headers:
+            headers = [('Content-Type','text/plain')]
+        self.status = status
+        self.body = body
+        self.headers = headers
+
+class Response(object):
+    def __init__(self, body, mimetype='text/plain', headers=None):
+        if isinstance(body, basestring):
+            self.body = body
+        elif mimetype == 'application/json':
+            self.body = json.dumps(body)
+        else:
+            raise ValueError('Cannot process body: %s' % body)
+        self.status = '200 OK'
+        self.headers = [('Content-Type', mimetype),
+                        ('Content-Length', str(len(self.body)))]
+        if headers:
+            self.headers += headers
+
+class Request(object):
+    def __init__(self, environ):
+        self._environ = environ
+
+    def get_input(self, mimetype='text/plain'):
+        length = int(self._environ['CONTENT_LENGTH'])
+        content = self._environ['wsgi.input'].read(length)
+        if mimetype == 'application/json':
+            content = json.loads(content)
+        return content
+
+def parse_qs(querystring):
+    querydict = {}
+    cgi_querydict = cgi.parse_qs(querystring)
+    for key, value in cgi_querydict.items():
+        querydict[key] = cgi_querydict[key][0]
+    return querydict
+
+def wsgiapp(application):
+    def wrapped_application(environ, start_response, *args, **kwargs):
+        def make_response(response):
+            start_response(response.status, response.headers)
+            return [response.body]
+        environ['atulweb.request'] = Request(environ)
+        try:
+            retval = application(environ, start_response, *args, **kwargs)
+        except HttpError, e:
+            return make_response(e)
+        except Exception, e:
+            tb = traceback.format_exc()
+            sys.stderr.write(tb)
+            return make_response(HttpError('internal server error', tb))
+        if isinstance(retval, Response):
+            return make_response(retval)
+        return retval
+    for attr in ['__doc__', '__name__', '__module__']:
+        setattr(wrapped_application, attr, getattr(application, attr))
+    return wrapped_application
+
+def main(application):
+    if len(sys.argv) == 2 and sys.argv[1] == 'test':
+        import doctest
+        module = __import__('__main__')
+        doctest.testmod(module, verbose=True)
+    else:
+        from wsgiref.simple_server import make_server
+        port = DEFAULT_LISTEN_PORT
+
+        @wsgiapp
+        def parent_application(environ, start_response):
+            match = re.match(r'\/static-files(\/.*)',
+                             environ['PATH_INFO'])
+            if match:
+                environ['PATH_INFO'] = match.group(1)
+                return static_file_wsgi_app(environ, start_response,
+                                            base_dir='static-files')
+            else:
+                return application(environ, start_response)
+
+        httpd = make_server('127.0.0.1', port, parent_application)
+        print 'Listening on localhost port %d.' % port
+        print "(Run '%s test' to execute test suite.)" % sys.argv[0]
+        # Respond to requests until process is killed
+        httpd.serve_forever()
+
+@wsgiapp
+def static_file_wsgi_app(environ, start_response, base_dir):
+    path = environ['PATH_INFO'][1:]
+    filename = os.path.join(base_dir, path)
+    if os.path.exists(filename):
+        if os.path.isdir(filename):
+            return Response('\n'.join(os.listdir(filename)),
+                            mimetype = 'text/plain')
+        else:
+            return Response(open(filename, 'r').read(),
+                            mimetype = mimetypes.guess_type(filename)[0])
+    else:
+        raise HttpError('not found')
+
+
+
+def test_request(application, requestline='', input='', **kwargs):
+    environ = {}
+    words = requestline.split()
+    if words:
+        environ['REQUEST_METHOD'] = words[0]
+        if len(words) > 1:
+            environ['PATH_INFO'] = words[1]
+    environ['wsgi.input'] = StringIO.StringIO(input)
+    environ['CONTENT_LENGTH'] = str(len(input))
+    environ.update(kwargs)
+    def mock_start_response(status, headers):
+        print "HTTP/1.1 %s" % status
+        for name, value in headers:
+            print "%s: %s" % (name, value)
+    result = application(environ, mock_start_response)
+    print '.'
+    print ''.join(result)
+
+# Mapping status codes to official W3C names. This is included in the httplib
+# module in Python 2.5, but we want this module to work in 2.4.
+responses = {
+    100: 'Continue',
+    101: 'Switching Protocols',
+
+    200: 'OK',
+    201: 'Created',
+    202: 'Accepted',
+    203: 'Non-Authoritative Information',
+    204: 'No Content',
+    205: 'Reset Content',
+    206: 'Partial Content',
+
+    300: 'Multiple Choices',
+    301: 'Moved Permanently',
+    302: 'Found',
+    303: 'See Other',
+    304: 'Not Modified',
+    305: 'Use Proxy',
+    306: '(Unused)',
+    307: 'Temporary Redirect',
+
+    400: 'Bad Request',
+    401: 'Unauthorized',
+    402: 'Payment Required',
+    403: 'Forbidden',
+    404: 'Not Found',
+    405: 'Method Not Allowed',
+    406: 'Not Acceptable',
+    407: 'Proxy Authentication Required',
+    408: 'Request Timeout',
+    409: 'Conflict',
+    410: 'Gone',
+    411: 'Length Required',
+    412: 'Precondition Failed',
+    413: 'Request Entity Too Large',
+    414: 'Request-URI Too Long',
+    415: 'Unsupported Media Type',
+    416: 'Requested Range Not Satisfiable',
+    417: 'Expectation Failed',
+
+    500: 'Internal Server Error',
+    501: 'Not Implemented',
+    502: 'Bad Gateway',
+    503: 'Service Unavailable',
+    504: 'Gateway Timeout',
+    505: 'HTTP Version Not Supported',
+}
+
+status_codes = {}
+
+def __init():
+    for key, value in responses.items():
+        status_codes[value.lower()] = key
+
+__init()
+
+if __name__ == '__main__':
+    import doctest
+    doctest.testmod(verbose=True)