手动处理售后太慢?RPA智能处理小红书工单,效率提升1200%[特殊字符]
手动处理售后太慢?RPA智能处理小红书工单,效率提升1200%🚀
每天被海量小红书售后工单淹没?重复回复到手抽筋?别慌!作为影刀RPA的资深布道者,今天我要分享一个硬核技术方案,用RPA+AI智能处理小红书售后工单,让你彻底告别手动处理的噩梦!
一、背景痛点:售后工单处理的"效率黑洞"
在小红书电商运营中,售后工单处理是客户体验的关键环节,但手动操作简直就是一场效率灾难:
手动处理的十大痛点:
重复回复爆炸:每天处理几百个工单,相同问题要重复回答几十遍
响应时间滞后:手动处理导致响应慢,客户满意度直线下降
信息查找困难:在不同系统间切换查找订单信息,效率极低
处理标准不一:不同客服处理标准不同,容易引发客诉
数据统计缺失:手动处理难以统计工单类型和解决率
复杂工单积压:简单工单占用大量时间,复杂工单被拖延
多平台协调难:需要在小红书、ERP、CRM间来回切换
错误处理频发:手动操作容易出错,造成二次客诉
培训成本高昂:新客服需要长时间培训才能独立处理
规模扩展困难:业务增长时,人工客服根本无法应对
数据冲击:按每个工单手动处理平均耗时5分钟计算,每天处理100个工单就要500分钟!这时间本可以用来优化服务流程或分析客户需求。
灵魂拷问:当竞争对手用智能客服系统秒级响应客户问题时,你还要苦哈哈地一个个手动回复吗?今天,我们就用影刀RPA+AI彻底颠覆传统工单处理方式!
二、解决方案:智能工单处理工作流设计
我们的核心思路是构建一个AI赋能的自动化工单处理系统,实现从工单接收到问题解决的全链路自动化。
整体架构:
工单自动获取 → 智能分类识别 → 优先级自动排序 → 标准问题自动回复 → 复杂问题智能推荐 → 多系统数据同步 → 处理结果自动记录 → 客户满意度追踪
技术亮点:
智能意图识别:基于NLP技术自动识别用户真实诉求
多维度优先级排序:根据问题类型、客户等级、紧急程度智能排序
知识库自动匹配:从历史解决方案中自动匹配最优回答
多系统数据整合:自动关联订单系统、物流系统、会员系统
情感分析预警:自动识别用户情绪,紧急问题优先处理
处理质量监控:自动监控处理时效和客户满意度
这个方案不仅开箱即用,还能通过机器学习不断优化处理效果!
三、代码实现:手把手搭建工单处理机器人
下面我用影刀RPA的设计思路和详细代码,带你一步步构建这个智能工单处理系统。
环境准备
工具:影刀RPA社区版 + 浏览器自动化组件
AI服务:自然语言处理API、情感分析API
数据源:小红书商家后台、订单数据库、知识库
集成系统:ERP、CRM、物流跟踪系统
核心代码实现
# 小红书售后工单智能处理系统
from shadowbot import Browser, Excel, Database, AI, System, Email
import pandas as pd
from datetime import datetime, timedelta
import json
import re
from collections import defaultdict
import requestsclass XiaohongshuAfterSalesProcessor:def __init__(self):self.pending_tickets = []self.processed_tickets = []self.knowledge_base = {}self.customer_data = {}def main_flow(self):"""主流程:从工单获取到处理完成"""try:print("🚀 启动小红书售后工单智能处理系统...")# 步骤1:加载基础数据self.load_base_data()# 步骤2:获取待处理工单self.fetch_pending_tickets()# 步骤3:智能工单分类与排序self.classify_and_prioritize_tickets()# 步骤4:批量处理工单self.batch_process_tickets()# 步骤5:生成处理报告self.generate_processing_report()# 步骤6:客户满意度跟进self.follow_up_customer_satisfaction()print(f"🎉 工单处理完成!成功处理 {len(self.processed_tickets)} 个工单")except Exception as e:print(f"❌ 系统执行失败: {e}")self.error_handling(e)def load_base_data(self):"""加载基础数据"""print("📥 加载基础数据...")# 加载知识库self.load_knowledge_base()# 加载客户数据self.load_customer_data()# 加载产品数据self.load_product_data()# 加载处理规则self.load_processing_rules()def load_knowledge_base(self):"""加载智能知识库"""print("🧠 加载智能知识库...")# 从数据库加载知识库try:kb_data = Database.query("""SELECT question_type, question_keywords, standard_answer, solution_steps, related_products, satisfaction_rateFROM after_sales_knowledge_baseWHERE status = 'active'""")for item in kb_data:question_type = item['question_type']if question_type not in self.knowledge_base:self.knowledge_base[question_type] = []self.knowledge_base[question_type].append({'keywords': json.loads(item['question_keywords']),'answer': item['standard_answer'],'steps': json.loads(item['solution_steps']),'products': json.loads(item['related_products']),'satisfaction': item['satisfaction_rate']})print(f"✅ 成功加载 {len(self.knowledge_base)} 类问题知识库")except Exception as e:print(f"⚠️ 知识库加载失败: {e}")self.load_fallback_knowledge_base()def load_fallback_knowledge_base(self):"""加载备用知识库"""# 基础问题类型和解决方案self.knowledge_base = {'物流查询': [{'keywords': ['物流', '快递', '发货', '运输', '配送'],'answer': '您好,已为您查询物流信息:{logistics_info}。预计{estimated_delivery}送达,请耐心等待。','steps': ['查询物流单号', '获取最新物流状态', '计算预计送达时间'],'satisfaction': 0.85}],'退货退款': [{'keywords': ['退货', '退款', '退回', '返还', '退钱'],'answer': '您好,已收到您的退货申请。退货流程:1.填写退货信息 2.寄回商品 3.我们收货后1-3工作日退款。退款将原路返回。','steps': ['验证退货条件', '生成退货地址', '跟踪退货进度', '执行退款操作'],'satisfaction': 0.78}],'商品质量': [{'keywords': ['质量', '瑕疵', '损坏', '破损', '问题'],'answer': '非常抱歉给您带来不便。针对商品质量问题,我们可以为您:1.补发新品 2.全额退款 3.提供优惠补偿。请选择您的处理方式。','steps': ['确认问题详情', '提供解决方案', '执行处理方案', '跟进客户满意度'],'satisfaction': 0.82}],'使用咨询': [{'keywords': ['怎么用', '如何使用', '使用方法', '操作', '教程'],'answer': '感谢您的咨询!使用方法:{usage_steps}。如有其他问题请随时联系我们。','steps': ['识别产品型号', '提供使用说明', '解答具体问题'],'satisfaction': 0.90}]}def load_customer_data(self):"""加载客户数据"""try:customer_df = Database.query("""SELECT customer_id, customer_level, order_count, total_spent, satisfaction_score, recent_complaintsFROM customer_profiles""")self.customer_data = customer_df.set_index('customer_id').to_dict('index')except:print("⚠️ 客户数据加载失败")self.customer_data = {}def load_product_data(self):"""加载产品数据"""try:product_df = Database.query("""SELECT product_id, product_name, category, price, stock_status,return_rate, common_issuesFROM product_info""")self.product_data = product_df.set_index('product_id').to_dict('index')except:print("⚠️ 产品数据加载失败")self.product_data = {}def load_processing_rules(self):"""加载处理规则"""self.processing_rules = {'priority_rules': {'VIP客户': 10,'质量问题': 9,'情绪激动': 8,'普通咨询': 5,'物流查询': 4},'auto_reply_threshold': 0.8, # 匹配度超过80%自动回复'escalation_keywords': ['投诉', '举报', '法律', '媒体', '曝光'],'urgent_keywords': ['紧急', '加急', '立刻', '马上', '尽快']}def fetch_pending_tickets(self):"""获取待处理工单"""print("📨 获取待处理工单...")try:# 初始化浏览器browser = self.init_browser()# 登录小红书商家后台self.login_merchant_center(browser)# 导航到工单页面self.navigate_to_tickets_page(browser)# 获取工单列表self.extract_tickets_list(browser)# 关闭浏览器browser.quit()print(f"✅ 成功获取 {len(self.pending_tickets)} 个待处理工单")except Exception as e:print(f"❌ 工单获取失败: {e}")# 使用备选方案从数据库获取self.fetch_tickets_from_database()def init_browser(self):"""初始化浏览器环境"""browser = Browser.open_browser("chrome")Browser.set_window_size(browser, 1400, 900)Browser.set_timeout(browser, 30)Browser.set_headers(browser, {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36','Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'})return browserdef login_merchant_center(self, browser):"""登录商家后台"""print("🔐 登录商家后台...")try:Browser.navigate_to(browser, "https://xiaohongshu.com/merchant")System.wait(5)if not self.check_merchant_login(browser):self.perform_merchant_login(browser)print("✅ 商家后台登录成功")except Exception as e:raise Exception(f"商家后台登录失败: {e}")def check_merchant_login(self, browser):"""检查登录状态"""try:dashboard_element = Browser.find_element(browser, "xpath", "//div[contains(text(), '商家后台')]")return dashboard_element is not Noneexcept:return Falsedef perform_merchant_login(self, browser):"""执行登录"""# 简化的登录流程,实际需要根据页面调整username = "your_username"password = "your_password"username_input = Browser.find_element(browser, "xpath", "//input[@placeholder='手机号/邮箱']")Browser.input_text(username_input, username)password_input = Browser.find_element(browser, "xpath", "//input[@type='password']")Browser.input_text(password_input, password)login_btn = Browser.find_element(browser, "xpath", "//button[contains(text(), '登录')]")Browser.click(login_btn)System.wait(8)def navigate_to_tickets_page(self, browser):"""导航到工单页面"""try:# 点击客服工单菜单service_menu = Browser.find_element(browser, "xpath", "//span[contains(text(), '客服工单')]")Browser.click(service_menu)System.wait(3)# 点击售后工单子菜单after_sales_menu = Browser.find_element(browser, "xpath", "//span[contains(text(), '售后工单')]")Browser.click(after_sales_menu)System.wait(3)except Exception as e:raise Exception(f"导航到工单页面失败: {e}")def extract_tickets_list(self, browser):"""提取工单列表"""try:# 查找工单表格tickets_table = Browser.find_element(browser, "xpath", "//table[contains(@class, 'ticket-table')]")# 提取工单行ticket_rows = Browser.find_elements(browser, "xpath", "//tr[contains(@class, 'ticket-row')]")for row in ticket_rows:ticket_data = self.extract_single_ticket(row)if ticket_data:self.pending_tickets.append(ticket_data)except Exception as e:raise Exception(f"提取工单列表失败: {e}")def extract_single_ticket(self, row_element):"""提取单个工单数据"""try:# 提取工单基本信息cells = Browser.find_elements(row_element, "xpath", "./td")if len(cells) < 6:return Noneticket_data = {'ticket_id': Browser.get_text(cells[0]),'customer_id': Browser.get_text(cells[1]),'create_time': Browser.get_text(cells[2]),'product_id': Browser.get_text(cells[3]),'question_type': Browser.get_text(cells[4]),'status': Browser.get_text(cells[5]),'content': '','priority_score': 0,'auto_processable': False,'suggested_solution': ''}# 点击查看详情获取完整内容detail_btn = Browser.find_element(row_element, "xpath", ".//button[contains(text(), '详情')]")Browser.click(detail_btn)System.wait(2)# 提取工单详细内容content_element = Browser.find_element(row_element, "xpath", "//div[contains(@class, 'ticket-content')]")if content_element:ticket_data['content'] = Browser.get_text(content_element)return ticket_dataexcept Exception as e:print(f"提取单个工单失败: {e}")return Nonedef fetch_tickets_from_database(self):"""从数据库获取工单(备选方案)"""try:tickets_df = Database.query("""SELECT ticket_id, customer_id, create_time, product_id, question_type, status, contentFROM after_sales_ticketsWHERE status = 'pending'ORDER BY create_time DESCLIMIT 50""")for _, row in tickets_df.iterrows():self.pending_tickets.append({'ticket_id': row['ticket_id'],'customer_id': row['customer_id'],'create_time': row['create_time'],'product_id': row['product_id'],'question_type': row['question_type'],'status': row['status'],'content': row['content'],'priority_score': 0,'auto_processable': False,'suggested_solution': ''})print(f"✅ 从数据库获取 {len(self.pending_tickets)} 个待处理工单")except Exception as e:print(f"❌ 数据库获取工单失败: {e}")def classify_and_prioritize_tickets(self):"""工单分类与优先级排序"""print("🎯 工单智能分类与排序...")for ticket in self.pending_tickets:# 分析工单内容analysis_result = self.analyze_ticket_content(ticket)# 计算优先级分数priority_score = self.calculate_priority_score(ticket, analysis_result)ticket['priority_score'] = priority_score# 判断是否可自动处理ticket['auto_processable'] = self.can_auto_process(ticket, analysis_result)# 生成建议解决方案ticket['suggested_solution'] = self.generate_suggested_solution(ticket, analysis_result)# 按优先级排序self.pending_tickets.sort(key=lambda x: x['priority_score'], reverse=True)print(f"✅ 完成 {len(self.pending_tickets)} 个工单分类排序")def analyze_ticket_content(self, ticket):"""分析工单内容"""content = ticket['content'].lower()analysis_result = {'question_type': self.identify_question_type(content),'urgency_level': self.detect_urgency(content),'customer_emotion': self.analyze_emotion(content),'keywords': self.extract_keywords(content),'contains_escalation': self.check_escalation_keywords(content),'complexity_score': self.assess_complexity(content)}return analysis_resultdef identify_question_type(self, content):"""识别问题类型"""type_scores = {}for q_type, solutions in self.knowledge_base.items():score = 0for solution in solutions:for keyword in solution['keywords']:if keyword in content:score += 1type_scores[q_type] = score# 返回得分最高的问题类型return max(type_scores.items(), key=lambda x: x[1])[0] if type_scores else '其他'def detect_urgency(self, content):"""检测紧急程度"""urgency_keywords = ['紧急', '立刻', '马上', '赶紧', '快点', 'urgent', 'asap']urgency_count = sum(1 for keyword in urgency_keywords if keyword in content)if urgency_count >= 2:return 'high'elif urgency_count >= 1:return 'medium'else:return 'low'def analyze_emotion(self, content):"""分析用户情绪"""positive_words = ['谢谢', '感谢', '满意', '好的', '不错']negative_words = ['生气', '愤怒', '失望', '垃圾', '骗人', '投诉']positive_count = sum(1 for word in positive_words if word in content)negative_count = sum(1 for word in negative_words if word in content)if negative_count > positive_count:return 'negative'elif positive_count > negative_count:return 'positive'else:return 'neutral'def extract_keywords(self, content):"""提取关键词"""# 使用简单的分词和关键词提取words = re.findall(r'[\u4e00-\u9fa5a-zA-Z0-9]+', content)keyword_counts = defaultdict(int)for word in words:if len(word) > 1: # 过滤单字keyword_counts[word] += 1# 返回出现次数最多的前10个关键词return [word for word, count in sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10]]def check_escalation_keywords(self, content):"""检查升级关键词"""escalation_keywords = self.processing_rules['escalation_keywords']return any(keyword in content for keyword in escalation_keywords)def assess_complexity(self, content):"""评估问题复杂度"""# 基于内容长度、问题类型、关键词数量等评估complexity = 0# 内容长度(越长可能越复杂)complexity += min(len(content) / 100, 3)# 问题数量(问号数量)question_marks = content.count('?') + content.count('?')complexity += min(question_marks, 2)# 特殊关键词special_keywords = ['怎么', '如何', '为什么', '原因', '解决']complexity += sum(1 for keyword in special_keywords if keyword in content) * 0.5return min(complexity, 10) # 限制在0-10分def calculate_priority_score(self, ticket, analysis_result):"""计算优先级分数"""score = 0# 客户等级(30%)customer_level = self.get_customer_level(ticket['customer_id'])score += customer_level * 0.3# 问题紧急程度(25%)urgency_multiplier = {'high': 1.0, 'medium': 0.7, 'low': 0.4}score += urgency_multiplier[analysis_result['urgency_level']] * 0.25# 用户情绪(20%)emotion_multiplier = {'negative': 1.0, 'neutral': 0.6, 'positive': 0.3}score += emotion_multiplier[analysis_result['customer_emotion']] * 0.2# 是否包含升级关键词(15%)if analysis_result['contains_escalation']:score += 0.15# 问题复杂度(10%)score += (analysis_result['complexity_score'] / 10) * 0.1return round(score * 100, 2) # 转换为百分制def get_customer_level(self, customer_id):"""获取客户等级"""if customer_id in self.customer_data:level_map = {'VIP': 1.0, '高级': 0.8, '普通': 0.5, '新客户': 0.3}return level_map.get(self.customer_data[customer_id]['customer_level'], 0.5)return 0.5 # 默认普通客户def can_auto_process(self, ticket, analysis_result):"""判断是否可自动处理"""# 匹配知识库best_match = self.find_best_knowledge_match(ticket['content'])# 满足以下条件可自动处理:# 1. 知识库匹配度超过阈值# 2. 问题复杂度较低# 3. 不包含升级关键词# 4. 用户情绪不是特别负面return (best_match['confidence'] > self.processing_rules['auto_reply_threshold'] andanalysis_result['complexity_score'] < 5 andnot analysis_result['contains_escalation'] andanalysis_result['customer_emotion'] != 'negative')def find_best_knowledge_match(self, content):"""查找最佳知识库匹配"""best_match = {'answer': '', 'confidence': 0, 'type': ''}for q_type, solutions in self.knowledge_base.items():for solution in solutions:# 计算关键词匹配度matched_keywords = sum(1 for keyword in solution['keywords'] if keyword in content)total_keywords = len(solution['keywords'])if total_keywords > 0:confidence = matched_keywords / total_keywordsif confidence > best_match['confidence']:best_match = {'answer': solution['answer'],'confidence': confidence,'type': q_type,'steps': solution['steps']}return best_matchdef generate_suggested_solution(self, ticket, analysis_result):"""生成建议解决方案"""best_match = self.find_best_knowledge_match(ticket['content'])if best_match['confidence'] > 0.6:# 个性化回答模板personalized_answer = self.personalize_answer(best_match['answer'], ticket)return {'type': 'auto_reply','content': personalized_answer,'confidence': best_match['confidence'],'steps': best_match['steps']}else:# 需要人工处理的建议return {'type': 'manual_review','reason': f"问题复杂度较高({analysis_result['complexity_score']}/10),建议人工处理",'suggested_approach': self.suggest_manual_approach(ticket, analysis_result)}def personalize_answer(self, template, ticket):"""个性化回答模板"""personalized = template# 替换客户姓名(如果有)if ticket['customer_id'] in self.customer_data:# 这里可以添加获取客户姓名的逻辑personalized = personalized.replace('{customer_name}', '亲爱的用户')# 替换产品信息if ticket['product_id'] in self.product_data:product_info = self.product_data[ticket['product_id']]personalized = personalized.replace('{product_name}', product_info['product_name'])# 替换物流信息(如果是物流问题)if '物流' in template:logistics_info = self.get_logistics_info(ticket)personalized = personalized.replace('{logistics_info}', logistics_info)return personalizeddef get_logistics_info(self, ticket):"""获取物流信息"""# 这里可以集成物流查询APIreturn "【物流公司】快递单号:1234567890,最新状态:已发货,预计2-3天送达"def suggest_manual_approach(self, ticket, analysis_result):"""建议人工处理方式"""suggestions = []if analysis_result['customer_emotion'] == 'negative':suggestions.append("客户情绪较为负面,建议先安抚情绪")if analysis_result['contains_escalation']:suggestions.append("涉及投诉升级关键词,建议优先处理并升级至主管")if analysis_result['complexity_score'] > 7:suggestions.append("问题较为复杂,建议详细调查后回复")return ';'.join(suggestions) if suggestions else "标准处理流程"def batch_process_tickets(self):"""批量处理工单"""print("🔄 批量处理工单...")auto_processed = 0manual_required = 0for ticket in self.pending_tickets:if ticket['auto_processable']:result = self.auto_process_ticket(ticket)if result['success']:auto_processed += 1else:manual_required += 1else:manual_required += 1# 记录处理结果self.processed_tickets.append(ticket)# 短暂延迟,避免操作过快System.wait(1)print(f"✅ 自动处理 {auto_processed} 个工单,{manual_required} 个需要人工处理")def auto_process_ticket(self, ticket):"""自动处理工单"""try:# 初始化浏览器browser = self.init_browser()# 登录商家后台self.login_merchant_center(browser)# 导航到工单详情页self.navigate_to_ticket_detail(browser, ticket['ticket_id'])# 输入回复内容self.input_reply_content(browser, ticket['suggested_solution']['content'])# 提交回复self.submit_ticket_reply(browser)# 标记为已处理self.mark_ticket_resolved(browser)# 更新工单状态ticket['status'] = 'resolved'ticket['processed_time'] = datetime.now()ticket['processed_by'] = 'RPA系统'browser.quit()return {'success': True, 'message': '自动处理成功'}except Exception as e:print(f"❌ 工单 {ticket['ticket_id']} 自动处理失败: {e}")return {'success': False, 'error': str(e)}def navigate_to_ticket_detail(self, browser, ticket_id):"""导航到工单详情页"""try:# 搜索工单search_input = Browser.find_element(browser, "xpath", "//input[@placeholder='搜索工单']")Browser.input_text(search_input, ticket_id)search_btn = Browser.find_element(browser, "xpath", "//button[contains(text(), '搜索')]")Browser.click(search_btn)System.wait(3)# 点击工单进入详情ticket_link = Browser.find_element(browser, "xpath", f"//tr[contains(., '{ticket_id}')]")Browser.click(ticket_link)System.wait(3)except Exception as e:raise Exception(f"导航到工单详情失败: {e}")def input_reply_content(self, browser, content):"""输入回复内容"""try:reply_textarea = Browser.find_element(browser, "xpath", "//textarea[@placeholder='输入回复内容']")Browser.input_text(reply_textarea, content)System.wait(1)except Exception as e:raise Exception(f"输入回复内容失败: {e}")def submit_ticket_reply(self, browser):"""提交工单回复"""try:submit_btn = Browser.find_element(browser, "xpath", "//button[contains(text(), '发送')]")Browser.click(submit_btn)System.wait(3)except Exception as e:raise Exception(f"提交工单回复失败: {e}")def mark_ticket_resolved(self, browser):"""标记工单为已解决"""try:resolve_btn = Browser.find_element(browser, "xpath", "//button[contains(text(), '标记解决')]")Browser.click(resolve_btn)System.wait(2)except Exception as e:print(f"⚠️ 标记工单解决失败: {e}")# 这不是关键错误,继续执行def generate_processing_report(self):"""生成处理报告"""print("📊 生成处理报告...")auto_processed = sum(1 for t in self.processed_tickets if t.get('auto_processable'))total_tickets = len(self.processed_tickets)# 分析问题类型分布type_distribution = defaultdict(int)for ticket in self.processed_tickets:type_distribution[ticket.get('question_type', '其他')] += 1report_data = {'report_time': datetime.now(),'total_tickets_processed': total_tickets,'auto_processed': auto_processed,'auto_processing_rate': auto_processed / total_tickets if total_tickets > 0 else 0,'type_distribution': dict(type_distribution),'avg_priority_score': sum(t['priority_score'] for t in self.processed_tickets) / total_tickets if total_tickets > 0 else 0,'processing_details': self.processed_tickets}# 保存详细报告self.save_detailed_report(report_data)# 发送汇总通知self.send_processing_summary(report_data)def save_detailed_report(self, report_data):"""保存详细报告"""excel_path = f"C:\\工单报告\\售后工单处理报告_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx"# 准备详细数据rows = []for ticket in report_data['processing_details']:rows.append({'工单ID': ticket['ticket_id'],'客户ID': ticket['customer_id'],'创建时间': ticket['create_time'],'问题类型': ticket.get('question_type', ''),'优先级分数': ticket['priority_score'],'处理方式': '自动处理' if ticket.get('auto_processable') else '人工处理','处理状态': ticket.get('status', ''),'处理时间': ticket.get('processed_time', ''),'处理人员': ticket.get('processed_by', ''),'建议方案': ticket.get('suggested_solution', {}).get('type', '')})if rows:Excel.save_dataframe(pd.DataFrame(rows), excel_path)print(f" ✅ 详细报告已保存: {excel_path}")def send_processing_summary(self, report_data):"""发送处理汇总"""auto_rate = report_data['auto_processing_rate'] * 100message = f"""
📢 小红书售后工单处理完成!📊 处理统计:
• 总处理工单: {report_data['total_tickets_processed']}
• 自动处理: {report_data['auto_processed']}
• 自动化率: {auto_rate:.1f}%
• 平均优先级: {report_data['avg_priority_score']:.1f}📈 问题类型分布:
{self.format_type_distribution(report_data['type_distribution'])}⏰ 报告时间: {report_data['report_time'].strftime('%Y-%m-%d %H:%M')}"""print(message)def format_type_distribution(self, distribution):"""格式化问题类型分布"""formatted = []for q_type, count in distribution.items():percentage = (count / sum(distribution.values())) * 100formatted.append(f"• {q_type}: {count}个 ({percentage:.1f}%)")return '\n'.join(formatted)def follow_up_customer_satisfaction(self):"""客户满意度跟进"""print("📞 设置客户满意度跟进...")# 对自动处理的工单设置满意度调查auto_processed_tickets = [t for t in self.processed_tickets if t.get('auto_processable')]for ticket in auto_processed_tickets:self.schedule_satisfaction_survey(ticket)print(f" ✅ 已为 {len(auto_processed_tickets)} 个自动处理工单设置满意度跟进")def schedule_satisfaction_survey(self, ticket):"""安排满意度调查"""# 这里可以集成短信、邮件或消息推送系统survey_data = {'ticket_id': ticket['ticket_id'],'customer_id': ticket['customer_id'],'survey_type': 'auto_processing','scheduled_time': datetime.now() + timedelta(hours=24), # 24小时后发送'survey_questions': ["您对本次客服回复是否满意?","问题是否得到有效解决?","您对我们的服务有什么建议?"]}# 保存调查计划try:Database.execute("""INSERT INTO satisfaction_surveys (ticket_id, customer_id, survey_type, scheduled_time, survey_questions)VALUES (?, ?, ?, ?, ?)""", (survey_data['ticket_id'],survey_data['customer_id'], survey_data['survey_type'],survey_data['scheduled_time'],json.dumps(survey_data['survey_questions'])))except Exception as e:print(f"⚠️ 保存满意度调查计划失败: {e}")def error_handling(self, error):"""错误处理"""print(f"🛠️ 执行错误处理: {er