websocket如何推送最新日志
直接上代码,通过读取日志位置,设置一个子线程(守护线程),每秒读取一次,有新的数据就发送给前端,没有下一秒继续,一旦前端页面关闭websocket之后,这个一秒执行一次的任务也会随着主线程结束结束
class LogConsumer(WebsocketConsumer):"""WebSocket消费者,用于实时发送任务日志支持持续推送新增日志内容"""def websocket_connect(self, message):"""客户端发起WebSocket连接请求时触发"""# 获取任务IDself.task_detail = self.scope['url_route']['kwargs']['task_detail']self.log_file_path = f"media/logs/task_{self.task_detail}.log"# 记录文件读取位置self.last_position = 0# 是否处于流式传输状态self.streaming = True# 接受连接self.accept()# 发送连接成功的消息self.send(text_data=json.dumps({'type': 'connected','message': f'已连接到任务 {self.task_detail} 的日志流'}))# 发送现有日志内容self.send_existing_logs()# 开始监控日志文件变化self.monitor_log_file()def websocket_receive(self, message):"""接收客户端发送的消息"""try:data = json.loads(message['text'])# 处理客户端控制指令if data.get('action') == 'pause':self.streaming = Falseelif data.get('action') == 'resume':self.streaming = Trueself.monitor_log_file()elif data.get('action') == 'ping':self.send(text_data=json.dumps({'type': 'pong','timestamp': time.time()}))except json.JSONDecodeError:passdef websocket_disconnect(self, message):"""客户端断开连接时触发"""self.streaming = Falseprint(f"任务 {self.task_detail} 的日志连接已断开")raise StopConsumer()def send_existing_logs(self):"""发送已存在的日志内容"""try:if os.path.exists(self.log_file_path):with open(self.log_file_path, 'r', encoding='utf-8') as f:content = f.read()self.last_position = f.tell()self.send(text_data=json.dumps({'type': 'existing_log_data','content': content}))else:self.send(text_data=json.dumps({'type': 'error','message': '日志文件不存在'}))except Exception as e:self.send(text_data=json.dumps({'type': 'error','message': f'读取日志文件时出错: {str(e)}'}))def monitor_log_file(self):"""持续监控日志文件变化并推送新增内容"""try:if not self.streaming:returnif os.path.exists(self.log_file_path):with open(self.log_file_path, 'r', encoding='utf-8') as f:f.seek(self.last_position)new_content = f.read()self.last_position = f.tell()if new_content:self.send(text_data=json.dumps({'type': 'new_log_data','content': new_content}))# 使用定时器实现周期性检查import threadingif self.streaming:timer = threading.Timer(1.0, self.monitor_log_file)timer.daemon = Truetimer.start()except Exception as e:self.send(text_data=json.dumps({'type': 'error','message': f'监控日志文件时出错: {str(e)}'}))