Mercurial > cosocket
view channels.py @ 99:b9cdccd5fbe4 default tip
Added unit test (doctest).
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Sat, 02 May 2009 08:13:45 -0700 |
parents | 57681224a62a |
children |
line wrap: on
line source
import cosocket _channels = {} class until_message_sent(cosocket.CoroutineInstruction): def do_execute(self, channel_name, message): if channel_name in _channels: receivers = _channels[channel_name].values() del _channels[channel_name] for receiver in receivers: receiver.trampoline.continue_from_yield(message) self.dispatcher.trampoline.continue_from_yield() class _until_message_received(cosocket.CoroutineInstruction): def do_execute(self, channel_name, timeout = cosocket.DEFAULT_TIMEOUT): if channel_name not in _channels: _channels[channel_name] = {} fd = self.dispatcher.socket.fileno() _channels[channel_name][fd] = self.dispatcher self.dispatcher.set_timeout(timeout) self.channel_name = channel_name self._fd = fd def finalize(self): if (self.channel_name in _channels and self._fd in _channels[self.channel_name]): del _channels[self.channel_name][self._fd] if not _channels[self.channel_name]: del _channels[self.channel_name] def until_message_received(channel_name): instruction = _until_message_received(channel_name) try: message = yield instruction yield cosocket.return_value(message) finally: instruction.finalize()