当前位置: 首页 > wzjs >正文

山东平台网站建设多少钱好的建站网站

山东平台网站建设多少钱,好的建站网站,玄武区网站建设,南昌哪里有网站建设文本语音 rtsp适时播放叫号系统的底层逻辑 这两天在弄这个,前2篇是通过虚拟声卡,达到了最简单的一个逻辑,播放文本就从声卡发声,不播无所谓,自动忙音。 那个工作在windows平台, 而今天的这个相似功能的代码…

文本语音 rtsp适时播放叫号系统的底层逻辑

这两天在弄这个,前2篇是通过虚拟声卡,达到了最简单的一个逻辑,播放文本就从声卡发声,不播无所谓,自动忙音。 那个工作在windows平台,
而今天的这个相似功能的代码是mac os,理论支持windows,和linux,依赖ffmpeg和xiu(一个rust流服务器)的rtsp服务。
今天的难点有点多

  1. asyncio的任务 async def _tts_worker(self, text: str) 运行中有各种错误, engine runAndWait是不行的。 内部有它的event loop。所以init和endLoop,是暂时找到的解决办法。同时经历了,这个,和调用 ffmpeg 外部指令,并直接获取- 代表的stdout。 会遇到各种问题。做了捕获和处理。但是查找的时候,不是太容易。
  2. self._start_ffmpeg() 他需要, create socket 或pipe完成以后,才能运行。 调试我都手工在外部启动。 作用就是,输出到rtsp服务器,以备播放。
  3. input handle,等都是ai生成的,因为有好多种循环,这是比较省心在。
  4. 最紧急隐蔽在是, async def _heartbeat(self) 他需要计算播放静音的时间,长了不行,短了不行。 这个最初在测试代码,就几个函数。然后AI,生成了三个theading的版本,两个Queue。 然后转到了异步版本,明显快了很多。
  5. 在windows上使用win32pipen可以达到unix socket的效果很相似, 记得还有FIFO是linux专用的,当然还有stdin,和stdout。对于ffmpeg,这是一些程序内部的传送机制
  6. rtsp是需要一个后台的服务的,xiu是开源的rust项目,可以使。另外window推荐metamtx,双击运行,什么也不管。
    在这里插入图片描述

音画同步应该是另个问题了,几天前,鼓捣了一下图片。让编辑后的,马上 在视频中显示。 这个另外一个话题了。做的这些就为了,让报号和点单,有个界面。

ffmpeg -re -framerate 30 -f image2 -loop 1 -i "image1.jpg" -c:v libx264 -preset ultrafast -tune zerolatency -pix_fmt rgba  -f rtsp -rtsp_transport tcp rtsp://localhost:8554/live

合并的代码,就当成剩下的作业,有空再来做。

对于刚接触的,最好是慢慢和AI调试着来,一些功能就做出来。

启动
接收text
FIFOunix..
同步语音传输
语音 推送
文本转语音
rtsp流服务器

语音推送使用ffmpeg独立进程,实现了前后中断后自动重启。

程序主体

可独立运行,也可以结合ffmg管理推送进程
main.py

