暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

一文带你搭建自己的Websocket服务

Python都知道 2022-08-24
363

哈喽,小伙伴们,我是知道。 

今天为大家带来的分享是如何搭建websocket服务。

一、背景

HTTP 协议是一种无状态的、无连接的、单向的应用层协议。它采用了请求/响应模型。通信请求只能由客户端发起,服务端对请求做出应答处理,HTTP 协议无法实现服务器主动向客户端发起消息。这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦。

大多数 Web 应用程序将通过频繁的异步 JavaScript 和 XML(AJAX)请求实现长轮询。轮询的效率低,非常浪费资源。Websocket应运而生,WebSocket 连接允许客户端和服务器之间进行全双工通信,以便任一方都可以通过建立的连接将数据推送到另一端。WebSocket 只需要建立一次连接,就可以一直保持连接状态。这相比于轮询方式的不停建立连接显然效率要大大提高。

Django基于Wsgi的,本身只能处理http请求,而channels是基于Asgi,可以处理多种请求,其中就包含WebSocket , 接下来进入主题,给大家介绍如何在Django项目中搭建和应用WebSocket 。

二、安装

首先需要安装搭建服务所需要的依赖包,这里我就直接使用pip安装即可,这里有个坑,需要注意的是安装依赖包的版本,channels_redis、channels的版本需要匹配,本文使用的版本如下,供参考:

pip3 install channels==2.4.0

pip3 install channels_redis==3.3.1

三、配置

安装完依赖,需要先配置:

  1. 需要将channels
    添加到app中;

  2. ASGI_APPLICATION
    配置项配置asgi
    的路由信息,具体的路由信息配置在routing.py
    文件中application

  3. 配置redis
    配置,用于收发数据,详细信息在下节会详细介绍

    配置示例:

  # settings.py 
  INSTALLED_APPS = [  
     'channels',  
   ...  
  ]  
  # 路由配置  
  ASGI_APPLICATION = 'main.routing.application'  
  # redis配置
  CHANNEL_LAYERS = {  
     'default': {  
         'BACKEND''channels_redis.core.RedisChannelLayer',  
         'CONFIG': {  
             "hosts": ["redis://:admin@123456@localhost:6379/0"],  
         },  
     },  
  }

复制

四、ASGI路由配置

接着上面配置,咱们继续聊一聊路由配置,类似于处理HTTP请求,websocket也需要单独配置自己的路由信息,其中包括主路由,以及子路由。

4.1 主路由配置

  • ProtocolTypeRouter:ASGI支持多种不同的协议,在这里可以指定特定协议的路由信息,这里只使用了websocket协议,这里只配置websocket

  • AuthMiddlewareStack:用于WebSocket认证,继承了Cookie Middleware,SessionMiddleware,SessionMiddleware,会给请求封装一个scope,获取请求中的一些信息,类似于request,包含了请求的type、path、header、cookie、session、user等等有用的信息

  • URLRouter:指定路由文件的路径,也可以直接将路由信息写在这里,代码中配置了路由文件的路径,会去对应应用下的routeing.py文件中查找websocket_urlpatterns

  # setting.py同级目录下的routing.py  
  from channels.auth import AuthMiddlewareStack  
  from channels.routing import ProtocolTypeRouter, URLRouter  
  import consumers.routing  
    
  # AuthMiddlewareStack 会给请求封装一个scope,获取请求中的一些信息,类似于request  
  application = ProtocolTypeRouter(  
     {  
         'websocket': AuthMiddlewareStack(  
                 URLRouter(  
                     consumers.routing.websocket_urlpatterns  
                 )  
             )  
     }  
  )

复制

4.2 项目路由

项目路由文件跟Django的url.py功能类似,语法也一样,意思就是访问websocket/notice/都交给notice_consumer.NoticeConsumer处理。

  # consumers.routing.py 项目路由  
  from django.urls import re_path  
  from consumers import notice_consumer  
    
  websocket_urlpatterns = [  
     re_path(r'websocket/notice/(?P<team_id>\w+)', notice_consumer.NoticeConsumer),  
  ]  

复制

五、消费者

现在配置信息和路由均已经配置好了,接下来我们来聊聊处理请求的具体程序吧,channels同时支持异步调用AsyncWebsocketConsumer和同步调用WebsocketConsumer,大家可以根据应用场景选择,这里给大家介绍一下异步实现的使用。先上代码,如下是一个简单的完成的消费者应用,包括建立连接、断开连接、接收和发送消息,下面让我们来逐一了解一下。

# consumers.notice_consumer.py  
import json  
from django.core.cache import cache  
from channels.generic.websocket import AsyncWebsocketConsumer  


