局域网TCP通过组播放地址rtp推流和拉流实现实时喊话

发布于:2025-07-26 ⋅ 阅读:(23) ⋅ 点赞:(0)

应用场景,安卓端局域网不用ip通过组播放地址实现实时对讲功能

发送端: ffmpeg -f alsa -i hw:1 -acodec aac -ab 64k -ac 2 -ar 16000 -frtp -sdp file stream.sdp rtp://224.0.0.1:14556

接收端: ffmpeg -protocol whitelist file,udp,rtp -i stream.sdp -acodec pcm s16le -ar 16000 -ac 2 -f alsa default

在windows上测试通过后然后在安卓中实现

# 查询本地可用麦克风设备
ffmpeg -list_devices true -f dshow -i dummy

麦克风 (Realtek(R) Audio)这是我电脑的
# windows  执行RTM推音频流
ffmpeg -f dshow -i audio="麦克风 (Realtek(R) Audio)" -acodec aac -ab 64k -ac 2 -ar 16000 -f rtp -sdp_file stream.sdp rtp://239.0.0.1:15556

上面windows上调通后接下来在安卓上实现

implementation("com.arthenica:mobile-ffmpeg-full:4.4.LTS")主要用到这个库
package com.xhx.megaphone.tcp

import android.media.AudioFormat
import android.media.AudioRecord
import android.media.MediaRecorder
import com.arthenica.mobileffmpeg.Config
import com.arthenica.mobileffmpeg.FFmpeg
import com.blankj.utilcode.util.LogUtils
import com.xhx.megaphone.App
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.io.File
import java.io.FileOutputStream
import java.io.IOException
import java.util.concurrent.atomic.AtomicBoolean

/**
 * 实时推流助手 - 最优化版本
 * 
 * 功能:
 * 1. 录音buffer直接实时写入临时文件
 * 2. FFmpeg同时读取文件进行推流
 * 3. 最小化延迟的实时推流
 */
object LiveStreamingHelper {
    
    private const val TAG = "LiveStreamingHelper"
    
    // 组播配置
    private const val MULTICAST_ADDRESS = "239.0.0.1"
    private const val MULTICAST_PORT = 15556
    
    // 音频参数
    private const val SAMPLE_RATE = 16000
    private const val CHANNELS = 1
    private const val BIT_RATE = 64000
    private const val AUDIO_FORMAT = AudioFormat.ENCODING_PCM_16BIT
    
    // 缓冲区大小 - 使用较小的缓冲区减少延迟
    private val BUFFER_SIZE = AudioRecord.getMinBufferSize(
        SAMPLE_RATE,
        AudioFormat.CHANNEL_IN_MONO,
        AUDIO_FORMAT
    ).let { minSize ->
        // 使用最小缓冲区的2倍,减少延迟
        minSize * 2
    }
    
    // 推流状态
    private val isStreaming = AtomicBoolean(false)
    private var audioRecord: AudioRecord? = null
    private var recordingThread: Thread? = null
    private var ffmpegExecutionId: Long = 0
    
    // 文件路径
    private val cacheDir = File(App.ctx.cacheDir, "live_streaming")
    private val sdpFile = File(cacheDir, "stream.sdp")
    private val liveAudioFile = File(cacheDir, "live_audio.pcm")
    
    /**
     * 开始实时录音推流
     */
    fun startStreaming(): Boolean {
        if (isStreaming.get()) {
            LogUtils.w(TAG, "推流已在进行中")
            return false
        }
        
        return try {
            initializeFiles()
            createSdpFile()
            startAudioRecording()
            startLiveStreaming()
            
            isStreaming.set(true)
            LogUtils.i(TAG, "✅ 实时推流启动成功")
            true
        } catch (e: Exception) {
            LogUtils.e(TAG, "❌ 实时推流启动失败", e)
            stopStreaming()
            false
        }
    }
    
    /**
     * 停止推流
     */
    fun stopStreaming() {
        if (!isStreaming.get()) {
            return
        }
        
        isStreaming.set(false)
        
        // 停止录音
        audioRecord?.stop()
        audioRecord?.release()
        audioRecord = null
        
        // 停止录音线程
        recordingThread?.interrupt()
        recordingThread = null
        
        // 停止FFmpeg
        if (ffmpegExecutionId != 0L) {
            FFmpeg.cancel(ffmpegExecutionId)
            ffmpegExecutionId = 0
        }
        
        LogUtils.i(TAG, "🛑 实时推流已停止")
    }
    
