view taw.py @ 4:4e6ce85226f4

Fixed a bug in receiving of data.
author Atul Varma <varmaa@toolness.com>
date Fri, 17 Apr 2009 16:23:31 -0700
parents 9e819377ce9f
children 4ec829dc7d1d
line wrap: on
line source

import time
import socket
import asyncore
import asynchat

class AsyncChatCoroutineBridge(asynchat.async_chat):
    def __init__(self, coroutine, conn = None):
        asynchat.async_chat.__init__(self, conn)
        self.set_terminator(None)
        self.__coroutine = coroutine
        self.__data = []
        if conn:
            self.__process_next_instruction()

    def __process_next_instruction(self, feedback = None):
        try:
            instruction = self.__coroutine.send(feedback)
        except StopIteration:
            instruction = {'op': 'close'}

        if instruction['op'] == 'receive':
            if instruction['terminator']:
                self.set_terminator(instruction['terminator'])
            elif instruction['length']:
                self.set_terminator(instruction['length'])
            else:
                raise ValueError(instruction)
        elif instruction['op'] == 'send':
            self.push(instruction['content'])
        elif instruction['op'] == 'close':
            self.close()

    def handle_connect(self):
        self.__process_next_instruction()

    def initiate_send(self):
        asynchat.async_chat.initiate_send(self)
        if ((not self.ac_out_buffer) and
            (len(self.producer_fifo) == 0) and
            self.connected):
            self.__process_next_instruction()

    def collect_incoming_data(self, data):
        # TODO: Enforce some maximum data length.
        self.__data.append(data)

    def found_terminator(self):
        self.set_terminator(None)
        data = ''.join(self.__data)
        self.__data = []
        self.__process_next_instruction(data)

class ChattyCoroutineServer(asyncore.dispatcher):
    def __init__(self, addr, coroutineFactory):
        asyncore.dispatcher.__init__(self)
        self.__coroutineFactory = coroutineFactory
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind(addr)
        self.listen(1)

    def run(self):
        while 1:
            asyncore.loop(timeout = 0.5)

    def handle_accept(self):
        conn, addr = self.accept()
        coroutine = self.__coroutineFactory(ChattyInstructions(), addr)
        AsyncChatCoroutineBridge(coroutine, conn)

class ChattyCoroutineClient(AsyncChatCoroutineBridge):
    def __init__(self, addr, coroutineFactory):
        coroutine = coroutineFactory(ChattyInstructions(), addr)
        AsyncChatCoroutineBridge.__init__(self, coroutine)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect(addr)

class ChattyInstructions(object):
    def receive(self, terminator = None, length = None):
        return {'op': 'receive',
                'terminator': terminator,
                'length': length}

    def send(self, content):
        return {'op': 'send',
                'content': content}

def lame_http_server_coroutine(chat, addr):
    req = yield chat.receive(terminator = '\r\n\r\n')
    msg = 'hello %s.' % addr[0]
    yield chat.send('HTTP/1.1 200 OK\r\n' +
                    'Content-Length: %d\r\n' % len(msg) +
                    'Content-Type: text/plain\r\n\r\n' +
                    msg)

def lame_http_client_coroutine(chat, addr):
    yield chat.send('GET / HTTP/1.1\r\n\r\n')
    response_headers = yield chat.receive(terminator = '\r\n\r\n')
    response = yield chat.receive(20)
    print 'first response: %s' % repr(response)
    response = yield chat.receive(20)
    print 'second response: %s' % repr(response)

if __name__ == '__main__':
    server = ChattyCoroutineServer(('127.0.0.1', 8071),
                                   lame_http_server_coroutine)
    client = ChattyCoroutineClient(('www.google.com', 80),
                                   lame_http_client_coroutine)
    server.run()