class NoticeConsumer(AsyncWebsocketConsumer):  
  async def connect(self):  
     pass

 async def disconnect(self, close_code):  
     pass
    
 async def receive(self, text_data=None, bytes_data=None):  
     pass  

 async def send_message(self, event):  
     pass


复制

5.1 建立连接

当请求过来时,在connect方法中执行self.accept() 建立连接,当然在建立连接前可以添加一些鉴权、分组等操作,这里我在建立连接前,通过获取请求URL中的“team_id”,以team_id将消费者进行分组,示例如下:

def connect(self):    
     # scope类似于request,里面包含了请求的信息  
     self.room_name = self.scope['url_route']['kwargs']['team_id']  
     self.room_group_name = 'team_%s' % self.room_name  
     # 添加进一个分组  
     await self.channel_layer.group_add(  
         self.room_group_name, # 分组名,可以自定义来区分不同分组  
         self.channel_name # 每次建连接会随机生成一个channel_name  
     )  
     await self.accept() 

复制

5.2 断开连接

当连接断开时执行disconnect方法,可以执行清除出分组或其它一些操作。

def disconnect(self, close_code):  
     # 断开连接时执行,从分组中移除  
     await self.channel_layer.group_discard(  
         self.room_group_name,  
         self.channel_name  
     )  

复制

5.3 接收消息

receive方法用来接收客户端的消息,并进行处理,这里我只是做了一个简单的处理操作,跟客户端保持心跳。

def receive(self, text_data=None, bytes_data=None):  
     # 接收客户端发送过来的消息  
     event = {  
         'message': dict(type='heartbeat', message=text_data)  
     }  
     await self.send_message(event=event)

复制

5.4 发送消息

def send_message(self, event):  
     # 发送消息给客户端  
     await self.send(text_data=json.dumps({  
         'message': event['message']  
     }))  

复制

5.5 补充说明

用到ORM查询数据时,需要添加这个装饰器函数

from channels.db import database_sync_to_async  
  
@database_sync_to_async  
def get_name():  
  User.objects.all().first().name  
  pass

复制

六、主动推送

经过上述一顿操作后,我们的服务可以接收客户端信息并作回应啦,但是记住websocket协议可是一个全双工通信协议,是可以双向通信的,那么接下来就带着大家一起来看看在业务中如何主动向客户端推送消息吧。

首先需要实例化一个channel_layer对象用来调用消费者中发送消息的方法,然后要构建你的发送数据字典,注意“type”的值send.message要与consumer的send_message方法对应哦(把下划线 "_" 改写成 ".")

from channels.layers import get_channel_layer  
from asgiref.sync import async_to_sync  
   
def push_match_notice(send_type, send_info, sender_list):  
   send_data = {  
       'type': send_type,  
       'message': send_info  
   }  
   channel_layer = get_channel_layer()  
   for room_id in sender_list:  
       send_dict = {  
           'type''send.message'# send.message对应consumer的send_message方法  
           'message': send_data,  
           'room_name':'team_%s' % room_id  
       }  
       # 按分组名去推送消息,如果是异步需要加 async_to_sync 
       # 第一个参数是创建websocket时,加入的分组名  
       async_to_sync(channel_layer.group_send)('team_%s' % room_id, send_dict)  

复制

七、Nginx+supervisor部署

好啦,到这里我们的websocket服务基本功能都实现完了,那要怎样部署呢,我们一起来操作一下吧。

第一步,在wsgi.py同级目录下创建asgi.py文件,配置如下:

# 在Django 3.X 版本中,创建项目后自带改文件了
import os  
import django  
from channels.routing import get_default_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE""main.settings")  
django.setup()  
application = get_default_application() 

复制

第二步,在Supervisord配置文件中创建一个Daphne服务,处理Nginx转发过来的websocket请求

# supervisord.conf  
[program:daphne]
socket=tcp://0.0.0.0:9987
directory=/backend/

command=/usr/local/bin/daphne -b 0.0.0.0 -p 9987 --proxy-headers main.asgi:application

autostart=true
autorestart=true
user=root
stopsignal=QUIT
stout_logfile=/var/logs/asgi.log
redirect_stderr=true

复制

第三步, 在Nginx配置文件中为websocket配置转发策略,示例如下:

# nginx.conf  

upstream wsbackend{
server 0.0.0.0:9987; # 对应Daphne服务地址
}

server {
listen 82;
server_name localhost;
......
# nginx本身无法区分是否是websocket请求,这里是通过路由前缀来区分
location /websocket/ {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Host $server_name;
proxy_pass http://wsbackend;
}
}

复制

八、结语

我是知道,今天的分享到这里就结束啦,我们已经一起在一个Django项目上完成了一个基于channels的websocket服务搭建全过程了,希望这些能够为你能够提供一些帮助,有不足的地方还请多多指教。

Bye,Good Luck !


文章转载自Python都知道,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论