当前位置: 首页 > news >正文

重构多任务爬虫

data_config.py

import random
import timeclass Config:def __init__(self):self.headers = {}self.cookies = {}self.key = "鼓浪屿"self._fxpcqlniredt = "09031065313454484706"self.x_traceID = self.generate_string()self.code_nums = 1  # 获取几个poiId    1<self.code_nums<=self.pages*10self.pages = 5  # 需要获取几页的景区链接self.comment_nums = 20  # 评论数=self.code_nums*self.comment_numsdef generate_string(self):# 获取当前时间戳(毫秒)timestamp = int(time.time() * 1000)# 生成1e7范围内的随机整数random_num = random.randint(0, 10 ** 7 - 1)# 拼接字符串result = f"{self._fxpcqlniredt}-{timestamp}-{random_num}"return resultdef run(self):print(self.headers)print(self.cookies)if __name__ == '__main__':config = Config()config.run()

data_crawl.py

"""
数据采集器 - 封装原有的爬虫功能,提供更友好的接口
"""
import sys
import os
from datetime import datetime
from loguru import logger# 添加父目录到路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))from Xiechen import data_config, get_url, get_PoiId, get_commentsclass DataCrawler:def __init__(self):self.real_keyword = ''self.real_count = ''self.real_pages = ''self.config = data_config.Config()self.comments_crawler = get_comments.Comments()def search_pois(self, keyword,count, pages):try:# 更新配置self.real_keyword = keywordself.real_count = countself.real_pages = pagesself.config.key = self.real_keywordself.config.pages = self.real_pages# 获取景区列表url_crawler = get_url.Get_Url(self.real_keyword)  # Get_Url本身初始化时就会读取data_config中的配置,导致我们这里设置的用户填写的信息被覆盖# 直接使用我们已经更新的config对象print("self.config:",self.config)url_crawler.config = self.configpoi_list = url_crawler.clear_data()logger.info(f"搜索到 {len(poi_list)} 个景区")return poi_listexcept Exception as e:logger.error(f"搜索景区失败: {e}")return []def get_poi_comments(self, poi_id):try:# 更新配置self.config.poiIds = poi_idself.config.comment_nums = self.real_count# 重新初始化评论爬虫self.comments_crawler = get_comments.Comments()print(self.comments_crawler)# 获取评论数据comments_data = self.comments_crawler.get_comments(self.real_pages,self.config.comment_nums)if not comments_data:return []# 转换为标准格式comments = []for i in range(len(comments_data.get('content', []))):comment = {'poi_id': poi_id,'content': comments_data['content'][i] if i < len(comments_data['content']) else '','score': comments_data['score'][i] if i < len(comments_data['score']) else None,'publish_date': comments_data['publishTypeTag'][i] if i < len(comments_data['publishTypeTag']) else None,'ip_location': comments_data['ipLocatedName'][i] if i < len(comments_data['ipLocatedName']) else '','user_id': comments_data['userId'][i] if i < len(comments_data['userId']) else '','user_image': comments_data['userImage'][i] if i < len(comments_data['userImage']) else '','client_auth': comments_data['clientAuth'][i] if i < len(comments_data['clientAuth']) else '','crawl_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}comments.append(comment)logger.info(f"成功获取 {len(comments)} 条评论")return commentsexcept Exception as e:logger.error(f"获取评论失败: {e}")return []

get_comments.py

