angular

前端

  • 写一个websocket服务
    src/app/service/websocket.service.ts
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { Subject } from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class WebsocketService {

  private subject = new Subject<any>();

  // 定义websocket服务地址 
  // 也可ws://localhost:8080,看后端怎么定义
  private wsUrl: string = 'ws://localhost/ws?id=myid';
  public ws!: WebSocket;

  constructor() {
    this.wssWSServer(this.subject);
  }

  wssWSServer(sub:any){
    console.log("WebSocket");
    // 创建websocket对象
    // 申请一个WebSocket对象,参数是服务端地址,同http协议使用http://开头一样,WebSocket协议的url使用ws://开头,另外安全的WebSocket协议使用wss://开头
    this.ws = new WebSocket(this.wsUrl);

    this.ws.onopen = function(){
      //当WebSocket创建成功时,触发onopen事件
       console.log("open创建连接:");
      //  this.send("angular: onopen msg form websocket service")
    }

    this.ws.onmessage = function(e:any){
      console.log("收到消息:", e);
      //当客户端收到服务端发来的消息时,触发onmessage事件,参数e.data包含server传递过来的数据
      // var message = eval("("+e.data+")");
      // sub.next(JSON.stringify(message));
    }

    this.ws.onclose = function(e:any){
      //当客户端收到服务端发送的关闭连接请求时,触发onclose事件
      console.log("close");
    }

    this.ws.onerror = function(e:any){
      //如果出现连接、处理、接收、发送数据失败的时候触发onerror事件
      console.log(e);
    }

  }

  wssSendMsg(id:string, type:string, msg: string){
    console.log("wsSendMsg enter: " + msg);
    // 给每个订阅者推送数据。对方可以实时获得
    //this.subject.next("push a msg");
    this.ws.send(id + type + msg);
  }

  wssGetMsg(): Observable <string> {
    console.log("wsGetMsg called");
    return this.subject.asObservable();
  }
}
  • 其他组件中使用
    src/app/service/share.service.ts
    任意组件的ts文件,这里在服务里边写了
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { WebsocketService } from './websocket.service';

@Injectable({
  providedIn: 'root'
})
export class ShareService {

  private incomingMsg!: Observable<any>;

  constructor(private wss: WebsocketService) {
    // 获得可观察值
    this.incomingMsg = this.wss.wssGetMsg();

    // 订阅可观察值
    this.incomingMsg.subscribe(message => {
      var dataObj = eval("(" + message + ")");
      console.log(dataObj)
    })
  }
}

问题:长时间不交互会断连
解决办法:心跳定时器,间隔一段时间向服务器发送消息;连接关闭的时候清除定时器;发生错误的时候重连;

   import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { Subject } from 'rxjs';

@Injectable({
    providedIn: 'root'
})
export class WebsocketService {

    heartStart() {
        let _this = this;
        this.wsHeart = setInterval(() => {
            _this.ws.send('ping');
        }, 10 * 1000);
    }

    reconnect() {
        const _this = this
        setTimeout(function () {
            _this.wssWSServer(_this.subject);
        }, 2000);
    }

    private subject = new Subject<any>();


    // 定义websocket服务地址 
    // 也可ws://localhost:8080,看后端怎么定义
    private wsUrl: string = 'ws://localhost:8000/ws?id=control-console';

    public ws!: WebSocket;

    // websocket心跳定时器,防止断连
    private wsHeart: any

    constructor() {
        this.wssWSServer(this.subject);
    }

    wssWSServer(sub: any) {
        const _this = this
        console.log("WebSocket");
        // 创建websocket对象
        // 申请一个WebSocket对象,参数是服务端地址,同http协议使用http://开头一样,WebSocket协议的url使用ws://开头,另外安全的WebSocket协议使用wss://开头
        this.ws = new WebSocket(this.wsUrl);

        this.ws.onopen = function () {
            //当WebSocket创建成功时,触发onopen事件
            console.log("open创建连接:");
            //  this.send("angular: onopen msg form websocket service")

            // 启动心跳定时器
            _this.heartStart()
        }

        this.ws.onmessage = function (e: any) {
            console.log("收到消息:", e);
            //当客户端收到服务端发来的消息时,触发onmessage事件,参数e.data包含server传递过来的数据
            // var message = eval("("+e.data+")");
            // sub.next(JSON.stringify(message));
        }

        this.ws.onclose = function (e: any) {
            //当客户端收到服务端发送的关闭连接请求时,触发onclose事件
            console.log("close");
            clearInterval(_this.wsHeart)
        }

        this.ws.onerror = function (e: any) {
            //如果出现连接、处理、接收、发送数据失败的时候触发onerror事件
            console.log(e);
            clearInterval(_this.wsHeart)
            _this.reconnect()
        }

    }

    wssSendMsg(id: string, type: string, msg: string) {
        console.log("wsSendMsg enter: " + msg);
        // 给每个订阅者推送数据。对方可以实时获得
        //this.subject.next("push a msg");
        this.ws.send(id + type + msg);
    }

    wssGetMsg(): Observable<string> {
        console.log("wsGetMsg called");
        return this.subject.asObservable();
    }
}


python后端

main.py

from abc import ABC

import tornado.ioloop
import tornado.web
from tornado import httpclient
from tornado import gen
from websocket.ws_server import WsHandler

settings = {
    'template_path': 'DIAVS/tornado_web',
    'static_path': 'DIAVS/tornado_web/static',
    'static_url_prefix': '/static/',
}

application = tornado.web.Application([
    (r"/ws", WsHandler),
], **settings)

if __name__ == "__main__":
    application.listen(80)
    print("tornado启动成功,监听端口:80")
    tornado.ioloop.IOLoop.instance().start()

websocket/ws_server.py

import json
import random
from time import sleep
from tornado.websocket import WebSocketHandler
import datetime
import asyncio
import websockets

users = dict()  # 用来存放在线用户的容器
pyload = {}  # 网络传输负载

def wsSendMsgAll(payload):
    print("users=?", users)
    for key in users:
        users[key].write_message(payload)

def wsSendMsgbyId(id):
    pass
    # for key in users:
    #     users[key].write_message(payload)


class WsHandler(WebSocketHandler):

    def open(self):
        print("---open: 新连接---")
        print(self)
        user = self.get_argument("id")
        users[user] = self
        print("当前用户:", user)
        print("所有用户:", users)


    def on_message(self, message):
        print("---onmessage---")
        # for key in users:  # 向在线用户广播消息
        #     users[key].write_message(u"[%s]-[%s]-说:%s" % (
        #         self.request.remote_ip, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message))
        print("收到Web端消息:", message)
        pyload.clear()


    def on_close(self):
        print("---on close---")
        users.remove(self)  # 用户关闭连接后从容器中移除用户


    def check_origin(self, origin):
        return True  # 允许WebSocket的跨域请求