Mercurial > summit-idp
view summitidp/app.py @ 20:3197696debbe
refactored server.py a bit
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Fri, 25 Jun 2010 20:33:01 +0000 |
parents | 667ebf2a5e8b |
children | 03c5651a371e |
line wrap: on
line source
from cgi import parse_qsl from base64 import urlsafe_b64encode from os import urandom import datetime import json DEFAULT_CHALLENGE_LIFETIME = datetime.timedelta(days=1) def datetime_to_iso8601(dt): """ >>> datetime_to_iso8601(datetime.datetime(2010, 4, 11, 19, 16, 59)) '2010-04-11T19:16:59Z' """ return dt.strftime("%Y-%m-%dT%H:%M:%SZ") def iso8601_to_datetime(timestamp): """ >>> iso8601_to_datetime('2010-04-11T19:16:59Z') datetime.datetime(2010, 4, 11, 19, 16, 59) """ return datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ") def gentoken(): # Generate a 256-bit key, but add a byte so we don't have # an annoying '=' in the string. return urlsafe_b64encode(urandom(256/8+1)) class Request(object): def __init__(self, environ, start_response): self.environ = environ self.start_response = start_response self.path = environ['PATH_INFO'] self.method = environ['REQUEST_METHOD'] self.qargs = dict(parse_qsl(self.environ.get('QUERY_STRING', ''))) try: self.length = int(environ.get('CONTENT_LENGTH', '0')) except ValueError: self.length = 0 def json_response(self, status, obj): self.start_response(status, [('Content-Type', 'application/json')]) return [json.dumps(obj)] def get_body(self): f = self.environ['wsgi.input'] try: obj = json.loads(f.read(self.length)) except ValueError: return None if not isinstance(obj, dict): return None return obj class TokenStore(object): def __init__(self, lifetime=None, utcnow=datetime.datetime.utcnow, gentoken=gentoken, mapping=None): if mapping is None: mapping = {} self.utcnow = utcnow self.gentoken = gentoken self.lifetime = lifetime self.__tokens = mapping def create(self, **contents): token = self.gentoken() while token in self.__tokens: token = self.gentoken() self.__tokens[token] = { 'date': datetime_to_iso8601(self.utcnow()), 'contents': contents } return token def revoke(self, token): if token in self.__tokens: del self.__tokens[token] def get(self, token): if token not in self.__tokens: return None if self.lifetime is not None: birth = iso8601_to_datetime(self.__tokens[token]['date']) age = self.utcnow() - birth if age > self.lifetime: del self.__tokens[token] return None return self.__tokens[token]['contents'] class ProfileStore(object): def __init__(self, mapping=None): if mapping is None: mapping = {} self.__profiles = mapping def set(self, user_id, contents): self.__profiles[str(user_id)] = contents def get(self): everything = {} for user_id in self.__profiles: everything[user_id] = self.__profiles[user_id] return everything class Server(object): request_challenge_path = '/challenge/request' respond_to_challenge_path = '/challenge/respond' profile_path = '/profile' def __init__(self, emails, send_email, gentoken=gentoken, challenge_lifetime=DEFAULT_CHALLENGE_LIFETIME, utcnow=datetime.datetime.utcnow, delegate_404s=None, challenge_tokens=None, auth_tokens=None, profiles=None): if challenge_tokens is None: challenge_tokens = TokenStore(lifetime=challenge_lifetime, utcnow=utcnow, gentoken=gentoken) if auth_tokens is None: auth_tokens = TokenStore(utcnow=utcnow, gentoken=gentoken) if profiles is None: profiles = ProfileStore() self.send_email = send_email self.challenge_tokens = challenge_tokens self.auth_tokens = auth_tokens self.profiles = profiles self.emails = emails self.delegate_404s = delegate_404s def new_auth_token(self, email): return self.auth_tokens.create(email=email) def wsgi_app(self, environ, start_response): req = Request(environ, start_response) if req.path == self.request_challenge_path: # TODO: check method == 'POST' body = req.get_body() if body and 'email' in body: if body['email'] in self.emails: token = self.challenge_tokens.create(email=body['email']) self.send_email(body['email'], token) return req.json_response('200 OK', {'success': True}) return req.json_response('400 Bad Request', {'error': 'invalid email'}) return req.json_response('400 Bad Request', {'error': 'invalid body'}) elif req.path == self.respond_to_challenge_path: body = req.get_body() if body and 'token' in body: chaltok = self.challenge_tokens.get(body['token']) if not chaltok: return req.json_response( '400 Bad Request', {'error': 'invalid or expired token'} ) email = chaltok['email'] self.challenge_tokens.revoke(body['token']) token = self.auth_tokens.create( email=email, user_id=self.emails.index(email) ) return req.json_response('200 OK', {'token': token, 'email': email}) return req.json_response('400 Bad Request', {'error': 'invalid body'}) elif req.path == self.profile_path: if req.method == 'GET': authtok = self.auth_tokens.get(req.qargs.get('token')) if not authtok: return req.json_response( '400 Bad Request', {'error': 'invalid or expired token'} ) profile = self.profiles.get() return req.json_response('200 OK', {'contents': profile}) elif req.method == 'POST': body = req.get_body() if body and 'token' in body and 'contents' in body: authtok = self.auth_tokens.get(body['token']) if not authtok: return req.json_response( '400 Bad Request', {'error': 'invalid or expired token'} ) self.profiles.set(authtok['user_id'], body['contents']) return req.json_response('200 OK', {'success': True}) return req.json_response('400 Bad Request', {'error': 'invalid body'}) if self.delegate_404s: return self.delegate_404s(environ, start_response) start_response('404 Not Found', [('Content-Type', 'text/plain')]) return ['path not found: %s' % environ['PATH_INFO']]