Andrew, thank you for quick response!
среда, 2 мая 2018 г., 18:39:01 UTC+3 пользователь Andrew Godwin написал:
-- Unfortunately I see something different. If I raise `StopConsumer` after processing a single message, the next one is lost. From the code https://github.com/django/asgiref/blob/master/asgiref/server.py#L75 I see that `get_or_create_application_instance` returns existing application instance, so when message arrives it is put into the queue of the existing application. If I raise `StopConsumer` this application got killed and the message is lost.
Another observation: when message handler awaits something (like in my snippet `await some_process()`) this application instance does not process new messages. When one message handler function exits - the next one starts.
Actually, I was offloading message processing to the threadpool (using `sync_to_async`) and trying to limit the number of messages being processed at the same time. I hoped that if I await for some coroutine the worker will stop accepting messages, so other workers can process them until this one gets free again. Can you give me a clue how to achieve this bevaviour?
среда, 2 мая 2018 г., 18:39:01 UTC+3 пользователь Andrew Godwin написал:
- Will `MyConsumer` receive new `wakeup` messages while awaiting `some_process`?
Yes. The worker server runs as many application instances as there are messages coming in, even though they all have the same scope. You can see the main listening loop here: https://github.com/django/channels/blob/master/ channels/worker.py#L32 You need to raise it when the specific application instance you have is completed - because there's a different instance for each message, that means "raise when you've finished processing the message". Nothing happens to other messages as they're being handled by different instances.
- When do I need to raise `StopConsumer`? I can do it after each processing of `wakeup` message (like in the code above) is that correct? What will happen with all the `pending` messages in such case?
AndrewOn Wed, May 2, 2018 at 7:50 AM Alexander Prokhorov <pro...@gmail.com> wrote:Dear Andrew,--I would like to ask couple of questions about the lifecycle of consumers running in Channels workers and serving custom channels.Consider a consumer:
# myconsumer.py
class MyConsumer(channels.consumer.AsyncConsumer ):
async def wakeup(self, message):
await some_process()
raise channels.exceptions.StopConsumer ()which I "register" to process messages in the channel `my_channel`:
# routing.py
application = channels.routing.ProtocolTypeRouter ({
'channel': channels.routing.ChannelNameRouter ({
'my_channel': MyConsumer
})
})
and eventually I run designated Channels worker to process `my_channel` messages.
./manage.py runworker my_channelSo the questions are:
- Will `MyConsumer` receive new `wakeup` messages while awaiting `some_process`?
- When do I need to raise `StopConsumer`? I can do it after each processing of `wakeup` message (like in the code above) is that correct? What will happen with all the `pending` messages in such case?
Actually, I do not raise "StopConsumer" in the implementation I currently have, but this leads to an issue with tests. In tests I need to somehow wait until all workers finish processing their messages. I tried calling `channels.testing.ApplicationCommunicator.wait() ` but as I see it from the code it waits the application/consumer to finish, i.e. to raise `StopConsumer` exception. Probably you can share some recommendations. Thanks in advance. Best regards,Alexander.
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users...@googlegroups.com .
To post to this group, send email to django...@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users .
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/00e6398b- .a71f-4509-a95b-3ced88b26ee0% 40googlegroups.com
For more options, visit https://groups.google.com/d/optout .
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscribe@googlegroups.com.
To post to this group, send email to django-users@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/d991b5d9-5cb7-46f0-9fb9-749e65443122%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
No comments:
Post a Comment