Mercurial > summit-idp
changeset 2:935b22e7c601
made separate TokenStore class
author | Atul Varma <avarma@mozilla.com> |
---|---|
date | Wed, 23 Jun 2010 20:47:57 -0700 |
parents | ac7704e92b02 |
children | f544c4b14fb3 |
files | oauth2_server.py test_server.py |
diffstat | 2 files changed, 52 insertions(+), 42 deletions(-) [+] |
line wrap: on
line diff
--- a/oauth2_server.py Wed Jun 23 17:19:11 2010 -0700 +++ b/oauth2_server.py Wed Jun 23 20:47:57 2010 -0700 @@ -37,6 +37,38 @@ return None return obj +class TokenStore(object): + def __init__(self, lifetime=None, utcnow=datetime.datetime.utcnow, + gentoken=gentoken): + self.utcnow = utcnow + self.gentoken = gentoken + self.lifetime = lifetime + self.__tokens = {} + + def create(self, **contents): + token = self.gentoken() + while token in self.__tokens: + token = self.gentoken() + self.__tokens[token] = { + 'date': self.utcnow(), + 'contents': contents + } + return token + + def revoke(self, token): + if token in self.__tokens: + del self.__tokens[token] + + def get(self, token): + if token not in self.__tokens: + return None + if self.lifetime is not None: + age = self.utcnow() - self.__tokens[token]['date'] + if age > self.lifetime: + del self.__tokens[token] + return None + return self.__tokens[token]['contents'] + class Server(object): oauth2_authorize_path = '/authorize' request_challenge_path = '/challenge/request' @@ -46,32 +78,14 @@ challenge_lifetime=DEFAULT_CHALLENGE_LIFETIME, utcnow=datetime.datetime.utcnow): self.send_email = send_email - self.challenge_tokens = {} - self.auth_tokens = {} + self.challenge_tokens = TokenStore(lifetime=challenge_lifetime, + utcnow=utcnow, + gentoken=gentoken) + self.auth_tokens = TokenStore(utcnow=utcnow, gentoken=gentoken) self.emails = emails - self.utcnow = utcnow - self.gentoken = gentoken - self.challenge_lifetime = challenge_lifetime - - def new_challenge_token(self, email): - token = self.gentoken() - while token in self.challenge_tokens: - token = self.gentoken() - self.challenge_tokens[token] = { - 'email': email, - 'date': self.utcnow() - } - return token def new_auth_token(self, email): - token = self.gentoken() - while token in self.auth_tokens: - token = self.gentoken() - self.auth_tokens[token] = { - 'email': email, - 'date': self.utcnow() - } - return token + return self.auth_tokens.create(email=email) def wsgi_app(self, environ, start_response): req = Request(environ, start_response) @@ -80,7 +94,7 @@ body = req.get_body() if isinstance(body, dict) and 'email' in body: if body['email'] in self.emails: - token = self.new_challenge_token(body['email']) + token = self.challenge_tokens.create(email=body['email']) self.send_email(body['email'], token) return req.json_response('200 OK', {'success': True}) @@ -91,20 +105,16 @@ elif req.path == self.respond_to_challenge_path: body = req.get_body() if isinstance(body, dict) and 'token' in body: - if body['token'] in self.challenge_tokens: - chaltok = self.challenge_tokens[body['token']] - time_since_challenge = self.utcnow() - chaltok['date'] - if time_since_challenge > self.challenge_lifetime: - return req.json_response('400 Bad Request', - {'error': 'expired token'}) - email = chaltok['email'] - del self.challenge_tokens[body['token']] - token = self.new_auth_token(email) - return req.json_response('200 OK', - {'token': token, - 'email': email}) - return req.json_response('400 Bad Request', - {'error': 'invalid token'}) + chaltok = self.challenge_tokens.get(body['token']) + if not chaltok: + return req.json_response('400 Bad Request', + {'error': 'invalid or expired token'}) + email = chaltok['email'] + self.challenge_tokens.revoke(body['token']) + token = self.auth_tokens.create(email=email) + return req.json_response('200 OK', + {'token': token, + 'email': email}) return req.json_response('400 Bad Request', {'error': 'invalid body'}) start_response('404 Not Found',
--- a/test_server.py Wed Jun 23 17:19:11 2010 -0700 +++ b/test_server.py Wed Jun 23 20:47:57 2010 -0700 @@ -90,7 +90,7 @@ @apptest def test_respond_to_challenge(): - token = server.new_challenge_token('bob@foo.com') + token = server.challenge_tokens.create(email='bob@foo.com') EntropyMachine.next.append('my auth token') resp = post_json(server.respond_to_challenge_path, {'token': token}) @@ -99,15 +99,15 @@ @apptest def test_respond_to_expired_challenge(): - token = server.new_challenge_token('bob@foo.com') - TimeMachine.travel(server.challenge_lifetime * 2) + token = server.challenge_tokens.create(email='bob@foo.com') + TimeMachine.travel(server.challenge_tokens.lifetime * 2) post_json(server.respond_to_challenge_path, {'token': token}, status=400) @apptest def test_respond_to_challenge_only_works_once(): - token = server.new_challenge_token('bob@foo.com') + token = server.challenge_tokens.create(email='bob@foo.com') EntropyMachine.next.append('my auth token') resp = post_json(server.respond_to_challenge_path, {'token': token})