Mercurial > summit-idp
changeset 1:ac7704e92b02
added challenge expiry
author | Atul Varma <avarma@mozilla.com> |
---|---|
date | Wed, 23 Jun 2010 17:19:11 -0700 |
parents | 47b666b85cac |
children | 935b22e7c601 |
files | oauth2_server.py test_server.py |
diffstat | 2 files changed, 18 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/oauth2_server.py Wed Jun 23 17:05:18 2010 -0700 +++ b/oauth2_server.py Wed Jun 23 17:19:11 2010 -0700 @@ -4,6 +4,8 @@ import datetime import json +DEFAULT_CHALLENGE_LIFETIME = datetime.timedelta(days=1) + def gentoken(): # Generate a 256-bit key, but add a byte so we don't have # an annoying '=' in the string. @@ -41,6 +43,7 @@ respond_to_challenge_path = '/challenge/respond' def __init__(self, emails, send_email, gentoken=gentoken, + challenge_lifetime=DEFAULT_CHALLENGE_LIFETIME, utcnow=datetime.datetime.utcnow): self.send_email = send_email self.challenge_tokens = {} @@ -48,6 +51,7 @@ self.emails = emails self.utcnow = utcnow self.gentoken = gentoken + self.challenge_lifetime = challenge_lifetime def new_challenge_token(self, email): token = self.gentoken() @@ -88,7 +92,12 @@ body = req.get_body() if isinstance(body, dict) and 'token' in body: if body['token'] in self.challenge_tokens: - email = self.challenge_tokens[body['token']]['email'] + chaltok = self.challenge_tokens[body['token']] + time_since_challenge = self.utcnow() - chaltok['date'] + if time_since_challenge > self.challenge_lifetime: + return req.json_response('400 Bad Request', + {'error': 'expired token'}) + email = chaltok['email'] del self.challenge_tokens[body['token']] token = self.new_auth_token(email) return req.json_response('200 OK',
--- a/test_server.py Wed Jun 23 17:05:18 2010 -0700 +++ b/test_server.py Wed Jun 23 17:19:11 2010 -0700 @@ -98,6 +98,14 @@ 'token': 'my auth token'} @apptest +def test_respond_to_expired_challenge(): + token = server.new_challenge_token('bob@foo.com') + TimeMachine.travel(server.challenge_lifetime * 2) + post_json(server.respond_to_challenge_path, + {'token': token}, + status=400) + +@apptest def test_respond_to_challenge_only_works_once(): token = server.new_challenge_token('bob@foo.com') EntropyMachine.next.append('my auth token')