from datetime import datetime
import pymysql
from pymysql.cursors import DictCursor
import requests
from loguru import logger
from Xiechen import data_config,get_PoiId,db_configclass Comments:def __init__(self):self.config = data_config.Config()self.url = "https://m.ctrip.com/restapi/soa2/13444/json/getCommentCollapseList"self.poiIds = get_PoiId.Get_PoiId().distribute_logic()self.pages = int(self.config.comment_nums / 10)  # 控制爬取页数self.dic = {'content': [],'publishTypeTag': [],'score': [],'ipLocatedName': [],'userId': [],'userImage': [],'clientAuth': []}self.params = {"_fxpcqlniredt": self.config._fxpcqlniredt,"x-traceID": self.config.x_traceID}self.data = {"arg": {"channelType": 2,"collapseType": 0,"commentTagId": 0,"pageSize": 10,"pageIndex": '',"poiId": '',"sourceType": 1,"sortType": 3,"starType": 0},"head": {"cid": "09031065313454484706","ctok": "","cver": "1.0","lang": "01","sid": "8888","syscode": "09","auth": "","xsid": "","extension": []}}# 数据库连接配置self.db_config = db_config.DB_CONFIG# 初始化数据库连接self.init_database()def init_database(self):"""初始化数据库连接"""try:self.connection = pymysql.connect(**self.db_config, cursorclass=DictCursor)self.cursor = self.connection.cursor()logger.success("数据库连接成功")except Exception as e:logger.error(f"数据库连接失败: {e}")self.connection = Noneself.cursor = Nonedef close_database(self):"""关闭数据库连接"""if self.cursor:self.cursor.close()if self.connection:self.connection.close()logger.info("数据库连接已关闭")def insert_comment_to_db(self, poi_id, comment_data):"""将评论数据插入数据库"""if not self.connection or not self.cursor:logger.error("数据库未连接,无法插入数据")return Falsetry:sql = """INSERT INTO ctrip_comments (poi_id, content, score, publish_date, ip_location, user_id, user_image, client_auth)VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"""# 处理数据,确保数据类型正确content = comment_data.get('content', '')score = float(comment_data.get('score', 0)) if comment_data.get('score') else Nonepublish_date = comment_data.get('publishTypeTag')ip_location = comment_data.get('ipLocatedName', '')user_id = comment_data.get('userId', '')user_image = comment_data.get('userImage', '')client_auth = comment_data.get('clientAuth', '')# 执行插入self.cursor.execute(sql, (poi_id, content, score, publish_date, ip_location, user_id, user_image, client_auth))self.connection.commit()logger.debug(f"插入评论成功: poi_id={poi_id}, score={score}, date={publish_date}")return Trueexcept Exception as e:logger.error(f"插入评论数据失败: {e}")self.connection.rollback()return Falsedef insert_poi_info_to_db(self, poi_info):"""将景区信息插入数据库"""if not self.connection or not self.cursor:logger.error("数据库未连接,无法插入数据")return Falsetry:sql = """INSERT INTO ctrip_poi_info (poi_id, poi_code, name, location, district, english_name, score, score_text, comment_count, comment_count_text, image_url, detail_url)VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)ON DUPLICATE KEY UPDATEscore = VALUES(score),comment_count = VALUES(comment_count),update_time = CURRENT_TIMESTAMP"""# 执行插入或更新self.cursor.execute(sql, (poi_info.get('id'), poi_info.get('code'), poi_info.get('name'),poi_info.get('location'), poi_info.get('district'), poi_info.get('english_name'),poi_info.get('score'), poi_info.get('score_text'), poi_info.get('comment_count'),poi_info.get('comment_count_text'), poi_info.get('image_url'), poi_info.get('url')))self.connection.commit()logger.success(f"成功插入/更新景区信息: {poi_info.get('name')}")return Trueexcept Exception as e:logger.error(f"插入景区信息失败: {e}")self.connection.rollback()return Falsedef check_poiId(self):print(self.poiIds)if isinstance(self.poiIds,list):return Truereturn Falsedef reset_data_dict(self):"""重置数据字典"""self.dic = {'content': [],'publishTypeTag': [],'score': [],'ipLocatedName': [],'userId': [],'userImage': [],'clientAuth': []}def get_comments(self,real_pages,counts):if self.check_poiId() or real_pages*10 < counts:print(real_pages)self.many_url_comments(counts)self.pages = real_pagesfor i in range(self.pages):self.data['arg']['pageIndex'] = iself.data['arg']['poiId'] = self.poiIds# 关键修改:使用json参数而非data参数response = requests.post(self.url, headers=self.config.headers, cookies=self.config.cookies, params=self.params,json=self.data)logger.info(f"请求状态: {response.status_code}, pageIndex={i}")try:items = response.json()['result']['items']if not items:logger.warning("本页无评论数据,跳过插入")for key in self.dic.keys():for item in items:if key == 'userId' or key == 'userImage' or key == 'clientAuth':try:self.dic[key].append(item['userInfo'][key])except:self.dic[key].append('空')elif key == 'publishTypeTag':try:str_time = item[key][:-5]self.dic[key].append(datetime.strptime(str_time, "%Y-%m-%d").date())except:self.dic[key].append(datetime.now().date())else:try:self.dic[key].append(item[key])except:self.dic[key].append('空')# 将评论数据插入数据库(单POI模式也入库)inserted = 0if self.connection and self.cursor:for j in range(len(self.dic['content'])):comment_item = {'content': self.dic['content'][j] if j < len(self.dic['content']) else '','score': self.dic['score'][j] if j < len(self.dic['score']) else None,'publishTypeTag': self.dic['publishTypeTag'][j] if j < len(self.dic['publishTypeTag']) else None,'ipLocatedName': self.dic['ipLocatedName'][j] if j < len(self.dic['ipLocatedName']) else '','userId': self.dic['userId'][j] if j < len(self.dic['userId']) else '','userImage': self.dic['userImage'][j] if j < len(self.dic['userImage']) else '','clientAuth': self.dic['clientAuth'][j] if j < len(self.dic['clientAuth']) else ''}if self.insert_comment_to_db(self.poiIds, comment_item):inserted += 1else:logger.error("数据库未连接,跳过评论入库")logger.info(f"本页评论:解析{len(self.dic['content'])},成功入库{inserted}")return self.dicexcept ValueError as e:logger.error(f"无法解析JSON响应: {e}")return Nonedef many_url_comments(self,counts):self.pages = round(counts/10)logger.info("实际页数:",self.pages)logger.info("poilds:",self.poiIds)for poiId in self.poiIds:for i in range(self.pages):logger.success("正常翻页")# 重置数据字典,避免数据累积self.reset_data_dict()self.data['arg']['pageIndex'] = iself.data['arg']['poiId'] = poiId# 关键修改:使用json参数而非data参数response = requests.post(self.url, headers=self.config.headers, cookies=self.config.cookies, params=self.params,json=self.data)logger.info(f"请求状态: {response.status_code}, poiId={poiId}, pageIndex={i}")try:items = response.json()['result']['items']if not items:logger.warning(f"poiId={poiId}{i}页无评论数据")for key in self.dic.keys():for item in items:try:if key == 'userId' or key == 'userImage' or key == 'clientAuth':try:self.dic[key].append(item['userInfo'][key])except:self.dic[key].append('空')elif key == 'publishTypeTag':try:str_time = item[key][:-5]self.dic[key].append(datetime.strptime(str_time, "%Y-%m-%d").date())except:self.dic[key].append(datetime.now().date())else:try:self.dic[key].append(item[key])except:self.dic[key].append('空')except Exception as e:logger.error(e)continue# 将评论数据插入数据库inserted = 0if self.connection and self.cursor:for j in range(len(self.dic['content'])):comment_item = {'content': self.dic['content'][j] if j < len(self.dic['content']) else '','score': self.dic['score'][j] if j < len(self.dic['score']) else None,'publishTypeTag': self.dic['publishTypeTag'][j] if j < len(self.dic['publishTypeTag']) else None,'ipLocatedName': self.dic['ipLocatedName'][j] if j < len(self.dic['ipLocatedName']) else '','userId': self.dic['userId'][j] if j < len(self.dic['userId']) else '','userImage': self.dic['userImage'][j] if j < len(self.dic['userImage']) else '','clientAuth': self.dic['clientAuth'][j] if j < len(self.dic['clientAuth']) else ''}if self.insert_comment_to_db(poiId, comment_item):inserted += 1else:logger.error("数据库未连接,跳过评论入库")logger.info(f"poiId={poiId}{i}页:解析{len(self.dic['content'])},成功入库{inserted}")yield self.dicexcept ValueError as e:logger.error(f"无法解析JSON响应: {e}")logger.error(f"原始内容: {response.text}")continueif __name__ == '__main__':comments = Comments()try:# print(comments.check_poiId())comments.get_comments(1,20)comments.close_database()except:from NLPapi.Xiechen import get_urlpoi_list = get_url.Get_Url().clear_data()for poi_info in poi_list[:comments.config.code_nums]:comments.insert_poi_info_to_db(poi_info)# 获取评论数据并插入数据库generator = comments.many_url_comments(20)for result in generator:print(result)finally:# 确保数据库连接被正确关闭comments.close_database()logger.info("程序执行完成,数据库连接已关闭")