    /**
     * 获取推流状态
     */
    fun isStreaming(): Boolean = isStreaming.get()
    
    /**
     * 获取SDP文件路径
     */
    fun getSdpFilePath(): String = sdpFile.absolutePath
    
    /**
     * 获取组播地址信息
     */
    fun getMulticastInfo(): String {
        val fileSize = if (liveAudioFile.exists()) {
            "${liveAudioFile.length() / 1024}KB"
        } else {
            "0KB"
        }
        
        return "组播地址: $MULTICAST_ADDRESS:$MULTICAST_PORT\n" +
                "SDP文件: ${sdpFile.absolutePath}\n" +
                "传输方式: 实时文件流\n" +
                "缓冲区大小: ${BUFFER_SIZE}字节\n" +
                "当前文件大小: $fileSize\n" +
                "推流状态: ${if (isStreaming.get()) "进行中" else "已停止"}"
    }
    
    /**
     * 初始化文件和目录
     */
    private fun initializeFiles() {
        if (!cacheDir.exists()) {
            cacheDir.mkdirs()
        }
        
        // 清理旧文件
        if (liveAudioFile.exists()) {
            liveAudioFile.delete()
        }
        
        // 创建新的音频文件
        liveAudioFile.createNewFile()
    }
    
    /**
     * 创建SDP文件
     */
    private fun createSdpFile() {
        val sdpContent = """
            v=0
            o=- 0 0 IN IP4 127.0.0.1
            s=No Name
            c=IN IP4 $MULTICAST_ADDRESS
            t=0 0
            a=tool:libavformat 58.45.100
            m=audio $MULTICAST_PORT RTP/AVP 97
            b=AS:64
            a=rtpmap:97 MPEG4-GENERIC/$SAMPLE_RATE/$CHANNELS
            a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=141056E500
        """.trimIndent()
        
        sdpFile.writeText(sdpContent)
        LogUtils.i(TAG, "SDP文件创建成功: ${sdpFile.absolutePath}")
    }
    
    /**
     * 开始音频录音 - 直接实时写入文件
     */
    private fun startAudioRecording() {
        audioRecord = AudioRecord(
            MediaRecorder.AudioSource.MIC,
            SAMPLE_RATE,
            AudioFormat.CHANNEL_IN_MONO,
            AUDIO_FORMAT,
            BUFFER_SIZE
        )
        
        if (audioRecord?.state != AudioRecord.STATE_INITIALIZED) {
            throw IOException("AudioRecord初始化失败")
        }
        
        audioRecord?.startRecording()
        
        // 启动录音线程,实时写入文件
        recordingThread = Thread {
            val buffer = ByteArray(BUFFER_SIZE)
            var fileOutputStream: FileOutputStream? = null
            var totalBytes = 0
            var lastLogTime = System.currentTimeMillis()
            
            try {
                fileOutputStream = FileOutputStream(liveAudioFile, false) // 不追加,覆盖写入
                LogUtils.i(TAG, "录音线程启动,实时写入: ${liveAudioFile.absolutePath}")
                LogUtils.i(TAG, "缓冲区大小: $BUFFER_SIZE 字节")
                
                while (isStreaming.get() && !Thread.currentThread().isInterrupted) {
                    val bytesRead = audioRecord?.read(buffer, 0, buffer.size) ?: 0
                    if (bytesRead > 0) {
                        // 立即写入文件并刷新
                        fileOutputStream.write(buffer, 0, bytesRead)
                        fileOutputStream.flush()
                        totalBytes += bytesRead
                        
                        // 每3秒打印一次状态(更频繁的状态更新)
                        val currentTime = System.currentTimeMillis()
                        if (currentTime - lastLogTime > 3000) {
                            LogUtils.d(TAG, "🎙️ 实时录音中: ${totalBytes / 1024}KB, 速率: ${(totalBytes / ((currentTime - (lastLogTime - 3000)) / 1000.0) / 1024).toInt()}KB/s")
                            lastLogTime = currentTime
                        }
                    } else if (bytesRead == AudioRecord.ERROR_INVALID_OPERATION) {
                        LogUtils.e(TAG, "AudioRecord读取错误: ERROR_INVALID_OPERATION")
                        break
                    } else if (bytesRead < 0) {
                        LogUtils.w(TAG, "AudioRecord读取返回负值: $bytesRead")
                    }
                }
                
                LogUtils.i(TAG, "录音线程结束,总计: ${totalBytes / 1024}KB")
            } catch (e: Exception) {
                LogUtils.e(TAG, "录音数据写入异常", e)
            } finally {
                fileOutputStream?.close()
            }
        }
        
        recordingThread?.start()
        LogUtils.i(TAG, "音频录音已启动")
    }
    
