Mercurial > bzapi
view test_cosocket.py @ 34:b4fab248d1eb
Added a bunch of files from http://hg.toolness.com/cosocket
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Thu, 24 Dec 2009 15:40:39 -0800 |
parents | |
children |
line wrap: on
line source
import cosocket import unittest class Tests(unittest.TestCase): PORT = 38424 IP = '127.0.0.1' def testSimple(self): done = dict(client = 0, server = 0, connection = 0) def server_coroutine(): yield cosocket.until_listening((self.IP, self.PORT)) cosocket.AsyncChatCoroutine(client_coroutine()) conn, addr = yield cosocket.until_connection_accepted() cosocket.AsyncChatCoroutine(connection_coroutine(addr), conn) done['server'] += 1 def client_coroutine(): yield cosocket.until_connected((self.IP, self.PORT)) yield cosocket.until_sent('hai2u\r\n') data = yield cosocket.until_received(terminator = '\r\n') self.assertEqual(data, 'hai2u') done['client'] += 1 def connection_coroutine(addr): data = yield cosocket.until_received('\r\n') yield cosocket.until_sent(data + '\r\n') done['connection'] += 1 cosocket.AsyncChatCoroutine(server_coroutine()) cosocket.loop() self.assertEqual(done['server'], 1) if __name__ == '__main__': unittest.main()