【无标题】

发布于:2024-12-07 ⋅ 阅读:(104) ⋅ 点赞:(0)

WebRTC 视频通话技术分享

前言

最近在做 KVM 项目,该项目需要使用 webRtc 技术实现视频流的推送和播放,因此在做项目之前我计划通过搭建 demo 的方式好好再复习一下 webRtc 通信技术,顺便给大家做个分享。

下面是最终的demo的实现效果:

WebRTC 简介

WebRTC 是一个开放平台,它允许浏览器之间进行视频、音频、屏幕共享等视频通信,是一种端对端的技术,因此他传输媒体流的时候是不会经过服务器的,服务器的作用只是用于端之间建立连接,一旦连接建立成功,则直接通过 WebRTC 协议进行通信,不需要再经过服务器(这个时候如果没有特殊处理,我们甚至可以停掉服务器,视频流的推送仍然可以继续进行)。它基于 HTML5JavaScriptWebSocketsWebRTC 等技术实现。

本次分享包含的技术信息

  1. WebRTC 搭建视频流推送和播放
  2. 浏览器获取媒体流
  3. 基于 nodejshttpwebsocket 通信服务器搭建。
  4. 前端工程化构建

工程架构概览

因为这个 demo 是前后端的内容都有,因此我将前后端的内容分为两个工程,最后将两个工程进行整合。目录结构如下:

– client
– server
– node_moudules
– package.json

工程搭建

整体工程

创建目录并且初始化仓库

mkdir webrtc-demo
cd webrtc-demo
npm init -y

配置启动脚本

 "scripts": {
    "dev:web": "cd ./client && pnpm dev",
    "dev:server": "cd ./server && pnpm dev",
    "dev": "concurrently \"pnpm dev:web\" \"pnpm dev:server\""
  },

注:concurrently 是一个用于同时运行多个命令的包,这里我们使用它来同时启动两个工程。

前端工程搭建

前端就是使用我们现在熟悉的技术栈 vite + vue3 + antd 搭建,这个完全不需要再赘述。

服务端工程搭建

服务端技术选型是选择我所熟悉的 express 搭建,我们可以使用 express-generater 生成器来直接生成一个 express 项目。但是我这边选择自己手动搭建:

  1. 初始化项目
mkdir server
cd server
npm init -y
tsc --init (初始化ts配置)
  1. 安装依赖包:
pnpm add express
pnpm add ws
pnpm add nodemon -D (用于热更新)
  1. 编写入口文件(通常来说我们应该做很多模块化处理,这里demo为了简单,就将很多逻辑写在一起了。)
import express from 'express'
import { addUser, BaseResponse, getUsers, login, MessageContent, messageHandler } from '.'
import logger from 'morgan'
import { WebSocketServer } from 'ws'
import configWsServer from './wsServer'
import { parseCookies } from './util'

const HTTP_PORT = 4004
const app = express()

app.use(logger('dev'))
app.use(express.json())

app.get('/', (req, res) => {
    res.send('Hello World!')
})

app.get('/api/user', async (req, res) => {
    const users = await getUsers()
    res.send(new BaseResponse(true, users))
})

app.post('/api/user', async (req, res) => {
    try {
        const data = req.body
        await addUser(data)
        res.send(new BaseResponse(true))
    } catch (error) {
        res.status(500).send(new BaseResponse(false, error))
    }
})

app.post('/api/login', async (req, res) => {
    try {
        const data = req.body
        const user = await login(data)
        res.cookie('user', user.username).send(new BaseResponse(true, user))
    } catch (error) {
        res.status(500).send(new BaseResponse(false, error))
    }
})

app.get('/api/logout', async (req, res) => {
    res.clearCookie('user').send(new BaseResponse(true))
})

app.get('/api/msgs', async (req, res) => {
    try {
        await messageHandler.getAllMessages()
        res.send(new BaseResponse(true, messageHandler.getUserRelatedMessages(parseCookies(req.headers.cookie).user)))
    } catch (error) {
        res.status(500).send(new BaseResponse(false, error))
    }
})

const server = app.listen(HTTP_PORT, undefined, () => console.log('listening on: http://localhost:' + HTTP_PORT))
const wsServer = new WebSocketServer({ server })

configWsServer(wsServer)

