使用WebSocket实现跨多个服务器传输音频及实时语音识别

发布于:2025-05-16 ⋅ 阅读:(8) ⋅ 点赞:(0)

下面我的项目信息:

项目架构:
A项目(Websocket客户端 / React前端) => B项目(Websocket客户端 / Java后端)=》C项目(Websocket服务端 / Node.js 后端)

项目功能:
A项目有一个开启语音输入的功能,用户开始说话,获取麦克风输入的数据,将获取到的数据传输到B项目,B项目返回一段模拟的识别文字(随机生成)给A项目,(真实逻辑应该是在C项目生成识别文字给到B,B再给我A,这里我偷懒了)。B项目继续将获取到的麦克风数据传输到C项目,C项目将收集到的音频数据存储到内存缓冲区,等A项目停止讲话的时候,C项目会将缓冲区的数据生成为一个 .wav 格式的音频文件,然后再通过 ffmpeg 将 .wav 格式的音频文件转为 .mp3 的音频文件。


上代码:

代码已经上传到了 gitee,

gitee仓库地址:

A项目和C项目:https://gitee.com/tylerzhong/websocket_frontend
B项目:https://gitee.com/tylerzhong/websocket_middleware

A项目的代码:

AudioWebSocket :(项目路径:src\pages\AudioWebSocket\index.jsx)

import { useState, useRef, useEffect } from 'react';

