view oauth2_server.py @ 3:f544c4b14fb3

minor formatting change
author Atul Varma <avarma@mozilla.com>
date Wed, 23 Jun 2010 20:48:19 -0700
parents 935b22e7c601
children 36e8f795b2c2
line wrap: on
line source

from cgi import parse_qsl
from base64 import urlsafe_b64encode
from os import urandom
import datetime
import json

DEFAULT_CHALLENGE_LIFETIME = datetime.timedelta(days=1)

def gentoken():
    # Generate a 256-bit key, but add a byte so we don't have
    # an annoying '=' in the string.
    return urlsafe_b64encode(urandom(256/8+1))

class Request(object):
    def __init__(self, environ, start_response):
        self.environ = environ
        self.start_response = start_response
        self.path = environ['PATH_INFO']
        self.method = environ['REQUEST_METHOD']
        self.qargs = dict(parse_qsl(self.environ.get('QUERY_STRING', '')))

        try:
            self.length = int(environ.get('CONTENT_LENGTH', '0'))
        except ValueError:
            self.length = 0

    def json_response(self, status, obj):
        self.start_response(status,
                            [('Content-Type', 'application/json')])
        return [json.dumps(obj)]

    def get_body(self):
        f = self.environ['wsgi.input']
        try:
            obj = json.loads(f.read(self.length))
        except ValueError:
            return None
        return obj

class TokenStore(object):
    def __init__(self, lifetime=None, utcnow=datetime.datetime.utcnow,
                 gentoken=gentoken):
        self.utcnow = utcnow
        self.gentoken = gentoken
        self.lifetime = lifetime
        self.__tokens = {}

    def create(self, **contents):
        token = self.gentoken()
        while token in self.__tokens:
            token = self.gentoken()
        self.__tokens[token] = {
            'date': self.utcnow(),
            'contents': contents
            }
        return token

    def revoke(self, token):
        if token in self.__tokens:
            del self.__tokens[token]

    def get(self, token):
        if token not in self.__tokens:
            return None
        if self.lifetime is not None:
            age = self.utcnow() - self.__tokens[token]['date']
            if age > self.lifetime:
                del self.__tokens[token]
                return None
        return self.__tokens[token]['contents']

class Server(object):
    oauth2_authorize_path = '/authorize'
    request_challenge_path = '/challenge/request'
    respond_to_challenge_path = '/challenge/respond'

    def __init__(self, emails, send_email, gentoken=gentoken,
                 challenge_lifetime=DEFAULT_CHALLENGE_LIFETIME,
                 utcnow=datetime.datetime.utcnow):
        self.send_email = send_email
        self.challenge_tokens = TokenStore(lifetime=challenge_lifetime,
                                           utcnow=utcnow,
                                           gentoken=gentoken)
        self.auth_tokens = TokenStore(utcnow=utcnow, gentoken=gentoken)
        self.emails = emails

    def new_auth_token(self, email):
        return self.auth_tokens.create(email=email)

    def wsgi_app(self, environ, start_response):
        req = Request(environ, start_response)
        if req.path == self.request_challenge_path:
            # TODO: check method == 'POST'
            body = req.get_body()
            if isinstance(body, dict) and 'email' in body:
                if body['email'] in self.emails:
                    token = self.challenge_tokens.create(email=body['email'])
                    self.send_email(body['email'], token)
                    return req.json_response('200 OK',
                                             {'success': True})
                return req.json_response('400 Bad Request',
                                         {'error': 'invalid email'})
            return req.json_response('400 Bad Request',
                                     {'error': 'invalid body'})
        elif req.path == self.respond_to_challenge_path:
            body = req.get_body()
            if isinstance(body, dict) and 'token' in body:
                chaltok = self.challenge_tokens.get(body['token'])
                if not chaltok:
                    return req.json_response(
                        '400 Bad Request',
                        {'error': 'invalid or expired token'}
                        )
                email = chaltok['email']
                self.challenge_tokens.revoke(body['token'])
                token = self.auth_tokens.create(email=email)
                return req.json_response('200 OK',
                                         {'token': token,
                                          'email': email})
            return req.json_response('400 Bad Request',
                                     {'error': 'invalid body'})
        start_response('404 Not Found',
                       [('Content-Type', 'text/plain')])
        return ['path not found: %s' % environ['PATH_INFO']]

if __name__ == '__main__':
    from wsgiref.simple_server import make_server

    from static_file_serving import StaticFileApp

    def send_email(email, token):
        print "Please send %s an email with the token %s." % (email, token)

    def app(environ, start_response):
        if environ['REQUEST_METHOD'] == 'GET':
            return staticfiles(environ, start_response)
        else:
            return server.wsgi_app(environ, start_response)

    staticfiles = StaticFileApp('static-files')
    server = Server(json.loads(open('attendees.json').read()),
                    send_email)
    port = 8000
    httpd = make_server('', port, app)
    print 'serving on port %d' % port
    httpd.serve_forever()