# HG changeset patch # User Atul Varma # Date 1240006699 25200 # Node ID f1159b9ec8237cf44a7488e2d3288f8a4831b46c # Parent fb5e84a8eb8affaac25f3865835f004c015e8736 Initial implementation of coroutine-based server. diff -r fb5e84a8eb8a -r f1159b9ec823 taw.py --- a/taw.py Fri Apr 17 11:28:28 2009 -0700 +++ b/taw.py Fri Apr 17 15:18:19 2009 -0700 @@ -1,43 +1,87 @@ import time import socket -import select +import asyncore +import asynchat + +class ChattyCoroutineServer(asyncore.dispatcher): + def __init__(self, addr, coroutineFactory): + asyncore.dispatcher.__init__(self) + self.__coroutineFactory = coroutineFactory + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind(addr) + self.listen(1) + + def run(self): + while 1: + asyncore.loop(timeout = 0.5) + + def handle_accept(self): + conn, addr = self.accept() + AsyncChatCoroutineBridge(conn, addr, self.__coroutineFactory()) + +class AsyncChatCoroutineBridge(asynchat.async_chat): + def __init__(self, conn, addr, coroutine): + asynchat.async_chat.__init__(self, conn) + self.set_terminator(None) + self.__coroutine = coroutine(addr) + self.__data = [] + self.__process_next_instruction() + + def __process_next_instruction(self, feedback = None): + try: + instruction = self.__coroutine.send(feedback) + except StopIteration: + instruction = {'op': 'close'} + + if instruction['op'] == 'receive': + if instruction['terminator']: + self.set_terminator(instruction['terminator']) + elif instruction['length']: + self.set_terminator(instruction['length']) + else: + raise ValueError(instruction) + elif instruction['op'] == 'send': + self.push(instruction['content']) + elif instruction['op'] == 'close': + self.close() + + def initiate_send(self): + asynchat.async_chat.initiate_send(self) + if ((not self.ac_out_buffer) and + (len(self.producer_fifo) == 0) and + self.connected): + self.__process_next_instruction() + + def collect_incoming_data(self, data): + # TODO: Enforce some maximum data length. + self.__data.append(data) + + def found_terminator(self): + self.set_terminator(None) + data = ''.join(self.__data) + self.__process_next_instruction(data) + +class ChattyCoroutine(object): + def receive(self, terminator = None, length = None): + return {'op': 'receive', + 'terminator': terminator, + 'length': length} + + def send(self, content): + return {'op': 'send', + 'content': content} + +class LameHttpCoroutine(ChattyCoroutine): + def __call__(self, addr): + req = yield self.receive(terminator = '\r\n\r\n') + msg = 'hello %s.' % addr[0] + yield self.send('HTTP/1.1 200 OK\r\n' + + 'Content-Length: %d\r\n' % len(msg) + + 'Content-Type: text/plain\r\n\r\n' + + msg) if __name__ == '__main__': - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('127.0.0.1', 8071)) - sock.setblocking(0) - sock.listen(1) - print "waiting for connection." - reading_list = [sock] - countdowns = [] - last_time = time.time() - while True: - rlist, wlist, xlist = select.select(reading_list, [], [], 0.25) - new_countdowns = [] - now = time.time() - time_elapsed = now - last_time - last_time = now - for read_socket, timeleft in countdowns: - timeleft -= time_elapsed - if timeleft <= 0: - msg = "go away." - read_socket.send('HTTP/1.1 200 OK\r\n' + - 'Content-Length: %d\r\n' % len(msg) + - 'Content-Type: text/plain\r\n\r\n' + - msg) - read_socket.close() - else: - new_countdowns.append((read_socket, timeleft)) - countdowns = new_countdowns - for read_socket in rlist: - if read_socket == sock: - conn, addr = sock.accept() - print "got conn: %s" % conn - reading_list.append(conn) - else: - data = read_socket.recv(4096) - if '\r\n\r\n' in data: - reading_list.remove(read_socket) - countdowns.append((read_socket, 5.0)) - print "bye." + server = ChattyCoroutineServer(('127.0.0.1', 8071), + LameHttpCoroutine) + server.run()