const AudioWebSocket = () => {
  const [resText, setResText] = useState('');
  const ws = useRef(null);
  const record = useRef(null);
  const waveformCanvas = useRef(null);
  const audioContextRef = useRef(null);

  // 开始/结束对讲的数据结构
  const startData = { is_speaking: true, mode: "2pass", wav_name: "h5" };
  const endData = { is_speaking: false, mode: "2pass", wav_name: "h5" };

  // 录音处理器类(完整修复:补全 audioData 结构)
  class Recorder {
    constructor(stream) {
      this.sampleBits = 16; // 输出采样数位
      this.sampleRate = 16000; // 输出采样率
      
      // 完整定义 audioData 对象(包含 input 方法)
      this.audioData = {
        type: "wav",
        size: 0, // 录音文件长度
        buffer: [], // 录音缓存
        inputSampleRate: 48000, // 输入采样率
        inputSampleBits: 16, // 输入采样数位
        outputSampleRate: this.sampleRate, // 输出采样率
        outputSampleBits: this.sampleBits, // 输出采样数位
        
        // 清理缓存
        clear: function () {
          this.buffer = [];
          this.size = 0;
        },
        
        // 向缓存中添加数据
        input: function (data) {
          this.buffer.push(new Float32Array(data));
          this.size += data.length;
        },
        
        // 合并并压缩数据
        compress: function () {
          const mergedData = new Float32Array(this.size);
          let offset = 0;
          this.buffer.forEach(chunk => {
            mergedData.set(chunk, offset);
            offset += chunk.length;
          });
          
          // 按采样率压缩(输入采样率 / 输出采样率)
          const compressionRatio = this.inputSampleRate / this.outputSampleRate;
          const compressedLength = Math.floor(mergedData.length / compressionRatio);
          const compressedData = new Float32Array(compressedLength);
          
          for (let i = 0; i < compressedLength; i++) {
            compressedData[i] = mergedData[i * compressionRatio];
          }
          return compressedData;
        },
        
        // 编码为 PCM 格式
        encodePCM: function () {
          const pcmData = this.compress();
          const byteLength = pcmData.length * (this.outputSampleBits / 8);
          const buffer = new ArrayBuffer(byteLength);
          const dataView = new DataView(buffer);
          
          for (let i = 0; i < pcmData.length; i++) {
            const value = pcmData[i];
            const intValue = value < 0 ? 
              Math.max(-1, value) * 0x8000 :  // 负数范围:-32768 ~ 0
              Math.min(1, value) * 0x7FFF;    // 正数范围:0 ~ 32767
            dataView.setInt16(i * 2, intValue, true); // 小端序
          }
          return new Blob([dataView], { type: 'audio/wav' });
        }
      };

      this.recording = true; // 录音状态标志
      
      // 初始化音频上下文(确保单例)
      audioContextRef.current = audioContextRef.current || new AudioContext();
      this.audioInput = audioContextRef.current.createMediaStreamSource(stream);
      this.recorderNode = audioContextRef.current.createScriptProcessor(4096, 1, 1);

      // 绑定音频处理回调(使用箭头函数确保 this 指向 Recorder 实例)
      this.recorderNode.onaudioprocess = (e) => {
        const inputBuffer = e.inputBuffer.getChannelData(0);
        this.audioData.input(inputBuffer);
        this.sendData();
        this.updateWaveform(inputBuffer);
      };
    }

    // 启动录音(连接音频节点)
    start() {
      this.audioInput.connect(this.recorderNode);
      this.recorderNode.connect(audioContextRef.current.destination);
    }

    // 停止录音(断开连接并标记状态)
    stop() {
      this.recording = false;
      this.recorderNode.disconnect();
    }

    // 发送数据到 WebSocket
    sendData() {
      if (!this.recording) return;
      
      const reader = new FileReader();
      reader.onload = (e) => {
        const rawData = e.target.result;
        const byteArray = new Int8Array(rawData);
        
        // 分包发送(每包 1024 字节)
        for (let i = 0; i < byteArray.length; i += 1024) {
          const chunk = byteArray.slice(i, i + 1024);
          ws.current.send(chunk);
        }
      };
      
      // 读取编码后的 PCM 数据
      reader.readAsArrayBuffer(this.audioData.encodePCM());
      this.audioData.clear(); // 清理缓存
    }

    // 更新音浪效果
    updateWaveform(inputBuffer) {
      const canvas = waveformCanvas.current;
      if (!canvas) return;
      
      const ctx = canvas.getContext('2d');
      const width = canvas.width;
      const height = canvas.height;
      
      ctx.clearRect(0, 0, width, height);
      ctx.fillStyle = '#106AE8';
      
      const numBars = 20;
      const barWidth = width / (numBars * 3);
      let x = 15;

      for (let i = 0; i < numBars; i++) {
        const sampleIndex = Math.floor(i * (inputBuffer.length / numBars));
        const barHeight = (Math.abs(inputBuffer[sampleIndex]) * height * 6) / 2;
        ctx.fillRect(x, height / 2 - barHeight, barWidth, barHeight * 2);
        x += barWidth + 4;
      }
    }
  }

  // 初始化 WebSocket
  const initWebSocket = () => {
    ws.current = new WebSocket('ws://localhost:8081/ws/a'); // 替换为实际地址
    ws.current.binaryType = 'arraybuffer';
    
    ws.current.onopen = () => {
      ws.current.send(JSON.stringify(startData));
      console.log('WebSocket 连接成功');
      if (record.current) {
        record.current.start(); // 启动录音
      }
    };

    ws.current.onmessage = (msg) => {
      const res = JSON.parse(msg.data);
      setResText(res.text); // 更新识别文字
    };

    ws.current.onerror = (err) => {
      console.error('WebSocket 错误:', err);
    };
  };

  // 开始对讲
  const startIntercom = async () => {
    try {
      // 申请麦克风权限
      const mediaStream = await navigator.mediaDevices.getUserMedia({ audio: true });
      record.current = new Recorder(mediaStream); // 创建 Recorder 实例
      initWebSocket(); // 初始化 WebSocket
    } catch (error) {
      console.error('麦克风权限申请失败:', error);
    }
  };

  // 关闭对讲
  const endIntercom = () => {
    if (ws.current) {
      ws.current.send(JSON.stringify(endData)); // 发送结束标志
      record.current?.stop(); // 停止录音
    }
  };

  // 组件卸载清理
  useEffect(() => {
    return () => {
      if (ws.current) {
        ws.current.close(); // 关闭 WebSocket
      }
      if (audioContextRef.current) {
        audioContextRef.current.close(); // 关闭音频上下文
      }
    };
  }, []);

  return (
    <div className="mainContent">
      <button onClick={startIntercom}>开始对讲</button>
      <button onClick={endIntercom}>关闭对讲</button>
      <div>语音识别的文字为:{resText || '--'}</div>
      <canvas ref={waveformCanvas} width="200" height="20" />
    </div>
  );
};

