当前位置: 首页 > news >正文

文本转语音-音画适时推送rtsp并播放

文本语音 rtsp适时播放叫号系统的底层逻辑

这两天在弄这个,前2篇是通过虚拟声卡,达到了最简单的一个逻辑,播放文本就从声卡发声,不播无所谓,自动忙音。 那个工作在windows平台,
而今天的这个相似功能的代码是mac os,理论支持windows,和linux,依赖ffmpeg和xiu(一个rust流服务器)的rtsp服务。
今天的难点有点多

  1. asyncio的任务 async def _tts_worker(self, text: str) 运行中有各种错误, engine runAndWait是不行的。 内部有它的event loop。所以init和endLoop,是暂时找到的解决办法。同时经历了,这个,和调用 ffmpeg 外部指令,并直接获取- 代表的stdout。 会遇到各种问题。做了捕获和处理。但是查找的时候,不是太容易。
  2. self._start_ffmpeg() 他需要, create socket 或pipe完成以后,才能运行。 调试我都手工在外部启动。 作用就是,输出到rtsp服务器,以备播放。
  3. input handle,等都是ai生成的,因为有好多种循环,这是比较省心在。
  4. 最紧急隐蔽在是, async def _heartbeat(self) 他需要计算播放静音的时间,长了不行,短了不行。 这个最初在测试代码,就几个函数。然后AI,生成了三个theading的版本,两个Queue。 然后转到了异步版本,明显快了很多。
  5. 在windows上使用win32pipen可以达到unix socket的效果很相似, 记得还有FIFO是linux专用的,当然还有stdin,和stdout。对于ffmpeg,这是一些程序内部的传送机制
  6. rtsp是需要一个后台的服务的,xiu是开源的rust项目,可以使。另外window推荐metamtx,双击运行,什么也不管。
    在这里插入图片描述

音画同步应该是另个问题了,几天前,鼓捣了一下图片。让编辑后的,马上 在视频中显示。 这个另外一个话题了。做的这些就为了,让报号和点单,有个界面。

ffmpeg -re -framerate 30 -f image2 -loop 1 -i "image1.jpg" -c:v libx264 -preset ultrafast -tune zerolatency -pix_fmt rgba  -f rtsp -rtsp_transport tcp rtsp://localhost:8554/live

合并的代码,就当成剩下的作业,有空再来做。

对于刚接触的,最好是慢慢和AI调试着来,一些功能就做出来。

启动
接收text
FIFOunix..
同步语音传输
语音 推送
文本转语音
rtsp流服务器

语音推送使用ffmpeg独立进程,实现了前后中断后自动重启。

程序主体

可独立运行,也可以结合ffmg管理推送进程
main.py

import asyncio
import struct
import pyttsx3
import tempfile
import os
import socket
from aioconsole import ainput
from contextlib import suppress
from typing import Optional

