Mercurial > universal-identity-relyer
changeset 1:379d5d9a5f34
added stupid tests
author | Atul Varma <avarma@mozilla.com> |
---|---|
date | Sat, 12 Jun 2010 18:16:23 -0700 |
parents | 7b42ce648fe5 |
children | a880317e3100 |
files | oauth_experiment.py test_twitter_client.py twitter_client.py |
diffstat | 3 files changed, 152 insertions(+), 72 deletions(-) [+] |
line wrap: on
line diff
--- a/oauth_experiment.py Sat Jun 12 17:06:05 2010 -0700 +++ b/oauth_experiment.py Sat Jun 12 18:16:23 2010 -0700 @@ -1,80 +1,10 @@ -import os import json -import urlparse -import urllib import oauth2 as oauth - -request_token_url = 'https://api.twitter.com/oauth/request_token' -access_token_url = 'https://api.twitter.com/oauth/access_token' -authorize_url = 'https://api.twitter.com/oauth/authorize' +import twitter_client config = json.loads(open("config.json").read()) consumer = oauth.Consumer(config['consumer_key'], config['consumer_secret']) -def app(environ, start_response): - path = environ['PATH_INFO'] - qs = environ['QUERY_STRING'] - - if path == '/request': - # Step 1: Get a request token. This is a temporary token that is used for - # having the user authorize an access token and to sign the request to obtain - # said access token. - - url = '%s?%s' % ( - request_token_url, - urllib.urlencode({'oauth_callback': 'http://localhost:8000/callback'}) - ) - print "url is %s" % url - client = oauth.Client(consumer) - resp, content = client.request(url, "GET") - if resp['status'] != '200': - raise Exception("Invalid response %s." % resp['status']) - - request_token = dict(urlparse.parse_qsl(content)) - - open('request-token.json', 'w').write(json.dumps(request_token)) - - print "Request Token:" - print " - oauth_token = %s" % request_token['oauth_token'] - print " - oauth_token_secret = %s" % request_token['oauth_token_secret'] - print - - # Step 2: Redirect to the provider. Since this is a CLI script we do not - # redirect. In a web application you would redirect the user to the URL - # below. - - redirect_url = "%s?oauth_token=%s" % (authorize_url, - request_token['oauth_token']) - start_response('302 Found', - [('Location', redirect_url)]) - return [] - elif path == '/callback': - qsdict = dict(urlparse.parse_qsl(qs)) - - # TODO: Ensure request_token['oauth_token'] - # is the same as the one in qsdict['oauth_token']. - - request_token = json.loads(open('request-token.json').read()) - token = oauth.Token(request_token['oauth_token'], - request_token['oauth_token_secret']) - token.set_verifier(qsdict['oauth_verifier']) - client = oauth.Client(consumer, token) - resp, content = client.request(access_token_url, "POST") - access_token = dict(urlparse.parse_qsl(content)) - print "Access Token:" - print " - oauth_token = %s" % access_token['oauth_token'] - print " - oauth_token_secret = %s" % access_token['oauth_token_secret'] - print " - user_id = %s" % access_token['user_id'] - print " - screen_name = %s" % access_token['screen_name'] - print - print "You may now access protected resources using the access tokens above." - print - start_response('200 OK', - [('Content-Type', 'text/plain')]) - return [json.dumps(access_token, indent=2)] - - start_response('404 Not Found', - [('Content-Type', 'text/plain')]) - return ['path not found: %s' % path] +app = twitter_client.TwitterOauthClientApp(consumer, oauth)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test_twitter_client.py Sat Jun 12 18:16:23 2010 -0700 @@ -0,0 +1,84 @@ +from minimock import Mock + +from twitter_client import TwitterOauthClientApp + +def app(request_tokens=None): + if request_tokens is None: + request_tokens = {} + consumer = 'mock consumer' + oauth = Mock('oauth') + toc = TwitterOauthClientApp(consumer, oauth, 'http://foo.com/oauth_callback', + request_tokens) + return (consumer, oauth, toc) + +def test_404(): + """ + >>> _, _, toc = app() + >>> environ = dict(PATH_INFO='/', QUERY_STRING='') + >>> toc(environ, Mock('start_response')) + Called start_response('404 Not Found', [('Content-Type', 'text/plain')]) + ['path not found: /'] + """ + + pass + +def test_request_redirect(self): + """ + >>> consumer, oauth, toc = app() + >>> client = Mock('client') + >>> client.request.mock_returns = ( + ... {'status': '200'}, + ... 'oauth_token=token&oauth_token_secret=secret&' + ... 'oauth_callback_confirmed=true' + ... ) + >>> oauth.Client.mock_returns = client + >>> environ = dict(PATH_INFO='/request', QUERY_STRING='') + >>> toc(environ, Mock('start_response')) + Called oauth.Client('mock consumer') + Called client.request( + 'https://api.twitter.com/oauth/request_token?oauth_callback=http%3A%2F%2Ffoo.com%2Foauth_callback', + 'GET') + Called start_response( + '302 Found', + [('Location', 'https://api.twitter.com/oauth/authorize?oauth_token=token')]) + [] + """ + + pass + +class MockToken(Mock): + def __repr__(self): + return "<Mock token>" + +def test_callback(self): + """ + >>> storage = {'token': {'oauth_token': 'token', 'oauth_token_secret': 'secret'}} + >>> consumer, oauth, toc = app(storage) + >>> client = Mock('client') + >>> client.request.mock_returns = ( + ... {'status': '200'}, + ... 'oauth_token=token&oauth_token_secret=secret&' + ... 'user_id=userid&screen_name=bob' + ... ) + >>> oauth.Client.mock_returns = client + >>> token = MockToken('token') + >>> oauth.Token.mock_returns = token + >>> environ = dict( + ... PATH_INFO='/callback', + ... QUERY_STRING='oauth_token=token&oauth_verifier=verifier' + ... ) + >>> toc(environ, Mock('start_response')) + Called oauth.Token(u'token', u'secret') + Called token.set_verifier('verifier') + Called oauth.Client('mock consumer', <Mock token>) + Called client.request('https://api.twitter.com/oauth/access_token', 'POST') + Called start_response('200 OK', [('Content-Type', 'text/plain')]) + ['success'] + """ + + pass + +if __name__ == '__main__': + import doctest + doctest.testmod() + print "all tests passed."
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/twitter_client.py Sat Jun 12 18:16:23 2010 -0700 @@ -0,0 +1,66 @@ +import json +import urlparse +import urllib + +class TwitterOauthClientApp(object): + request_token_url = 'https://api.twitter.com/oauth/request_token' + access_token_url = 'https://api.twitter.com/oauth/access_token' + authorize_url = 'https://api.twitter.com/oauth/authorize' + + def __init__(self, consumer, oauth, oauth_callback, request_tokens): + self.oauth_callback = oauth_callback + self.oauth = oauth + self.consumer = consumer + self.request_tokens = request_tokens + + def __call__(self, environ, start_response): + path = environ['PATH_INFO'] + qs = environ['QUERY_STRING'] + + if path == '/request': + # Step 1: Get a request token. This is a temporary token that is used for + # having the user authorize an access token and to sign the request to obtain + # said access token. + + url = '%s?%s' % ( + self.request_token_url, + urllib.urlencode({'oauth_callback': self.oauth_callback}) + ) + client = self.oauth.Client(self.consumer) + resp, content = client.request(url, "GET") + if resp['status'] != '200': + raise Exception("Invalid response %s." % resp['status']) + + request_token = dict(urlparse.parse_qsl(content)) + + self.request_tokens[request_token['oauth_token']] = request_token + + # Step 2: Redirect to the provider. + + redirect_url = "%s?oauth_token=%s" % (self.authorize_url, + request_token['oauth_token']) + start_response('302 Found', + [('Location', redirect_url)]) + return [] + elif path == '/callback': + qsdict = dict(urlparse.parse_qsl(qs)) + + if qsdict['oauth_token'] not in self.request_tokens: + raise Exception('invalid token: %s' % self.request_tokens) + + request_token = self.request_tokens[qsdict['oauth_token']] + del self.request_tokens[qsdict['oauth_token']] + + token = self.oauth.Token(request_token['oauth_token'], + request_token['oauth_token_secret']) + token.set_verifier(qsdict['oauth_verifier']) + client = self.oauth.Client(self.consumer, token) + resp, content = client.request(self.access_token_url, "POST") + access_token = dict(urlparse.parse_qsl(content)) + start_response('200 OK', + [('Content-Type', 'text/plain')]) + return ['success'] + + start_response('404 Not Found', + [('Content-Type', 'text/plain')]) + return ['path not found: %s' % path]