    /**
     * 启动实时推流
     */
    private fun startLiveStreaming() {
        GlobalScope.launch(Dispatchers.IO) {
            // 等待一些音频数据写入
            Thread.sleep(300)
            
            // 构建FFmpeg命令 - 使用较小的缓冲区和实时参数
            val command = "-re -f s16le -ar $SAMPLE_RATE -ac $CHANNELS " +
                    "-thread_queue_size 512 " +  // 增加线程队列大小
                    "-i ${liveAudioFile.absolutePath} " +
                    "-acodec aac -ab ${BIT_RATE/1000}k -ac $CHANNELS -ar $SAMPLE_RATE " +
                    "-f rtp -sdp_file ${sdpFile.absolutePath} " +
                    "rtp://$MULTICAST_ADDRESS:$MULTICAST_PORT"
            
            LogUtils.i(TAG, "FFmpeg实时推流命令: $command")
            
            ffmpegExecutionId = FFmpeg.executeAsync(command) { executionId, returnCode ->
                LogUtils.i(TAG, "FFmpeg推流结束: executionId=$executionId, returnCode=$returnCode")
                
                when (returnCode) {
                    Config.RETURN_CODE_SUCCESS -> {
                        LogUtils.i(TAG, "✅ 推流正常结束")
                    }
                    Config.RETURN_CODE_CANCEL -> {
                        LogUtils.i(TAG, "🛑 推流被用户取消")
                    }
                    else -> {
                        LogUtils.w(TAG, "⚠️ 推流异常结束,返回码: $returnCode")
                    }
                }
            }
            
            LogUtils.i(TAG, "FFmpeg执行ID: $ffmpegExecutionId")
        }
    }
}

拉流

package com.xhx.megaphone.tcp

import android.media.AudioFormat
import android.media.AudioManager
import android.media.AudioTrack
import com.arthenica.mobileffmpeg.Config
import com.arthenica.mobileffmpeg.FFmpeg
import com.blankj.utilcode.util.LogUtils
import com.xhx.megaphone.App
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.io.File
import java.io.RandomAccessFile
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong

/**
 * 低延迟拉流播放助手
 * 
 * 优化策略:
 * 1. 最小化FFmpeg缓冲
 * 2. 减少AudioTrack缓冲区
 * 3. 更频繁的数据读取
 * 4. 优化文件IO
 */
object LowLatencyPullHelper {
    
    private const val TAG = "LowLatencyPullHelper"
    
    // 音频参数
    private const val SAMPLE_RATE = 16000
    private const val CHANNELS = 1
    private const val AUDIO_FORMAT = AudioFormat.ENCODING_PCM_16BIT
    
    // 低延迟参数
    private const val SMALL_BUFFER_SIZE = 1024  // 使用更小的缓冲区
    private const val READ_INTERVAL_MS = 20     // 更频繁的读取间隔
    
    // 拉流状态
    private val isPulling = AtomicBoolean(false)
    private var ffmpegExecutionId: Long = 0
    private var audioTrack: AudioTrack? = null
    private var playbackThread: Thread? = null
    
    // 文件读取位置
    private val fileReadPosition = AtomicLong(0)
    
    // 文件路径
    private val cacheDir = File(App.ctx.cacheDir, "low_latency_pull")
    private val outputPcmFile = File(cacheDir, "realtime_audio.pcm")
    
