当前位置: 首页 > news >正文

局域网TCP通过组播放地址rtp推流和拉流实现实时喊话

应用场景,安卓端局域网不用ip通过组播放地址实现实时对讲功能

发送端: ffmpeg -f alsa -i hw:1 -acodec aac -ab 64k -ac 2 -ar 16000 -frtp -sdp file stream.sdp rtp://224.0.0.1:14556

接收端: ffmpeg -protocol whitelist file,udp,rtp -i stream.sdp -acodec pcm s16le -ar 16000 -ac 2 -f alsa default

在windows上测试通过后然后在安卓中实现

# 查询本地可用麦克风设备
ffmpeg -list_devices true -f dshow -i dummy

麦克风 (Realtek(R) Audio)这是我电脑的
# windows  执行RTM推音频流
ffmpeg -f dshow -i audio="麦克风 (Realtek(R) Audio)" -acodec aac -ab 64k -ac 2 -ar 16000 -f rtp -sdp_file stream.sdp rtp://239.0.0.1:15556

上面windows上调通后接下来在安卓上实现

implementation("com.arthenica:mobile-ffmpeg-full:4.4.LTS")主要用到这个库
package com.xhx.megaphone.tcpimport android.media.AudioFormat
import android.media.AudioRecord
import android.media.MediaRecorder
import com.arthenica.mobileffmpeg.Config
import com.arthenica.mobileffmpeg.FFmpeg
import com.blankj.utilcode.util.LogUtils
import com.xhx.megaphone.App
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.io.File
import java.io.FileOutputStream
import java.io.IOException
import java.util.concurrent.atomic.AtomicBoolean/*** 实时推流助手 - 最优化版本* * 功能:* 1. 录音buffer直接实时写入临时文件* 2. FFmpeg同时读取文件进行推流* 3. 最小化延迟的实时推流*/
object LiveStreamingHelper {private const val TAG = "LiveStreamingHelper"// 组播配置private const val MULTICAST_ADDRESS = "239.0.0.1"private const val MULTICAST_PORT = 15556// 音频参数private const val SAMPLE_RATE = 16000private const val CHANNELS = 1private const val BIT_RATE = 64000private const val AUDIO_FORMAT = AudioFormat.ENCODING_PCM_16BIT// 缓冲区大小 - 使用较小的缓冲区减少延迟private val BUFFER_SIZE = AudioRecord.getMinBufferSize(SAMPLE_RATE,AudioFormat.CHANNEL_IN_MONO,AUDIO_FORMAT).let { minSize ->// 使用最小缓冲区的2倍,减少延迟minSize * 2}// 推流状态private val isStreaming = AtomicBoolean(false)private var audioRecord: AudioRecord? = nullprivate var recordingThread: Thread? = nullprivate var ffmpegExecutionId: Long = 0// 文件路径private val cacheDir = File(App.ctx.cacheDir, "live_streaming")private val sdpFile = File(cacheDir, "stream.sdp")private val liveAudioFile = File(cacheDir, "live_audio.pcm")/*** 开始实时录音推流*/fun startStreaming(): Boolean {if (isStreaming.get()) {LogUtils.w(TAG, "推流已在进行中")return false}return try {initializeFiles()createSdpFile()startAudioRecording()startLiveStreaming()isStreaming.set(true)LogUtils.i(TAG, "✅ 实时推流启动成功")true} catch (e: Exception) {LogUtils.e(TAG, "❌ 实时推流启动失败", e)stopStreaming()false}}/*** 停止推流*/fun stopStreaming() {if (!isStreaming.get()) {return}isStreaming.set(false)// 停止录音audioRecord?.stop()audioRecord?.release()audioRecord = null// 停止录音线程recordingThread?.interrupt()recordingThread = null// 停止FFmpegif (ffmpegExecutionId != 0L) {FFmpeg.cancel(ffmpegExecutionId)ffmpegExecutionId = 0}LogUtils.i(TAG, "🛑 实时推流已停止")}/*** 获取推流状态*/fun isStreaming(): Boolean = isStreaming.get()/*** 获取SDP文件路径*/fun getSdpFilePath(): String = sdpFile.absolutePath/*** 获取组播地址信息*/fun getMulticastInfo(): String {val fileSize = if (liveAudioFile.exists()) {"${liveAudioFile.length() / 1024}KB"} else {"0KB"}return "组播地址: $MULTICAST_ADDRESS:$MULTICAST_PORT\n" +"SDP文件: ${sdpFile.absolutePath}\n" +"传输方式: 实时文件流\n" +"缓冲区大小: ${BUFFER_SIZE}字节\n" +"当前文件大小: $fileSize\n" +"推流状态: ${if (isStreaming.get()) "进行中" else "已停止"}"}/*** 初始化文件和目录*/private fun initializeFiles() {if (!cacheDir.exists()) {cacheDir.mkdirs()}// 清理旧文件if (liveAudioFile.exists()) {liveAudioFile.delete()}// 创建新的音频文件liveAudioFile.createNewFile()}/*** 创建SDP文件*/private fun createSdpFile() {val sdpContent = """v=0o=- 0 0 IN IP4 127.0.0.1s=No Namec=IN IP4 $MULTICAST_ADDRESSt=0 0a=tool:libavformat 58.45.100m=audio $MULTICAST_PORT RTP/AVP 97b=AS:64a=rtpmap:97 MPEG4-GENERIC/$SAMPLE_RATE/$CHANNELSa=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=141056E500""".trimIndent()sdpFile.writeText(sdpContent)LogUtils.i(TAG, "SDP文件创建成功: ${sdpFile.absolutePath}")}/*** 开始音频录音 - 直接实时写入文件*/private fun startAudioRecording() {audioRecord = AudioRecord(MediaRecorder.AudioSource.MIC,SAMPLE_RATE,AudioFormat.CHANNEL_IN_MONO,AUDIO_FORMAT,BUFFER_SIZE)if (audioRecord?.state != AudioRecord.STATE_INITIALIZED) {throw IOException("AudioRecord初始化失败")}audioRecord?.startRecording()// 启动录音线程,实时写入文件recordingThread = Thread {val buffer = ByteArray(BUFFER_SIZE)var fileOutputStream: FileOutputStream? = nullvar totalBytes = 0var lastLogTime = System.currentTimeMillis()try {fileOutputStream = FileOutputStream(liveAudioFile, false) // 不追加,覆盖写入LogUtils.i(TAG, "录音线程启动,实时写入: ${liveAudioFile.absolutePath}")LogUtils.i(TAG, "缓冲区大小: $BUFFER_SIZE 字节")while (isStreaming.get() && !Thread.currentThread().isInterrupted) {val bytesRead = audioRecord?.read(buffer, 0, buffer.size) ?: 0if (bytesRead > 0) {// 立即写入文件并刷新fileOutputStream.write(buffer, 0, bytesRead)fileOutputStream.flush()totalBytes += bytesRead// 每3秒打印一次状态(更频繁的状态更新)val currentTime = System.currentTimeMillis()if (currentTime - lastLogTime > 3000) {LogUtils.d(TAG, "🎙️ 实时录音中: ${totalBytes / 1024}KB, 速率: ${(totalBytes / ((currentTime - (lastLogTime - 3000)) / 1000.0) / 1024).toInt()}KB/s")lastLogTime = currentTime}} else if (bytesRead == AudioRecord.ERROR_INVALID_OPERATION) {LogUtils.e(TAG, "AudioRecord读取错误: ERROR_INVALID_OPERATION")break} else if (bytesRead < 0) {LogUtils.w(TAG, "AudioRecord读取返回负值: $bytesRead")}}LogUtils.i(TAG, "录音线程结束,总计: ${totalBytes / 1024}KB")} catch (e: Exception) {LogUtils.e(TAG, "录音数据写入异常", e)} finally {fileOutputStream?.close()}}recordingThread?.start()LogUtils.i(TAG, "音频录音已启动")}/*** 启动实时推流*/private fun startLiveStreaming() {GlobalScope.launch(Dispatchers.IO) {// 等待一些音频数据写入Thread.sleep(300)// 构建FFmpeg命令 - 使用较小的缓冲区和实时参数val command = "-re -f s16le -ar $SAMPLE_RATE -ac $CHANNELS " +"-thread_queue_size 512 " +  // 增加线程队列大小"-i ${liveAudioFile.absolutePath} " +"-acodec aac -ab ${BIT_RATE/1000}k -ac $CHANNELS -ar $SAMPLE_RATE " +"-f rtp -sdp_file ${sdpFile.absolutePath} " +"rtp://$MULTICAST_ADDRESS:$MULTICAST_PORT"LogUtils.i(TAG, "FFmpeg实时推流命令: $command")ffmpegExecutionId = FFmpeg.executeAsync(command) { executionId, returnCode ->LogUtils.i(TAG, "FFmpeg推流结束: executionId=$executionId, returnCode=$returnCode")when (returnCode) {Config.RETURN_CODE_SUCCESS -> {LogUtils.i(TAG, "✅ 推流正常结束")}Config.RETURN_CODE_CANCEL -> {LogUtils.i(TAG, "🛑 推流被用户取消")}else -> {LogUtils.w(TAG, "⚠️ 推流异常结束,返回码: $returnCode")}}}LogUtils.i(TAG, "FFmpeg执行ID: $ffmpegExecutionId")}}
}

