Wednesday, May 2, 2018

Re: Channels 2: Consumer lifecycle when run in a worker process

  • Will `MyConsumer` receive new `wakeup` messages while awaiting `some_process`?
Yes. The worker server runs as many application instances as there are messages coming in, even though they all have the same scope. You can see the main listening loop here: https://github.com/django/channels/blob/master/channels/worker.py#L32
  • When do I need to raise `StopConsumer`? I can do it after each processing of `wakeup` message (like in the code above) is that correct? What will happen with all the `pending` messages in such case?
You need to raise it when the specific application instance you have is completed - because there's a different instance for each message, that means "raise when you've finished processing the message". Nothing happens to other messages as they're being handled by different instances.

Andrew

On Wed, May 2, 2018 at 7:50 AM Alexander Prokhorov <prokher@gmail.com> wrote:
Dear Andrew,

I would like to ask couple of questions about the lifecycle of consumers running in Channels workers and serving custom channels.

Consider a consumer:

# myconsumer.py
class MyConsumer(channels.consumer.AsyncConsumer):
    async
def wakeup(self, message):
         await some_process
()
         
raise channels.exceptions.StopConsumer()

which I "register" to process messages in the channel `my_channel`:

# routing.py
application
= channels.routing.ProtocolTypeRouter({
   
'channel': channels.routing.ChannelNameRouter({
       
'my_channel': MyConsumer
   
})
})

and eventually I run designated Channels worker to process `my_channel` messages.

./manage.py runworker my_channel

So the questions are:
  • Will `MyConsumer` receive new `wakeup` messages while awaiting `some_process`?
  • When do I need to raise `StopConsumer`? I can do it after each processing of `wakeup` message (like in the code above) is that correct? What will happen with all the `pending` messages in such case?
Actually, I do not raise "StopConsumer" in the implementation I currently have, but this leads to an issue with tests. In tests I need to somehow wait until all workers finish processing their messages. I tried calling `channels.testing.ApplicationCommunicator.wait()` but as I see it from the code it waits the application/consumer to finish, i.e. to raise `StopConsumer` exception. Probably you can share some recommendations. Thanks in advance.

Best regards,
Alexander.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscribe@googlegroups.com.
To post to this group, send email to django-users@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/00e6398b-a71f-4509-a95b-3ced88b26ee0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscribe@googlegroups.com.
To post to this group, send email to django-users@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/CAFwN1urq6%2B7GJO-8rA-0V%3DmzmCe5_1OvF-ix8H7phOFB5H8BCw%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

No comments:

Post a Comment