    /**
     * 开始低延迟拉流播放
     */
    fun startPulling(sdpFilePath: String): Boolean {
        if (isPulling.get()) {
            LogUtils.w(TAG, "拉流已在进行中")
            return false
        }
        
        val sdpFile = File(sdpFilePath)
        if (!sdpFile.exists()) {
            LogUtils.e(TAG, "SDP文件不存在: $sdpFilePath")
            return false
        }
        
        return try {
            initializeFiles()
            startLowLatencyDecoding(sdpFilePath)
            startLowLatencyPlayback()
            
            isPulling.set(true)
            fileReadPosition.set(0)
            LogUtils.i(TAG, "✅ 低延迟拉流播放启动成功")
            true
        } catch (e: Exception) {
            LogUtils.e(TAG, "❌ 低延迟拉流播放启动失败", e)
            stopPulling()
            false
        }
    }
    
    /**
     * 停止拉流
     */
    fun stopPulling() {
        if (!isPulling.get()) {
            return
        }
        
        isPulling.set(false)
        
        // 停止FFmpeg
        if (ffmpegExecutionId != 0L) {
            FFmpeg.cancel(ffmpegExecutionId)
            ffmpegExecutionId = 0
        }
        
        // 停止音频播放
        audioTrack?.stop()
        audioTrack?.release()
        audioTrack = null
        
        // 停止播放线程
        playbackThread?.interrupt()
        playbackThread = null
        
        LogUtils.i(TAG, "🛑 低延迟拉流已停止")
    }
    
    /**
     * 获取拉流状态
     */
    fun isPulling(): Boolean = isPulling.get()
    
    /**
     * 获取拉流信息
     */
    fun getPullInfo(): String {
        val fileSize = if (outputPcmFile.exists()) {
            "${outputPcmFile.length() / 1024}KB"
        } else {
            "0KB"
        }
        
        return "拉流状态: ${if (isPulling.get()) "进行中" else "已停止"}\n" +
                "解码文件: ${outputPcmFile.absolutePath}\n" +
                "文件大小: $fileSize\n" +
                "读取位置: ${fileReadPosition.get() / 1024}KB\n" +
                "优化模式: 低延迟"
    }
    
    /**
     * 初始化文件和目录
     */
    private fun initializeFiles() {
        if (!cacheDir.exists()) {
            cacheDir.mkdirs()
        }
        
        // 清理旧文件
        if (outputPcmFile.exists()) {
            outputPcmFile.delete()
        }
    }
    
    /**
     * 启动低延迟FFmpeg解码
     */
    private fun startLowLatencyDecoding(sdpFilePath: String) {
        GlobalScope.launch(Dispatchers.IO) {
            // 减少等待时间
            Thread.sleep(500)
            
            // 构建超低延迟FFmpeg解码命令
            val command = "-protocol_whitelist file,udp,rtp " +
                    "-fflags +nobuffer+flush_packets " +      // 禁用缓冲并立即刷新
                    "-flags low_delay " +                     // 低延迟模式
                    "-probesize 32 " +                        // 最小探测大小
                    "-analyzeduration 0 " +                   // 不分析流
                    "-max_delay 0 " +                         // 最大延迟为0
                    "-reorder_queue_size 0 " +                // 禁用重排序队列
                    "-rw_timeout 3000000 " +                  // 3秒超时
                    "-i $sdpFilePath " +
                    "-acodec pcm_s16le " +
                    "-ar $SAMPLE_RATE " +
                    "-ac $CHANNELS " +
                    "-f s16le " +
                    "-flush_packets 1 " +                     // 立即刷新数据包
                    "${outputPcmFile.absolutePath}"
            
            LogUtils.i(TAG, "低延迟FFmpeg解码命令: $command")
            
            ffmpegExecutionId = FFmpeg.executeAsync(command) { executionId, returnCode ->
                LogUtils.i(TAG, "FFmpeg解码结束: executionId=$executionId, returnCode=$returnCode")
                
                when (returnCode) {
                    Config.RETURN_CODE_SUCCESS -> {
                        LogUtils.i(TAG, "✅ 解码正常结束")
                    }
                    Config.RETURN_CODE_CANCEL -> {
                        LogUtils.i(TAG, "🛑 解码被用户取消")
                    }
                    else -> {
                        LogUtils.w(TAG, "⚠️ 解码异常结束,返回码: $returnCode")
                    }
                }
            }
        }
    }
    