class AsyncTTSController:
    def __init__(self):
        # 使用Unix域套接字
        self.socket_path = "/tmp/tts_audio.sock"
        self.server_socket: Optional[socket.socket] = None
        self.client_socket: Optional[socket.socket] = None
        
        # 进程控制
        self.ffmpeg_process: Optional[asyncio.subprocess.Process] = None
        self.running = False
        
        # TTS引擎
        self.engine = pyttsx3.init()
        self.engine.setProperty('rate', 180)
        self.engine.setProperty('volume', 1.0)
        
        # 音频参数
        self.sample_rate = 24000
        self.channels = 1
        self.bits_per_sample = 16
        self.silence = self._generate_silence(0.2)
        self.wav_header = self._generate_wav_header()
        
        # 状态管理
        self.connection_active = False
        self.last_heartbeat = 0.0
        self.heartbeat_interval = 2.0
        self.sending_audio = 0
    def _generate_wav_header(self) -> bytes:
        """生成WAV文件头"""
        byte_rate = self.sample_rate * self.channels * self.bits_per_sample // 8
        block_align = self.channels * self.bits_per_sample // 8
        return struct.pack(
            '<4sI4s4sIHHIIHH4sI',
            b'RIFF', 36, b'WAVE', b'fmt ', 16, 1, self.channels,
            self.sample_rate, byte_rate, block_align, self.bits_per_sample,
            b'data', 0
        )

    def _generate_silence(self, duration: float) -> bytes:
        """生成静音数据"""
        samples = int(self.sample_rate * duration)
        return bytes(samples * self.channels * (self.bits_per_sample // 8))

    async def _async_create_socket(self) -> None:
        """创建Unix域套接字"""
        with suppress(Exception):
            if os.path.exists(self.socket_path):
                os.unlink(self.socket_path)
                
            self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self.server_socket.setblocking(False)
            self.server_socket.bind(self.socket_path)
            self.server_socket.listen(1)
            
            loop = asyncio.get_running_loop()
            while self.running and not self.connection_active:
                try:
                    self.client_socket, _ = await loop.sock_accept(self.server_socket)
                    self.connection_active = True
                    print("客户端已连接")
                    await loop.sock_sendall(self.client_socket, self.wav_header)
                except (BlockingIOError, InterruptedError):
                    await asyncio.sleep(0.1)
                except Exception as e:
                    print(f"连接错误: {str(e)}")
                    self.connection_active = False
                    await asyncio.sleep(1)

    async def _start_ffmpeg(self) -> None:
        """启动FFmpeg进程"""
        with suppress(Exception):
            if self.ffmpeg_process:
                self.ffmpeg_process.terminate()
                await self.ffmpeg_process.wait()
            socketid='unix:'+self.socket_path

            self.ffmpeg_process = await asyncio.create_subprocess_exec(
                'ffmpeg',
                '-f', 's16le',
                '-ar', str(self.sample_rate),
                '-ac', str(self.channels),
                '-i', socketid,  # 修改输入源为套接字路径
                '-c:a', 'aac',
                '-f', 'rtsp',
                '-rtsp_transport', 'tcp',
                'rtsp://localhost:8554/mystream',
                stdout=asyncio.subprocess.DEVNULL,
                stdin=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.PIPE
            )
            asyncio.create_task(self._monitor_ffmpeg_errors())

    async def _monitor_ffmpeg_errors(self) -> None:
        """监控FFmpeg错误输出"""
        while self.running and self.ffmpeg_process:
            line = await self.ffmpeg_process.stderr.readline()
            if not line:
                break
         #   print(f"[FFmpeg Error] {line.decode().strip()}")

    async def _async_write_socket(self, data: bytes) -> None:
        """安全写入套接字"""
        try:
            if self.client_socket and self.connection_active:
                loop = asyncio.get_running_loop()
                await loop.sock_sendall(self.client_socket, data)
        except (BrokenPipeError, ConnectionResetError):
            print("连接已断开,尝试重连...")
            await self._reconnect_pipeline()
        except Exception as e:
            print(f"写入错误: {str(e)}")
            self.connection_active = False

    async def _reconnect_pipeline(self) -> None:
        """完整重连流程"""
        print("启动重连流程...")
        self.connection_active = False
        if self.client_socket:
            self.client_socket.close()
        task1=asyncio.create_task(self._async_create_socket()),
        task2=asyncio.create_task( self._start_ffmpeg()),    
        await task2
        await task1
       # await asyncio.gather(task1, task2)
        #await self._async_create_socket()
        #await self._start_ffmpeg()

    # 剩余的heartbeat、tts_worker、input_handler等方法保持相同...

    async def stop(self) -> None:
        """安全关闭"""
        self.running = False
        with suppress(Exception):
            if self.ffmpeg_process:
                self.ffmpeg_process.terminate()
                await self.ffmpeg_process.wait()
            if self.client_socket:
                self.client_socket.close()
            if self.server_socket:
                self.server_socket.close()
            if os.path.exists(self.socket_path):
                os.unlink(self.socket_path)
            print("所有资源已释放")
    async def _heartbeat(self) -> None:
        """心跳维持机制"""
        while self.running:
            if self.connection_active :
                for i in range(10):
                    if   self.sending_audio<0:
                       await self._async_write_socket(self.silence)
                    else :
                        self.sending_audio-= 2
                    await asyncio.sleep(0.2)   
                    
           #     print(self.sending_audio,"slend")
              #  await asyncio.sleep(self.heartbeat_interval)
            else:
                await asyncio.sleep(0.5)
    def _sync_tts(self,text,tmp_filename):
        eng=pyttsx3.init()
      #  eng.say(text)
        eng.save_to_file(text, 'temp3.wav')
        eng.runAndWait()
        eng.endLoop()
            
    async def _tts_worker(self, text: str) -> None:
        """异步TTS处理核心"""
        tmp_filename = None
        #with open('audio1.raw','rb') as chunkf:
                # data=chunkf.read()
                # secdd=len(data)/48000
                # self.sending_audio=int(secdd*10) 
                # await self._async_write_socket(data)
               
              
                # #await asyncio.sleep(secdd)
                
 
                # print (secdd,len(data) )   
        
            # 创建临时文件
        with tempfile.NamedTemporaryFile(delete=False) as tmp:
                tmp_filename = tmp.name

            # # 同步TTS操作转异步执行
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, self._sync_tts, *(text, 'temp3.wav',))
            # 转换音频格式
           # await asyncio.sleep(1.3)
           # self._sync_tts(text,tmp_filename)
        try: 
            proc = await asyncio.create_subprocess_exec(
    'ffmpeg',
    '-hide_banner',
    '-loglevel', 'error',
    '-y',
    '-i', 'temp3.wav',       # 输入文件路径
    '-f', 's16le',            # 强制输出格式为PCM s16le
    '-acodec', 'pcm_s16le',   # 明确指定音频编解码器 👈 关键修复
    '-ar', str(self.sample_rate),
    '-ac', str(self.channels),
    '-',                     # 输出到标准输出
    stdout=asyncio.subprocess.PIPE
)

           # 流式发送音频数据
            sum=0
            while chunk := await proc.stdout.read(4096):
              
               sum+=len(chunk)
               await self._async_write_socket(chunk)
            self.sending_audio=int(sum*10/48000) 
            print("write data x0.1s:",self.sending_audio)


        finally:
            if tmp_filename and os.path.exists(tmp_filename):
              1
              #  os.unlink(tmp_filename)

    async def _input_handler(self) -> None:
        """异步输入处理"""
        while self.running:
            try:
                text = await ainput("请输入文本(输入q退出): ")
                if text.lower() == 'q':
                    self.running = False
                    break
                if text.strip():
                  await self._tts_worker(text)
            except Exception as e:
                print(f"输入错误: {str(e)}")

    async def run(self) -> None:
        """主运行循环"""
        self.running = True
       # 
        #await  self._start_ffmpeg()

        tasks = [
            asyncio.create_task(self._async_create_socket()),
            asyncio.create_task( self._start_ffmpeg()),
            asyncio.create_task(self._input_handler()),
            asyncio.create_task(self._heartbeat()),


        ]
       
        await asyncio.gather(*tasks)
       
    

# 以下保持不变...
if __name__ == "__main__":
    controller = AsyncTTSController()
    try:
        asyncio.run(controller.run())
    except KeyboardInterrupt:
        asyncio.run(controller.stop())
"""
ffmpeg -y -i temp.wav -f s16le -acodec pcm_s16le  -ar 24000  -ac 1   audio.raw
ffmpeg  -ar 24000 -ac 1 -f s16le  -i unix:/tmp/tts_audio.sock -f rtsp  rtsp://localhost:8554/mystream
"""

ffmpeg启动和监控的独立代码

验证了一下rtsp断线重建连结,也验证了 上面 的main.py的socket server退出后,ffmpeg自动重启连接。 要使用这个稳健程序,需要注释main.py run中的asyncio.create_task( self._start_ffmpeg()),
在这里插入图片描述
ffmg,py

import asyncio
from contextlib import suppress
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class FFmpegManager:
    def __init__(self):
        self.ffmpeg_process = None
        self._retry_count = 0
        self._max_retries = 5
        self._retry_lock = asyncio.Lock()
        self._is_running = False
        self.sample_rate=24000
        self.channels=1
        self.socket_path = "/tmp/tts_audio.sock"

    async def _start_ffmpeg(self) -> None:
        """带自动重试的FFmpeg启动函数"""
        async with self._retry_lock:
            await self._safe_terminate()
            
            try:
                socketid = 'unix:' + self.socket_path
                self.ffmpeg_process =await asyncio.create_subprocess_exec(
                'ffmpeg',
                '-f', 's16le',
                '-ar', str(self.sample_rate),
                '-ac', str(self.channels),
                '-i', socketid,  # 修改输入源为套接字路径
                '-c:a', 'aac',
                '-f', 'rtsp',
                '-rtsp_transport', 'tcp',
                'rtsp://localhost:8554/mystream',
                stdout=asyncio.subprocess.DEVNULL,
                stdin=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.PIPE
            )
                self._retry_count = 0  # 重置重试计数器
                asyncio.create_task(self._monitor_ffmpeg_errors())
                self._is_running = True
            except Exception as e:
                logging.error(f"FFmpeg启动失败: {str(e)}")
                await self._handle_retry()

    async def _monitor_ffmpeg_errors(self):
        """增强型进程监控"""
        while self._is_running:
            logging.info("loop  error cathch")
            stderr = await self.ffmpeg_process.stderr.readline()
            if stderr:
                logging.error(f"FFmpeg错误输出: {stderr.decode().strip()}")
            
            # 检测进程状态
            return_code = self.ffmpeg_process.returncode
            if return_code is not None:
                logging.warning(f"FFmpeg异常退出,返回码: {return_code}")
                self._is_running = False
                await self._handle_retry()
                break

    async def _handle_retry(self):
        """智能重试策略"""
        if self._retry_count >= self._max_retries:
            logging.critical("达到最大重试次数,放弃重启")
            return

        # 指数退避算法
        delay = min(2 ** self._retry_count, 30)  # 最大间隔30秒
        self._retry_count += 1
        logging.info(f"将在 {delay} 秒后尝试第 {self._retry_count} 次重启")

        await asyncio.sleep(delay)
        await self._start_ffmpeg()

    async def _safe_terminate(self):
        """安全终止现有进程"""
        if self.ffmpeg_process:
            with suppress(Exception):
                self.ffmpeg_process.terminate()
                await self.ffmpeg_process.wait()
                self.ffmpeg_process = None
# 以下保持不变...
async def main():
    controller=FFmpegManager()
    try:
        await controller._start_ffmpeg()
        logging.info('rung')
        await asyncio.sleep(1160)
    except KeyboardInterrupt:
        logging.info(3)
        asyncio.run(controller._safe_terminate())
if __name__ == "__main__":
     
     asyncio.run(main())

相关文章:

  • 静态路由实验
  • Spring Boot/Spring Cloud 整合 ELK(Elasticsearch、Logstash、Kibana)详细避坑指南
  • 【CSS3】元婴篇
  • [数据结构]并查集
  • 【 <一> 炼丹初探:JavaWeb 的起源与基础】之 JavaWeb 项目的部署:从开发环境到生产环境
  • 智能焊机监测系统:打造工业安全的数字化盾牌
  • Git合并工具在开发中的使用指南
  • 常用中文开源embedding模型应用
  • printk相关说明
  • 谷歌AI最新发布的可微分逻辑元胞自动机(DiffLogic CA)
  • ubuntu-学习笔记-nginx+php
  • MATLAB表格Table与时间序列Timetable的高效操作方法
  • MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)
  • L1-088 静静的推荐
  • QT中委托QStyledItemDelegate的使用
  • 6-langchang多模态输入和自定义输出
  • Apache POI详解
  • 保安员考试:巧用记忆术,攻克理论知识堡垒
  • 目标检测YOLO实战应用案例100讲-基于毫米波雷达的多目标检测 (续)
  • protobuf, rpc, 火焰图
  • 李公明 | 一周画记:生活就是抵抗
  • 申活观察|演出场次破纪录、入境游导游档期忙,上海文旅商“热力”拉满
  • 国家能源局:鼓励各地探索深远海、沙戈荒等可再生能源制氢场景
  • 中央气象台:未来三天北方地区有大风沙尘,江南等地有强降水
  • 安徽安庆市委书记张祥安调研假日经济和旅游安全工作
  • 菏泽家长“付费查成绩”风波调查:免费功能被误读