get_poild.py

import re
import requests
from loguru import logger
from Xiechen import get_url
from Xiechen import data_config
from lxml import etree
import json
import random,time
class Get_PoiId:def __init__(self):self.url = "https://m.ctrip.com/restapi/soa2/{}/json/getCommentCollapseList"def init_config(self):"""初始化配置 - 每次调用时获取最新配置"""self.config = data_config.Config()self.get_main_page_obj = get_url.Get_Url()# 处理配置文件中需要获取poiId的数量def distribute_logic(self):# 初始化配置self.init_config()if self.config.code_nums == 1:main_page = self.get_main_page_obj.clear_data()[self.config.code_nums - 1]['url']print(main_page)return self.request_main_page(main_page)else:poiId_ls = []for i in range(self.config.code_nums):main_page = self.get_main_page_obj.clear_data()[i]['url']logger.success(f"main_page:{main_page}")poiId_ls.append(self.request_main_page(main_page))logger.success(f"poiId解析成功:{poiId_ls}")return poiId_ls# 请求主页def request_main_page(self,url):full_url = f"{url}?renderPlatform="response = requests.get(full_url,headers=self.config.headers,cookies=self.config.cookies).texttime.sleep(random.randint(1, 2))xml_doc = etree.HTML(response)script_code = xml_doc.xpath('//div[@id="hp_container"]/div[@id="content"]/script[@id="__NEXT_DATA__"]')[0].textpoiId = json.loads(script_code)['props']['pageProps']['initialState']["poiDetail"]['poiId']return poiIdif __name__ == '__main__':service = Get_PoiId()print(service.distribute_logic())

