Mercurial > encryptobin
view atulweb.py @ 0:1c8dbbdce596
Origination.
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Tue, 07 Apr 2009 16:07:54 -0700 |
parents | |
children |
line wrap: on
line source
import StringIO import httplib import cgi import traceback import sys import re import os import types import mimetypes try: import json except ImportError: import simplejson as json DEFAULT_LISTEN_PORT = 8000 OPEN_STR = "{" CLOSE_STR = "}" def _parse_request_path_pattern(pattern, current=None): """ >>> _parse_request_path_pattern('/blarg/{foob}/narb') [('s', '/blarg/'), ('n', 'foob'), ('s', '/narb')] >>> _parse_request_path_pattern('{foob}/gump') [('n', 'foob'), ('s', '/gump')] >>> _parse_request_path_pattern('{foob}') [('n', 'foob')] >>> _parse_request_path_pattern('gump') [('s', 'gump')] >>> _parse_request_path_pattern('{}') Traceback (most recent call last): ... ValueError: empty noun >>> _parse_request_path_pattern('/goop/{regr') Traceback (most recent call last): ... ValueError: unclosed noun """ if current is None: current = [] open_index = pattern.find(OPEN_STR) if open_index != -1: string_part = pattern[:open_index] rest = pattern[open_index+len(OPEN_STR):] close_index = rest.find(CLOSE_STR) if close_index == -1: raise ValueError("unclosed noun") noun_part = rest[:close_index] rest = rest[close_index+len(CLOSE_STR):] if string_part: current.append(('s', string_part)) if noun_part: current.append(('n', noun_part)) _parse_request_path_pattern(rest, current) else: raise ValueError("empty noun") elif pattern: current.append(('s', pattern)) return current def make_request_map(requests): """ >>> requests = {'blarg': 'GET /blar/{goop}', ... 'fnarg': 'POST /blarg/goopy'} >>> req_map = make_request_map(requests) >>> req_map['GET']['blarg'] [('s', '/blar/'), ('n', 'goop')] >>> req_map['POST']['fnarg'] [('s', '/blarg/goopy')] """ request_map = {} for key in requests: method, pattern = requests[key].split(' ') if method not in request_map: request_map[method] = {} request_map[method][key] = _parse_request_path_pattern(pattern) return request_map def _match_request_path(path_pattern, path, nouns): """ >>> import re >>> pattern = [('s', '/bla/'), ('n', 'doc')] >>> nouns = {'doc': re.compile(r'[0-9]+')} >>> _match_request_path(pattern, '/bla/4532', nouns) {'doc': '4532'} >>> _match_request_path(pattern, '/bla/garg/4532/', nouns) >>> _match_request_path(pattern, '/bla/garg/4532', nouns) >>> _match_request_path(pattern, '/4532', {'doc': r'.*'}) >>> _match_request_path([('s', '/status')], '/status', {}) {} >>> _match_request_path([('n', 'doc')], '345', nouns) {'doc': '345'} >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345/go', nouns) {'doc': '345'} >>> _match_request_path([('n', 'doc'), ('s', '/go')], '345', nouns) """ matches = {} curr_noun = None path_pattern = list(path_pattern) path_pattern.append(('end', None)) for seg_type, seg_value in path_pattern: noun_to_check = None if seg_type == 'n': curr_noun = seg_value elif seg_type == 's': index = path.find(seg_value) if (index == -1): return None elif index == 0: if curr_noun: return None else: path = path[len(seg_value):] else: if curr_noun: noun_to_check = path[:index] path = path[index:] else: return None elif seg_type == 'end': if curr_noun is not None: noun_to_check = path else: raise ValueError('unknown segment type in path pattern') if noun_to_check is not None: if nouns[curr_noun].match(noun_to_check): matches[curr_noun] = noun_to_check curr_noun = None else: return None return matches def map_request(environ, request_map, nouns, mapping_obj): method = environ['REQUEST_METHOD'] if method in request_map: for name in request_map[method]: match = _match_request_path(request_map[method][name], environ['PATH_INFO'], nouns) if match is not None: return getattr(mapping_obj, name)(**match) raise HttpError('not found') else: raise HttpError('method not allowed') def make_wsgiapp(request_handler): requests = {} for name in dir(request_handler): method = getattr(request_handler, name) if type(method) == types.MethodType: requests[name] = method.__doc__.strip() request_map = make_request_map(requests) @wsgiapp def application(environ, start_response): request_handler.environ = environ return map_request(environ, request_map, request_handler.NOUNS, request_handler) return application class HttpError(Exception): def __init__(self, status, body=None, headers=None): if isinstance(status, basestring): if not status[0].isdigit(): status = status_codes[status.lower()] if isinstance(status, int): status = '%d %s' % (status, responses[status]) else: raise ValueError("bad status '%s'" % status) if not body: body = status if not headers: headers = [('Content-Type','text/plain')] self.status = status self.body = body self.headers = headers class Response(object): def __init__(self, body, mimetype='text/plain', headers=None): if isinstance(body, basestring): self.body = body elif mimetype == 'application/json': self.body = json.dumps(body) else: raise ValueError('Cannot process body: %s' % body) self.status = '200 OK' self.headers = [('Content-Type', mimetype), ('Content-Length', str(len(self.body)))] if headers: self.headers += headers class Request(object): def __init__(self, environ): self._environ = environ def get_input(self, mimetype='text/plain'): length = int(self._environ['CONTENT_LENGTH']) content = self._environ['wsgi.input'].read(length) if mimetype == 'application/json': content = json.loads(content) return content def parse_qs(querystring): querydict = {} cgi_querydict = cgi.parse_qs(querystring) for key, value in cgi_querydict.items(): querydict[key] = cgi_querydict[key][0] return querydict def wsgiapp(application): def wrapped_application(environ, start_response, *args, **kwargs): def make_response(response): start_response(response.status, response.headers) return [response.body] environ['atulweb.request'] = Request(environ) try: retval = application(environ, start_response, *args, **kwargs) except HttpError, e: return make_response(e) except Exception, e: tb = traceback.format_exc() sys.stderr.write(tb) return make_response(HttpError('internal server error', tb)) if isinstance(retval, Response): return make_response(retval) return retval for attr in ['__doc__', '__name__', '__module__']: setattr(wrapped_application, attr, getattr(application, attr)) return wrapped_application def main(application): if len(sys.argv) == 2 and sys.argv[1] == 'test': import doctest module = __import__('__main__') doctest.testmod(module, verbose=True) else: from wsgiref.simple_server import make_server port = DEFAULT_LISTEN_PORT @wsgiapp def parent_application(environ, start_response): match = re.match(r'\/static-files(\/.*)', environ['PATH_INFO']) if match: environ['PATH_INFO'] = match.group(1) return static_file_wsgi_app(environ, start_response, base_dir='static-files') else: return application(environ, start_response) httpd = make_server('127.0.0.1', port, parent_application) print 'Listening on localhost port %d.' % port print "(Run '%s test' to execute test suite.)" % sys.argv[0] # Respond to requests until process is killed httpd.serve_forever() @wsgiapp def static_file_wsgi_app(environ, start_response, base_dir): path = environ['PATH_INFO'][1:] filename = os.path.join(base_dir, path) if os.path.exists(filename): if os.path.isdir(filename): return Response('\n'.join(os.listdir(filename)), mimetype = 'text/plain') else: return Response(open(filename, 'r').read(), mimetype = mimetypes.guess_type(filename)[0]) else: raise HttpError('not found') def test_request(application, requestline='', input='', **kwargs): environ = {} words = requestline.split() if words: environ['REQUEST_METHOD'] = words[0] if len(words) > 1: environ['PATH_INFO'] = words[1] environ['wsgi.input'] = StringIO.StringIO(input) environ['CONTENT_LENGTH'] = str(len(input)) environ.update(kwargs) def mock_start_response(status, headers): print "HTTP/1.1 %s" % status for name, value in headers: print "%s: %s" % (name, value) result = application(environ, mock_start_response) print '.' print ''.join(result) # Mapping status codes to official W3C names. This is included in the httplib # module in Python 2.5, but we want this module to work in 2.4. responses = { 100: 'Continue', 101: 'Switching Protocols', 200: 'OK', 201: 'Created', 202: 'Accepted', 203: 'Non-Authoritative Information', 204: 'No Content', 205: 'Reset Content', 206: 'Partial Content', 300: 'Multiple Choices', 301: 'Moved Permanently', 302: 'Found', 303: 'See Other', 304: 'Not Modified', 305: 'Use Proxy', 306: '(Unused)', 307: 'Temporary Redirect', 400: 'Bad Request', 401: 'Unauthorized', 402: 'Payment Required', 403: 'Forbidden', 404: 'Not Found', 405: 'Method Not Allowed', 406: 'Not Acceptable', 407: 'Proxy Authentication Required', 408: 'Request Timeout', 409: 'Conflict', 410: 'Gone', 411: 'Length Required', 412: 'Precondition Failed', 413: 'Request Entity Too Large', 414: 'Request-URI Too Long', 415: 'Unsupported Media Type', 416: 'Requested Range Not Satisfiable', 417: 'Expectation Failed', 500: 'Internal Server Error', 501: 'Not Implemented', 502: 'Bad Gateway', 503: 'Service Unavailable', 504: 'Gateway Timeout', 505: 'HTTP Version Not Supported', } status_codes = {} def __init(): for key, value in responses.items(): status_codes[value.lower()] = key __init() if __name__ == '__main__': import doctest doctest.testmod(verbose=True)