changeset 2:935b22e7c601

made separate TokenStore class
author Atul Varma <avarma@mozilla.com>
date Wed, 23 Jun 2010 20:47:57 -0700
parents ac7704e92b02
children f544c4b14fb3
files oauth2_server.py test_server.py
diffstat 2 files changed, 52 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/oauth2_server.py	Wed Jun 23 17:19:11 2010 -0700
+++ b/oauth2_server.py	Wed Jun 23 20:47:57 2010 -0700
@@ -37,6 +37,38 @@
             return None
         return obj
 
+class TokenStore(object):
+    def __init__(self, lifetime=None, utcnow=datetime.datetime.utcnow,
+                 gentoken=gentoken):
+        self.utcnow = utcnow
+        self.gentoken = gentoken
+        self.lifetime = lifetime
+        self.__tokens = {}
+
+    def create(self, **contents):
+        token = self.gentoken()
+        while token in self.__tokens:
+            token = self.gentoken()
+        self.__tokens[token] = {
+            'date': self.utcnow(),
+            'contents': contents
+            }
+        return token
+
+    def revoke(self, token):
+        if token in self.__tokens:
+            del self.__tokens[token]
+
+    def get(self, token):
+        if token not in self.__tokens:
+            return None
+        if self.lifetime is not None:
+            age = self.utcnow() - self.__tokens[token]['date']
+            if age > self.lifetime:
+                del self.__tokens[token]
+                return None
+        return self.__tokens[token]['contents']
+
 class Server(object):
     oauth2_authorize_path = '/authorize'
     request_challenge_path = '/challenge/request'
@@ -46,32 +78,14 @@
                  challenge_lifetime=DEFAULT_CHALLENGE_LIFETIME,
                  utcnow=datetime.datetime.utcnow):
         self.send_email = send_email
-        self.challenge_tokens = {}
-        self.auth_tokens = {}
+        self.challenge_tokens = TokenStore(lifetime=challenge_lifetime,
+                                           utcnow=utcnow,
+                                           gentoken=gentoken)
+        self.auth_tokens = TokenStore(utcnow=utcnow, gentoken=gentoken)
         self.emails = emails
-        self.utcnow = utcnow
-        self.gentoken = gentoken
-        self.challenge_lifetime = challenge_lifetime
-
-    def new_challenge_token(self, email):
-        token = self.gentoken()
-        while token in self.challenge_tokens:
-            token = self.gentoken()
-        self.challenge_tokens[token] = {
-            'email': email,
-            'date': self.utcnow()
-            }
-        return token
 
     def new_auth_token(self, email):
-        token = self.gentoken()
-        while token in self.auth_tokens:
-            token = self.gentoken()
-        self.auth_tokens[token] = {
-            'email': email,
-            'date': self.utcnow()
-            }
-        return token
+        return self.auth_tokens.create(email=email)
 
     def wsgi_app(self, environ, start_response):
         req = Request(environ, start_response)
@@ -80,7 +94,7 @@
             body = req.get_body()
             if isinstance(body, dict) and 'email' in body:
                 if body['email'] in self.emails:
-                    token = self.new_challenge_token(body['email'])
+                    token = self.challenge_tokens.create(email=body['email'])
                     self.send_email(body['email'], token)
                     return req.json_response('200 OK',
                                              {'success': True})
@@ -91,20 +105,16 @@
         elif req.path == self.respond_to_challenge_path:
             body = req.get_body()
             if isinstance(body, dict) and 'token' in body:
-                if body['token'] in self.challenge_tokens:
-                    chaltok = self.challenge_tokens[body['token']]
-                    time_since_challenge = self.utcnow() - chaltok['date']
-                    if time_since_challenge > self.challenge_lifetime:
-                        return req.json_response('400 Bad Request',
-                                                 {'error': 'expired token'})
-                    email = chaltok['email']
-                    del self.challenge_tokens[body['token']]
-                    token = self.new_auth_token(email)
-                    return req.json_response('200 OK',
-                                             {'token': token,
-                                              'email': email})
-                return req.json_response('400 Bad Request',
-                                         {'error': 'invalid token'})
+                chaltok = self.challenge_tokens.get(body['token'])
+                if not chaltok:
+                    return req.json_response('400 Bad Request',
+                                             {'error': 'invalid or expired token'})
+                email = chaltok['email']
+                self.challenge_tokens.revoke(body['token'])
+                token = self.auth_tokens.create(email=email)
+                return req.json_response('200 OK',
+                                         {'token': token,
+                                          'email': email})
             return req.json_response('400 Bad Request',
                                      {'error': 'invalid body'})
         start_response('404 Not Found',
--- a/test_server.py	Wed Jun 23 17:19:11 2010 -0700
+++ b/test_server.py	Wed Jun 23 20:47:57 2010 -0700
@@ -90,7 +90,7 @@
 
 @apptest
 def test_respond_to_challenge():
-    token = server.new_challenge_token('bob@foo.com')
+    token = server.challenge_tokens.create(email='bob@foo.com')
     EntropyMachine.next.append('my auth token')
     resp = post_json(server.respond_to_challenge_path,
                      {'token': token})
@@ -99,15 +99,15 @@
 
 @apptest
 def test_respond_to_expired_challenge():
-    token = server.new_challenge_token('bob@foo.com')
-    TimeMachine.travel(server.challenge_lifetime * 2)
+    token = server.challenge_tokens.create(email='bob@foo.com')
+    TimeMachine.travel(server.challenge_tokens.lifetime * 2)
     post_json(server.respond_to_challenge_path,
               {'token': token},
               status=400)
 
 @apptest
 def test_respond_to_challenge_only_works_once():
-    token = server.new_challenge_token('bob@foo.com')
+    token = server.challenge_tokens.create(email='bob@foo.com')
     EntropyMachine.next.append('my auth token')
     resp = post_json(server.respond_to_challenge_path,
                      {'token': token})