Monday, April 30, 2018

Re: Architecting a Crypto Market Data Feed using Django Channels


Thanks for your reply Andrew. I've created a custom manage command called `websocket_producer.py` that is responsible for feeding the firehose of market data (price changes, market volatility etc).

It looks like this

from django.core.management.base import BaseCommand, CommandError


from lomond.websocket import WebSocket


import channels.layers
from asgiref.sync import async_to_sync




class Command(BaseCommand):


   
def handle(self, *args, **kwargs):
        channel_layer
= channels.layers.get_channel_layer()
       
while True:
            ws
= WebSocket('wss://<my-url>/ws')
           
# this for loop is effectively infinite
           
for event in ws:
               
if event.name == "text":
                    data
= event.json
                    async_to_sync
(channel_layer.send)(
                       
'abc',
                       
{'type': 'abc', 'message': str(data)}
                   
)



However, this can only run for 1 min before resulting in the following error:


Traceback (most recent call last):
 
File "./manage.py", line 21, in <module>
    execute_from_command_line
(sys.argv)
 
File "/usr/local/lib/python3.6/site-packages/django/core/management/__init__.py", line 371, in execute_from_command_line
    utility
.execute()
 
File "/usr/local/lib/python3.6/site-packages/django/core/management/__init__.py", line 365, in execute
   
self.fetch_command(subcommand).run_from_argv(self.argv)
 
File "/usr/local/lib/python3.6/site-packages/django/core/management/base.py", line 288, in run_from_argv
   
self.execute(*args, **cmd_options)
 
File "/usr/local/lib/python3.6/site-packages/django/core/management/base.py", line 335, in execute
    output
= self.handle(*args, **options)
 
File "/app/savings_apps/accounts/management/commands/wsproducer.py", line 22, in handle
   
{'type': 'action_reports', 'message': str(data)}
 
File "/usr/local/lib/python3.6/site-packages/asgiref/sync.py", line 64, in __call__
   
return call_result.result()
 
File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 398, in result
   
return self.__get_result()
 
File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 357, in __get_result
   
raise self._exception
 
File "/usr/local/lib/python3.6/site-packages/asgiref/sync.py", line 78, in main_wrap
    result
= await self.awaitable(*args, **kwargs)
 
File "/usr/local/lib/python3.6/site-packages/channels_redis/core.py", line 136, in send
   
raise ChannelFull()
channels
.exceptions.ChannelFull




Any ideas? Doing a real-time market data feed seems like it would be a pretty common use case, but I can't seem to find any prior art in Django Channels.


On Saturday, April 28, 2018 at 4:22:20 AM UTC-6, Andrew Godwin wrote:
I can't help you with real-time streaming architecture overall - that's a much bigger scope of thing - but I can say that you shouldn't be keeping a synchronous consumer open like that (you're using a whole thread). You should either rewrite it to be async-native, so it doesn't use up a thread and potentially block the server, or rework it to put the feed events onto the channel layer from an external process.

Andrew

On Sat, Apr 28, 2018 at 1:12 AM, Michael <writemicha...@gmail.com> wrote:
Hi,

What is the best way to architect a Django Channels app that provides a very fast infinite stream of market data? This is what I have so far, but I think it's not the best solution.

This data is updated every millisecond so I would prefer to not persist it (unless there is a way of using redis pub/sub without actually saving the data, only for messaging)




class ChatConsumer(WebsocketConsumer):
    def connect(self):
        self.room_name = 'foo'
        self.room_group_name = 'foo'
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name,
            self.channel_name
        )


        self.accept()
        while True:
          # Imagine this is another WS feed or Zero MQ Feed.
          feed = Feed(....)
          for event in feed:
              if event.name == "text":
                  data = event.json
                  self.send(str(data)

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users...@googlegroups.com.
To post to this group, send email to django...@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/01bf458c-ff1a-4cf6-bd58-da9b2f43123c%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscribe@googlegroups.com.
To post to this group, send email to django-users@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/381ae5ed-95c4-4d0e-a194-5df291d77def%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

No comments:

Post a Comment