get_url.py

import requests
import json
from Xiechen import data_config
import random,timeclass Get_Url:def __init__(self,new_keyword=''):self.new_keyword = new_keyword# 请求景区doc 获取详情页urlself.url = "https://m.ctrip.com/restapi/soa2/20591/getGsOnlineResult"def request_url(self):# 每次请求时获取最新配置self.config = data_config.Config()if self.new_keyword:self.config.key=self.new_keywordelse:passfor i in range(self.config.pages):try:params = {"_fxpcqlniredt": self.config._fxpcqlniredt,"x-traceID": self.config.x_traceID}data = {"keyword": self.config.key,"pageIndex": i,"pageSize": 12,"tab": "all","sourceFrom": "","profile": False,"head": {"cid": self.config._fxpcqlniredt,"ctok": "","cver": "1.0","lang": "01","sid": "8888","syscode": "09","auth": "","xsid": "","extension": []}}data = json.dumps(data, separators=(',', ':'))response = requests.post(self.url, headers=self.config.headers, cookies=self.config.cookies, params=params, data=data)time.sleep(random.randint(1, 2))return response.json()except Exception as e:print(e)continuedef clear_data(self):data = self.request_url()ls = []for i in range(len(data['homeItems'][1]['items'])):item = data['homeItems'][1]['items'][i]dic = {"id": item.get("id"),"code": item.get("code"),"name": item.get("word", "").split(",")[0],  # 提取景点名称"location": item.get("word", "").split(",")[-1].strip() if "," in item.get("word", "") else "","url": item.get("url"),"image_url": item.get("imageUrl"),"district": item.get("districtName"),"english_name": item.get("eName"),"score": item.get("commentScore"),"score_text": item.get("commentScoreText"),"comment_count": item.get("commentCount"),"comment_count_text": item.get("commentCountText")}ls.append(dic)return lsif __name__ == '__main__':get_url_object = Get_Url()print(get_url_object.clear_data())

app.py