拉流

package com.xhx.megaphone.tcpimport android.media.AudioFormat
import android.media.AudioManager
import android.media.AudioTrack
import com.arthenica.mobileffmpeg.Config
import com.arthenica.mobileffmpeg.FFmpeg
import com.blankj.utilcode.util.LogUtils
import com.xhx.megaphone.App
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.io.File
import java.io.RandomAccessFile
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong/*** 低延迟拉流播放助手* * 优化策略:* 1. 最小化FFmpeg缓冲* 2. 减少AudioTrack缓冲区* 3. 更频繁的数据读取* 4. 优化文件IO*/
object LowLatencyPullHelper {private const val TAG = "LowLatencyPullHelper"// 音频参数private const val SAMPLE_RATE = 16000private const val CHANNELS = 1private const val AUDIO_FORMAT = AudioFormat.ENCODING_PCM_16BIT// 低延迟参数private const val SMALL_BUFFER_SIZE = 1024  // 使用更小的缓冲区private const val READ_INTERVAL_MS = 20     // 更频繁的读取间隔// 拉流状态private val isPulling = AtomicBoolean(false)private var ffmpegExecutionId: Long = 0private var audioTrack: AudioTrack? = nullprivate var playbackThread: Thread? = null// 文件读取位置private val fileReadPosition = AtomicLong(0)// 文件路径private val cacheDir = File(App.ctx.cacheDir, "low_latency_pull")private val outputPcmFile = File(cacheDir, "realtime_audio.pcm")/*** 开始低延迟拉流播放*/fun startPulling(sdpFilePath: String): Boolean {if (isPulling.get()) {LogUtils.w(TAG, "拉流已在进行中")return false}val sdpFile = File(sdpFilePath)if (!sdpFile.exists()) {LogUtils.e(TAG, "SDP文件不存在: $sdpFilePath")return false}return try {initializeFiles()startLowLatencyDecoding(sdpFilePath)startLowLatencyPlayback()isPulling.set(true)fileReadPosition.set(0)LogUtils.i(TAG, "✅ 低延迟拉流播放启动成功")true} catch (e: Exception) {LogUtils.e(TAG, "❌ 低延迟拉流播放启动失败", e)stopPulling()false}}/*** 停止拉流*/fun stopPulling() {if (!isPulling.get()) {return}isPulling.set(false)// 停止FFmpegif (ffmpegExecutionId != 0L) {FFmpeg.cancel(ffmpegExecutionId)ffmpegExecutionId = 0}// 停止音频播放audioTrack?.stop()audioTrack?.release()audioTrack = null// 停止播放线程playbackThread?.interrupt()playbackThread = nullLogUtils.i(TAG, "🛑 低延迟拉流已停止")}/*** 获取拉流状态*/fun isPulling(): Boolean = isPulling.get()/*** 获取拉流信息*/fun getPullInfo(): String {val fileSize = if (outputPcmFile.exists()) {"${outputPcmFile.length() / 1024}KB"} else {"0KB"}return "拉流状态: ${if (isPulling.get()) "进行中" else "已停止"}\n" +"解码文件: ${outputPcmFile.absolutePath}\n" +"文件大小: $fileSize\n" +"读取位置: ${fileReadPosition.get() / 1024}KB\n" +"优化模式: 低延迟"}/*** 初始化文件和目录*/private fun initializeFiles() {if (!cacheDir.exists()) {cacheDir.mkdirs()}// 清理旧文件if (outputPcmFile.exists()) {outputPcmFile.delete()}}/*** 启动低延迟FFmpeg解码*/private fun startLowLatencyDecoding(sdpFilePath: String) {GlobalScope.launch(Dispatchers.IO) {// 减少等待时间Thread.sleep(500)// 构建超低延迟FFmpeg解码命令val command = "-protocol_whitelist file,udp,rtp " +"-fflags +nobuffer+flush_packets " +      // 禁用缓冲并立即刷新"-flags low_delay " +                     // 低延迟模式"-probesize 32 " +                        // 最小探测大小"-analyzeduration 0 " +                   // 不分析流"-max_delay 0 " +                         // 最大延迟为0"-reorder_queue_size 0 " +                // 禁用重排序队列"-rw_timeout 3000000 " +                  // 3秒超时"-i $sdpFilePath " +"-acodec pcm_s16le " +"-ar $SAMPLE_RATE " +"-ac $CHANNELS " +"-f s16le " +"-flush_packets 1 " +                     // 立即刷新数据包"${outputPcmFile.absolutePath}"LogUtils.i(TAG, "低延迟FFmpeg解码命令: $command")ffmpegExecutionId = FFmpeg.executeAsync(command) { executionId, returnCode ->LogUtils.i(TAG, "FFmpeg解码结束: executionId=$executionId, returnCode=$returnCode")when (returnCode) {Config.RETURN_CODE_SUCCESS -> {LogUtils.i(TAG, "✅ 解码正常结束")}Config.RETURN_CODE_CANCEL -> {LogUtils.i(TAG, "🛑 解码被用户取消")}else -> {LogUtils.w(TAG, "⚠️ 解码异常结束,返回码: $returnCode")}}}}}/*** 启动低延迟音频播放*/private fun startLowLatencyPlayback() {// 使用最小缓冲区val minBufferSize = AudioTrack.getMinBufferSize(SAMPLE_RATE,AudioFormat.CHANNEL_OUT_MONO,AUDIO_FORMAT)// 使用稍大于最小缓冲区的大小,但不要太大val bufferSize = minBufferSize * 2audioTrack = AudioTrack(AudioManager.STREAM_MUSIC,SAMPLE_RATE,AudioFormat.CHANNEL_OUT_MONO,AUDIO_FORMAT,bufferSize,AudioTrack.MODE_STREAM)// 设置低延迟模式(API 26+)try {if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.O) {val audioAttributes = android.media.AudioAttributes.Builder().setUsage(android.media.AudioAttributes.USAGE_MEDIA).setContentType(android.media.AudioAttributes.CONTENT_TYPE_MUSIC).setFlags(android.media.AudioAttributes.FLAG_LOW_LATENCY).build()audioTrack = AudioTrack.Builder().setAudioAttributes(audioAttributes).setAudioFormat(AudioFormat.Builder().setEncoding(AUDIO_FORMAT).setSampleRate(SAMPLE_RATE).setChannelMask(AudioFormat.CHANNEL_OUT_MONO).build()).setBufferSizeInBytes(bufferSize).setTransferMode(AudioTrack.MODE_STREAM).build()}} catch (e: Exception) {LogUtils.w(TAG, "无法设置低延迟AudioTrack,使用默认配置", e)}audioTrack?.play()LogUtils.i(TAG, "AudioTrack初始化完成,缓冲区大小: $bufferSize")// 启动高频率播放线程playbackThread = Thread {val buffer = ByteArray(SMALL_BUFFER_SIZE)var totalPlayed = 0var lastLogTime = System.currentTimeMillis()LogUtils.i(TAG, "低延迟音频播放线程启动")while (isPulling.get() && !Thread.currentThread().isInterrupted) {try {if (outputPcmFile.exists()) {val currentFileSize = outputPcmFile.length()val currentReadPos = fileReadPosition.get()// 如果有新数据可读if (currentFileSize > currentReadPos) {RandomAccessFile(outputPcmFile, "r").use { randomAccessFile ->randomAccessFile.seek(currentReadPos)val bytesRead = randomAccessFile.read(buffer)if (bytesRead > 0) {audioTrack?.write(buffer, 0, bytesRead)totalPlayed += bytesReadfileReadPosition.addAndGet(bytesRead.toLong())// 每2秒打印一次状态val currentTime = System.currentTimeMillis()if (currentTime - lastLogTime > 2000) {LogUtils.d(TAG, "🔊 低延迟播放: ${totalPlayed / 1024}KB, 延迟: ${(currentFileSize - currentReadPos) / 32}ms")lastLogTime = currentTime}}}}}// 高频率检查,减少延迟Thread.sleep(READ_INTERVAL_MS.toLong())} catch (e: Exception) {LogUtils.e(TAG, "低延迟播放异常", e)Thread.sleep(100)}}LogUtils.i(TAG, "低延迟播放线程结束,总计播放: ${totalPlayed / 1024}KB")}playbackThread?.start()}
}
http://www.dtcms.com/a/296653.html

