changeset 87:43d37495e9d4

Added timeout functionality.
author Atul Varma <varmaa@toolness.com>
date Fri, 01 May 2009 16:46:31 -0700
parents 408738c3cd5d
children ce5060140af5
files channels.py cosocket.py openwebchat.py
diffstat 3 files changed, 45 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- a/channels.py	Fri May 01 15:37:57 2009 -0700
+++ b/channels.py	Fri May 01 16:46:31 2009 -0700
@@ -16,8 +16,9 @@
         dispatcher.continue_from_yield()
 
 class _until_message_received(object):
-    def __init__(self, channel_name):
+    def __init__(self, channel_name, timeout = cosocket.DEFAULT_TIMEOUT):
         self.channel_name = channel_name
+        self._timeout = timeout
         self._fd = None
 
     def execute(self, dispatcher):
@@ -25,6 +26,7 @@
             _channels[self.channel_name] = {}
         self._fd = dispatcher.socket.fileno()
         _channels[self.channel_name][self._fd] = dispatcher
+        dispatcher.set_timeout(self._timeout)
 
     def finalize(self):
         if (self.channel_name in _channels and
--- a/cosocket.py	Fri May 01 15:37:57 2009 -0700
+++ b/cosocket.py	Fri May 01 16:46:31 2009 -0700
@@ -7,9 +7,11 @@
 import time
 import weakref
 
-coroutine_dispatchers = []
+DEFAULT_TIMEOUT = 90.0
 
-def loop(timeout):
+time_map = {}
+
+def loop(timeout = DEFAULT_TIMEOUT):
     start_time = time.time()
     while 1:
         asyncore.loop(timeout = timeout, count = 1)
@@ -17,25 +19,18 @@
         time_elapsed = curr_time - start_time
         if time_elapsed > timeout:
             start_time = curr_time
-            new_coroutine_dispatchers = []
-            for ref in coroutine_dispatchers:
-                dispatcher = ref()
-                if dispatcher:
-                    new_coroutine_dispatchers.append(ref)
-                    try:
-                        dispatcher.handle_tick(time_elapsed)
-                    except:
-                        dispatcher.handle_error()
-            coroutine_dispatchers[:] = new_coroutine_dispatchers
+            for func in time_map.values():
+                func(time_elapsed)
 
 class _AsyncChatCoroutineDispatcher(asynchat.async_chat):
     def __init__(self, coroutine, conn = None):
-        coroutine_dispatchers.append(weakref.ref(self))
         asynchat.async_chat.__init__(self, conn)
         self.set_terminator(None)
         self.__coroutine = coroutine
         self.__data = []
         self.__coroutine_stack = []
+        self.__timeout = 0
+        self.__time_passed = 0
         if conn:
             self.continue_from_yield()
 
@@ -49,6 +44,7 @@
 
     def continue_from_yield(self, message = None,
                             exception_info = None):
+        self.clear_timeout()
         try:
             if exception_info:
                 instruction = self.__coroutine.throw(*exception_info)
@@ -95,9 +91,6 @@
                           (filename, lineno, name))
         return '\n'.join(lines)
 
-    def handle_tick(self, time_elapsed):
-        pass
-
     def handle_close(self):
         if self.__coroutine:
             # Pass an exception back into the coroutine to kick
@@ -106,6 +99,7 @@
             self.__coroutine = None
             while self.__coroutine_stack:
                 self.__close_coroutine(self.__coroutine_stack.pop())
+        self.clear_timeout()
         self.close()
 
     def log_info(self, message, type='info'):
@@ -127,6 +121,22 @@
             self.connected):
             self.continue_from_yield()
 
+    def __on_tick(self, time_elapsed):
+        self.__time_passed += time_elapsed
+        if self.__time_passed > self.__timeout:
+            self.handle_close()
+
+    def clear_timeout(self):
+        self.__timeout = 0
+        self.__time_passed = 0
+        if self.__on_tick in time_map:
+            del time_map[self.__on_tick]
+
+    def set_timeout(self, timeout):
+        self.__timeout = timeout
+        self.__time_passed = 0
+        time_map[self.__on_tick] = self.__on_tick
+
     def collect_incoming_data(self, data):
         # TODO: Enforce some maximum data length.
         self.__data.append(data)
@@ -146,7 +156,7 @@
         self.bind(addr)
         self.listen(1)
 
-    def run(self, timeout):
+    def run(self, timeout = DEFAULT_TIMEOUT):
         loop(timeout)
 
     def handle_accept(self):
@@ -164,7 +174,9 @@
 # Instructions that coroutines yield.
 
 class until_received(object):
-    def __init__(self, terminator = None, bytes = None):
+    def __init__(self, terminator = None, bytes = None,
+                 timeout = DEFAULT_TIMEOUT):
+        self._timeout = timeout
         if terminator:
             self._terminator = terminator
         elif bytes:
@@ -173,15 +185,18 @@
             raise ValueError()
 
     def execute(self, dispatcher):
+        dispatcher.set_timeout(self._timeout)
         dispatcher.set_terminator(self._terminator)
 
 class until_sent(object):
-    def __init__(self, content):
+    def __init__(self, content, timeout = DEFAULT_TIMEOUT):
         if not content:
             raise ValueError(content)
+        self._timeout = timeout
         self.content = content
 
     def execute(self, dispatcher):
+        dispatcher.set_timeout(self._timeout)
         dispatcher.push(self.content)
 
 class return_value(object):
--- a/openwebchat.py	Fri May 01 15:37:57 2009 -0700
+++ b/openwebchat.py	Fri May 01 16:46:31 2009 -0700
@@ -16,6 +16,9 @@
 except ImportError:
     import simplejson as json
 
+KEEP_ALIVE_MAX_REQUESTS = 99
+KEEP_ALIVE_TIMEOUT = int(DEFAULT_TIMEOUT)
+
 class Conversations(object):
     def __init__(self):
         self._convs = weakref.WeakValueDictionary()
@@ -84,7 +87,9 @@
                                   additional_headers = None):
         headers = {'Content-Type': mimetype}
         if self._is_keep_alive:
-            headers.update({'Keep-Alive': 'timeout=99, max=99',
+            headers.update({'Keep-Alive': 'timeout=%d, max=%d' %
+                            (KEEP_ALIVE_TIMEOUT,
+                             KEEP_ALIVE_MAX_REQUESTS),
                             'Connection': 'Keep-Alive'})
         if additional_headers:
             headers.update(additional_headers)
@@ -123,7 +128,7 @@
         self._num_connections += 1
         try:
             if self._is_keep_alive:
-                while 1:
+                for i in range(KEEP_ALIVE_MAX_REQUESTS):
                     yield self._until_one_request_processed(addr)
             else:
                 yield self._until_one_request_processed(addr)
@@ -231,6 +236,7 @@
             if conv_name == 'status':
                 # TODO: Return 404 if page is non-empty.
                 lines = ('open connections  : %d' % self._num_connections,
+                         'open timers       : %d' % len(time_map),
                          'open conversations: %d'% len(self._convs))
                 yield self._until_http_response_sent('\r\n'.join(lines))
             elif conv_name == 'media':