from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import os
import sys
import pandas as pd
from datetime import datetime
import threading
import time
import json
from loguru import logger# 添加当前目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))from Xiechen.data_crawler import DataCrawler
from Xiechen import data_config,get_commentsapp = Flask(__name__)
CORS(app)  # 允许跨域请求# 全局变量存储爬取状态
crawl_status = {'is_running': False,'progress': 0,'current_task': '','total_tasks': 0,'completed_tasks': 0,'error_message': '','results': []
}@app.route('/')
def hello_world():return '数据采集服务运行中!'@app.route('/api/crawl/start', methods=['POST'])
def start_crawl():"""开始数据采集"""global crawl_statusif crawl_status['is_running']:return jsonify({'success': False,'message': '数据采集正在进行中,请等待完成后再试'})try:data = request.get_json()keyword = data.get('keyword', '')count = data.get('count', 10)pages = data.get('pages', 1)print(keyword,count,pages)if not keyword:return jsonify({'success': False,'message': '请输入搜索关键词'})if count <= 0 or count > 1000:return jsonify({'success': False,'message': '采集数量必须在1-1000之间'})# 重置状态crawl_status.update({'is_running': True,'progress': 0,'current_task': '初始化爬虫...','total_tasks': 0,'completed_tasks': 0,'error_message': '','results': []})# 在后台线程中运行爬虫thread = threading.Thread(target=run_crawler, args=(keyword, count, pages))thread.daemon = Truethread.start()return jsonify({'success': True,'message': '数据采集已开始','task_id': int(time.time())})except Exception as e:crawl_status['is_running'] = Falsecrawl_status['error_message'] = str(e)return jsonify({'success': False,'message': f'启动失败: {str(e)}'})@app.route('/api/crawl/status', methods=['GET'])
def get_crawl_status():"""获取爬取状态"""return jsonify({'success': True,'data': crawl_status})@app.route('/api/crawl/stop', methods=['POST'])
def stop_crawl():"""停止数据采集"""global crawl_statuscrawl_status['is_running'] = Falsecrawl_status['current_task'] = '已停止'return jsonify({'success': True,'message': '数据采集已停止'})@app.route('/api/crawl/download/<filename>', methods=['GET'])
def download_file(filename):"""下载Excel文件"""try:file_path = os.path.join('downloads', filename)if os.path.exists(file_path):return send_file(file_path, as_attachment=True)else:return jsonify({'success': False,'message': '文件不存在'})except Exception as e:return jsonify({'success': False,'message': f'下载失败: {str(e)}'})def run_crawler(keyword,count,pages):"""运行爬虫的后台函数"""global crawl_statustry:# 创建下载目录os.makedirs('downloads', exist_ok=True)# 初始化爬虫crawler = DataCrawler()# 更新状态crawl_status['current_task'] = f'搜索关键词: {keyword}'crawl_status['progress'] = 10# 搜索景区poi_list = crawler.search_pois(keyword,count, pages)logger.warning(f"根据用户输入得到:{poi_list}")if not poi_list:crawl_status['error_message'] = '未找到相关景区'crawl_status['is_running'] = Falsereturncrawl_status['total_tasks'] = len(poi_list)crawl_status['current_task'] = f'开始采集 {len(poi_list)} 个景区的评论数据'crawl_status['progress'] = 20# 采集评论数据all_comments = []for i, poi_info in enumerate(poi_list):logger.warning(f"循环得到poi_info:{poi_info}")if not crawl_status['is_running']:breakcrawl_status['current_task'] = f'正在采集: {poi_info.get("name", "未知景区")}'# 获取该景区的评论comments = crawler.get_poi_comments(poi_info['id'])if comments:all_comments.extend(comments)crawl_status['completed_tasks'] = i + 1crawl_status['progress'] = 20 + (i + 1) * 60 // len(poi_list)# 添加延迟避免被封time.sleep(2)if not all_comments:crawl_status['error_message'] = '未采集到任何评论数据'crawl_status['is_running'] = Falsereturn# 生成Excel文件crawl_status['current_task'] = '正在生成Excel文件...'crawl_status['progress'] = 85filename = f"ctrip_comments_{keyword}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"file_path = os.path.join('downloads', filename)# 创建DataFramedf = pd.DataFrame(all_comments)# 保存为Excelwith pd.ExcelWriter(file_path, engine='openpyxl') as writer:df.to_excel(writer, sheet_name='评论数据', index=False)crawl_status['current_task'] = '数据采集完成'crawl_status['progress'] = 100crawl_status['results'] = {'filename': filename,'total_comments': len(all_comments),'file_path': file_path}except Exception as e:crawl_status['error_message'] = str(e)crawl_status['current_task'] = '采集失败'finally:crawl_status['is_running'] = Falseif __name__ == '__main__':# 创建必要的目录os.makedirs('downloads', exist_ok=True)app.run(host='0.0.0.0', port=5000, debug=False)