import asyncio
import struct
import pyttsx3
import tempfile
import os
import socket
from aioconsole import ainput
from contextlib import suppress
from typing import Optionalclass AsyncTTSController:def __init__(self):# 使用Unix域套接字self.socket_path = "/tmp/tts_audio.sock"self.server_socket: Optional[socket.socket] = Noneself.client_socket: Optional[socket.socket] = None# 进程控制self.ffmpeg_process: Optional[asyncio.subprocess.Process] = Noneself.running = False# TTS引擎self.engine = pyttsx3.init()self.engine.setProperty('rate', 180)self.engine.setProperty('volume', 1.0)# 音频参数self.sample_rate = 24000self.channels = 1self.bits_per_sample = 16self.silence = self._generate_silence(0.2)self.wav_header = self._generate_wav_header()# 状态管理self.connection_active = Falseself.last_heartbeat = 0.0self.heartbeat_interval = 2.0self.sending_audio = 0def _generate_wav_header(self) -> bytes:"""生成WAV文件头"""byte_rate = self.sample_rate * self.channels * self.bits_per_sample // 8block_align = self.channels * self.bits_per_sample // 8return struct.pack('<4sI4s4sIHHIIHH4sI',b'RIFF', 36, b'WAVE', b'fmt ', 16, 1, self.channels,self.sample_rate, byte_rate, block_align, self.bits_per_sample,b'data', 0)def _generate_silence(self, duration: float) -> bytes:"""生成静音数据"""samples = int(self.sample_rate * duration)return bytes(samples * self.channels * (self.bits_per_sample // 8))async def _async_create_socket(self) -> None:"""创建Unix域套接字"""with suppress(Exception):if os.path.exists(self.socket_path):os.unlink(self.socket_path)self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)self.server_socket.setblocking(False)self.server_socket.bind(self.socket_path)self.server_socket.listen(1)loop = asyncio.get_running_loop()while self.running and not self.connection_active:try:self.client_socket, _ = await loop.sock_accept(self.server_socket)self.connection_active = Trueprint("客户端已连接")await loop.sock_sendall(self.client_socket, self.wav_header)except (BlockingIOError, InterruptedError):await asyncio.sleep(0.1)except Exception as e:print(f"连接错误: {str(e)}")self.connection_active = Falseawait asyncio.sleep(1)async def _start_ffmpeg(self) -> None:"""启动FFmpeg进程"""with suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()socketid='unix:'+self.socket_pathself.ffmpeg_process = await asyncio.create_subprocess_exec('ffmpeg','-f', 's16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-i', socketid,  # 修改输入源为套接字路径'-c:a', 'aac','-f', 'rtsp','-rtsp_transport', 'tcp','rtsp://localhost:8554/mystream',stdout=asyncio.subprocess.DEVNULL,stdin=asyncio.subprocess.DEVNULL,stderr=asyncio.subprocess.PIPE)asyncio.create_task(self._monitor_ffmpeg_errors())async def _monitor_ffmpeg_errors(self) -> None:"""监控FFmpeg错误输出"""while self.running and self.ffmpeg_process:line = await self.ffmpeg_process.stderr.readline()if not line:break#   print(f"[FFmpeg Error] {line.decode().strip()}")async def _async_write_socket(self, data: bytes) -> None:"""安全写入套接字"""try:if self.client_socket and self.connection_active:loop = asyncio.get_running_loop()await loop.sock_sendall(self.client_socket, data)except (BrokenPipeError, ConnectionResetError):print("连接已断开,尝试重连...")await self._reconnect_pipeline()except Exception as e:print(f"写入错误: {str(e)}")self.connection_active = Falseasync def _reconnect_pipeline(self) -> None:"""完整重连流程"""print("启动重连流程...")self.connection_active = Falseif self.client_socket:self.client_socket.close()task1=asyncio.create_task(self._async_create_socket()),task2=asyncio.create_task( self._start_ffmpeg()),    await task2await task1# await asyncio.gather(task1, task2)#await self._async_create_socket()#await self._start_ffmpeg()# 剩余的heartbeat、tts_worker、input_handler等方法保持相同...async def stop(self) -> None:"""安全关闭"""self.running = Falsewith suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()if self.client_socket:self.client_socket.close()if self.server_socket:self.server_socket.close()if os.path.exists(self.socket_path):os.unlink(self.socket_path)print("所有资源已释放")async def _heartbeat(self) -> None:"""心跳维持机制"""while self.running:if self.connection_active :for i in range(10):if   self.sending_audio<0:await self._async_write_socket(self.silence)else :self.sending_audio-= 2await asyncio.sleep(0.2)   #     print(self.sending_audio,"slend")#  await asyncio.sleep(self.heartbeat_interval)else:await asyncio.sleep(0.5)def _sync_tts(self,text,tmp_filename):eng=pyttsx3.init()#  eng.say(text)eng.save_to_file(text, 'temp3.wav')eng.runAndWait()eng.endLoop()async def _tts_worker(self, text: str) -> None:"""异步TTS处理核心"""tmp_filename = None#with open('audio1.raw','rb') as chunkf:# data=chunkf.read()# secdd=len(data)/48000# self.sending_audio=int(secdd*10) # await self._async_write_socket(data)# #await asyncio.sleep(secdd)# print (secdd,len(data) )   # 创建临时文件with tempfile.NamedTemporaryFile(delete=False) as tmp:tmp_filename = tmp.name# # 同步TTS操作转异步执行loop = asyncio.get_running_loop()await loop.run_in_executor(None, self._sync_tts, *(text, 'temp3.wav',))# 转换音频格式# await asyncio.sleep(1.3)# self._sync_tts(text,tmp_filename)try: proc = await asyncio.create_subprocess_exec('ffmpeg','-hide_banner','-loglevel', 'error','-y','-i', 'temp3.wav',       # 输入文件路径'-f', 's16le',            # 强制输出格式为PCM s16le'-acodec', 'pcm_s16le',   # 明确指定音频编解码器 👈 关键修复'-ar', str(self.sample_rate),'-ac', str(self.channels),'-',                     # 输出到标准输出stdout=asyncio.subprocess.PIPE
)# 流式发送音频数据sum=0while chunk := await proc.stdout.read(4096):sum+=len(chunk)await self._async_write_socket(chunk)self.sending_audio=int(sum*10/48000) print("write data x0.1s:",self.sending_audio)finally:if tmp_filename and os.path.exists(tmp_filename):1#  os.unlink(tmp_filename)async def _input_handler(self) -> None:"""异步输入处理"""while self.running:try:text = await ainput("请输入文本(输入q退出): ")if text.lower() == 'q':self.running = Falsebreakif text.strip():await self._tts_worker(text)except Exception as e:print(f"输入错误: {str(e)}")async def run(self) -> None:"""主运行循环"""self.running = True# #await  self._start_ffmpeg()tasks = [asyncio.create_task(self._async_create_socket()),asyncio.create_task( self._start_ffmpeg()),asyncio.create_task(self._input_handler()),asyncio.create_task(self._heartbeat()),]await asyncio.gather(*tasks)# 以下保持不变...
if __name__ == "__main__":controller = AsyncTTSController()try:asyncio.run(controller.run())except KeyboardInterrupt:asyncio.run(controller.stop())
"""
ffmpeg -y -i temp.wav -f s16le -acodec pcm_s16le  -ar 24000  -ac 1   audio.raw
ffmpeg  -ar 24000 -ac 1 -f s16le  -i unix:/tmp/tts_audio.sock -f rtsp  rtsp://localhost:8554/mystream
"""

