局域网TCP通过组播放地址rtp推流和拉流实现实时喊话
应用场景,安卓端局域网不用ip通过组播放地址实现实时对讲功能
发送端: ffmpeg -f alsa -i hw:1 -acodec aac -ab 64k -ac 2 -ar 16000 -frtp -sdp file stream.sdp rtp://224.0.0.1:14556
接收端: ffmpeg -protocol whitelist file,udp,rtp -i stream.sdp -acodec pcm s16le -ar 16000 -ac 2 -f alsa default
在windows上测试通过后然后在安卓中实现
# 查询本地可用麦克风设备
ffmpeg -list_devices true -f dshow -i dummy
麦克风 (Realtek(R) Audio)这是我电脑的
# windows 执行RTM推音频流
ffmpeg -f dshow -i audio="麦克风 (Realtek(R) Audio)" -acodec aac -ab 64k -ac 2 -ar 16000 -f rtp -sdp_file stream.sdp rtp://239.0.0.1:15556
上面windows上调通后接下来在安卓上实现
implementation("com.arthenica:mobile-ffmpeg-full:4.4.LTS")主要用到这个库
package com.xhx.megaphone.tcpimport android.media.AudioFormat import android.media.AudioRecord import android.media.MediaRecorder import com.arthenica.mobileffmpeg.Config import com.arthenica.mobileffmpeg.FFmpeg import com.blankj.utilcode.util.LogUtils import com.xhx.megaphone.App import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import java.io.File import java.io.FileOutputStream import java.io.IOException import java.util.concurrent.atomic.AtomicBoolean/*** 实时推流助手 - 最优化版本* * 功能:* 1. 录音buffer直接实时写入临时文件* 2. FFmpeg同时读取文件进行推流* 3. 最小化延迟的实时推流*/ object LiveStreamingHelper {private const val TAG = "LiveStreamingHelper"// 组播配置private const val MULTICAST_ADDRESS = "239.0.0.1"private const val MULTICAST_PORT = 15556// 音频参数private const val SAMPLE_RATE = 16000private const val CHANNELS = 1private const val BIT_RATE = 64000private const val AUDIO_FORMAT = AudioFormat.ENCODING_PCM_16BIT// 缓冲区大小 - 使用较小的缓冲区减少延迟private val BUFFER_SIZE = AudioRecord.getMinBufferSize(SAMPLE_RATE,AudioFormat.CHANNEL_IN_MONO,AUDIO_FORMAT).let { minSize ->// 使用最小缓冲区的2倍,减少延迟minSize * 2}// 推流状态private val isStreaming = AtomicBoolean(false)private var audioRecord: AudioRecord? = nullprivate var recordingThread: Thread? = nullprivate var ffmpegExecutionId: Long = 0// 文件路径private val cacheDir = File(App.ctx.cacheDir, "live_streaming")private val sdpFile = File(cacheDir, "stream.sdp")private val liveAudioFile = File(cacheDir, "live_audio.pcm")/*** 开始实时录音推流*/fun startStreaming(): Boolean {if (isStreaming.get()) {LogUtils.w(TAG, "推流已在进行中")return false}return try {initializeFiles()createSdpFile()startAudioRecording()startLiveStreaming()isStreaming.set(true)LogUtils.i(TAG, "✅ 实时推流启动成功")true} catch (e: Exception) {LogUtils.e(TAG, "❌ 实时推流启动失败", e)stopStreaming()false}}/*** 停止推流*/fun stopStreaming() {if (!isStreaming.get()) {return}isStreaming.set(false)// 停止录音audioRecord?.stop()audioRecord?.release()audioRecord = null// 停止录音线程recordingThread?.interrupt()recordingThread = null// 停止FFmpegif (ffmpegExecutionId != 0L) {FFmpeg.cancel(ffmpegExecutionId)ffmpegExecutionId = 0}LogUtils.i(TAG, "🛑 实时推流已停止")}/*** 获取推流状态*/fun isStreaming(): Boolean = isStreaming.get()/*** 获取SDP文件路径*/fun getSdpFilePath(): String = sdpFile.absolutePath/*** 获取组播地址信息*/fun getMulticastInfo(): String {val fileSize = if (liveAudioFile.exists()) {"${liveAudioFile.length() / 1024}KB"} else {"0KB"}return "组播地址: $MULTICAST_ADDRESS:$MULTICAST_PORT\n" +"SDP文件: ${sdpFile.absolutePath}\n" +"传输方式: 实时文件流\n" +"缓冲区大小: ${BUFFER_SIZE}字节\n" +"当前文件大小: $fileSize\n" +"推流状态: ${if (isStreaming.get()) "进行中" else "已停止"}"}/*** 初始化文件和目录*/private fun initializeFiles() {if (!cacheDir.exists()) {cacheDir.mkdirs()}// 清理旧文件if (liveAudioFile.exists()) {liveAudioFile.delete()}// 创建新的音频文件liveAudioFile.createNewFile()}/*** 创建SDP文件*/private fun createSdpFile() {val sdpContent = """v=0o=- 0 0 IN IP4 127.0.0.1s=No Namec=IN IP4 $MULTICAST_ADDRESSt=0 0a=tool:libavformat 58.45.100m=audio $MULTICAST_PORT RTP/AVP 97b=AS:64a=rtpmap:97 MPEG4-GENERIC/$SAMPLE_RATE/$CHANNELSa=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=141056E500""".trimIndent()sdpFile.writeText(sdpContent)LogUtils.i(TAG, "SDP文件创建成功: ${sdpFile.absolutePath}")}/*** 开始音频录音 - 直接实时写入文件*/private fun startAudioRecording() {audioRecord = AudioRecord(MediaRecorder.AudioSource.MIC,SAMPLE_RATE,AudioFormat.CHANNEL_IN_MONO,AUDIO_FORMAT,BUFFER_SIZE)if (audioRecord?.state != AudioRecord.STATE_INITIALIZED) {throw IOException("AudioRecord初始化失败")}audioRecord?.startRecording()// 启动录音线程,实时写入文件recordingThread = Thread {val buffer = ByteArray(BUFFER_SIZE)var fileOutputStream: FileOutputStream? = nullvar totalBytes = 0var lastLogTime = System.currentTimeMillis()try {fileOutputStream = FileOutputStream(liveAudioFile, false) // 不追加,覆盖写入LogUtils.i(TAG, "录音线程启动,实时写入: ${liveAudioFile.absolutePath}")LogUtils.i(TAG, "缓冲区大小: $BUFFER_SIZE 字节")while (isStreaming.get() && !Thread.currentThread().isInterrupted) {val bytesRead = audioRecord?.read(buffer, 0, buffer.size) ?: 0if (bytesRead > 0) {// 立即写入文件并刷新fileOutputStream.write(buffer, 0, bytesRead)fileOutputStream.flush()totalBytes += bytesRead// 每3秒打印一次状态(更频繁的状态更新)val currentTime = System.currentTimeMillis()if (currentTime - lastLogTime > 3000) {LogUtils.d(TAG, "🎙️ 实时录音中: ${totalBytes / 1024}KB, 速率: ${(totalBytes / ((currentTime - (lastLogTime - 3000)) / 1000.0) / 1024).toInt()}KB/s")lastLogTime = currentTime}} else if (bytesRead == AudioRecord.ERROR_INVALID_OPERATION) {LogUtils.e(TAG, "AudioRecord读取错误: ERROR_INVALID_OPERATION")break} else if (bytesRead < 0) {LogUtils.w(TAG, "AudioRecord读取返回负值: $bytesRead")}}LogUtils.i(TAG, "录音线程结束,总计: ${totalBytes / 1024}KB")} catch (e: Exception) {LogUtils.e(TAG, "录音数据写入异常", e)} finally {fileOutputStream?.close()}}recordingThread?.start()LogUtils.i(TAG, "音频录音已启动")}/*** 启动实时推流*/private fun startLiveStreaming() {GlobalScope.launch(Dispatchers.IO) {// 等待一些音频数据写入Thread.sleep(300)// 构建FFmpeg命令 - 使用较小的缓冲区和实时参数val command = "-re -f s16le -ar $SAMPLE_RATE -ac $CHANNELS " +"-thread_queue_size 512 " + // 增加线程队列大小"-i ${liveAudioFile.absolutePath} " +"-acodec aac -ab ${BIT_RATE/1000}k -ac $CHANNELS -ar $SAMPLE_RATE " +"-f rtp -sdp_file ${sdpFile.absolutePath} " +"rtp://$MULTICAST_ADDRESS:$MULTICAST_PORT"LogUtils.i(TAG, "FFmpeg实时推流命令: $command")ffmpegExecutionId = FFmpeg.executeAsync(command) { executionId, returnCode ->LogUtils.i(TAG, "FFmpeg推流结束: executionId=$executionId, returnCode=$returnCode")when (returnCode) {Config.RETURN_CODE_SUCCESS -> {LogUtils.i(TAG, "✅ 推流正常结束")}Config.RETURN_CODE_CANCEL -> {LogUtils.i(TAG, "🛑 推流被用户取消")}else -> {LogUtils.w(TAG, "⚠️ 推流异常结束,返回码: $returnCode")}}}LogUtils.i(TAG, "FFmpeg执行ID: $ffmpegExecutionId")}} }
拉流
package com.xhx.megaphone.tcpimport android.media.AudioFormat import android.media.AudioManager import android.media.AudioTrack import com.arthenica.mobileffmpeg.Config import com.arthenica.mobileffmpeg.FFmpeg import com.blankj.utilcode.util.LogUtils import com.xhx.megaphone.App import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import java.io.File import java.io.RandomAccessFile import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong/*** 低延迟拉流播放助手* * 优化策略:* 1. 最小化FFmpeg缓冲* 2. 减少AudioTrack缓冲区* 3. 更频繁的数据读取* 4. 优化文件IO*/ object LowLatencyPullHelper {private const val TAG = "LowLatencyPullHelper"// 音频参数private const val SAMPLE_RATE = 16000private const val CHANNELS = 1private const val AUDIO_FORMAT = AudioFormat.ENCODING_PCM_16BIT// 低延迟参数private const val SMALL_BUFFER_SIZE = 1024 // 使用更小的缓冲区private const val READ_INTERVAL_MS = 20 // 更频繁的读取间隔// 拉流状态private val isPulling = AtomicBoolean(false)private var ffmpegExecutionId: Long = 0private var audioTrack: AudioTrack? = nullprivate var playbackThread: Thread? = null// 文件读取位置private val fileReadPosition = AtomicLong(0)// 文件路径private val cacheDir = File(App.ctx.cacheDir, "low_latency_pull")private val outputPcmFile = File(cacheDir, "realtime_audio.pcm")/*** 开始低延迟拉流播放*/fun startPulling(sdpFilePath: String): Boolean {if (isPulling.get()) {LogUtils.w(TAG, "拉流已在进行中")return false}val sdpFile = File(sdpFilePath)if (!sdpFile.exists()) {LogUtils.e(TAG, "SDP文件不存在: $sdpFilePath")return false}return try {initializeFiles()startLowLatencyDecoding(sdpFilePath)startLowLatencyPlayback()isPulling.set(true)fileReadPosition.set(0)LogUtils.i(TAG, "✅ 低延迟拉流播放启动成功")true} catch (e: Exception) {LogUtils.e(TAG, "❌ 低延迟拉流播放启动失败", e)stopPulling()false}}/*** 停止拉流*/fun stopPulling() {if (!isPulling.get()) {return}isPulling.set(false)// 停止FFmpegif (ffmpegExecutionId != 0L) {FFmpeg.cancel(ffmpegExecutionId)ffmpegExecutionId = 0}// 停止音频播放audioTrack?.stop()audioTrack?.release()audioTrack = null// 停止播放线程playbackThread?.interrupt()playbackThread = nullLogUtils.i(TAG, "🛑 低延迟拉流已停止")}/*** 获取拉流状态*/fun isPulling(): Boolean = isPulling.get()/*** 获取拉流信息*/fun getPullInfo(): String {val fileSize = if (outputPcmFile.exists()) {"${outputPcmFile.length() / 1024}KB"} else {"0KB"}return "拉流状态: ${if (isPulling.get()) "进行中" else "已停止"}\n" +"解码文件: ${outputPcmFile.absolutePath}\n" +"文件大小: $fileSize\n" +"读取位置: ${fileReadPosition.get() / 1024}KB\n" +"优化模式: 低延迟"}/*** 初始化文件和目录*/private fun initializeFiles() {if (!cacheDir.exists()) {cacheDir.mkdirs()}// 清理旧文件if (outputPcmFile.exists()) {outputPcmFile.delete()}}/*** 启动低延迟FFmpeg解码*/private fun startLowLatencyDecoding(sdpFilePath: String) {GlobalScope.launch(Dispatchers.IO) {// 减少等待时间Thread.sleep(500)// 构建超低延迟FFmpeg解码命令val command = "-protocol_whitelist file,udp,rtp " +"-fflags +nobuffer+flush_packets " + // 禁用缓冲并立即刷新"-flags low_delay " + // 低延迟模式"-probesize 32 " + // 最小探测大小"-analyzeduration 0 " + // 不分析流"-max_delay 0 " + // 最大延迟为0"-reorder_queue_size 0 " + // 禁用重排序队列"-rw_timeout 3000000 " + // 3秒超时"-i $sdpFilePath " +"-acodec pcm_s16le " +"-ar $SAMPLE_RATE " +"-ac $CHANNELS " +"-f s16le " +"-flush_packets 1 " + // 立即刷新数据包"${outputPcmFile.absolutePath}"LogUtils.i(TAG, "低延迟FFmpeg解码命令: $command")ffmpegExecutionId = FFmpeg.executeAsync(command) { executionId, returnCode ->LogUtils.i(TAG, "FFmpeg解码结束: executionId=$executionId, returnCode=$returnCode")when (returnCode) {Config.RETURN_CODE_SUCCESS -> {LogUtils.i(TAG, "✅ 解码正常结束")}Config.RETURN_CODE_CANCEL -> {LogUtils.i(TAG, "🛑 解码被用户取消")}else -> {LogUtils.w(TAG, "⚠️ 解码异常结束,返回码: $returnCode")}}}}}/*** 启动低延迟音频播放*/private fun startLowLatencyPlayback() {// 使用最小缓冲区val minBufferSize = AudioTrack.getMinBufferSize(SAMPLE_RATE,AudioFormat.CHANNEL_OUT_MONO,AUDIO_FORMAT)// 使用稍大于最小缓冲区的大小,但不要太大val bufferSize = minBufferSize * 2audioTrack = AudioTrack(AudioManager.STREAM_MUSIC,SAMPLE_RATE,AudioFormat.CHANNEL_OUT_MONO,AUDIO_FORMAT,bufferSize,AudioTrack.MODE_STREAM)// 设置低延迟模式(API 26+)try {if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.O) {val audioAttributes = android.media.AudioAttributes.Builder().setUsage(android.media.AudioAttributes.USAGE_MEDIA).setContentType(android.media.AudioAttributes.CONTENT_TYPE_MUSIC).setFlags(android.media.AudioAttributes.FLAG_LOW_LATENCY).build()audioTrack = AudioTrack.Builder().setAudioAttributes(audioAttributes).setAudioFormat(AudioFormat.Builder().setEncoding(AUDIO_FORMAT).setSampleRate(SAMPLE_RATE).setChannelMask(AudioFormat.CHANNEL_OUT_MONO).build()).setBufferSizeInBytes(bufferSize).setTransferMode(AudioTrack.MODE_STREAM).build()}} catch (e: Exception) {LogUtils.w(TAG, "无法设置低延迟AudioTrack,使用默认配置", e)}audioTrack?.play()LogUtils.i(TAG, "AudioTrack初始化完成,缓冲区大小: $bufferSize")// 启动高频率播放线程playbackThread = Thread {val buffer = ByteArray(SMALL_BUFFER_SIZE)var totalPlayed = 0var lastLogTime = System.currentTimeMillis()LogUtils.i(TAG, "低延迟音频播放线程启动")while (isPulling.get() && !Thread.currentThread().isInterrupted) {try {if (outputPcmFile.exists()) {val currentFileSize = outputPcmFile.length()val currentReadPos = fileReadPosition.get()// 如果有新数据可读if (currentFileSize > currentReadPos) {RandomAccessFile(outputPcmFile, "r").use { randomAccessFile ->randomAccessFile.seek(currentReadPos)val bytesRead = randomAccessFile.read(buffer)if (bytesRead > 0) {audioTrack?.write(buffer, 0, bytesRead)totalPlayed += bytesReadfileReadPosition.addAndGet(bytesRead.toLong())// 每2秒打印一次状态val currentTime = System.currentTimeMillis()if (currentTime - lastLogTime > 2000) {LogUtils.d(TAG, "🔊 低延迟播放: ${totalPlayed / 1024}KB, 延迟: ${(currentFileSize - currentReadPos) / 32}ms")lastLogTime = currentTime}}}}}// 高频率检查,减少延迟Thread.sleep(READ_INTERVAL_MS.toLong())} catch (e: Exception) {LogUtils.e(TAG, "低延迟播放异常", e)Thread.sleep(100)}}LogUtils.i(TAG, "低延迟播放线程结束,总计播放: ${totalPlayed / 1024}KB")}playbackThread?.start()} }