相关文章:

  • 猎板碳油 PCB和普通PCB的区别
  • 【OpenCV实现多图像拼接】
  • kafka消费者组消费进度(Lag)深入理解
  • Redis--哨兵机制详解
  • Linux C:预处理命令
  • 225. 用队列实现栈
  • markdown学习笔记(个人向) Part.2
  • Redis高可用架构演进面试笔记
  • C#解析JSON数据全攻略
  • SpringBoot框架,不同环境中实体类对应不同的表
  • MySQL workbench的使用
  • Django 科普介绍:从入门到了解其核心魅力
  • 【Python】Python多线程爬虫实战:从基础原理到分布式架构实现
  • RCLAMP0512TQTCT 升特半导体 TVS二极管 12通道全防护芯片 以太网/PLC控制/5G基站专用
  • UE5中如何解决角色网格体“掉下去”的问题
  • 高并发系统设计面试题
  • 高效互联,ModbusTCP转EtherCAT网关赋能新能源电缆智能制造
  • Apache 消息队列分布式架构与原理
  • 六种经典智能优化算法(PSO/GWO/WOA/HHO/DBO/SSA)无人机(UAV)三维路径规划,Matlab代码实现
  • 【三桥君】大语言模型计算成本高,MoE如何有效降低成本?
  • Java学习---Spring及其衍生(下)
  • Oracle 时间处理函数和操作符笔记
  • 数据库常用DDL语言
  • 洛谷 P1996 约瑟夫问题之题解
  • LLM针对隐藏层的特征增强的相关论文
  • Python生成折线图
  • 7.24 C/C++蓝桥杯 | 排序算法
  • 外企本土化布局对国内连接器企业影响几何?
  • 排序初识(上)-- 讲解超详细
  • 【接口自动化】-1- 初识接口