Mercurial > encryptobin
diff atulweb.py @ 0:1c8dbbdce596
Origination.
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Tue, 07 Apr 2009 16:07:54 -0700 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/atulweb.py Tue Apr 07 16:07:54 2009 -0700 @@ -0,0 +1,375 @@ +import StringIO +import httplib +import cgi +import traceback +import sys +import re +import os +import types +import mimetypes + +try: + import json +except ImportError: + import simplejson as json + +DEFAULT_LISTEN_PORT = 8000 + +OPEN_STR = "{" +CLOSE_STR = "}" + +def _parse_request_path_pattern(pattern, current=None): + """ + >>> _parse_request_path_pattern('/blarg/{foob}/narb') + [('s', '/blarg/'), ('n', 'foob'), ('s', '/narb')] + + >>> _parse_request_path_pattern('{foob}/gump') + [('n', 'foob'), ('s', '/gump')] + + >>> _parse_request_path_pattern('{foob}') + [('n', 'foob')] + + >>> _parse_request_path_pattern('gump') + [('s', 'gump')] + + >>> _parse_request_path_pattern('{}') + Traceback (most recent call last): + ... + ValueError: empty noun + + >>> _parse_request_path_pattern('/goop/{regr') + Traceback (most recent call last): + ... + ValueError: unclosed noun + """ + + if current is None: + current = [] + + open_index = pattern.find(OPEN_STR) + if open_index != -1: + string_part = pattern[:open_index] + rest = pattern[open_index+len(OPEN_STR):] + close_index = rest.find(CLOSE_STR) + if close_index == -1: + raise ValueError("unclosed noun") + noun_part = rest[:close_index] + rest = rest[close_index+len(CLOSE_STR):] + if string_part: + current.append(('s', string_part)) + if noun_part: + current.append(('n', noun_part)) + _parse_request_path_pattern(rest, current) + else: + raise ValueError("empty noun") + elif pattern: + current.append(('s', pattern)) + return current + +def make_request_map(requests): + """ + >>> requests = {'blarg': 'GET /blar/{goop}', + ... 'fnarg': 'POST /blarg/goopy'} + >>> req_map = make_request_map(requests) + + >>> req_map['GET']['blarg'] + [('s', '/blar/'), ('n', 'goop')] + + >>> req_map['POST']['fnarg'] + [('s', '/blarg/goopy')] + """ + + request_map = {} + for key in requests: + method, pattern = requests[key].split(' ') + if method not in request_map: + request_map[method] = {} + request_map[method][key] = _parse_request_path_pattern(pattern) + return request_map + +def _match_request_path(path_pattern, path, nouns): + """ + >>> import re + >>> pattern = [('s', '/bla/'), ('n', 'doc')] + >>> nouns = {'doc': re.compile(r'[0-9]+')} + >>> _match_request_path(pattern, '/bla/4532', nouns) + {'doc': '4532'} + + >>> _match_request_path(pattern, '/bla/garg/4532/', nouns) + + >>> _match_request_path(pattern, '/bla/garg/4532', nouns) + + >>> _match_request_path(pattern, '/4532', {'doc': r'.*'}) + + >>> _match_request_path([('s', '/status')], '/status', {}) + {} + + >>> _match_request_path([('n', 'doc')], '345', nouns) + {'doc': '345'} + + >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345/go', nouns) + {'doc': '345'} + + >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345', nouns) + """ + + matches = {} + curr_noun = None + path_pattern = list(path_pattern) + path_pattern.append(('end', None)) + for seg_type, seg_value in path_pattern: + noun_to_check = None + if seg_type == 'n': + curr_noun = seg_value + elif seg_type == 's': + index = path.find(seg_value) + if (index == -1): + return None + elif index == 0: + if curr_noun: + return None + else: + path = path[len(seg_value):] + else: + if curr_noun: + noun_to_check = path[:index] + path = path[index:] + else: + return None + elif seg_type == 'end': + if curr_noun is not None: + noun_to_check = path + else: + raise ValueError('unknown segment type in path pattern') + if noun_to_check is not None: + if nouns[curr_noun].match(noun_to_check): + matches[curr_noun] = noun_to_check + curr_noun = None + else: + return None + return matches + +def map_request(environ, request_map, nouns, mapping_obj): + method = environ['REQUEST_METHOD'] + if method in request_map: + for name in request_map[method]: + match = _match_request_path(request_map[method][name], + environ['PATH_INFO'], + nouns) + if match is not None: + return getattr(mapping_obj, name)(**match) + raise HttpError('not found') + else: + raise HttpError('method not allowed') + +def make_wsgiapp(request_handler): + requests = {} + for name in dir(request_handler): + method = getattr(request_handler, name) + if type(method) == types.MethodType: + requests[name] = method.__doc__.strip() + request_map = make_request_map(requests) + + @wsgiapp + def application(environ, start_response): + request_handler.environ = environ + return map_request(environ, + request_map, + request_handler.NOUNS, + request_handler) + + return application + +class HttpError(Exception): + def __init__(self, status, body=None, headers=None): + if isinstance(status, basestring): + if not status[0].isdigit(): + status = status_codes[status.lower()] + + if isinstance(status, int): + status = '%d %s' % (status, responses[status]) + else: + raise ValueError("bad status '%s'" % status) + + if not body: + body = status + if not headers: + headers = [('Content-Type','text/plain')] + self.status = status + self.body = body + self.headers = headers + +class Response(object): + def __init__(self, body, mimetype='text/plain', headers=None): + if isinstance(body, basestring): + self.body = body + elif mimetype == 'application/json': + self.body = json.dumps(body) + else: + raise ValueError('Cannot process body: %s' % body) + self.status = '200 OK' + self.headers = [('Content-Type', mimetype), + ('Content-Length', str(len(self.body)))] + if headers: + self.headers += headers + +class Request(object): + def __init__(self, environ): + self._environ = environ + + def get_input(self, mimetype='text/plain'): + length = int(self._environ['CONTENT_LENGTH']) + content = self._environ['wsgi.input'].read(length) + if mimetype == 'application/json': + content = json.loads(content) + return content + +def parse_qs(querystring): + querydict = {} + cgi_querydict = cgi.parse_qs(querystring) + for key, value in cgi_querydict.items(): + querydict[key] = cgi_querydict[key][0] + return querydict + +def wsgiapp(application): + def wrapped_application(environ, start_response, *args, **kwargs): + def make_response(response): + start_response(response.status, response.headers) + return [response.body] + environ['atulweb.request'] = Request(environ) + try: + retval = application(environ, start_response, *args, **kwargs) + except HttpError, e: + return make_response(e) + except Exception, e: + tb = traceback.format_exc() + sys.stderr.write(tb) + return make_response(HttpError('internal server error', tb)) + if isinstance(retval, Response): + return make_response(retval) + return retval + for attr in ['__doc__', '__name__', '__module__']: + setattr(wrapped_application, attr, getattr(application, attr)) + return wrapped_application + +def main(application): + if len(sys.argv) == 2 and sys.argv[1] == 'test': + import doctest + module = __import__('__main__') + doctest.testmod(module, verbose=True) + else: + from wsgiref.simple_server import make_server + port = DEFAULT_LISTEN_PORT + + @wsgiapp + def parent_application(environ, start_response): + match = re.match(r'\/static-files(\/.*)', + environ['PATH_INFO']) + if match: + environ['PATH_INFO'] = match.group(1) + return static_file_wsgi_app(environ, start_response, + base_dir='static-files') + else: + return application(environ, start_response) + + httpd = make_server('127.0.0.1', port, parent_application) + print 'Listening on localhost port %d.' % port + print "(Run '%s test' to execute test suite.)" % sys.argv[0] + # Respond to requests until process is killed + httpd.serve_forever() + +@wsgiapp +def static_file_wsgi_app(environ, start_response, base_dir): + path = environ['PATH_INFO'][1:] + filename = os.path.join(base_dir, path) + if os.path.exists(filename): + if os.path.isdir(filename): + return Response('\n'.join(os.listdir(filename)), + mimetype = 'text/plain') + else: + return Response(open(filename, 'r').read(), + mimetype = mimetypes.guess_type(filename)[0]) + else: + raise HttpError('not found') + + + +def test_request(application, requestline='', input='', **kwargs): + environ = {} + words = requestline.split() + if words: + environ['REQUEST_METHOD'] = words[0] + if len(words) > 1: + environ['PATH_INFO'] = words[1] + environ['wsgi.input'] = StringIO.StringIO(input) + environ['CONTENT_LENGTH'] = str(len(input)) + environ.update(kwargs) + def mock_start_response(status, headers): + print "HTTP/1.1 %s" % status + for name, value in headers: + print "%s: %s" % (name, value) + result = application(environ, mock_start_response) + print '.' + print ''.join(result) + +# Mapping status codes to official W3C names. This is included in the httplib +# module in Python 2.5, but we want this module to work in 2.4. +responses = { + 100: 'Continue', + 101: 'Switching Protocols', + + 200: 'OK', + 201: 'Created', + 202: 'Accepted', + 203: 'Non-Authoritative Information', + 204: 'No Content', + 205: 'Reset Content', + 206: 'Partial Content', + + 300: 'Multiple Choices', + 301: 'Moved Permanently', + 302: 'Found', + 303: 'See Other', + 304: 'Not Modified', + 305: 'Use Proxy', + 306: '(Unused)', + 307: 'Temporary Redirect', + + 400: 'Bad Request', + 401: 'Unauthorized', + 402: 'Payment Required', + 403: 'Forbidden', + 404: 'Not Found', + 405: 'Method Not Allowed', + 406: 'Not Acceptable', + 407: 'Proxy Authentication Required', + 408: 'Request Timeout', + 409: 'Conflict', + 410: 'Gone', + 411: 'Length Required', + 412: 'Precondition Failed', + 413: 'Request Entity Too Large', + 414: 'Request-URI Too Long', + 415: 'Unsupported Media Type', + 416: 'Requested Range Not Satisfiable', + 417: 'Expectation Failed', + + 500: 'Internal Server Error', + 501: 'Not Implemented', + 502: 'Bad Gateway', + 503: 'Service Unavailable', + 504: 'Gateway Timeout', + 505: 'HTTP Version Not Supported', +} + +status_codes = {} + +def __init(): + for key, value in responses.items(): + status_codes[value.lower()] = key + +__init() + +if __name__ == '__main__': + import doctest + doctest.testmod(verbose=True)