这里主要是初始花了一些 http服务和接口websocket 服务, demo为了尽量简单,使用 cookie 做的登录验证。

  1. 用户有关操作

该demo由于没有使用数据库,就在根目录创建了 db 目录,里面存放着 msgs.jsonusers.json 文件,用于存储用户信息以及聊天记录。我们就使用文件读写来代替服务端的数据库读写的功能(有兴趣也可以安装 mySql、postgreSql 等数据库,使用数据库来做持久化操作)。

// 常量:用户信息文件地址和消息文件地址
const userListPath = resolve(__dirname, "../db/users.json")
const msgListPath = resolve(__dirname, "../db/msgs.json")

这里以登录举例,登录就需要我们去数据库查询用户是否存在且密码是否正确,因此我们使用 fs 模块来读取文件,然后使用 JSON.parse 将文件内容转换为对象,然后使用 Array.find 查询用户是否存在,如果存在就返回用户信息,否则就返回 null

// 登录接口
app.post('/api/login', async (req, res) => {
    try {
        const data = req.body
        const user = await login(data)
        res.cookie('user', user.username).send(new BaseResponse(true, user))
    } catch (error) {
        res.status(500).send(new BaseResponse(false, error))
    }
})
// 登录方法
export const login = async (user: UserInfo) => {
    const users = await getUsers()
    const foundUser = users.find(u => u.username === user.username && u.password === user.password)
    if (!foundUser)
        throw "username or password is not correct"
    return foundUser
}
// 从文件读取所有用户
export const getUsers = async () => {
    const data = await fs.readFile(userListPath, "utf-8")
    const user = (JSON.parse(data) || []) as UserInfo[]

    try {
        const allOnlineUser = getAllOnlineUsers()
        user.forEach(item => {
            if (allOnlineUser.includes(item.username)) {
                item.online = true
            } else {
                item.online = false
            }
        })
        return user
    } catch (error) {
        return user
    }
}

总而言之就是使用文件读取代替数据库操作。

  1. websocket相关

使用 ws 这个库老构建 webSocket 基础服务:

const server = app.listen(HTTP_PORT, undefined, () => console.log('listening on: http://localhost:' + HTTP_PORT))
const wsServer = new WebSocketServer({ server })

configWsServer(wsServer)

下面是 configServer 相关的逻辑:

import { Server, type WebSocket } from "ws";
import { parseCookies } from "./util";
import { messageHandler } from ".";
// 消息类型
export enum WsMsgTypes {
    CALL = 'call',
    RECEIVE_CALL = 'receive_call',
    ANSWER = 'answer',
    CONNECT_SUCCESS = 'connect_success',
    USER_ONLINE = 'user_online',
    USER_OFFLINE = 'user_offline',
    CHAT_MSG = 'chat_msg',
    ICE = 'ice',
    SDP = 'sdp',
}

// 这里为了统一规范ws消息的格式,使用该类型来约束
export class WsMsgs<T = any> {
    constructor(public type: WsMsgTypes, public data?: T) {
        this.type = type
        this.data = data
    }

    public get content() {
        return JSON.stringify({ type: this.type, data: this.data })
    }
}

export class CallMsgs {
    constructor(public from: string, public to: string) {
        this.from = from
        this.to = to
    }
}
// 处理消息
export const handleMessage = async (msg: WsMsgs, user: string) => {
    switch (msg.type) {
        case WsMsgTypes.CALL:
            // 发送通话请求
            sendMsgToUser(msg.data.to, new WsMsgs(WsMsgTypes.CALL, msg.data).content)
            break;
        case WsMsgTypes.ANSWER:
            // 接收通话请求
            sendMsgToUser(msg.data.to, new WsMsgs(WsMsgTypes.ANSWER, msg.data).content)
            break;
        case WsMsgTypes.CHAT_MSG:
            // 收到连接成功消息
            await messageHandler.addMessage(msg.data)
            sendMsgToUser(msg.data.to, new WsMsgs(WsMsgTypes.CHAT_MSG, msg.data).content)
            sendMsgToUser(msg.data.from, new WsMsgs(WsMsgTypes.CHAT_MSG, msg.data).content)
            break;
        case WsMsgTypes.SDP:
            sendMsgToUser(msg.data.to, new WsMsgs(WsMsgTypes.SDP, msg.data).content)
            break
        case WsMsgTypes.ICE:
            sendMsgToUser(msg.data.to, new WsMsgs(WsMsgTypes.ICE, msg.data).content)
            break
        default:
            break;
    }
}
// 这里存储所有的通信实例,用于存储所有连接的ws实例和用户名
let allConnections: { user: string; ws: WebSocket }[] = []