ffmpeg启动和监控的独立代码

验证了一下rtsp断线重建连结,也验证了 上面 的main.py的socket server退出后,ffmpeg自动重启连接。 要使用这个稳健程序,需要注释main.py run中的asyncio.create_task( self._start_ffmpeg()),
在这里插入图片描述
ffmg,py

import asyncio
from contextlib import suppress
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')class FFmpegManager:def __init__(self):self.ffmpeg_process = Noneself._retry_count = 0self._max_retries = 5self._retry_lock = asyncio.Lock()self._is_running = Falseself.sample_rate=24000self.channels=1self.socket_path = "/tmp/tts_audio.sock"async def _start_ffmpeg(self) -> None:"""带自动重试的FFmpeg启动函数"""async with self._retry_lock:await self._safe_terminate()try:socketid = 'unix:' + self.socket_pathself.ffmpeg_process =await asyncio.create_subprocess_exec('ffmpeg','-f', 's16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-i', socketid,  # 修改输入源为套接字路径'-c:a', 'aac','-f', 'rtsp','-rtsp_transport', 'tcp','rtsp://localhost:8554/mystream',stdout=asyncio.subprocess.DEVNULL,stdin=asyncio.subprocess.DEVNULL,stderr=asyncio.subprocess.PIPE)self._retry_count = 0  # 重置重试计数器asyncio.create_task(self._monitor_ffmpeg_errors())self._is_running = Trueexcept Exception as e:logging.error(f"FFmpeg启动失败: {str(e)}")await self._handle_retry()async def _monitor_ffmpeg_errors(self):"""增强型进程监控"""while self._is_running:logging.info("loop  error cathch")stderr = await self.ffmpeg_process.stderr.readline()if stderr:logging.error(f"FFmpeg错误输出: {stderr.decode().strip()}")# 检测进程状态return_code = self.ffmpeg_process.returncodeif return_code is not None:logging.warning(f"FFmpeg异常退出,返回码: {return_code}")self._is_running = Falseawait self._handle_retry()breakasync def _handle_retry(self):"""智能重试策略"""if self._retry_count >= self._max_retries:logging.critical("达到最大重试次数,放弃重启")return# 指数退避算法delay = min(2 ** self._retry_count, 30)  # 最大间隔30秒self._retry_count += 1logging.info(f"将在 {delay} 秒后尝试第 {self._retry_count} 次重启")await asyncio.sleep(delay)await self._start_ffmpeg()async def _safe_terminate(self):"""安全终止现有进程"""if self.ffmpeg_process:with suppress(Exception):self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()self.ffmpeg_process = None
# 以下保持不变...
async def main():controller=FFmpegManager()try:await controller._start_ffmpeg()logging.info('rung')await asyncio.sleep(1160)except KeyboardInterrupt:logging.info(3)asyncio.run(controller._safe_terminate())
if __name__ == "__main__":asyncio.run(main())
http://www.dtcms.com/wzjs/453559.html

相关文章:

  • 太仓建设局网站域名历史查询工具
  • 乡镇门户网站建设seo的优点和缺点
  • 用c 做毕业设计的音乐网站云南新闻最新消息今天
  • 连云港网站关键词优化服务深圳网站做优化哪家公司好
  • 做网站需要什么语言连云港seo优化
  • 想给公司做个网站怎么做找客户资源的软件哪个最靠谱
  • 做网站必须用域名吗电商代运营公司十强
  • 阆中住房城乡建设委官方网站在线服务器网站
  • 吴江网站建设关键词搜索站长工具
  • 关于汽车的网站cba最新消息
  • 自主建站系统电商关键词seo排名
  • 贵港网站建设媒体发布公司
  • wordpress主题视频站无锡网络优化推广公司
  • 台州的网站建设怎么优化自己网站
  • 外包做网站价格郑州网站排名优化公司
  • 怎么自己做淘宝客网站如何进行网站宣传推广
  • 日本做受视频网站万能搜索 引擎
  • 互联网行业怎么赚钱seo优化排名推广
  • 有哪些建设网站公司专业做加盟推广的公司
  • 汇中建设 官方网站网络营销师是做什么的
  • 扁平化网站源码网站建设定制
  • 个人响应式网站建设网络营销的职能有哪些
  • 佛山市和城乡建设局网站首页中国2022年重大新闻
  • 怎么用joomla做网站谷歌chrome手机版
  • 百度云加速西安seo经理
  • 网站建设与制作百度软件中心
  • 东莞网站开发推荐百度广告投放技巧
  • wordpress xydowngoogle seo 优化
  • 东莞建外贸网站好seo引擎搜索网站
  • 自建个网站怎么做360推广官网