changeset 0:47b666b85cac

origination
author Atul Varma <avarma@mozilla.com>
date Wed, 23 Jun 2010 17:05:18 -0700
parents
children ac7704e92b02
files oauth2_server.py test_server.py
diffstat 2 files changed, 251 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/oauth2_server.py	Wed Jun 23 17:05:18 2010 -0700
@@ -0,0 +1,125 @@
+from cgi import parse_qsl
+from base64 import urlsafe_b64encode
+from os import urandom
+import datetime
+import json
+
+def gentoken():
+    # Generate a 256-bit key, but add a byte so we don't have
+    # an annoying '=' in the string.
+    return urlsafe_b64encode(urandom(256/8+1))
+
+class Request(object):
+    def __init__(self, environ, start_response):
+        self.environ = environ
+        self.start_response = start_response
+        self.path = environ['PATH_INFO']
+        self.method = environ['REQUEST_METHOD']
+        self.qargs = dict(parse_qsl(self.environ.get('QUERY_STRING', '')))
+
+        try:
+            self.length = int(environ.get('CONTENT_LENGTH', '0'))
+        except ValueError:
+            self.length = 0
+
+    def json_response(self, status, obj):
+        self.start_response(status,
+                            [('Content-Type', 'application/json')])
+        return [json.dumps(obj)]
+
+    def get_body(self):
+        f = self.environ['wsgi.input']
+        try:
+            obj = json.loads(f.read(self.length))
+        except ValueError:
+            return None
+        return obj
+
+class Server(object):
+    oauth2_authorize_path = '/authorize'
+    request_challenge_path = '/challenge/request'
+    respond_to_challenge_path = '/challenge/respond'
+
+    def __init__(self, emails, send_email, gentoken=gentoken,
+                 utcnow=datetime.datetime.utcnow):
+        self.send_email = send_email
+        self.challenge_tokens = {}
+        self.auth_tokens = {}
+        self.emails = emails
+        self.utcnow = utcnow
+        self.gentoken = gentoken
+
+    def new_challenge_token(self, email):
+        token = self.gentoken()
+        while token in self.challenge_tokens:
+            token = self.gentoken()
+        self.challenge_tokens[token] = {
+            'email': email,
+            'date': self.utcnow()
+            }
+        return token
+
+    def new_auth_token(self, email):
+        token = self.gentoken()
+        while token in self.auth_tokens:
+            token = self.gentoken()
+        self.auth_tokens[token] = {
+            'email': email,
+            'date': self.utcnow()
+            }
+        return token
+
+    def wsgi_app(self, environ, start_response):
+        req = Request(environ, start_response)
+        if req.path == self.request_challenge_path:
+            # TODO: check method == 'POST'
+            body = req.get_body()
+            if isinstance(body, dict) and 'email' in body:
+                if body['email'] in self.emails:
+                    token = self.new_challenge_token(body['email'])
+                    self.send_email(body['email'], token)
+                    return req.json_response('200 OK',
+                                             {'success': True})
+                return req.json_response('400 Bad Request',
+                                         {'error': 'invalid email'})
+            return req.json_response('400 Bad Request',
+                                     {'error': 'invalid body'})
+        elif req.path == self.respond_to_challenge_path:
+            body = req.get_body()
+            if isinstance(body, dict) and 'token' in body:
+                if body['token'] in self.challenge_tokens:
+                    email = self.challenge_tokens[body['token']]['email']
+                    del self.challenge_tokens[body['token']]
+                    token = self.new_auth_token(email)
+                    return req.json_response('200 OK',
+                                             {'token': token,
+                                              'email': email})
+                return req.json_response('400 Bad Request',
+                                         {'error': 'invalid token'})
+            return req.json_response('400 Bad Request',
+                                     {'error': 'invalid body'})
+        start_response('404 Not Found',
+                       [('Content-Type', 'text/plain')])
+        return ['path not found: %s' % environ['PATH_INFO']]
+
+if __name__ == '__main__':
+    from wsgiref.simple_server import make_server
+
+    from static_file_serving import StaticFileApp
+
+    def send_email(email, token):
+        print "Please send %s an email with the token %s." % (email, token)
+
+    def app(environ, start_response):
+        if environ['REQUEST_METHOD'] == 'GET':
+            return staticfiles(environ, start_response)
+        else:
+            return server.wsgi_app(environ, start_response)
+
+    staticfiles = StaticFileApp('static-files')
+    server = Server(json.loads(open('attendees.json').read()),
+                    send_email)
+    port = 8000
+    httpd = make_server('', port, app)
+    print 'serving on port %d' % port
+    httpd.serve_forever()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test_server.py	Wed Jun 23 17:05:18 2010 -0700
@@ -0,0 +1,126 @@
+import simplejson as json
+import datetime
+
+from webtest import TestApp
+
+from oauth2_server import Server, gentoken
+
+server = None
+app = None
+
+def apptest(func):
+    def wrapper():
+        g = globals()
+
+        EmailMachine.emails = []
+        EntropyMachine.next = []
+        TimeMachine.now = datetime.datetime(2010, 6, 17, 0, 32, 33, 985904)
+
+        g['server'] = Server(['bob@foo.com', 'jane@bar.com'],
+                             send_email=EmailMachine.send_email,
+                             utcnow=TimeMachine.utcnow,
+                             gentoken=EntropyMachine.gentoken)
+        g['app'] = TestApp(server.wsgi_app)
+
+        func()
+
+    wrapper.__name__ = func.__name__
+    return wrapper
+
+class EmailMachine(object):
+    emails = []
+
+    @classmethod
+    def send_email(klass, email, token):
+        klass.emails.append((email, token))
+
+class EntropyMachine(object):
+    next = []
+
+    @classmethod
+    def gentoken(klass):
+        if klass.next:
+            return klass.next.pop()
+        return gentoken()
+
+class TimeMachine(object):
+    now = None
+
+    @classmethod
+    def travel(klass, timedelta=None, *args, **kwargs):
+        if timedelta is None:
+            timedelta = datetime.timedelta(*args, **kwargs)
+
+        klass.now += timedelta
+
+    @classmethod
+    def utcnow(klass):
+        return klass.now
+
+def post_json(url, obj, **kwargs):
+    return app.post(url, json.dumps(obj),
+                    {'Content-Type': 'application/json'},
+                    **kwargs)
+
+@apptest
+def test_request_challenge():
+    EntropyMachine.next.append('a token')
+    resp = post_json(server.request_challenge_path,
+                     {'email': 'bob@foo.com'})
+    assert resp.json == {'success': True}
+    assert EmailMachine.emails == [('bob@foo.com', 'a token')]
+
+@apptest
+def test_request_challenge_with_invalid_email():
+    post_json(server.request_challenge_path,
+              {'email': 'invalid@foo.com'},
+              status=400)
+
+@apptest
+def test_request_challenge_with_invalid_body1():
+    post_json(server.request_challenge_path,
+              [],
+              status=400)
+
+@apptest
+def test_request_challenge_with_invalid_body2():
+    post_json(server.request_challenge_path,
+              {'blah': 'narg'},
+              status=400)
+
+@apptest
+def test_respond_to_challenge():
+    token = server.new_challenge_token('bob@foo.com')
+    EntropyMachine.next.append('my auth token')
+    resp = post_json(server.respond_to_challenge_path,
+                     {'token': token})
+    assert resp.json == {'email': 'bob@foo.com',
+                         'token': 'my auth token'}
+
+@apptest
+def test_respond_to_challenge_only_works_once():
+    token = server.new_challenge_token('bob@foo.com')
+    EntropyMachine.next.append('my auth token')
+    resp = post_json(server.respond_to_challenge_path,
+                     {'token': token})
+    assert resp.json == {'email': 'bob@foo.com',
+                         'token': 'my auth token'}
+    post_json(server.respond_to_challenge_path,
+              {'token': token},
+              status=400)
+
+@apptest
+def test_respond_to_challenge_with_invalid_token():
+    post_json(server.respond_to_challenge_path,
+              {'token': 'hello there'},
+              status=400)
+
+@apptest
+def test_respond_to_challenge_with_invalid_body():
+    post_json(server.respond_to_challenge_path,
+              [],
+              status=400)
+
+@apptest
+def test_basic_404():
+    app.get('/blarg', status=404)