export const getAllOnlineUsers = () => allConnections.map(item => item.user)

async function handleConnect(ws: WebSocket, user: string) {
    if (user) {
        allConnections.push({ user, ws })
        console.log(user, 'is online!!');
        ws.send(new WsMsgs(WsMsgTypes.CONNECT_SUCCESS).content)
        // 用户上线就发消息给其他用户,通知其他用户有新用户上线
        sendToAllUser(new WsMsgs(WsMsgTypes.USER_ONLINE, user))
    }
    else {
        // 如果用户不存在,就关闭连接
        ws.send(500)
    }
}
// 发送消息给所有用户
async function sendToAllUser(msg: WsMsgs) {
    try {
        allConnections.forEach(item => {
            item.ws.send(msg.content)
        })
    } catch (error) {
        console.log(error);

    }
}
// 发送消息给指定用户
async function sendMsgToUser(user: string, msg: string) {
    const conn = allConnections.find(item => item.user === user)
    if (conn) {
        conn.ws.send(msg)
    }
}

export default function configWsServer(wsServer: Server) {
    wsServer.on('connection', (ws, req) => {
        const user = parseCookies(req.headers.cookie).user
        handleConnect(ws, user)
        ws.on('message', (data) => {
            handleMessage(JSON.parse(data.toString()), user)
        })
        ws.on('close', () => {
            allConnections = allConnections.filter(item => item.user !== user)
            sendToAllUser(new WsMsgs(WsMsgTypes.USER_OFFLINE, user))
        })
    })
    wsServer.on('close', () => {
        console.log('close')
    })
}

webSocket通信

到此为止后端的大致架子就搭建完成了,前端就需要通过 http 和 websocket 协议使用后端服务,首先前端的登录这里就不说了,简单说一下webSocket通信:

export class WebSocketService {
    public ws: WebSocket
    private onSdpReceived: (sdp: RTCSessionDescription) => void
    private onAnswerReceived: (answer: RTCSessionDescriptionInit) => void
    private onIceReceived: (ice: RTCIceCandidate) => void

    constructor(private callback: (ws?: WebSocket) => void) {
        this.init()
        callback(this.ws)
    }

    private init() {
        this.ws = new WebSocket(`ws://${location.hostname}:4004`);
        this.ws.onmessage = (event) => {
            this.handleMessage(JSON.parse(event.data))
        }
        this.ws.onerror = (error) => {
            console.log('error', error)
        }
        this.ws.onclose = (ev) => {
            console.log('close', ev)
        }
    }

    private handleMessage({ data, type }: WsMsgs) {
        const { setUserOnlineStatus } = useUserAction()
        switch (type) {
            case WsMsgTypes.CONNECT_SUCCESS:
                this.callback(this.ws)
                return
            case WsMsgTypes.USER_ONLINE:
                setUserOnlineStatus(data as string, true)
                return
            case WsMsgTypes.USER_OFFLINE:
                setUserOnlineStatus(data as string, false)
                return
            case WsMsgTypes.CHAT_MSG:
                useMsgStore().addChatMsg(data as MessageContent)
                return
            // case WsMsgTypes.CALL:
            //     alert("收到呼叫消息")
            //     // this.onCallReceived?.(data)
            //     return
            case WsMsgTypes.SDP:
                console.log("收到sdp消息")
                this.onSdpReceived?.(data.content as RTCSessionDescription)
                return
            case WsMsgTypes.ANSWER:
                console.log("收到answer消息", data)
                this.onAnswerReceived?.(data.content as RTCSessionDescriptionInit)
                return
            case WsMsgTypes.ICE:
                console.log("收到ice消息", data)
                this.onIceReceived?.(data.content as RTCIceCandidate)
        }
    }

    public on(type: 'sdp_received' | 'answer_received' | 'ice_candidate', callback: (...data: any) => void) {
        switch (type) {
            case 'sdp_received':
                this.onSdpReceived = callback
                break
            case 'answer_received':
                this.onAnswerReceived = callback
                break
            case 'ice_candidate':
                this.onIceReceived = callback
                break
        }
    }

}

