Mercurial > summit-idp
view oauth2_server.py @ 3:f544c4b14fb3
minor formatting change
author | Atul Varma <avarma@mozilla.com> |
---|---|
date | Wed, 23 Jun 2010 20:48:19 -0700 |
parents | 935b22e7c601 |
children | 36e8f795b2c2 |
line wrap: on
line source
from cgi import parse_qsl from base64 import urlsafe_b64encode from os import urandom import datetime import json DEFAULT_CHALLENGE_LIFETIME = datetime.timedelta(days=1) def gentoken(): # Generate a 256-bit key, but add a byte so we don't have # an annoying '=' in the string. return urlsafe_b64encode(urandom(256/8+1)) class Request(object): def __init__(self, environ, start_response): self.environ = environ self.start_response = start_response self.path = environ['PATH_INFO'] self.method = environ['REQUEST_METHOD'] self.qargs = dict(parse_qsl(self.environ.get('QUERY_STRING', ''))) try: self.length = int(environ.get('CONTENT_LENGTH', '0')) except ValueError: self.length = 0 def json_response(self, status, obj): self.start_response(status, [('Content-Type', 'application/json')]) return [json.dumps(obj)] def get_body(self): f = self.environ['wsgi.input'] try: obj = json.loads(f.read(self.length)) except ValueError: return None return obj class TokenStore(object): def __init__(self, lifetime=None, utcnow=datetime.datetime.utcnow, gentoken=gentoken): self.utcnow = utcnow self.gentoken = gentoken self.lifetime = lifetime self.__tokens = {} def create(self, **contents): token = self.gentoken() while token in self.__tokens: token = self.gentoken() self.__tokens[token] = { 'date': self.utcnow(), 'contents': contents } return token def revoke(self, token): if token in self.__tokens: del self.__tokens[token] def get(self, token): if token not in self.__tokens: return None if self.lifetime is not None: age = self.utcnow() - self.__tokens[token]['date'] if age > self.lifetime: del self.__tokens[token] return None return self.__tokens[token]['contents'] class Server(object): oauth2_authorize_path = '/authorize' request_challenge_path = '/challenge/request' respond_to_challenge_path = '/challenge/respond' def __init__(self, emails, send_email, gentoken=gentoken, challenge_lifetime=DEFAULT_CHALLENGE_LIFETIME, utcnow=datetime.datetime.utcnow): self.send_email = send_email self.challenge_tokens = TokenStore(lifetime=challenge_lifetime, utcnow=utcnow, gentoken=gentoken) self.auth_tokens = TokenStore(utcnow=utcnow, gentoken=gentoken) self.emails = emails def new_auth_token(self, email): return self.auth_tokens.create(email=email) def wsgi_app(self, environ, start_response): req = Request(environ, start_response) if req.path == self.request_challenge_path: # TODO: check method == 'POST' body = req.get_body() if isinstance(body, dict) and 'email' in body: if body['email'] in self.emails: token = self.challenge_tokens.create(email=body['email']) self.send_email(body['email'], token) return req.json_response('200 OK', {'success': True}) return req.json_response('400 Bad Request', {'error': 'invalid email'}) return req.json_response('400 Bad Request', {'error': 'invalid body'}) elif req.path == self.respond_to_challenge_path: body = req.get_body() if isinstance(body, dict) and 'token' in body: chaltok = self.challenge_tokens.get(body['token']) if not chaltok: return req.json_response( '400 Bad Request', {'error': 'invalid or expired token'} ) email = chaltok['email'] self.challenge_tokens.revoke(body['token']) token = self.auth_tokens.create(email=email) return req.json_response('200 OK', {'token': token, 'email': email}) return req.json_response('400 Bad Request', {'error': 'invalid body'}) start_response('404 Not Found', [('Content-Type', 'text/plain')]) return ['path not found: %s' % environ['PATH_INFO']] if __name__ == '__main__': from wsgiref.simple_server import make_server from static_file_serving import StaticFileApp def send_email(email, token): print "Please send %s an email with the token %s." % (email, token) def app(environ, start_response): if environ['REQUEST_METHOD'] == 'GET': return staticfiles(environ, start_response) else: return server.wsgi_app(environ, start_response) staticfiles = StaticFileApp('static-files') server = Server(json.loads(open('attendees.json').read()), send_email) port = 8000 httpd = make_server('', port, app) print 'serving on port %d' % port httpd.serve_forever()