Mercurial > summit-idp
view summitidp/app.py @ 10:102514eec446
made summitidp package and tests dir
author | Atul Varma <avarma@mozilla.com> |
---|---|
date | Thu, 24 Jun 2010 17:40:50 -0700 |
parents | oauth2_server.py@53cfbdc7324f |
children | a1c1d2955981 |
line wrap: on
line source
from cgi import parse_qsl from base64 import urlsafe_b64encode from os import urandom import datetime import json DEFAULT_CHALLENGE_LIFETIME = datetime.timedelta(days=1) def gentoken(): # Generate a 256-bit key, but add a byte so we don't have # an annoying '=' in the string. return urlsafe_b64encode(urandom(256/8+1)) class Request(object): def __init__(self, environ, start_response): self.environ = environ self.start_response = start_response self.path = environ['PATH_INFO'] self.method = environ['REQUEST_METHOD'] self.qargs = dict(parse_qsl(self.environ.get('QUERY_STRING', ''))) try: self.length = int(environ.get('CONTENT_LENGTH', '0')) except ValueError: self.length = 0 def json_response(self, status, obj): self.start_response(status, [('Content-Type', 'application/json')]) return [json.dumps(obj)] def get_body(self): f = self.environ['wsgi.input'] try: obj = json.loads(f.read(self.length)) except ValueError: return None if not isinstance(obj, dict): return None return obj class TokenStore(object): def __init__(self, lifetime=None, utcnow=datetime.datetime.utcnow, gentoken=gentoken): self.utcnow = utcnow self.gentoken = gentoken self.lifetime = lifetime self.__tokens = {} def create(self, **contents): token = self.gentoken() while token in self.__tokens: token = self.gentoken() self.__tokens[token] = { 'date': self.utcnow(), 'contents': contents } return token def revoke(self, token): if token in self.__tokens: del self.__tokens[token] def get(self, token): if token not in self.__tokens: return None if self.lifetime is not None: age = self.utcnow() - self.__tokens[token]['date'] if age > self.lifetime: del self.__tokens[token] return None return self.__tokens[token]['contents'] class ProfileStore(object): def __init__(self): self.__profiles = {} def set(self, user_id, contents): self.__profiles[user_id] = contents def get(self): return self.__profiles class Server(object): request_challenge_path = '/challenge/request' respond_to_challenge_path = '/challenge/respond' profile_path = '/profile' def __init__(self, emails, send_email, gentoken=gentoken, challenge_lifetime=DEFAULT_CHALLENGE_LIFETIME, utcnow=datetime.datetime.utcnow, delegate_404s=None): self.send_email = send_email self.challenge_tokens = TokenStore(lifetime=challenge_lifetime, utcnow=utcnow, gentoken=gentoken) self.auth_tokens = TokenStore(utcnow=utcnow, gentoken=gentoken) self.oauth2_consumers = TokenStore(utcnow=utcnow, gentoken=gentoken) self.profiles = ProfileStore() self.emails = emails self.gentoken = gentoken self.delegate_404s = delegate_404s def new_auth_token(self, email): return self.auth_tokens.create(email=email) def wsgi_app(self, environ, start_response): req = Request(environ, start_response) if req.path == self.request_challenge_path: # TODO: check method == 'POST' body = req.get_body() if body and 'email' in body: if body['email'] in self.emails: token = self.challenge_tokens.create(email=body['email']) self.send_email(body['email'], token) return req.json_response('200 OK', {'success': True}) return req.json_response('400 Bad Request', {'error': 'invalid email'}) return req.json_response('400 Bad Request', {'error': 'invalid body'}) elif req.path == self.respond_to_challenge_path: body = req.get_body() if body and 'token' in body: chaltok = self.challenge_tokens.get(body['token']) if not chaltok: return req.json_response( '400 Bad Request', {'error': 'invalid or expired token'} ) email = chaltok['email'] self.challenge_tokens.revoke(body['token']) token = self.auth_tokens.create( email=email, user_id=self.emails.index(email) ) return req.json_response('200 OK', {'token': token, 'email': email}) return req.json_response('400 Bad Request', {'error': 'invalid body'}) elif req.path == self.profile_path: if req.method == 'GET': authtok = self.auth_tokens.get(req.qargs.get('token')) if not authtok: return req.json_response( '400 Bad Request', {'error': 'invalid or expired token'} ) profile = self.profiles.get() return req.json_response('200 OK', {'contents': profile}) elif req.method == 'POST': body = req.get_body() if body and 'token' in body and 'contents' in body: authtok = self.auth_tokens.get(body['token']) if not authtok: return req.json_response( '400 Bad Request', {'error': 'invalid or expired token'} ) self.profiles.set(authtok['user_id'], body['contents']) return req.json_response('200 OK', {'success': True}) return req.json_response('400 Bad Request', {'error': 'invalid body'}) if self.delegate_404s: return self.delegate_404s(environ, start_response) start_response('404 Not Found', [('Content-Type', 'text/plain')]) return ['path not found: %s' % environ['PATH_INFO']] if __name__ == '__main__': from wsgiref.simple_server import make_server from static_file_serving import StaticFileApp def send_email(email, token): print "Please send %s an email with the token %s." % (email, token) server = Server(json.loads(open('attendees.json').read()), send_email, delegate_404s=StaticFileApp('static-files')) port = 8000 httpd = make_server('', port, server.wsgi_app) print 'serving on port %d' % port httpd.serve_forever()