这是我封装的 websocketService,用于前后端之间的双向通信,这里主要实现了 on 方法,用于监听 sdp_receivedanswer_receivedice_candidate 三个事件,分别对应 sdpanswerice 三个消息,这里就不一一实现了,这里只是简单的打印一下,实际开发中可以根据自己的需求进行扩展。其他的消息我这边直接存在了全局状态store里面,store里面来处理各种不同类型的消息,这来也不再赘述这些处理逻辑了,毕竟此次的分享主要是分享webrtc相关的技术内容,前面的铺垫已经非常多了。

webrtc视频通话

webrtc视频通话的实现主要分为以下几步:

  1. 获取视频流(屏幕或者摄像头之类的)
  2. 创建 PeerConnection 连接
  3. 给对方发送推流请求
  4. 推流

这个步骤是大致的一个步骤,接下来我们需要详细讨论:

创建 PeerConnection 连接

PeerConnection 是 webrtc 的核心,它封装了 WebSocketRTCDataChannel 等协议,我们可以通过 PeerConnection 来实现视频通话,步骤如下:
1.首先我们就是需要实例化一个 PeerConnection

export const createPeerConnection = () => {
    const pc = new RTCPeerConnection(iceConfiguration)
    return pc
}
  1. 获取视频流:我们以摄像头举例,浏览器给我们提供了一个 api 用于获取用户的摄像头视频流: navigator.mediaDevices.getUserMedia,这个 api 接收一个参数,是一个对象,里面可以设置 videoaudio 的参数,比如 widthheightfacingMode 等等,具体可以参考文档。这是一个异步函数,我们需要等待异步结束,拿到这个视频流,然后在本地播放(毕竟视频通话是双向的视频流推送和播放):
export const getMediaStream = async (constraint: MediaStreamConstraints = {
    video: true,
    audio: true
}) => {
    const stream = await navigator.mediaDevices.getUserMedia(constraint)
    return stream
}

// 本地播放
currentVideoRef.value.srcObject = localStream;
  1. 将视频流推送到对端:
 localStream.getTracks().forEach(track => {
    pc.addTrack(track, localStream)
})
  1. 事件监听:我们至少需要监听 pc.onTrack和pc.onicecandidate 事件:

    1. pc.onTrack:当对端发送视频流时,会触发这个事件,我们可以通过这个事件来获取对端视频流,并将其播放到本地:
    2. pc.onicecandidate:有新的ice候选的时候,会触发这个事件,我们可以通过这个事件来获取对端 ice 消息,并将其发送给对端:
  2. 创建请求(offer)并将其设置为pc.localDescription

 const offer = await pc.createOffer()
    await pc.setLocalDescription(offer)
  1. offer 发送给对端:
    new WsMsgs(WsMsgTypes.SDP, new CallMsgs(targetUser, pc.localDescription)).send()

至此:发送端的操作就完成了,下面是接收端应该做的事情:

接收端通过 websocket 接收到 offer 之后,如果统一这一通视频通话那么就进行如下操作:

  1. 首先和发送端一样,都需要获取本地摄像头视频流,并且在本地视频流对应的 video 元素进行播放。
  2. 接着创建 PeerConnection,并且将视频流推到该 PeerConnection 中:
    const localStream = await getMediaStream()
    // 将本地媒体流添加到pc中
    localStream.getTracks().forEach(track => {
        pc.addTrack(track, localStream)
    })
    return localStream
  1. 同样的:我们至少需要监听 pc.onTrack和pc.onicecandidate 事件。
  2. 接收端需要将 offer 设置为 pc.remoteDescription
    await pc.setRemoteDescription(new RTCSessionDescription(sdp))
  1. 接收端需要创建 answer 并且将其设置为 pc.localDescription
    const answer = await pc.createAnswer()
    await pc.setLocalDescription(answer)
  1. answer 发送给发送端:
    new WsMsgs(WsMsgTypes.ANSWER, new CallMsgs(targetUser, answer)).send()

至此:使用webRtc进行端对端的视频通话就完成了。

本次分享的demo的github地址

https://github.com/rondout/webrtc-demo


网站公告

今日签到

点亮在社区的每一天
去签到