# HG changeset patch # User Atul Varma # Date 1241221591 25200 # Node ID 43d37495e9d41e6efd1c1a092c682cf92a9ec046 # Parent 408738c3cd5d44cd2ac025e448115cf8a85683ee Added timeout functionality. diff -r 408738c3cd5d -r 43d37495e9d4 channels.py --- a/channels.py Fri May 01 15:37:57 2009 -0700 +++ b/channels.py Fri May 01 16:46:31 2009 -0700 @@ -16,8 +16,9 @@ dispatcher.continue_from_yield() class _until_message_received(object): - def __init__(self, channel_name): + def __init__(self, channel_name, timeout = cosocket.DEFAULT_TIMEOUT): self.channel_name = channel_name + self._timeout = timeout self._fd = None def execute(self, dispatcher): @@ -25,6 +26,7 @@ _channels[self.channel_name] = {} self._fd = dispatcher.socket.fileno() _channels[self.channel_name][self._fd] = dispatcher + dispatcher.set_timeout(self._timeout) def finalize(self): if (self.channel_name in _channels and diff -r 408738c3cd5d -r 43d37495e9d4 cosocket.py --- a/cosocket.py Fri May 01 15:37:57 2009 -0700 +++ b/cosocket.py Fri May 01 16:46:31 2009 -0700 @@ -7,9 +7,11 @@ import time import weakref -coroutine_dispatchers = [] +DEFAULT_TIMEOUT = 90.0 -def loop(timeout): +time_map = {} + +def loop(timeout = DEFAULT_TIMEOUT): start_time = time.time() while 1: asyncore.loop(timeout = timeout, count = 1) @@ -17,25 +19,18 @@ time_elapsed = curr_time - start_time if time_elapsed > timeout: start_time = curr_time - new_coroutine_dispatchers = [] - for ref in coroutine_dispatchers: - dispatcher = ref() - if dispatcher: - new_coroutine_dispatchers.append(ref) - try: - dispatcher.handle_tick(time_elapsed) - except: - dispatcher.handle_error() - coroutine_dispatchers[:] = new_coroutine_dispatchers + for func in time_map.values(): + func(time_elapsed) class _AsyncChatCoroutineDispatcher(asynchat.async_chat): def __init__(self, coroutine, conn = None): - coroutine_dispatchers.append(weakref.ref(self)) asynchat.async_chat.__init__(self, conn) self.set_terminator(None) self.__coroutine = coroutine self.__data = [] self.__coroutine_stack = [] + self.__timeout = 0 + self.__time_passed = 0 if conn: self.continue_from_yield() @@ -49,6 +44,7 @@ def continue_from_yield(self, message = None, exception_info = None): + self.clear_timeout() try: if exception_info: instruction = self.__coroutine.throw(*exception_info) @@ -95,9 +91,6 @@ (filename, lineno, name)) return '\n'.join(lines) - def handle_tick(self, time_elapsed): - pass - def handle_close(self): if self.__coroutine: # Pass an exception back into the coroutine to kick @@ -106,6 +99,7 @@ self.__coroutine = None while self.__coroutine_stack: self.__close_coroutine(self.__coroutine_stack.pop()) + self.clear_timeout() self.close() def log_info(self, message, type='info'): @@ -127,6 +121,22 @@ self.connected): self.continue_from_yield() + def __on_tick(self, time_elapsed): + self.__time_passed += time_elapsed + if self.__time_passed > self.__timeout: + self.handle_close() + + def clear_timeout(self): + self.__timeout = 0 + self.__time_passed = 0 + if self.__on_tick in time_map: + del time_map[self.__on_tick] + + def set_timeout(self, timeout): + self.__timeout = timeout + self.__time_passed = 0 + time_map[self.__on_tick] = self.__on_tick + def collect_incoming_data(self, data): # TODO: Enforce some maximum data length. self.__data.append(data) @@ -146,7 +156,7 @@ self.bind(addr) self.listen(1) - def run(self, timeout): + def run(self, timeout = DEFAULT_TIMEOUT): loop(timeout) def handle_accept(self): @@ -164,7 +174,9 @@ # Instructions that coroutines yield. class until_received(object): - def __init__(self, terminator = None, bytes = None): + def __init__(self, terminator = None, bytes = None, + timeout = DEFAULT_TIMEOUT): + self._timeout = timeout if terminator: self._terminator = terminator elif bytes: @@ -173,15 +185,18 @@ raise ValueError() def execute(self, dispatcher): + dispatcher.set_timeout(self._timeout) dispatcher.set_terminator(self._terminator) class until_sent(object): - def __init__(self, content): + def __init__(self, content, timeout = DEFAULT_TIMEOUT): if not content: raise ValueError(content) + self._timeout = timeout self.content = content def execute(self, dispatcher): + dispatcher.set_timeout(self._timeout) dispatcher.push(self.content) class return_value(object): diff -r 408738c3cd5d -r 43d37495e9d4 openwebchat.py --- a/openwebchat.py Fri May 01 15:37:57 2009 -0700 +++ b/openwebchat.py Fri May 01 16:46:31 2009 -0700 @@ -16,6 +16,9 @@ except ImportError: import simplejson as json +KEEP_ALIVE_MAX_REQUESTS = 99 +KEEP_ALIVE_TIMEOUT = int(DEFAULT_TIMEOUT) + class Conversations(object): def __init__(self): self._convs = weakref.WeakValueDictionary() @@ -84,7 +87,9 @@ additional_headers = None): headers = {'Content-Type': mimetype} if self._is_keep_alive: - headers.update({'Keep-Alive': 'timeout=99, max=99', + headers.update({'Keep-Alive': 'timeout=%d, max=%d' % + (KEEP_ALIVE_TIMEOUT, + KEEP_ALIVE_MAX_REQUESTS), 'Connection': 'Keep-Alive'}) if additional_headers: headers.update(additional_headers) @@ -123,7 +128,7 @@ self._num_connections += 1 try: if self._is_keep_alive: - while 1: + for i in range(KEEP_ALIVE_MAX_REQUESTS): yield self._until_one_request_processed(addr) else: yield self._until_one_request_processed(addr) @@ -231,6 +236,7 @@ if conv_name == 'status': # TODO: Return 404 if page is non-empty. lines = ('open connections : %d' % self._num_connections, + 'open timers : %d' % len(time_map), 'open conversations: %d'% len(self._convs)) yield self._until_http_response_sent('\r\n'.join(lines)) elif conv_name == 'media':