changeset 97:0d3dd2ab36cd

Factored out all instruction-specific logic from the dispatcher into individual instructions.
author Atul Varma <varmaa@toolness.com>
date Sat, 02 May 2009 00:14:59 -0700
parents 68598f164855
children 06aa973a54c3
files cosocket.py
diffstat 1 files changed, 41 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/cosocket.py	Fri May 01 23:47:06 2009 -0700
+++ b/cosocket.py	Sat May 02 00:14:59 2009 -0700
@@ -95,13 +95,14 @@
                 self.continue_from_yield()
             else:
                 self.__coroutine = None
-                self.__handler.handle_coroutine_complete()
+                self.__handler.handle_coroutine_complete(None)
         except Exception, e:
             if self.__coroutine_stack:
                 self.__coroutine = self.__coroutine_stack.pop()
                 self.continue_from_yield(exception_info = sys.exc_info())
             else:
                 self.__log_error()
+                self.__handler.handle_coroutine_complete(e)
         else:
             if type(instruction) == types.GeneratorType:
                 self.__coroutine_stack.append(self.__coroutine)
@@ -115,18 +116,17 @@
         asynchat.async_chat.__init__(self, conn)
         self.trampoline = _Trampoline(coroutine, self)
         self.set_terminator(None)
-        self.__max_data = DEFAULT_MAX_DATA
-        self.__timeout = 0
-        self.__data = []
-        self.__data_len = 0
         if conn:
             self.trampoline.continue_from_yield()
 
     def handle_coroutine_instruction(self, instruction):
+        self.__instruction = instruction
         instruction.execute(self)
 
-    def handle_coroutine_complete(self):
-        self.handle_close()
+    def handle_coroutine_complete(self, exception):
+        self.__instruction = None
+        if not exception:
+            self.handle_close()
 
     def handle_close(self):
         self.trampoline.close_coroutine_stack()
@@ -146,47 +146,27 @@
                       'error')
 
     def handle_connect(self):
-        self.trampoline.continue_from_yield()
+        self.__instruction.handle_connect()
 
     def initiate_send(self):
         asynchat.async_chat.initiate_send(self)
-        if ((not self.ac_out_buffer) and
-            (len(self.producer_fifo) == 0) and
-            self.connected):
-            self.trampoline.continue_from_yield()
+        self.__instruction.handle_initiate_send()
 
     def __on_timeout(self):
-        self.log_info("Timeout expired (%ss)." % self.__timeout,
-                      'error')
-        self.handle_close()
+        self.__instruction.handle_timeout()
 
     def clear_timeout(self):
-        self.__timeout = 0
         if self.__on_timeout in time_map:
             del time_map[self.__on_timeout]
 
     def set_timeout(self, timeout):
-        self.__timeout = timeout
         time_map[self.__on_timeout] = timeout
 
     def collect_incoming_data(self, data):
-        self.__data.append(data)
-        self.__data_len += len(data)
-        if self.__max_data and self.__data_len > self.__max_data:
-            self.log_info("Max data reached (%s bytes)." % self.__max_data,
-                          'error')
-            self.handle_close()
-
-    def set_max_data(self, amount):
-        self.__max_data = amount
+        self.__instruction.collect_incoming_data(data)
 
     def found_terminator(self):
-        if not (self.__max_data and self.__data_len > self.__max_data):
-            self.set_terminator(None)
-            data = ''.join(self.__data)
-            self.__data = []
-            self.__data_len = 0
-            self.trampoline.continue_from_yield(data)
+        self.__instruction.found_terminator()
 
 class CoroutineSocketServer(asyncore.dispatcher):
     def __init__(self, addr, coroutineFactory):
@@ -223,6 +203,9 @@
         self.dispatcher = dispatcher
         self.do_execute(*self.__args, **self.__kwargs)
 
+    def handle_timeout(self):
+        self.dispatcher.handle_close()
+
 class until_received(CoroutineInstruction):
     def do_execute(self,
                    terminator = None,
@@ -237,13 +220,38 @@
             self.dispatcher.set_terminator(bytes)
         else:
             raise ValueError('Must specify terminator or bytes')
-        self.dispatcher.set_max_data(max_data)
+        self.__max_data = max_data
+        self.__data = []
+        self.__data_len = 0
+
+    def collect_incoming_data(self, data):
+        self.__data.append(data)
+        self.__data_len += len(data)
+        if self.__max_data and self.__data_len > self.__max_data:
+            logging.error("Max data reached (%s bytes)." % self.__max_data)
+            self.dispatcher.handle_close()
+
+    def found_terminator(self):
+        if not (self.__max_data and self.__data_len > self.__max_data):
+            self.dispatcher.set_terminator(None)
+            data = ''.join(self.__data)
+            self.__data = []
+            self.__data_len = 0
+            self.dispatcher.clear_timeout()
+            self.dispatcher.trampoline.continue_from_yield(data)
 
 class until_sent(CoroutineInstruction):
     def do_execute(self, content, timeout = DEFAULT_TIMEOUT):
         self.dispatcher.set_timeout(timeout)
         self.dispatcher.push(content)
 
+    def handle_initiate_send(self):
+        if ((not self.dispatcher.ac_out_buffer) and
+            (len(self.dispatcher.producer_fifo) == 0) and
+            self.dispatcher.connected):
+            self.dispatcher.clear_timeout()
+            self.dispatcher.trampoline.continue_from_yield()
+
 class return_value(CoroutineInstruction):
     def do_execute(self, value):
         self.dispatcher.trampoline.close_coroutine_and_return_to_caller(value)