Mercurial > summit-idp
changeset 0:47b666b85cac
origination
author | Atul Varma <avarma@mozilla.com> |
---|---|
date | Wed, 23 Jun 2010 17:05:18 -0700 |
parents | |
children | ac7704e92b02 |
files | oauth2_server.py test_server.py |
diffstat | 2 files changed, 251 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/oauth2_server.py Wed Jun 23 17:05:18 2010 -0700 @@ -0,0 +1,125 @@ +from cgi import parse_qsl +from base64 import urlsafe_b64encode +from os import urandom +import datetime +import json + +def gentoken(): + # Generate a 256-bit key, but add a byte so we don't have + # an annoying '=' in the string. + return urlsafe_b64encode(urandom(256/8+1)) + +class Request(object): + def __init__(self, environ, start_response): + self.environ = environ + self.start_response = start_response + self.path = environ['PATH_INFO'] + self.method = environ['REQUEST_METHOD'] + self.qargs = dict(parse_qsl(self.environ.get('QUERY_STRING', ''))) + + try: + self.length = int(environ.get('CONTENT_LENGTH', '0')) + except ValueError: + self.length = 0 + + def json_response(self, status, obj): + self.start_response(status, + [('Content-Type', 'application/json')]) + return [json.dumps(obj)] + + def get_body(self): + f = self.environ['wsgi.input'] + try: + obj = json.loads(f.read(self.length)) + except ValueError: + return None + return obj + +class Server(object): + oauth2_authorize_path = '/authorize' + request_challenge_path = '/challenge/request' + respond_to_challenge_path = '/challenge/respond' + + def __init__(self, emails, send_email, gentoken=gentoken, + utcnow=datetime.datetime.utcnow): + self.send_email = send_email + self.challenge_tokens = {} + self.auth_tokens = {} + self.emails = emails + self.utcnow = utcnow + self.gentoken = gentoken + + def new_challenge_token(self, email): + token = self.gentoken() + while token in self.challenge_tokens: + token = self.gentoken() + self.challenge_tokens[token] = { + 'email': email, + 'date': self.utcnow() + } + return token + + def new_auth_token(self, email): + token = self.gentoken() + while token in self.auth_tokens: + token = self.gentoken() + self.auth_tokens[token] = { + 'email': email, + 'date': self.utcnow() + } + return token + + def wsgi_app(self, environ, start_response): + req = Request(environ, start_response) + if req.path == self.request_challenge_path: + # TODO: check method == 'POST' + body = req.get_body() + if isinstance(body, dict) and 'email' in body: + if body['email'] in self.emails: + token = self.new_challenge_token(body['email']) + self.send_email(body['email'], token) + return req.json_response('200 OK', + {'success': True}) + return req.json_response('400 Bad Request', + {'error': 'invalid email'}) + return req.json_response('400 Bad Request', + {'error': 'invalid body'}) + elif req.path == self.respond_to_challenge_path: + body = req.get_body() + if isinstance(body, dict) and 'token' in body: + if body['token'] in self.challenge_tokens: + email = self.challenge_tokens[body['token']]['email'] + del self.challenge_tokens[body['token']] + token = self.new_auth_token(email) + return req.json_response('200 OK', + {'token': token, + 'email': email}) + return req.json_response('400 Bad Request', + {'error': 'invalid token'}) + return req.json_response('400 Bad Request', + {'error': 'invalid body'}) + start_response('404 Not Found', + [('Content-Type', 'text/plain')]) + return ['path not found: %s' % environ['PATH_INFO']] + +if __name__ == '__main__': + from wsgiref.simple_server import make_server + + from static_file_serving import StaticFileApp + + def send_email(email, token): + print "Please send %s an email with the token %s." % (email, token) + + def app(environ, start_response): + if environ['REQUEST_METHOD'] == 'GET': + return staticfiles(environ, start_response) + else: + return server.wsgi_app(environ, start_response) + + staticfiles = StaticFileApp('static-files') + server = Server(json.loads(open('attendees.json').read()), + send_email) + port = 8000 + httpd = make_server('', port, app) + print 'serving on port %d' % port + httpd.serve_forever()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test_server.py Wed Jun 23 17:05:18 2010 -0700 @@ -0,0 +1,126 @@ +import simplejson as json +import datetime + +from webtest import TestApp + +from oauth2_server import Server, gentoken + +server = None +app = None + +def apptest(func): + def wrapper(): + g = globals() + + EmailMachine.emails = [] + EntropyMachine.next = [] + TimeMachine.now = datetime.datetime(2010, 6, 17, 0, 32, 33, 985904) + + g['server'] = Server(['bob@foo.com', 'jane@bar.com'], + send_email=EmailMachine.send_email, + utcnow=TimeMachine.utcnow, + gentoken=EntropyMachine.gentoken) + g['app'] = TestApp(server.wsgi_app) + + func() + + wrapper.__name__ = func.__name__ + return wrapper + +class EmailMachine(object): + emails = [] + + @classmethod + def send_email(klass, email, token): + klass.emails.append((email, token)) + +class EntropyMachine(object): + next = [] + + @classmethod + def gentoken(klass): + if klass.next: + return klass.next.pop() + return gentoken() + +class TimeMachine(object): + now = None + + @classmethod + def travel(klass, timedelta=None, *args, **kwargs): + if timedelta is None: + timedelta = datetime.timedelta(*args, **kwargs) + + klass.now += timedelta + + @classmethod + def utcnow(klass): + return klass.now + +def post_json(url, obj, **kwargs): + return app.post(url, json.dumps(obj), + {'Content-Type': 'application/json'}, + **kwargs) + +@apptest +def test_request_challenge(): + EntropyMachine.next.append('a token') + resp = post_json(server.request_challenge_path, + {'email': 'bob@foo.com'}) + assert resp.json == {'success': True} + assert EmailMachine.emails == [('bob@foo.com', 'a token')] + +@apptest +def test_request_challenge_with_invalid_email(): + post_json(server.request_challenge_path, + {'email': 'invalid@foo.com'}, + status=400) + +@apptest +def test_request_challenge_with_invalid_body1(): + post_json(server.request_challenge_path, + [], + status=400) + +@apptest +def test_request_challenge_with_invalid_body2(): + post_json(server.request_challenge_path, + {'blah': 'narg'}, + status=400) + +@apptest +def test_respond_to_challenge(): + token = server.new_challenge_token('bob@foo.com') + EntropyMachine.next.append('my auth token') + resp = post_json(server.respond_to_challenge_path, + {'token': token}) + assert resp.json == {'email': 'bob@foo.com', + 'token': 'my auth token'} + +@apptest +def test_respond_to_challenge_only_works_once(): + token = server.new_challenge_token('bob@foo.com') + EntropyMachine.next.append('my auth token') + resp = post_json(server.respond_to_challenge_path, + {'token': token}) + assert resp.json == {'email': 'bob@foo.com', + 'token': 'my auth token'} + post_json(server.respond_to_challenge_path, + {'token': token}, + status=400) + +@apptest +def test_respond_to_challenge_with_invalid_token(): + post_json(server.respond_to_challenge_path, + {'token': 'hello there'}, + status=400) + +@apptest +def test_respond_to_challenge_with_invalid_body(): + post_json(server.respond_to_challenge_path, + [], + status=400) + +@apptest +def test_basic_404(): + app.get('/blarg', status=404)