    /**
     * 启动低延迟音频播放
     */
    private fun startLowLatencyPlayback() {
        // 使用最小缓冲区
        val minBufferSize = AudioTrack.getMinBufferSize(
            SAMPLE_RATE,
            AudioFormat.CHANNEL_OUT_MONO,
            AUDIO_FORMAT
        )
        
        // 使用稍大于最小缓冲区的大小,但不要太大
        val bufferSize = minBufferSize * 2
        
        audioTrack = AudioTrack(
            AudioManager.STREAM_MUSIC,
            SAMPLE_RATE,
            AudioFormat.CHANNEL_OUT_MONO,
            AUDIO_FORMAT,
            bufferSize,
            AudioTrack.MODE_STREAM
        )
        
        // 设置低延迟模式(API 26+)
        try {
            if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.O) {
                val audioAttributes = android.media.AudioAttributes.Builder()
                    .setUsage(android.media.AudioAttributes.USAGE_MEDIA)
                    .setContentType(android.media.AudioAttributes.CONTENT_TYPE_MUSIC)
                    .setFlags(android.media.AudioAttributes.FLAG_LOW_LATENCY)
                    .build()
                
                audioTrack = AudioTrack.Builder()
                    .setAudioAttributes(audioAttributes)
                    .setAudioFormat(
                        AudioFormat.Builder()
                            .setEncoding(AUDIO_FORMAT)
                            .setSampleRate(SAMPLE_RATE)
                            .setChannelMask(AudioFormat.CHANNEL_OUT_MONO)
                            .build()
                    )
                    .setBufferSizeInBytes(bufferSize)
                    .setTransferMode(AudioTrack.MODE_STREAM)
                    .build()
            }
        } catch (e: Exception) {
            LogUtils.w(TAG, "无法设置低延迟AudioTrack,使用默认配置", e)
        }
        
        audioTrack?.play()
        LogUtils.i(TAG, "AudioTrack初始化完成,缓冲区大小: $bufferSize")
        
        // 启动高频率播放线程
        playbackThread = Thread {
            val buffer = ByteArray(SMALL_BUFFER_SIZE)
            var totalPlayed = 0
            var lastLogTime = System.currentTimeMillis()
            
            LogUtils.i(TAG, "低延迟音频播放线程启动")
            
            while (isPulling.get() && !Thread.currentThread().isInterrupted) {
                try {
                    if (outputPcmFile.exists()) {
                        val currentFileSize = outputPcmFile.length()
                        val currentReadPos = fileReadPosition.get()
                        
                        // 如果有新数据可读
                        if (currentFileSize > currentReadPos) {
                            RandomAccessFile(outputPcmFile, "r").use { randomAccessFile ->
                                randomAccessFile.seek(currentReadPos)
                                
                                val bytesRead = randomAccessFile.read(buffer)
                                if (bytesRead > 0) {
                                    audioTrack?.write(buffer, 0, bytesRead)
                                    totalPlayed += bytesRead
                                    fileReadPosition.addAndGet(bytesRead.toLong())
                                    
                                    // 每2秒打印一次状态
                                    val currentTime = System.currentTimeMillis()
                                    if (currentTime - lastLogTime > 2000) {
                                        LogUtils.d(TAG, "🔊 低延迟播放: ${totalPlayed / 1024}KB, 延迟: ${(currentFileSize - currentReadPos) / 32}ms")
                                        lastLogTime = currentTime
                                    }
                                }
                            }
                        }
                    }
                    
                    // 高频率检查,减少延迟
                    Thread.sleep(READ_INTERVAL_MS.toLong())
                    
                } catch (e: Exception) {
                    LogUtils.e(TAG, "低延迟播放异常", e)
                    Thread.sleep(100)
                }
            }
            
            LogUtils.i(TAG, "低延迟播放线程结束,总计播放: ${totalPlayed / 1024}KB")
        }
        
        playbackThread?.start()
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到