comparison cosocket.py @ 97:0d3dd2ab36cd

Factored out all instruction-specific logic from the dispatcher into individual instructions.
author Atul Varma <varmaa@toolness.com>
date Sat, 02 May 2009 00:14:59 -0700
parents 68598f164855
children 06aa973a54c3
comparison
equal deleted inserted replaced
96:68598f164855 97:0d3dd2ab36cd
93 if self.__coroutine_stack: 93 if self.__coroutine_stack:
94 self.__coroutine = self.__coroutine_stack.pop() 94 self.__coroutine = self.__coroutine_stack.pop()
95 self.continue_from_yield() 95 self.continue_from_yield()
96 else: 96 else:
97 self.__coroutine = None 97 self.__coroutine = None
98 self.__handler.handle_coroutine_complete() 98 self.__handler.handle_coroutine_complete(None)
99 except Exception, e: 99 except Exception, e:
100 if self.__coroutine_stack: 100 if self.__coroutine_stack:
101 self.__coroutine = self.__coroutine_stack.pop() 101 self.__coroutine = self.__coroutine_stack.pop()
102 self.continue_from_yield(exception_info = sys.exc_info()) 102 self.continue_from_yield(exception_info = sys.exc_info())
103 else: 103 else:
104 self.__log_error() 104 self.__log_error()
105 self.__handler.handle_coroutine_complete(e)
105 else: 106 else:
106 if type(instruction) == types.GeneratorType: 107 if type(instruction) == types.GeneratorType:
107 self.__coroutine_stack.append(self.__coroutine) 108 self.__coroutine_stack.append(self.__coroutine)
108 self.__coroutine = instruction 109 self.__coroutine = instruction
109 self.continue_from_yield() 110 self.continue_from_yield()
113 class _AsyncChatCoroutineDispatcher(asynchat.async_chat): 114 class _AsyncChatCoroutineDispatcher(asynchat.async_chat):
114 def __init__(self, coroutine, conn = None): 115 def __init__(self, coroutine, conn = None):
115 asynchat.async_chat.__init__(self, conn) 116 asynchat.async_chat.__init__(self, conn)
116 self.trampoline = _Trampoline(coroutine, self) 117 self.trampoline = _Trampoline(coroutine, self)
117 self.set_terminator(None) 118 self.set_terminator(None)
118 self.__max_data = DEFAULT_MAX_DATA
119 self.__timeout = 0
120 self.__data = []
121 self.__data_len = 0
122 if conn: 119 if conn:
123 self.trampoline.continue_from_yield() 120 self.trampoline.continue_from_yield()
124 121
125 def handle_coroutine_instruction(self, instruction): 122 def handle_coroutine_instruction(self, instruction):
123 self.__instruction = instruction
126 instruction.execute(self) 124 instruction.execute(self)
127 125
128 def handle_coroutine_complete(self): 126 def handle_coroutine_complete(self, exception):
129 self.handle_close() 127 self.__instruction = None
128 if not exception:
129 self.handle_close()
130 130
131 def handle_close(self): 131 def handle_close(self):
132 self.trampoline.close_coroutine_stack() 132 self.trampoline.close_coroutine_stack()
133 self.clear_timeout() 133 self.clear_timeout()
134 self.close() 134 self.close()
144 self.log_info(traceback.format_exc() + 144 self.log_info(traceback.format_exc() +
145 self.trampoline.get_formatted_coroutine_traceback(), 145 self.trampoline.get_formatted_coroutine_traceback(),
146 'error') 146 'error')
147 147
148 def handle_connect(self): 148 def handle_connect(self):
149 self.trampoline.continue_from_yield() 149 self.__instruction.handle_connect()
150 150
151 def initiate_send(self): 151 def initiate_send(self):
152 asynchat.async_chat.initiate_send(self) 152 asynchat.async_chat.initiate_send(self)
153 if ((not self.ac_out_buffer) and 153 self.__instruction.handle_initiate_send()
154 (len(self.producer_fifo) == 0) and
155 self.connected):
156 self.trampoline.continue_from_yield()
157 154
158 def __on_timeout(self): 155 def __on_timeout(self):
159 self.log_info("Timeout expired (%ss)." % self.__timeout, 156 self.__instruction.handle_timeout()
160 'error')
161 self.handle_close()
162 157
163 def clear_timeout(self): 158 def clear_timeout(self):
164 self.__timeout = 0
165 if self.__on_timeout in time_map: 159 if self.__on_timeout in time_map:
166 del time_map[self.__on_timeout] 160 del time_map[self.__on_timeout]
167 161
168 def set_timeout(self, timeout): 162 def set_timeout(self, timeout):
169 self.__timeout = timeout
170 time_map[self.__on_timeout] = timeout 163 time_map[self.__on_timeout] = timeout
171 164
172 def collect_incoming_data(self, data): 165 def collect_incoming_data(self, data):
173 self.__data.append(data) 166 self.__instruction.collect_incoming_data(data)
174 self.__data_len += len(data)
175 if self.__max_data and self.__data_len > self.__max_data:
176 self.log_info("Max data reached (%s bytes)." % self.__max_data,
177 'error')
178 self.handle_close()
179
180 def set_max_data(self, amount):
181 self.__max_data = amount
182 167
183 def found_terminator(self): 168 def found_terminator(self):
184 if not (self.__max_data and self.__data_len > self.__max_data): 169 self.__instruction.found_terminator()
185 self.set_terminator(None)
186 data = ''.join(self.__data)
187 self.__data = []
188 self.__data_len = 0
189 self.trampoline.continue_from_yield(data)
190 170
191 class CoroutineSocketServer(asyncore.dispatcher): 171 class CoroutineSocketServer(asyncore.dispatcher):
192 def __init__(self, addr, coroutineFactory): 172 def __init__(self, addr, coroutineFactory):
193 asyncore.dispatcher.__init__(self) 173 asyncore.dispatcher.__init__(self)
194 self.__coroutineFactory = coroutineFactory 174 self.__coroutineFactory = coroutineFactory
220 self.__kwargs = kwargs 200 self.__kwargs = kwargs
221 201
222 def execute(self, dispatcher): 202 def execute(self, dispatcher):
223 self.dispatcher = dispatcher 203 self.dispatcher = dispatcher
224 self.do_execute(*self.__args, **self.__kwargs) 204 self.do_execute(*self.__args, **self.__kwargs)
205
206 def handle_timeout(self):
207 self.dispatcher.handle_close()
225 208
226 class until_received(CoroutineInstruction): 209 class until_received(CoroutineInstruction):
227 def do_execute(self, 210 def do_execute(self,
228 terminator = None, 211 terminator = None,
229 bytes = None, 212 bytes = None,
235 self.dispatcher.set_terminator(terminator) 218 self.dispatcher.set_terminator(terminator)
236 elif bytes: 219 elif bytes:
237 self.dispatcher.set_terminator(bytes) 220 self.dispatcher.set_terminator(bytes)
238 else: 221 else:
239 raise ValueError('Must specify terminator or bytes') 222 raise ValueError('Must specify terminator or bytes')
240 self.dispatcher.set_max_data(max_data) 223 self.__max_data = max_data
224 self.__data = []
225 self.__data_len = 0
226
227 def collect_incoming_data(self, data):
228 self.__data.append(data)
229 self.__data_len += len(data)
230 if self.__max_data and self.__data_len > self.__max_data:
231 logging.error("Max data reached (%s bytes)." % self.__max_data)
232 self.dispatcher.handle_close()
233
234 def found_terminator(self):
235 if not (self.__max_data and self.__data_len > self.__max_data):
236 self.dispatcher.set_terminator(None)
237 data = ''.join(self.__data)
238 self.__data = []
239 self.__data_len = 0
240 self.dispatcher.clear_timeout()
241 self.dispatcher.trampoline.continue_from_yield(data)
241 242
242 class until_sent(CoroutineInstruction): 243 class until_sent(CoroutineInstruction):
243 def do_execute(self, content, timeout = DEFAULT_TIMEOUT): 244 def do_execute(self, content, timeout = DEFAULT_TIMEOUT):
244 self.dispatcher.set_timeout(timeout) 245 self.dispatcher.set_timeout(timeout)
245 self.dispatcher.push(content) 246 self.dispatcher.push(content)
246 247
248 def handle_initiate_send(self):
249 if ((not self.dispatcher.ac_out_buffer) and
250 (len(self.dispatcher.producer_fifo) == 0) and
251 self.dispatcher.connected):
252 self.dispatcher.clear_timeout()
253 self.dispatcher.trampoline.continue_from_yield()
254
247 class return_value(CoroutineInstruction): 255 class return_value(CoroutineInstruction):
248 def do_execute(self, value): 256 def do_execute(self, value):
249 self.dispatcher.trampoline.close_coroutine_and_return_to_caller(value) 257 self.dispatcher.trampoline.close_coroutine_and_return_to_caller(value)