Mercurial > wiki
diff atulweb.py @ 28:602baadb535a default tip
Added a requesthandler infrastructure to atulweb.
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Wed, 11 Mar 2009 23:19:18 -0500 |
parents | 1850638c1da5 |
children |
line wrap: on
line diff
--- a/atulweb.py Thu Feb 12 23:00:03 2009 -0800 +++ b/atulweb.py Wed Mar 11 23:19:18 2009 -0500 @@ -5,6 +5,7 @@ import sys import re import os +import types import mimetypes try: @@ -14,6 +15,171 @@ DEFAULT_LISTEN_PORT = 8000 +OPEN_STR = "{" +CLOSE_STR = "}" + +def _parse_request_path_pattern(pattern, current=None): + """ + >>> _parse_request_path_pattern('/blarg/{foob}/narb') + [('s', '/blarg/'), ('n', 'foob'), ('s', '/narb')] + + >>> _parse_request_path_pattern('{foob}/gump') + [('n', 'foob'), ('s', '/gump')] + + >>> _parse_request_path_pattern('{foob}') + [('n', 'foob')] + + >>> _parse_request_path_pattern('gump') + [('s', 'gump')] + + >>> _parse_request_path_pattern('{}') + Traceback (most recent call last): + ... + ValueError: empty noun + + >>> _parse_request_path_pattern('/goop/{regr') + Traceback (most recent call last): + ... + ValueError: unclosed noun + """ + + if current is None: + current = [] + + open_index = pattern.find(OPEN_STR) + if open_index != -1: + string_part = pattern[:open_index] + rest = pattern[open_index+len(OPEN_STR):] + close_index = rest.find(CLOSE_STR) + if close_index == -1: + raise ValueError("unclosed noun") + noun_part = rest[:close_index] + rest = rest[close_index+len(CLOSE_STR):] + if string_part: + current.append(('s', string_part)) + if noun_part: + current.append(('n', noun_part)) + _parse_request_path_pattern(rest, current) + else: + raise ValueError("empty noun") + elif pattern: + current.append(('s', pattern)) + return current + +def make_request_map(requests): + """ + >>> requests = {'blarg': 'GET /blar/{goop}', + ... 'fnarg': 'POST /blarg/goopy'} + >>> req_map = make_request_map(requests) + + >>> req_map['GET']['blarg'] + [('s', '/blar/'), ('n', 'goop')] + + >>> req_map['POST']['fnarg'] + [('s', '/blarg/goopy')] + """ + + request_map = {} + for key in requests: + method, pattern = requests[key].split(' ') + if method not in request_map: + request_map[method] = {} + request_map[method][key] = _parse_request_path_pattern(pattern) + return request_map + +def _match_request_path(path_pattern, path, nouns): + """ + >>> import re + >>> pattern = [('s', '/bla/'), ('n', 'doc')] + >>> nouns = {'doc': re.compile(r'[0-9]+')} + >>> _match_request_path(pattern, '/bla/4532', nouns) + {'doc': '4532'} + + >>> _match_request_path(pattern, '/bla/garg/4532/', nouns) + + >>> _match_request_path(pattern, '/bla/garg/4532', nouns) + + >>> _match_request_path(pattern, '/4532', {'doc': r'.*'}) + + >>> _match_request_path([('s', '/status')], '/status', {}) + {} + + >>> _match_request_path([('n', 'doc')], '345', nouns) + {'doc': '345'} + + >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345/go', nouns) + {'doc': '345'} + + >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345', nouns) + """ + + matches = {} + curr_noun = None + path_pattern = list(path_pattern) + path_pattern.append(('end', None)) + for seg_type, seg_value in path_pattern: + noun_to_check = None + if seg_type == 'n': + curr_noun = seg_value + elif seg_type == 's': + index = path.find(seg_value) + if (index == -1): + return None + elif index == 0: + if curr_noun: + return None + else: + path = path[len(seg_value):] + else: + if curr_noun: + noun_to_check = path[:index] + path = path[index:] + else: + return None + elif seg_type == 'end': + if curr_noun is not None: + noun_to_check = path + else: + raise ValueError('unknown segment type in path pattern') + if noun_to_check is not None: + if nouns[curr_noun].match(noun_to_check): + matches[curr_noun] = noun_to_check + curr_noun = None + else: + return None + return matches + +def map_request(environ, request_map, nouns, mapping_obj): + method = environ['REQUEST_METHOD'] + if method in request_map: + for name in request_map[method]: + match = _match_request_path(request_map[method][name], + environ['PATH_INFO'], + nouns) + if match is not None: + return getattr(mapping_obj, name)(**match) + raise HttpError('not found') + else: + raise HttpError('method not allowed') + +def make_wsgiapp(request_handler): + requests = {} + for name in dir(request_handler): + method = getattr(request_handler, name) + if type(method) == types.MethodType: + requests[name] = method.__doc__.strip() + request_map = make_request_map(requests) + + @wsgiapp + def application(environ, start_response): + request_handler.environ = environ + return map_request(environ, + request_map, + request_handler.NOUNS, + request_handler) + + return application + class HttpError(Exception): def __init__(self, status, body=None, headers=None): if isinstance(status, basestring): @@ -126,6 +292,8 @@ else: raise HttpError('not found') + + def test_request(application, requestline='', input='', **kwargs): environ = {} words = requestline.split() @@ -201,3 +369,7 @@ status_codes[value.lower()] = key __init() + +if __name__ == '__main__': + import doctest + doctest.testmod(verbose=True)