view cosocket.py @ 46:346573b610cd

Changed port/ip addr binding.
author Atul Varma <varmaa@toolness.com>
date Tue, 28 Apr 2009 17:37:09 +0000
parents 1683a8fc76b0
children 7e3b3eb57ec2
line wrap: on
line source

import sys
import socket
import asyncore
import asynchat
import types
import traceback

class _AsyncChatCoroutineDispatcher(asynchat.async_chat):
    def __init__(self, coroutine, conn = None):
        asynchat.async_chat.__init__(self, conn)
        self.set_terminator(None)
        self.__coroutine = coroutine
        self.__data = []
        self.__coroutine_stack = []
        if conn:
            self.continue_from_yield()

    def close_coroutine_and_return_to_caller(self, message):
        self.__close_coroutine(self.__coroutine)
        if self.__coroutine_stack:
            self.__coroutine = self.__coroutine_stack.pop()
            self.continue_from_yield(message)
        else:
            self.__coroutine = None

    def continue_from_yield(self, message = None,
                            exception_info = None):
        try:
            if exception_info:
                instruction = self.__coroutine.throw(*exception_info)
            else:
                instruction = self.__coroutine.send(message)
        except StopIteration:
            if self.__coroutine_stack:
                self.__coroutine = self.__coroutine_stack.pop()
                self.continue_from_yield()
            else:
                self.__coroutine = None
                self.handle_close()
        except Exception, e:
            if self.__coroutine_stack:
                self.__coroutine = self.__coroutine_stack.pop()
                self.continue_from_yield(exception_info = sys.exc_info())
            else:
                self.handle_error()
        else:
            if type(instruction) == types.GeneratorType:
                self.__coroutine_stack.append(self.__coroutine)
                self.__coroutine = instruction
                self.continue_from_yield()
            else:
                instruction.execute(self)

    def __close_coroutine(self, coroutine):
        try:
            coroutine.close()
        except Exception:
            self.log_info(traceback.format_exc(), 'error')

    def handle_close(self):
        if self.__coroutine:
            # Pass an exception back into the coroutine to kick
            # it out of whatever yielding state it's in.
            self.__close_coroutine(self.__coroutine)
            self.__coroutine = None
            while self.__coroutine_stack:
                self.__close_coroutine(self.__coroutine_stack.pop())
        self.close()

    def log_info(self, message, type='info'):
        # TODO: Use the logging module here.
        print '%s: %s' % (type, message)

    def handle_error(self):
        self.log_info(traceback.format_exc(), 'error')

    def handle_connect(self):
        self.continue_from_yield()

    def initiate_send(self):
        asynchat.async_chat.initiate_send(self)
        if ((not self.ac_out_buffer) and
            (len(self.producer_fifo) == 0) and
            self.connected):
            self.continue_from_yield()

    def collect_incoming_data(self, data):
        # TODO: Enforce some maximum data length.
        self.__data.append(data)

    def found_terminator(self):
        self.set_terminator(None)
        data = ''.join(self.__data)
        self.__data = []
        self.continue_from_yield(data)

class CoroutineSocketServer(asyncore.dispatcher):
    def __init__(self, addr, coroutineFactory):
        asyncore.dispatcher.__init__(self)
        self.__coroutineFactory = coroutineFactory
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind(addr)
        self.listen(1)

    def run(self):
        while 1:
            asyncore.loop(timeout = 0.5)

    def handle_accept(self):
        conn, addr = self.accept()
        coroutine = self.__coroutineFactory(addr)
        _AsyncChatCoroutineDispatcher(coroutine, conn)

class CoroutineSocketClient(_AsyncChatCoroutineDispatcher):
    def __init__(self, addr, coroutineFactory):
        coroutine = coroutineFactory(addr)
        _AsyncChatCoroutineDispatcher.__init__(self, coroutine)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect(addr)

# Instructions that coroutines yield.

class until_received(object):
    def __init__(self, terminator = None, bytes = None):
        if terminator:
            self._terminator = terminator
        elif bytes:
            self._terminator = bytes
        else:
            raise ValueError()

    def execute(self, dispatcher):
        dispatcher.set_terminator(self._terminator)

class until_sent(object):
    def __init__(self, content):
        if not content:
            raise ValueError(content)
        self.content = content

    def execute(self, dispatcher):
        dispatcher.push(self.content)

class return_value(object):
    def __init__(self, value):
        self.value = value

    def execute(self, dispatcher):
        dispatcher.close_coroutine_and_return_to_caller(self.value)