Wednesday, May 2, 2018

Re: Channels 2: Consumer lifecycle when run in a worker process


Indeed, that is exactly what I am doing - run processing in the background with

task = asyncio.ensure_future(database_sync_to_async(self._process)(message))

and then keep track of that running tasks. Actually, I am quite happy with this except one thing. I would like to limit the number if messages processed at the same time by a single worker. The logic is straightforward - once I start a new task I check if the "tasks per worker limit" has reached and if so - just invoke

asyncio.wait(self._tasks, return_when=asyncio.FIRST_COMPLETED)

so I block the consumer's message handler until some task finishes. I did not make experiments, but from the worker's code I can conclude that worker will anyway extract messages from the channel and put them into the application's queue which (according to this this) has unlimited size. And that is what bothers me the most. My logic is simple, if a worker has already reached the "tasks per worker" limit, then I do not want this worker to extract message from the channel, cause there is probably another worker process willing to process it. Frankly, I do not understand how to achieve this... sorry for bothering, but probably you have some bright idea, please?

Anyway, thank you for explanation you have already gave me, it helps, and it is always pleasure to chat with you :-)

среда, 2 мая 2018 г., 22:35:08 UTC+3 пользователь Andrew Godwin написал:
Ah, my apologies - you are entirely right, the scope is the same so it will re-use a single existing instance, which means that it will process messages synchronously and lose them, as you suggested.

Using sync_to_async won't help as, while it runs in a threadpool, it also blocks the coroutine until that thread completes.

Without modifying the underlying worker implementation, the best way to process things in parallel would be to spin off things into their own coroutines within your handler - either manually, using EventLoop.create_task, or I guess you could slew it out into different-named channels.

Andrew

On Wed, May 2, 2018 at 11:51 AM Alexander Prokhorov <pro...@gmail.com> wrote:
Andrew, thank you for quick response!

Unfortunately I see something different. If I raise `StopConsumer` after processing a single message, the next one is lost. From the code https://github.com/django/asgiref/blob/master/asgiref/server.py#L75 I see that `get_or_create_application_instance` returns existing application instance, so when message arrives it is put into the queue of the existing application. If I raise `StopConsumer` this application got killed and the message is lost.

Another observation: when message handler awaits something (like in my snippet `await some_process()`) this application instance does not process new messages. When one message handler function exits - the next one starts.

Actually, I was offloading message processing to the threadpool (using `sync_to_async`) and trying to limit the number of messages being processed at the same time. I hoped that if I await for some coroutine the worker will stop accepting messages, so other workers can process them until this one gets free again. Can you give me a clue how to achieve this bevaviour?


среда, 2 мая 2018 г., 18:39:01 UTC+3 пользователь Andrew Godwin написал:
  • Will `MyConsumer` receive new `wakeup` messages while awaiting `some_process`?
Yes. The worker server runs as many application instances as there are messages coming in, even though they all have the same scope. You can see the main listening loop here: https://github.com/django/channels/blob/master/channels/worker.py#L32
  • When do I need to raise `StopConsumer`? I can do it after each processing of `wakeup` message (like in the code above) is that correct? What will happen with all the `pending` messages in such case?
You need to raise it when the specific application instance you have is completed - because there's a different instance for each message, that means "raise when you've finished processing the message". Nothing happens to other messages as they're being handled by different instances.

Andrew

On Wed, May 2, 2018 at 7:50 AM Alexander Prokhorov <pro...@gmail.com> wrote:
Dear Andrew,

I would like to ask couple of questions about the lifecycle of consumers running in Channels workers and serving custom channels.

Consider a consumer:

# myconsumer.py
class MyConsumer(channels.consumer.AsyncConsumer):
    async
def wakeup(self, message):
         await some_process
()
         
raise channels.exceptions.StopConsumer()

which I "register" to process messages in the channel `my_channel`:

# routing.py
application
= channels.routing.ProtocolTypeRouter({
   
'channel': channels.routing.ChannelNameRouter({
       
'my_channel': MyConsumer
   
})
})

and eventually I run designated Channels worker to process `my_channel` messages.

./manage.py runworker my_channel

So the questions are:
  • Will `MyConsumer` receive new `wakeup` messages while awaiting `some_process`?
  • When do I need to raise `StopConsumer`? I can do it after each processing of `wakeup` message (like in the code above) is that correct? What will happen with all the `pending` messages in such case?
Actually, I do not raise "StopConsumer" in the implementation I currently have, but this leads to an issue with tests. In tests I need to somehow wait until all workers finish processing their messages. I tried calling `channels.testing.ApplicationCommunicator.wait()` but as I see it from the code it waits the application/consumer to finish, i.e. to raise `StopConsumer` exception. Probably you can share some recommendations. Thanks in advance.

Best regards,
Alexander.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users...@googlegroups.com.
To post to this group, send email to django...@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/00e6398b-a71f-4509-a95b-3ced88b26ee0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users...@googlegroups.com.
To post to this group, send email to django...@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/d991b5d9-5cb7-46f0-9fb9-749e65443122%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscribe@googlegroups.com.
To post to this group, send email to django-users@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/8ab3982b-0620-460d-aff7-d75287da7261%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

No comments:

Post a Comment