Mercurial > cosocket
comparison cosocket.py @ 97:0d3dd2ab36cd
Factored out all instruction-specific logic from the dispatcher into individual instructions.
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Sat, 02 May 2009 00:14:59 -0700 |
parents | 68598f164855 |
children | 06aa973a54c3 |
comparison
equal
deleted
inserted
replaced
96:68598f164855 | 97:0d3dd2ab36cd |
---|---|
93 if self.__coroutine_stack: | 93 if self.__coroutine_stack: |
94 self.__coroutine = self.__coroutine_stack.pop() | 94 self.__coroutine = self.__coroutine_stack.pop() |
95 self.continue_from_yield() | 95 self.continue_from_yield() |
96 else: | 96 else: |
97 self.__coroutine = None | 97 self.__coroutine = None |
98 self.__handler.handle_coroutine_complete() | 98 self.__handler.handle_coroutine_complete(None) |
99 except Exception, e: | 99 except Exception, e: |
100 if self.__coroutine_stack: | 100 if self.__coroutine_stack: |
101 self.__coroutine = self.__coroutine_stack.pop() | 101 self.__coroutine = self.__coroutine_stack.pop() |
102 self.continue_from_yield(exception_info = sys.exc_info()) | 102 self.continue_from_yield(exception_info = sys.exc_info()) |
103 else: | 103 else: |
104 self.__log_error() | 104 self.__log_error() |
105 self.__handler.handle_coroutine_complete(e) | |
105 else: | 106 else: |
106 if type(instruction) == types.GeneratorType: | 107 if type(instruction) == types.GeneratorType: |
107 self.__coroutine_stack.append(self.__coroutine) | 108 self.__coroutine_stack.append(self.__coroutine) |
108 self.__coroutine = instruction | 109 self.__coroutine = instruction |
109 self.continue_from_yield() | 110 self.continue_from_yield() |
113 class _AsyncChatCoroutineDispatcher(asynchat.async_chat): | 114 class _AsyncChatCoroutineDispatcher(asynchat.async_chat): |
114 def __init__(self, coroutine, conn = None): | 115 def __init__(self, coroutine, conn = None): |
115 asynchat.async_chat.__init__(self, conn) | 116 asynchat.async_chat.__init__(self, conn) |
116 self.trampoline = _Trampoline(coroutine, self) | 117 self.trampoline = _Trampoline(coroutine, self) |
117 self.set_terminator(None) | 118 self.set_terminator(None) |
118 self.__max_data = DEFAULT_MAX_DATA | |
119 self.__timeout = 0 | |
120 self.__data = [] | |
121 self.__data_len = 0 | |
122 if conn: | 119 if conn: |
123 self.trampoline.continue_from_yield() | 120 self.trampoline.continue_from_yield() |
124 | 121 |
125 def handle_coroutine_instruction(self, instruction): | 122 def handle_coroutine_instruction(self, instruction): |
123 self.__instruction = instruction | |
126 instruction.execute(self) | 124 instruction.execute(self) |
127 | 125 |
128 def handle_coroutine_complete(self): | 126 def handle_coroutine_complete(self, exception): |
129 self.handle_close() | 127 self.__instruction = None |
128 if not exception: | |
129 self.handle_close() | |
130 | 130 |
131 def handle_close(self): | 131 def handle_close(self): |
132 self.trampoline.close_coroutine_stack() | 132 self.trampoline.close_coroutine_stack() |
133 self.clear_timeout() | 133 self.clear_timeout() |
134 self.close() | 134 self.close() |
144 self.log_info(traceback.format_exc() + | 144 self.log_info(traceback.format_exc() + |
145 self.trampoline.get_formatted_coroutine_traceback(), | 145 self.trampoline.get_formatted_coroutine_traceback(), |
146 'error') | 146 'error') |
147 | 147 |
148 def handle_connect(self): | 148 def handle_connect(self): |
149 self.trampoline.continue_from_yield() | 149 self.__instruction.handle_connect() |
150 | 150 |
151 def initiate_send(self): | 151 def initiate_send(self): |
152 asynchat.async_chat.initiate_send(self) | 152 asynchat.async_chat.initiate_send(self) |
153 if ((not self.ac_out_buffer) and | 153 self.__instruction.handle_initiate_send() |
154 (len(self.producer_fifo) == 0) and | |
155 self.connected): | |
156 self.trampoline.continue_from_yield() | |
157 | 154 |
158 def __on_timeout(self): | 155 def __on_timeout(self): |
159 self.log_info("Timeout expired (%ss)." % self.__timeout, | 156 self.__instruction.handle_timeout() |
160 'error') | |
161 self.handle_close() | |
162 | 157 |
163 def clear_timeout(self): | 158 def clear_timeout(self): |
164 self.__timeout = 0 | |
165 if self.__on_timeout in time_map: | 159 if self.__on_timeout in time_map: |
166 del time_map[self.__on_timeout] | 160 del time_map[self.__on_timeout] |
167 | 161 |
168 def set_timeout(self, timeout): | 162 def set_timeout(self, timeout): |
169 self.__timeout = timeout | |
170 time_map[self.__on_timeout] = timeout | 163 time_map[self.__on_timeout] = timeout |
171 | 164 |
172 def collect_incoming_data(self, data): | 165 def collect_incoming_data(self, data): |
173 self.__data.append(data) | 166 self.__instruction.collect_incoming_data(data) |
174 self.__data_len += len(data) | |
175 if self.__max_data and self.__data_len > self.__max_data: | |
176 self.log_info("Max data reached (%s bytes)." % self.__max_data, | |
177 'error') | |
178 self.handle_close() | |
179 | |
180 def set_max_data(self, amount): | |
181 self.__max_data = amount | |
182 | 167 |
183 def found_terminator(self): | 168 def found_terminator(self): |
184 if not (self.__max_data and self.__data_len > self.__max_data): | 169 self.__instruction.found_terminator() |
185 self.set_terminator(None) | |
186 data = ''.join(self.__data) | |
187 self.__data = [] | |
188 self.__data_len = 0 | |
189 self.trampoline.continue_from_yield(data) | |
190 | 170 |
191 class CoroutineSocketServer(asyncore.dispatcher): | 171 class CoroutineSocketServer(asyncore.dispatcher): |
192 def __init__(self, addr, coroutineFactory): | 172 def __init__(self, addr, coroutineFactory): |
193 asyncore.dispatcher.__init__(self) | 173 asyncore.dispatcher.__init__(self) |
194 self.__coroutineFactory = coroutineFactory | 174 self.__coroutineFactory = coroutineFactory |
220 self.__kwargs = kwargs | 200 self.__kwargs = kwargs |
221 | 201 |
222 def execute(self, dispatcher): | 202 def execute(self, dispatcher): |
223 self.dispatcher = dispatcher | 203 self.dispatcher = dispatcher |
224 self.do_execute(*self.__args, **self.__kwargs) | 204 self.do_execute(*self.__args, **self.__kwargs) |
205 | |
206 def handle_timeout(self): | |
207 self.dispatcher.handle_close() | |
225 | 208 |
226 class until_received(CoroutineInstruction): | 209 class until_received(CoroutineInstruction): |
227 def do_execute(self, | 210 def do_execute(self, |
228 terminator = None, | 211 terminator = None, |
229 bytes = None, | 212 bytes = None, |
235 self.dispatcher.set_terminator(terminator) | 218 self.dispatcher.set_terminator(terminator) |
236 elif bytes: | 219 elif bytes: |
237 self.dispatcher.set_terminator(bytes) | 220 self.dispatcher.set_terminator(bytes) |
238 else: | 221 else: |
239 raise ValueError('Must specify terminator or bytes') | 222 raise ValueError('Must specify terminator or bytes') |
240 self.dispatcher.set_max_data(max_data) | 223 self.__max_data = max_data |
224 self.__data = [] | |
225 self.__data_len = 0 | |
226 | |
227 def collect_incoming_data(self, data): | |
228 self.__data.append(data) | |
229 self.__data_len += len(data) | |
230 if self.__max_data and self.__data_len > self.__max_data: | |
231 logging.error("Max data reached (%s bytes)." % self.__max_data) | |
232 self.dispatcher.handle_close() | |
233 | |
234 def found_terminator(self): | |
235 if not (self.__max_data and self.__data_len > self.__max_data): | |
236 self.dispatcher.set_terminator(None) | |
237 data = ''.join(self.__data) | |
238 self.__data = [] | |
239 self.__data_len = 0 | |
240 self.dispatcher.clear_timeout() | |
241 self.dispatcher.trampoline.continue_from_yield(data) | |
241 | 242 |
242 class until_sent(CoroutineInstruction): | 243 class until_sent(CoroutineInstruction): |
243 def do_execute(self, content, timeout = DEFAULT_TIMEOUT): | 244 def do_execute(self, content, timeout = DEFAULT_TIMEOUT): |
244 self.dispatcher.set_timeout(timeout) | 245 self.dispatcher.set_timeout(timeout) |
245 self.dispatcher.push(content) | 246 self.dispatcher.push(content) |
246 | 247 |
248 def handle_initiate_send(self): | |
249 if ((not self.dispatcher.ac_out_buffer) and | |
250 (len(self.dispatcher.producer_fifo) == 0) and | |
251 self.dispatcher.connected): | |
252 self.dispatcher.clear_timeout() | |
253 self.dispatcher.trampoline.continue_from_yield() | |
254 | |
247 class return_value(CoroutineInstruction): | 255 class return_value(CoroutineInstruction): |
248 def do_execute(self, value): | 256 def do_execute(self, value): |
249 self.dispatcher.trampoline.close_coroutine_and_return_to_caller(value) | 257 self.dispatcher.trampoline.close_coroutine_and_return_to_caller(value) |