export default AudioWebSocket;

B项目的代码:

WebSocketConfig:(项目路径:src/main/java/com/tyler/config/WebSocketConfig.java)

package com.tyler.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {

    // 注册 ServerEndpointExporter,用于扫描所有 @ServerEndpoint 注解的类
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

WebSocketClient:(项目路径:src/main/java/com/tyler/voice/WebSocketClient.java)

package com.tyler.voice;

import jakarta.websocket.Endpoint;
import jakarta.websocket.EndpointConfig;
import jakarta.websocket.Session;
import org.glassfish.tyrus.client.ClientManager;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

public class WebSocketClient {
    private String cServerUrl;
    private Session cSession;
    private CountDownLatch connectLatch = new CountDownLatch(1);

    public WebSocketClient(String cServerUrl) {
        this.cServerUrl = cServerUrl;
    }

    // 连接
    public void connect() {
        new Thread(() -> {
            ClientManager client = ClientManager.createClient();
            try {
                client.connectToServer(new Endpoint() {
                    @Override
                    public void onOpen(Session session, EndpointConfig config){
                        cSession = session;
                        connectLatch.countDown();
                        System.out.println("已连接到 C项目 WebSocket 服务");
                    }
                }, new URI(cServerUrl));
                connectLatch.await(); // 等待连接建立
            } catch (Exception e) {
                System.out.println("连接 C项目失败:"+e.getMessage());
            }
        }).start();
    }

    // 发送文本消息(新增方法)
    public void sendText(String text) {
        if (cSession != null && cSession.isOpen()) {
            try {
                cSession.getBasicRemote().sendText(text); // 直接发送文本
            } catch (IOException e) {
                System.out.println("向 C项目发送文本失败:"+e.getMessage());
            }
        }
    }

    // 保留原发送二进制数据的方法(用于音频数据)
    public void send(byte[] data) {
        if (cSession != null && cSession.isOpen()) {
            try {
                cSession.getBasicRemote().sendBinary(ByteBuffer.wrap(data));
            } catch (IOException e) {
                System.out.println("向 C项目发送数据失败:"+e.getMessage());
            }
        }
    }

    // 断开与C项目的连接
    public void disconnect() {
        if (cSession != null) {
            try {
                cSession.close();
            } catch (IOException e) {
                System.out.println("断开 C项目连接失败:"+e.getMessage());
            }
        }
    }

}

WebSocketServer:(项目路径:src/main/java/com/tyler/voice/WebSocketServer.java)

package com.tyler.voice;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
@ServerEndpoint(value = "/ws/a")
public class WebSocketServer {
    private static final ConcurrentHashMap<String, Session> aSessions = new ConcurrentHashMap<>();
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private WebSocketClient cClient = new WebSocketClient("ws://localhost:8082/ws/c");

    @OnOpen
    public void onOpen(Session session) {
        String sessionId = session.getId();
        aSessions.put(sessionId, session);
        System.out.println("A项目连接已建立,SessionID: " + sessionId);
        cClient.connect(); // 连接 C 项目
    }

    @OnMessage
    public void onMessage(Session session, String controlMsg) {
        System.out.println("收到控制消息:" + controlMsg);
        try {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("text", "----------------已经收到数据了。。。。。。。。。。。。。。。");
            session.getBasicRemote().sendText(JSON.toJSONString(jsonObject));
        } catch (IOException e) {
            System.out.println("发送数据到A项目失败");
        }
        cClient.sendText(controlMsg);
    }

    @OnMessage
    public void onMessage(Session session, byte[] audioData) {
        System.out.println("收到A项目语音数据,长度:" + audioData.length + " bytes");
        try {
            JSONObject jsonObject = new JSONObject();
            Random random = new Random();
            jsonObject.put("text", random.nextInt(10, 100));
            session.getBasicRemote().sendText(JSON.toJSONString(jsonObject));
        } catch (IOException e) {
            System.out.println("发送数据到A项目失败");
        }
        cClient.send(audioData);
    }

    @OnClose
    public void onClose(Session session) {
        String sessionId = session.getId();
        scheduler.schedule(() -> {
            cClient.disconnect();
            aSessions.remove(sessionId);
            System.out.println("A项目连接已关闭,SessionID: " + sessionId);
        }, 500, TimeUnit.MICROSECONDS);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        // 忽略因连接关闭导致的 IOException (关键)
        if (!(error instanceof IOException && error.getMessage() != null && error.getMessage().contains("中止了一个已建立的连接"))) {
            System.out.println("A项目连接异常:"+error.getMessage());
        }
    }

}

application.yml: (项目路径:src/main/resources/application.yml)

server:
  port: 8081

C 项目代码:

websocket-node.js:(项目路径:websocket-node.js)

const WebSocket = require('ws');
const fs = require('fs');
const { exec } = require('child_process');
const path = require('path');

const wss = new WebSocket.Server({ port: 8082 });
const clientBuffers = new Map(); // 存储 { buffer: Buffer[], isEnd: boolean }

// 生成唯一客户端 ID
const generateClientId = () => {
    return `client-${Date.now()}-${Math.floor(Math.random() * 1000)}`;
};

// 计算缓冲区总长度
const sumBufferLength = (clientId) => {
    return clientBuffers.get(clientId).buffer.reduce((acc, chunk) => acc + chunk.length, 0);
};

// FFmpeg 转码(WAV → MP3)
const convertToMp3 = (wavPath, mp3Path, callback) => {
    const ffmpegCmd = `ffmpeg -i "${wavPath}" -vn -ab 128k -ar 44100 -y "${mp3Path}"`;
    exec(ffmpegCmd, (error, stdout, stderr) => {
        if (error) {
            callback(`转码失败: ${error.message}`);
            return;
        }
        callback(null, mp3Path);
    });
};

wss.on('connection', (ws) => {
    const clientId = generateClientId();
    clientBuffers.set(clientId, { buffer: [], isEnd: false });
    console.log(`新客户端连接: ${clientId}`);

    ws.on('message', (data) => {
        let controlMsg = null;
        const isJson = data.toString().startsWith('{') && data.toString().endsWith('}'); // 快速判断是否可能是 JSON

        try {
            if (isJson) {
                controlMsg = JSON.parse(data.toString());
            }
        } catch (error) {
            // 故意空着,让 controlMsg 保持 null
        }

        if (controlMsg !== null) {
            // 处理控制消息
            console.log(`[${clientId}] 收到控制消息:`, controlMsg);
            if (controlMsg.is_speaking === false) {
                clientBuffers.get(clientId).isEnd = true;
                processAudio(clientId);
            }
        } else {
            // 处理音频数据(支持 Buffer/ArrayBuffer/Blob)
            let bufferData;
            if (data instanceof Buffer) {
                bufferData = data;
            } else if (data instanceof Uint8Array) {
                bufferData = Buffer.from(data);
            } else if (data instanceof ArrayBuffer) {
                bufferData = Buffer.from(data);
            } else {
                console.warn(`[${clientId}] 不支持的数据类型:`, typeof data);
                return;
            }

            clientBuffers.get(clientId).buffer.push(bufferData);
            console.log(`[${clientId}] 收到音频数据块,当前总长度: ${sumBufferLength(clientId)}`);
        }
    });

    ws.on('close', () => {
        console.log(`[${clientId}] 连接关闭,处理剩余数据`);
        processAudio(clientId); // 连接关闭时处理残留数据
    });

    ws.on('error', (error) => {
        console.error(`[${clientId}] 连接错误:`, error);
        processAudio(clientId); // 错误时强制处理
    });
});

// 处理音频数据的核心函数
const processAudio = (clientId) => {
    const clientData = clientBuffers.get(clientId);
    if (!clientData || clientData.buffer.length === 0 && !clientData.isEnd) {
        clientBuffers.delete(clientId);
        return;
    }

    const audioBuffer = Buffer.concat(clientData.buffer);
    clientBuffers.delete(clientId);

    if (audioBuffer.length === 0) {
        console.log(`[${clientId}] 无有效音频数据`);
        return;
    }

    // 保存临时 WAV 文件(补充 WAV 头)
    const wavBuffer = addWavHeader(audioBuffer);
    const tempWavPath = path.join(__dirname, 'temp', `${clientId}.wav`);
    const outputMp3Path = path.join(__dirname, 'output', `${clientId}.mp3`);

    fs.writeFile(tempWavPath, wavBuffer, (err) => {
        if (err) {
            console.error(`[${clientId}] 写入临时文件失败:`, err);
            return;
        }

        convertToMp3(tempWavPath, outputMp3Path, (error, mp3Path) => {
            if (error) {
                console.error(`[${clientId}] ${error}`);
            } else {
                console.log(`[${clientId}] MP3 保存成功: ${mp3Path}`);
            }
            // fs.unlink(tempWavPath, () => {}); // 清理临时文件
        });
    });
};

// 补充 WAV 头(与 A 项目采样率一致:16000Hz, 16bit, 单声道)
const addWavHeader = (pcmBuffer) => {
    const sampleRate = 16000; // A 项目 Recorder 中设置的采样率
    const numChannels = 1; // 单声道
    const bitDepth = 16; // 16bit 采样

    // WAV 头总长度固定为 44 字节,后续是 PCM 数据
    const wavBuffer = Buffer.alloc(44 + pcmBuffer.length);

    // 写入 RIFF 头(4字节)
    wavBuffer.write('RIFF', 0);
    wavBuffer.writeUInt32LE(36 + pcmBuffer.length, 4); // 文件总大小(RIFF 头 + WAV 头 + PCM 数据)
    wavBuffer.write('WAVE', 8);

    // 写入 fmt 子块(24字节)
    wavBuffer.write('fmt ', 12);
    wavBuffer.writeUInt32LE(16, 16); // fmt 子块大小(固定为 16)
    wavBuffer.writeUInt16LE(1, 20); // 音频格式(PCM = 1)
    wavBuffer.writeUInt16LE(numChannels, 22); // 声道数
    wavBuffer.writeUInt32LE(sampleRate, 24); // 采样率
    wavBuffer.writeUInt32LE((sampleRate * numChannels * bitDepth) / 8, 28); // 字节率(采样率×声道数×位深/8)
    wavBuffer.writeUInt16LE((numChannels * bitDepth) / 8, 32); // 块对齐(声道数×位深/8)
    wavBuffer.writeUInt16LE(bitDepth, 34); // 位深

    // 写入 data 子块(8字节 + PCM 数据)
    wavBuffer.write('data', 36);
    wavBuffer.writeUInt32LE(pcmBuffer.length, 40); // PCM 数据大小

    // 写入 PCM 数据(关键修复:逐个字节复制)
    pcmBuffer.copy(wavBuffer, 44); // 将 PCM 数据复制到 WAV 缓冲区的 44 字节之后

    return wavBuffer;
};

// 初始化临时目录
fs.mkdirSync(path.join(__dirname, 'temp'), { recursive: true });
fs.mkdirSync(path.join(__dirname, 'output'), { recursive: true });
console.log('C项目启动,端口: 8082');

网站公告

今日签到

点亮在社区的每一天
去签到