Mercurial > cosocket
view cosocket.py @ 40:11c4fb8bec3c
Added timestamp to messages, though they're not currently displayed in any way.
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Mon, 27 Apr 2009 22:26:57 -0700 |
parents | 1683a8fc76b0 |
children | 7e3b3eb57ec2 |
line wrap: on
line source
import sys import socket import asyncore import asynchat import types import traceback class _AsyncChatCoroutineDispatcher(asynchat.async_chat): def __init__(self, coroutine, conn = None): asynchat.async_chat.__init__(self, conn) self.set_terminator(None) self.__coroutine = coroutine self.__data = [] self.__coroutine_stack = [] if conn: self.continue_from_yield() def close_coroutine_and_return_to_caller(self, message): self.__close_coroutine(self.__coroutine) if self.__coroutine_stack: self.__coroutine = self.__coroutine_stack.pop() self.continue_from_yield(message) else: self.__coroutine = None def continue_from_yield(self, message = None, exception_info = None): try: if exception_info: instruction = self.__coroutine.throw(*exception_info) else: instruction = self.__coroutine.send(message) except StopIteration: if self.__coroutine_stack: self.__coroutine = self.__coroutine_stack.pop() self.continue_from_yield() else: self.__coroutine = None self.handle_close() except Exception, e: if self.__coroutine_stack: self.__coroutine = self.__coroutine_stack.pop() self.continue_from_yield(exception_info = sys.exc_info()) else: self.handle_error() else: if type(instruction) == types.GeneratorType: self.__coroutine_stack.append(self.__coroutine) self.__coroutine = instruction self.continue_from_yield() else: instruction.execute(self) def __close_coroutine(self, coroutine): try: coroutine.close() except Exception: self.log_info(traceback.format_exc(), 'error') def handle_close(self): if self.__coroutine: # Pass an exception back into the coroutine to kick # it out of whatever yielding state it's in. self.__close_coroutine(self.__coroutine) self.__coroutine = None while self.__coroutine_stack: self.__close_coroutine(self.__coroutine_stack.pop()) self.close() def log_info(self, message, type='info'): # TODO: Use the logging module here. print '%s: %s' % (type, message) def handle_error(self): self.log_info(traceback.format_exc(), 'error') def handle_connect(self): self.continue_from_yield() def initiate_send(self): asynchat.async_chat.initiate_send(self) if ((not self.ac_out_buffer) and (len(self.producer_fifo) == 0) and self.connected): self.continue_from_yield() def collect_incoming_data(self, data): # TODO: Enforce some maximum data length. self.__data.append(data) def found_terminator(self): self.set_terminator(None) data = ''.join(self.__data) self.__data = [] self.continue_from_yield(data) class CoroutineSocketServer(asyncore.dispatcher): def __init__(self, addr, coroutineFactory): asyncore.dispatcher.__init__(self) self.__coroutineFactory = coroutineFactory self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(addr) self.listen(1) def run(self): while 1: asyncore.loop(timeout = 0.5) def handle_accept(self): conn, addr = self.accept() coroutine = self.__coroutineFactory(addr) _AsyncChatCoroutineDispatcher(coroutine, conn) class CoroutineSocketClient(_AsyncChatCoroutineDispatcher): def __init__(self, addr, coroutineFactory): coroutine = coroutineFactory(addr) _AsyncChatCoroutineDispatcher.__init__(self, coroutine) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(addr) # Instructions that coroutines yield. class until_received(object): def __init__(self, terminator = None, bytes = None): if terminator: self._terminator = terminator elif bytes: self._terminator = bytes else: raise ValueError() def execute(self, dispatcher): dispatcher.set_terminator(self._terminator) class until_sent(object): def __init__(self, content): if not content: raise ValueError(content) self.content = content def execute(self, dispatcher): dispatcher.push(self.content) class return_value(object): def __init__(self, value): self.value = value def execute(self, dispatcher): dispatcher.close_coroutine_and_return_to_caller(self.value)