Mercurial > cosocket
changeset 97:0d3dd2ab36cd
Factored out all instruction-specific logic from the dispatcher into individual instructions.
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Sat, 02 May 2009 00:14:59 -0700 |
parents | 68598f164855 |
children | 06aa973a54c3 |
files | cosocket.py |
diffstat | 1 files changed, 41 insertions(+), 33 deletions(-) [+] |
line wrap: on
line diff
--- a/cosocket.py Fri May 01 23:47:06 2009 -0700 +++ b/cosocket.py Sat May 02 00:14:59 2009 -0700 @@ -95,13 +95,14 @@ self.continue_from_yield() else: self.__coroutine = None - self.__handler.handle_coroutine_complete() + self.__handler.handle_coroutine_complete(None) except Exception, e: if self.__coroutine_stack: self.__coroutine = self.__coroutine_stack.pop() self.continue_from_yield(exception_info = sys.exc_info()) else: self.__log_error() + self.__handler.handle_coroutine_complete(e) else: if type(instruction) == types.GeneratorType: self.__coroutine_stack.append(self.__coroutine) @@ -115,18 +116,17 @@ asynchat.async_chat.__init__(self, conn) self.trampoline = _Trampoline(coroutine, self) self.set_terminator(None) - self.__max_data = DEFAULT_MAX_DATA - self.__timeout = 0 - self.__data = [] - self.__data_len = 0 if conn: self.trampoline.continue_from_yield() def handle_coroutine_instruction(self, instruction): + self.__instruction = instruction instruction.execute(self) - def handle_coroutine_complete(self): - self.handle_close() + def handle_coroutine_complete(self, exception): + self.__instruction = None + if not exception: + self.handle_close() def handle_close(self): self.trampoline.close_coroutine_stack() @@ -146,47 +146,27 @@ 'error') def handle_connect(self): - self.trampoline.continue_from_yield() + self.__instruction.handle_connect() def initiate_send(self): asynchat.async_chat.initiate_send(self) - if ((not self.ac_out_buffer) and - (len(self.producer_fifo) == 0) and - self.connected): - self.trampoline.continue_from_yield() + self.__instruction.handle_initiate_send() def __on_timeout(self): - self.log_info("Timeout expired (%ss)." % self.__timeout, - 'error') - self.handle_close() + self.__instruction.handle_timeout() def clear_timeout(self): - self.__timeout = 0 if self.__on_timeout in time_map: del time_map[self.__on_timeout] def set_timeout(self, timeout): - self.__timeout = timeout time_map[self.__on_timeout] = timeout def collect_incoming_data(self, data): - self.__data.append(data) - self.__data_len += len(data) - if self.__max_data and self.__data_len > self.__max_data: - self.log_info("Max data reached (%s bytes)." % self.__max_data, - 'error') - self.handle_close() - - def set_max_data(self, amount): - self.__max_data = amount + self.__instruction.collect_incoming_data(data) def found_terminator(self): - if not (self.__max_data and self.__data_len > self.__max_data): - self.set_terminator(None) - data = ''.join(self.__data) - self.__data = [] - self.__data_len = 0 - self.trampoline.continue_from_yield(data) + self.__instruction.found_terminator() class CoroutineSocketServer(asyncore.dispatcher): def __init__(self, addr, coroutineFactory): @@ -223,6 +203,9 @@ self.dispatcher = dispatcher self.do_execute(*self.__args, **self.__kwargs) + def handle_timeout(self): + self.dispatcher.handle_close() + class until_received(CoroutineInstruction): def do_execute(self, terminator = None, @@ -237,13 +220,38 @@ self.dispatcher.set_terminator(bytes) else: raise ValueError('Must specify terminator or bytes') - self.dispatcher.set_max_data(max_data) + self.__max_data = max_data + self.__data = [] + self.__data_len = 0 + + def collect_incoming_data(self, data): + self.__data.append(data) + self.__data_len += len(data) + if self.__max_data and self.__data_len > self.__max_data: + logging.error("Max data reached (%s bytes)." % self.__max_data) + self.dispatcher.handle_close() + + def found_terminator(self): + if not (self.__max_data and self.__data_len > self.__max_data): + self.dispatcher.set_terminator(None) + data = ''.join(self.__data) + self.__data = [] + self.__data_len = 0 + self.dispatcher.clear_timeout() + self.dispatcher.trampoline.continue_from_yield(data) class until_sent(CoroutineInstruction): def do_execute(self, content, timeout = DEFAULT_TIMEOUT): self.dispatcher.set_timeout(timeout) self.dispatcher.push(content) + def handle_initiate_send(self): + if ((not self.dispatcher.ac_out_buffer) and + (len(self.dispatcher.producer_fifo) == 0) and + self.dispatcher.connected): + self.dispatcher.clear_timeout() + self.dispatcher.trampoline.continue_from_yield() + class return_value(CoroutineInstruction): def do_execute(self, value): self.dispatcher.trampoline.close_coroutine_and_return_to_caller(value)