文章转载自:

http://sXDRz7Vw.bkgfp.cn
http://y5uR7YXt.bkgfp.cn
http://gU40iDdF.bkgfp.cn
http://LVpbbf5T.bkgfp.cn
http://CiexMMgv.bkgfp.cn
http://0xQPFQLB.bkgfp.cn
http://kjJ7z9P9.bkgfp.cn
http://GkW5n75v.bkgfp.cn
http://SekNCPV0.bkgfp.cn
http://EDIq4aTn.bkgfp.cn
http://ZwdkJSu0.bkgfp.cn
http://fuhRJxR2.bkgfp.cn
http://m2wf5Xew.bkgfp.cn
http://3m00FWXl.bkgfp.cn
http://Ocuo78CI.bkgfp.cn
http://1owCHZzP.bkgfp.cn
http://HJKlLb74.bkgfp.cn
http://wJUDSIJy.bkgfp.cn
http://qvuMqNDa.bkgfp.cn
http://5Vgy8A48.bkgfp.cn
http://hnnmwB6i.bkgfp.cn
http://nL1nLW8q.bkgfp.cn
http://6j2L1PL3.bkgfp.cn
http://W2BHRRqK.bkgfp.cn
http://vxeH0yOY.bkgfp.cn
http://f57WWAQo.bkgfp.cn
http://oAS09Jai.bkgfp.cn
http://5Xf4o6Vv.bkgfp.cn
http://4bjomLXV.bkgfp.cn
http://Mw9GqJHr.bkgfp.cn
http://www.dtcms.com/a/387398.html

相关文章:

  • 语音DDS系统核心组件详解与实现方案
  • 部署CephFS文件存储
  • 元宇宙与物流产业:数字孪生重构物流全链路运营
  • 通信算法之328:Vivado中FIFO的IP核
  • Android MediaCodec 编解码
  • Resolve JSON Reference for ASP.NET backend
  • 十一、vue3后台项目系列——封装请求,存储token,api统一化管理,封装token的处理工具
  • 一个OC的十年老项目刚接手编译报错:No Accounts: Add a new account in Accounts settings.
  • 苹果个人开发者如何实现应用下载安装
  • 【CSS】文档流
  • App 自动化:从环境搭建到问题排查,全方位提升测试效率
  • 微信小程序转uni-app
  • 深入理解线性回归与 Softmax 回归:从理论到实践
  • SSM-----Spring
  • ubuntu 24.04.02安装android-studio
  • WebRTC 定时任务Process Module
  • 【服务器挂掉了】A40和A800:“性能不足”和“系统崩溃”
  • EJS(Embedded JavaScript)(一个基于JavaScript的模板引擎,用于在HTML中嵌入动态内容)
  • 前端路由模式:Vue Router的hash模式和history模式详解
  • 信创电脑采购指南:选型要点与避坑攻略
  • 前端高级开发工程师面试准备一
  • window下Qt设置生成exe应用程序的图标
  • Linux(三) | Vim 编辑器的模式化架构与核心操作机制研究
  • Kubernetes 安全与资源管理:Secrets、资源配额与访问控制实战
  • Java基础知识总结(超详细)持续更新中~
  • 原生js过滤出对象数组中重复id的元素,并将其放置于一个同一个数组中
  • 《Python 对象创建的秘密:从 __new__ 到单例模式的实战演绎》
  • k8s 与 docker 的相同点和区别是什么?
  • Linux《线程(下)》
  • 第二部分:VTK核心类详解(第20章 vtkCamera相机类)