Mercurial > summit-idp
changeset 10:102514eec446
made summitidp package and tests dir
author | Atul Varma <avarma@mozilla.com> |
---|---|
date | Thu, 24 Jun 2010 17:40:50 -0700 |
parents | 53cfbdc7324f |
children | a1c1d2955981 |
files | oauth2_server.py static_file_serving.py summitidp/__init__.py summitidp/app.py summitidp/static_file_serving.py test_server.py tests/test_app.py |
diffstat | 6 files changed, 421 insertions(+), 421 deletions(-) [+] |
line wrap: on
line diff
--- a/oauth2_server.py Thu Jun 24 17:34:25 2010 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,188 +0,0 @@ -from cgi import parse_qsl -from base64 import urlsafe_b64encode -from os import urandom -import datetime -import json - -DEFAULT_CHALLENGE_LIFETIME = datetime.timedelta(days=1) - -def gentoken(): - # Generate a 256-bit key, but add a byte so we don't have - # an annoying '=' in the string. - return urlsafe_b64encode(urandom(256/8+1)) - -class Request(object): - def __init__(self, environ, start_response): - self.environ = environ - self.start_response = start_response - self.path = environ['PATH_INFO'] - self.method = environ['REQUEST_METHOD'] - self.qargs = dict(parse_qsl(self.environ.get('QUERY_STRING', ''))) - - try: - self.length = int(environ.get('CONTENT_LENGTH', '0')) - except ValueError: - self.length = 0 - - def json_response(self, status, obj): - self.start_response(status, - [('Content-Type', 'application/json')]) - return [json.dumps(obj)] - - def get_body(self): - f = self.environ['wsgi.input'] - try: - obj = json.loads(f.read(self.length)) - except ValueError: - return None - if not isinstance(obj, dict): - return None - return obj - -class TokenStore(object): - def __init__(self, lifetime=None, utcnow=datetime.datetime.utcnow, - gentoken=gentoken): - self.utcnow = utcnow - self.gentoken = gentoken - self.lifetime = lifetime - self.__tokens = {} - - def create(self, **contents): - token = self.gentoken() - while token in self.__tokens: - token = self.gentoken() - self.__tokens[token] = { - 'date': self.utcnow(), - 'contents': contents - } - return token - - def revoke(self, token): - if token in self.__tokens: - del self.__tokens[token] - - def get(self, token): - if token not in self.__tokens: - return None - if self.lifetime is not None: - age = self.utcnow() - self.__tokens[token]['date'] - if age > self.lifetime: - del self.__tokens[token] - return None - return self.__tokens[token]['contents'] - -class ProfileStore(object): - def __init__(self): - self.__profiles = {} - - def set(self, user_id, contents): - self.__profiles[user_id] = contents - - def get(self): - return self.__profiles - -class Server(object): - request_challenge_path = '/challenge/request' - respond_to_challenge_path = '/challenge/respond' - profile_path = '/profile' - - def __init__(self, emails, send_email, gentoken=gentoken, - challenge_lifetime=DEFAULT_CHALLENGE_LIFETIME, - utcnow=datetime.datetime.utcnow, - delegate_404s=None): - self.send_email = send_email - self.challenge_tokens = TokenStore(lifetime=challenge_lifetime, - utcnow=utcnow, - gentoken=gentoken) - self.auth_tokens = TokenStore(utcnow=utcnow, gentoken=gentoken) - self.oauth2_consumers = TokenStore(utcnow=utcnow, gentoken=gentoken) - self.profiles = ProfileStore() - self.emails = emails - self.gentoken = gentoken - self.delegate_404s = delegate_404s - - def new_auth_token(self, email): - return self.auth_tokens.create(email=email) - - def wsgi_app(self, environ, start_response): - req = Request(environ, start_response) - if req.path == self.request_challenge_path: - # TODO: check method == 'POST' - body = req.get_body() - if body and 'email' in body: - if body['email'] in self.emails: - token = self.challenge_tokens.create(email=body['email']) - self.send_email(body['email'], token) - return req.json_response('200 OK', - {'success': True}) - return req.json_response('400 Bad Request', - {'error': 'invalid email'}) - return req.json_response('400 Bad Request', - {'error': 'invalid body'}) - elif req.path == self.respond_to_challenge_path: - body = req.get_body() - if body and 'token' in body: - chaltok = self.challenge_tokens.get(body['token']) - if not chaltok: - return req.json_response( - '400 Bad Request', - {'error': 'invalid or expired token'} - ) - email = chaltok['email'] - self.challenge_tokens.revoke(body['token']) - token = self.auth_tokens.create( - email=email, - user_id=self.emails.index(email) - ) - return req.json_response('200 OK', - {'token': token, - 'email': email}) - return req.json_response('400 Bad Request', - {'error': 'invalid body'}) - elif req.path == self.profile_path: - if req.method == 'GET': - authtok = self.auth_tokens.get(req.qargs.get('token')) - if not authtok: - return req.json_response( - '400 Bad Request', - {'error': 'invalid or expired token'} - ) - profile = self.profiles.get() - return req.json_response('200 OK', - {'contents': profile}) - elif req.method == 'POST': - body = req.get_body() - if body and 'token' in body and 'contents' in body: - authtok = self.auth_tokens.get(body['token']) - if not authtok: - return req.json_response( - '400 Bad Request', - {'error': 'invalid or expired token'} - ) - self.profiles.set(authtok['user_id'], body['contents']) - return req.json_response('200 OK', - {'success': True}) - return req.json_response('400 Bad Request', - {'error': 'invalid body'}) - - if self.delegate_404s: - return self.delegate_404s(environ, start_response) - - start_response('404 Not Found', - [('Content-Type', 'text/plain')]) - return ['path not found: %s' % environ['PATH_INFO']] - -if __name__ == '__main__': - from wsgiref.simple_server import make_server - from static_file_serving import StaticFileApp - - def send_email(email, token): - print "Please send %s an email with the token %s." % (email, token) - - server = Server(json.loads(open('attendees.json').read()), - send_email, - delegate_404s=StaticFileApp('static-files')) - port = 8000 - httpd = make_server('', port, server.wsgi_app) - print 'serving on port %d' % port - httpd.serve_forever()
--- a/static_file_serving.py Thu Jun 24 17:34:25 2010 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,42 +0,0 @@ -#! /usr/bin/env python - -import mimetypes -import os -import wsgiref.util - -class StaticFileApp(object): - def __init__(self, root_dir): - self.root_dir = root_dir - - def __call__(self, environ, start_response): - path = environ['PATH_INFO'] - - def error_404(): - start_response('404 Not Found', - [('Content-Type', 'text/plain')]) - return ['Not Found: %s' % path] - - if path == '/': - path = '/index.html' - - if path.startswith('/') and environ['REQUEST_METHOD'] == 'GET': - filename = path.split('/')[1] - if filename in os.listdir(self.root_dir): - mimetype, enc = mimetypes.guess_type(filename) - f = open(os.path.join(self.root_dir, filename)) - start_response('200 OK', - [('Content-Type', mimetype)]) - return wsgiref.util.FileWrapper(f) - else: - return error_404() - - return error_404() - -if __name__ == '__main__': - from wsgiref.simple_server import make_server - - dirname = os.getcwd() - port = 8000 - httpd = make_server('', port, StaticFileApp(dirname)) - print "Serving files on port %d at %s." % (port, dirname) - httpd.serve_forever()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/summitidp/app.py Thu Jun 24 17:40:50 2010 -0700 @@ -0,0 +1,188 @@ +from cgi import parse_qsl +from base64 import urlsafe_b64encode +from os import urandom +import datetime +import json + +DEFAULT_CHALLENGE_LIFETIME = datetime.timedelta(days=1) + +def gentoken(): + # Generate a 256-bit key, but add a byte so we don't have + # an annoying '=' in the string. + return urlsafe_b64encode(urandom(256/8+1)) + +class Request(object): + def __init__(self, environ, start_response): + self.environ = environ + self.start_response = start_response + self.path = environ['PATH_INFO'] + self.method = environ['REQUEST_METHOD'] + self.qargs = dict(parse_qsl(self.environ.get('QUERY_STRING', ''))) + + try: + self.length = int(environ.get('CONTENT_LENGTH', '0')) + except ValueError: + self.length = 0 + + def json_response(self, status, obj): + self.start_response(status, + [('Content-Type', 'application/json')]) + return [json.dumps(obj)] + + def get_body(self): + f = self.environ['wsgi.input'] + try: + obj = json.loads(f.read(self.length)) + except ValueError: + return None + if not isinstance(obj, dict): + return None + return obj + +class TokenStore(object): + def __init__(self, lifetime=None, utcnow=datetime.datetime.utcnow, + gentoken=gentoken): + self.utcnow = utcnow + self.gentoken = gentoken + self.lifetime = lifetime + self.__tokens = {} + + def create(self, **contents): + token = self.gentoken() + while token in self.__tokens: + token = self.gentoken() + self.__tokens[token] = { + 'date': self.utcnow(), + 'contents': contents + } + return token + + def revoke(self, token): + if token in self.__tokens: + del self.__tokens[token] + + def get(self, token): + if token not in self.__tokens: + return None + if self.lifetime is not None: + age = self.utcnow() - self.__tokens[token]['date'] + if age > self.lifetime: + del self.__tokens[token] + return None + return self.__tokens[token]['contents'] + +class ProfileStore(object): + def __init__(self): + self.__profiles = {} + + def set(self, user_id, contents): + self.__profiles[user_id] = contents + + def get(self): + return self.__profiles + +class Server(object): + request_challenge_path = '/challenge/request' + respond_to_challenge_path = '/challenge/respond' + profile_path = '/profile' + + def __init__(self, emails, send_email, gentoken=gentoken, + challenge_lifetime=DEFAULT_CHALLENGE_LIFETIME, + utcnow=datetime.datetime.utcnow, + delegate_404s=None): + self.send_email = send_email + self.challenge_tokens = TokenStore(lifetime=challenge_lifetime, + utcnow=utcnow, + gentoken=gentoken) + self.auth_tokens = TokenStore(utcnow=utcnow, gentoken=gentoken) + self.oauth2_consumers = TokenStore(utcnow=utcnow, gentoken=gentoken) + self.profiles = ProfileStore() + self.emails = emails + self.gentoken = gentoken + self.delegate_404s = delegate_404s + + def new_auth_token(self, email): + return self.auth_tokens.create(email=email) + + def wsgi_app(self, environ, start_response): + req = Request(environ, start_response) + if req.path == self.request_challenge_path: + # TODO: check method == 'POST' + body = req.get_body() + if body and 'email' in body: + if body['email'] in self.emails: + token = self.challenge_tokens.create(email=body['email']) + self.send_email(body['email'], token) + return req.json_response('200 OK', + {'success': True}) + return req.json_response('400 Bad Request', + {'error': 'invalid email'}) + return req.json_response('400 Bad Request', + {'error': 'invalid body'}) + elif req.path == self.respond_to_challenge_path: + body = req.get_body() + if body and 'token' in body: + chaltok = self.challenge_tokens.get(body['token']) + if not chaltok: + return req.json_response( + '400 Bad Request', + {'error': 'invalid or expired token'} + ) + email = chaltok['email'] + self.challenge_tokens.revoke(body['token']) + token = self.auth_tokens.create( + email=email, + user_id=self.emails.index(email) + ) + return req.json_response('200 OK', + {'token': token, + 'email': email}) + return req.json_response('400 Bad Request', + {'error': 'invalid body'}) + elif req.path == self.profile_path: + if req.method == 'GET': + authtok = self.auth_tokens.get(req.qargs.get('token')) + if not authtok: + return req.json_response( + '400 Bad Request', + {'error': 'invalid or expired token'} + ) + profile = self.profiles.get() + return req.json_response('200 OK', + {'contents': profile}) + elif req.method == 'POST': + body = req.get_body() + if body and 'token' in body and 'contents' in body: + authtok = self.auth_tokens.get(body['token']) + if not authtok: + return req.json_response( + '400 Bad Request', + {'error': 'invalid or expired token'} + ) + self.profiles.set(authtok['user_id'], body['contents']) + return req.json_response('200 OK', + {'success': True}) + return req.json_response('400 Bad Request', + {'error': 'invalid body'}) + + if self.delegate_404s: + return self.delegate_404s(environ, start_response) + + start_response('404 Not Found', + [('Content-Type', 'text/plain')]) + return ['path not found: %s' % environ['PATH_INFO']] + +if __name__ == '__main__': + from wsgiref.simple_server import make_server + from static_file_serving import StaticFileApp + + def send_email(email, token): + print "Please send %s an email with the token %s." % (email, token) + + server = Server(json.loads(open('attendees.json').read()), + send_email, + delegate_404s=StaticFileApp('static-files')) + port = 8000 + httpd = make_server('', port, server.wsgi_app) + print 'serving on port %d' % port + httpd.serve_forever()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/summitidp/static_file_serving.py Thu Jun 24 17:40:50 2010 -0700 @@ -0,0 +1,42 @@ +#! /usr/bin/env python + +import mimetypes +import os +import wsgiref.util + +class StaticFileApp(object): + def __init__(self, root_dir): + self.root_dir = root_dir + + def __call__(self, environ, start_response): + path = environ['PATH_INFO'] + + def error_404(): + start_response('404 Not Found', + [('Content-Type', 'text/plain')]) + return ['Not Found: %s' % path] + + if path == '/': + path = '/index.html' + + if path.startswith('/') and environ['REQUEST_METHOD'] == 'GET': + filename = path.split('/')[1] + if filename in os.listdir(self.root_dir): + mimetype, enc = mimetypes.guess_type(filename) + f = open(os.path.join(self.root_dir, filename)) + start_response('200 OK', + [('Content-Type', mimetype)]) + return wsgiref.util.FileWrapper(f) + else: + return error_404() + + return error_404() + +if __name__ == '__main__': + from wsgiref.simple_server import make_server + + dirname = os.getcwd() + port = 8000 + httpd = make_server('', port, StaticFileApp(dirname)) + print "Serving files on port %d at %s." % (port, dirname) + httpd.serve_forever()
--- a/test_server.py Thu Jun 24 17:34:25 2010 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,191 +0,0 @@ -import simplejson as json -import datetime - -from webtest import TestApp - -from oauth2_server import Server, gentoken - -server = None -app = None - -def apptest(func): - def wrapper(): - g = globals() - - EmailMachine.emails = [] - EntropyMachine.next = [] - TimeMachine.now = datetime.datetime(2010, 6, 17, 0, 32, 33, 985904) - - g['server'] = Server(['bob@foo.com', 'jane@bar.com'], - send_email=EmailMachine.send_email, - utcnow=TimeMachine.utcnow, - gentoken=EntropyMachine.gentoken) - g['app'] = TestApp(server.wsgi_app) - - func() - - wrapper.__name__ = func.__name__ - return wrapper - -class EmailMachine(object): - emails = [] - - @classmethod - def send_email(klass, email, token): - klass.emails.append((email, token)) - -class EntropyMachine(object): - next = [] - - @classmethod - def gentoken(klass): - if klass.next: - return klass.next.pop() - return gentoken() - -class TimeMachine(object): - now = None - - @classmethod - def travel(klass, timedelta=None, *args, **kwargs): - if timedelta is None: - timedelta = datetime.timedelta(*args, **kwargs) - - klass.now += timedelta - - @classmethod - def utcnow(klass): - return klass.now - -def post_json(url, obj, **kwargs): - return app.post(url, json.dumps(obj), - {'Content-Type': 'application/json'}, - **kwargs) - -@apptest -def test_request_challenge(): - EntropyMachine.next.append('a token') - resp = post_json(server.request_challenge_path, - {'email': 'bob@foo.com'}) - assert resp.json == {'success': True} - assert EmailMachine.emails == [('bob@foo.com', 'a token')] - -@apptest -def test_request_challenge_with_invalid_email(): - post_json(server.request_challenge_path, - {'email': 'invalid@foo.com'}, - status=400) - -@apptest -def test_request_challenge_with_invalid_body1(): - post_json(server.request_challenge_path, - [], - status=400) - -@apptest -def test_request_challenge_with_invalid_body2(): - post_json(server.request_challenge_path, - {'blah': 'narg'}, - status=400) - -@apptest -def test_respond_to_challenge(): - token = server.challenge_tokens.create(email='bob@foo.com') - EntropyMachine.next.append('my auth token') - resp = post_json(server.respond_to_challenge_path, - {'token': token}) - assert resp.json == {'email': 'bob@foo.com', - 'token': 'my auth token'} - assert server.auth_tokens.get('my auth token') == { - 'email': 'bob@foo.com', - 'user_id': 0 - } - -@apptest -def test_respond_to_expired_challenge(): - token = server.challenge_tokens.create(email='bob@foo.com') - TimeMachine.travel(server.challenge_tokens.lifetime * 2) - post_json(server.respond_to_challenge_path, - {'token': token}, - status=400) - -@apptest -def test_respond_to_challenge_only_works_once(): - token = server.challenge_tokens.create(email='bob@foo.com') - EntropyMachine.next.append('my auth token') - resp = post_json(server.respond_to_challenge_path, - {'token': token}) - assert resp.json == {'email': 'bob@foo.com', - 'token': 'my auth token'} - post_json(server.respond_to_challenge_path, - {'token': token}, - status=400) - -@apptest -def test_respond_to_challenge_with_invalid_token(): - post_json(server.respond_to_challenge_path, - {'token': 'hello there'}, - status=400) - -@apptest -def test_respond_to_challenge_with_invalid_body(): - post_json(server.respond_to_challenge_path, - [], - status=400) - -@apptest -def test_basic_404(): - app.get('/blarg', status=404) - -@apptest -def test_set_profile_with_bad_token(): - post_json(server.profile_path, - {'token': 'blap', 'contents': {}}, - status=400) - -@apptest -def test_set_profile_with_no_contents(): - token = server.auth_tokens.create(email='bob@foo.com', - user_id=0) - post_json(server.profile_path, - {'token': token}, - status=400) - -@apptest -def test_get_profile_with_bad_token(): - app.get('%s?token=%s' % (server.profile_path, 'blap'), - status=400) - -@apptest -def test_get_profile_with_no_token(): - app.get('%s?foo=bar' % server.profile_path, - status=400) - -@apptest -def test_set_and_get_profile(): - token = server.auth_tokens.create(email='bob@foo.com', - user_id=0) - post_json(server.profile_path, - {'token': token, - 'contents': {'name': 'bob jones'}}) - resp = app.get('%s?token=%s' % (server.profile_path, token)) - assert resp.json == {"contents": {"0": {"name": "bob jones"}}} - - # Make sure it works twice. - - post_json(server.profile_path, - {'token': token, - 'contents': {'alias': 'blah'}}) - resp = app.get('%s?token=%s' % (server.profile_path, token)) - assert resp.json == {"contents": {"0": {"alias": "blah"}}} - - # Add another user's profile. - - token2 = server.auth_tokens.create(email='jane@bar.com', - user_id=1) - post_json(server.profile_path, - {'token': token2, - 'contents': {'name': 'jane person'}}) - resp = app.get('%s?token=%s' % (server.profile_path, token2)) - assert resp.json == {"contents": {"0": {"alias": "blah"}, - "1": {"name": "jane person"}}}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_app.py Thu Jun 24 17:40:50 2010 -0700 @@ -0,0 +1,191 @@ +import simplejson as json +import datetime + +from webtest import TestApp + +from summitidp.app import Server, gentoken + +server = None +app = None + +def apptest(func): + def wrapper(): + g = globals() + + EmailMachine.emails = [] + EntropyMachine.next = [] + TimeMachine.now = datetime.datetime(2010, 6, 17, 0, 32, 33, 985904) + + g['server'] = Server(['bob@foo.com', 'jane@bar.com'], + send_email=EmailMachine.send_email, + utcnow=TimeMachine.utcnow, + gentoken=EntropyMachine.gentoken) + g['app'] = TestApp(server.wsgi_app) + + func() + + wrapper.__name__ = func.__name__ + return wrapper + +class EmailMachine(object): + emails = [] + + @classmethod + def send_email(klass, email, token): + klass.emails.append((email, token)) + +class EntropyMachine(object): + next = [] + + @classmethod + def gentoken(klass): + if klass.next: + return klass.next.pop() + return gentoken() + +class TimeMachine(object): + now = None + + @classmethod + def travel(klass, timedelta=None, *args, **kwargs): + if timedelta is None: + timedelta = datetime.timedelta(*args, **kwargs) + + klass.now += timedelta + + @classmethod + def utcnow(klass): + return klass.now + +def post_json(url, obj, **kwargs): + return app.post(url, json.dumps(obj), + {'Content-Type': 'application/json'}, + **kwargs) + +@apptest +def test_request_challenge(): + EntropyMachine.next.append('a token') + resp = post_json(server.request_challenge_path, + {'email': 'bob@foo.com'}) + assert resp.json == {'success': True} + assert EmailMachine.emails == [('bob@foo.com', 'a token')] + +@apptest +def test_request_challenge_with_invalid_email(): + post_json(server.request_challenge_path, + {'email': 'invalid@foo.com'}, + status=400) + +@apptest +def test_request_challenge_with_invalid_body1(): + post_json(server.request_challenge_path, + [], + status=400) + +@apptest +def test_request_challenge_with_invalid_body2(): + post_json(server.request_challenge_path, + {'blah': 'narg'}, + status=400) + +@apptest +def test_respond_to_challenge(): + token = server.challenge_tokens.create(email='bob@foo.com') + EntropyMachine.next.append('my auth token') + resp = post_json(server.respond_to_challenge_path, + {'token': token}) + assert resp.json == {'email': 'bob@foo.com', + 'token': 'my auth token'} + assert server.auth_tokens.get('my auth token') == { + 'email': 'bob@foo.com', + 'user_id': 0 + } + +@apptest +def test_respond_to_expired_challenge(): + token = server.challenge_tokens.create(email='bob@foo.com') + TimeMachine.travel(server.challenge_tokens.lifetime * 2) + post_json(server.respond_to_challenge_path, + {'token': token}, + status=400) + +@apptest +def test_respond_to_challenge_only_works_once(): + token = server.challenge_tokens.create(email='bob@foo.com') + EntropyMachine.next.append('my auth token') + resp = post_json(server.respond_to_challenge_path, + {'token': token}) + assert resp.json == {'email': 'bob@foo.com', + 'token': 'my auth token'} + post_json(server.respond_to_challenge_path, + {'token': token}, + status=400) + +@apptest +def test_respond_to_challenge_with_invalid_token(): + post_json(server.respond_to_challenge_path, + {'token': 'hello there'}, + status=400) + +@apptest +def test_respond_to_challenge_with_invalid_body(): + post_json(server.respond_to_challenge_path, + [], + status=400) + +@apptest +def test_basic_404(): + app.get('/blarg', status=404) + +@apptest +def test_set_profile_with_bad_token(): + post_json(server.profile_path, + {'token': 'blap', 'contents': {}}, + status=400) + +@apptest +def test_set_profile_with_no_contents(): + token = server.auth_tokens.create(email='bob@foo.com', + user_id=0) + post_json(server.profile_path, + {'token': token}, + status=400) + +@apptest +def test_get_profile_with_bad_token(): + app.get('%s?token=%s' % (server.profile_path, 'blap'), + status=400) + +@apptest +def test_get_profile_with_no_token(): + app.get('%s?foo=bar' % server.profile_path, + status=400) + +@apptest +def test_set_and_get_profile(): + token = server.auth_tokens.create(email='bob@foo.com', + user_id=0) + post_json(server.profile_path, + {'token': token, + 'contents': {'name': 'bob jones'}}) + resp = app.get('%s?token=%s' % (server.profile_path, token)) + assert resp.json == {"contents": {"0": {"name": "bob jones"}}} + + # Make sure it works twice. + + post_json(server.profile_path, + {'token': token, + 'contents': {'alias': 'blah'}}) + resp = app.get('%s?token=%s' % (server.profile_path, token)) + assert resp.json == {"contents": {"0": {"alias": "blah"}}} + + # Add another user's profile. + + token2 = server.auth_tokens.create(email='jane@bar.com', + user_id=1) + post_json(server.profile_path, + {'token': token2, + 'contents': {'name': 'jane person'}}) + resp = app.get('%s?token=%s' % (server.profile_path, token2)) + assert resp.json == {"contents": {"0": {"alias": "blah"}, + "1": {"name": "jane person"}}}