当前位置: 首页 > news >正文

某Boss直聘数据获取

🚀 Boss直聘AI岗位数据爬取:从零到一的完整方案

📖 前言

在当今AI浪潮席卷全球的时代,了解各大公司的AI岗位分布情况对求职者和行业分析师来说至关重要。本文将详细介绍如何通过Python爬虫技术,自动化获取Boss直聘平台上各大公司的AI相关职位数据,并进行智能分析。

🎯 项目概述

核心功能

  • 🔍 自动爬取指定公司在Boss直聘的职位数据
  • 📊 智能识别AI相关岗位(人工智能、机器学习、深度学习等)
  • 📈 计算各公司AI岗位占比
  • 💾 数据持久化存储与分析报告生成

技术栈

  • Python 3.x - 主要开发语言
  • requests - HTTP请求库
  • pandas - 数据处理与分析
  • lxml - HTML解析
  • fake_useragent - 反爬虫伪装

💻 核心代码实现

依赖库安装

pip install requests pandas lxml fake_useragent matplotlib seaborn scikit-learn celery aiohttp

主要代码结构与分布式架构

# -*- coding: utf-8 -*-
import requests, pandas as pd, time, random, os, re, json
import matplotlib.pyplot as plt
import seaborn as sns
from lxml import etree
from fake_useragent import UserAgent
from urllib.parse import quote
from celery import Celery
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import schedule
import asyncio
import aiohttp# AI相关关键词配置
KEY_WORDS = ["人工智能", "AI", "算法", "机器学习", "深度学习"]# 数据存储路径
DATA_DIR = "data"
company_csv = os.path.join(DATA_DIR, "company_list.csv")
output_csv = os.path.join(DATA_DIR, "ai_job_ratio_boss.csv")# 分布式任务队列配置
app = Celery('boss_crawler', broker='redis://localhost:6379')

核心爬取函数

def get_boss_count(company, keyword=None):"""从Boss直聘获取职位数量的核心函数"""# 随机User-Agent防反爬ua = UserAgent()headers = {"User-Agent": ua.random,"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8","Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8","Connection": "keep-alive","Referer": "https://www.zhipin.com/","Cache-Control": "no-cache"}# 构建搜索查询search_term = company if not keyword else f"{company} {keyword}"search_term_encoded = quote(search_term)url = f"https://www.zhipin.com/web/geek/search?query={search_term_encoded}"try:# 随机延迟避免频率限制time.sleep(random.uniform(2, 5))response = requests.get(url, headers=headers, timeout=15)if response.status_code == 200:html = etree.HTML(response.text)# 多种方式提取职位数量(增强鲁棒性)# 方法1: 搜索结果计数count_elements = html.xpath('//div[contains(@class,"search-job-result")]//span[@class="search-job-result-count"]/text()')# 方法2: 页面标题提取title = html.xpath('//title/text()')# 方法3: 正则匹配职位数all_text = html.xpath('//text()')for text in all_text:match = re.search(r'(\d+)\s*[个]?职位', text)if match:return int(match.group(1))# 方法4: 直接计算职位卡片数量job_items = html.xpath('//div[contains(@class,"job-card-wrapper")]')if job_items:return len(job_items)return 0else:print(f"请求失败,状态码: {response.status_code}")return 0except Exception as e:print(f"请求Boss直聘数据出错: {e}")return 0# 异步版本实现
async def get_boss_count_async(session, company, keyword=None):"""异步版本的职位数量获取函数,提升并发性能"""ua = UserAgent()headers = {"User-Agent": ua.random,"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8","Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8","Connection": "keep-alive","Referer": "https://www.zhipin.com/","Cache-Control": "no-cache"}search_term = company if not keyword else f"{company} {keyword}"search_term_encoded = quote(search_term)url = f"https://www.zhipin.com/web/geek/search?query={search_term_encoded}"try:await asyncio.sleep(random.uniform(1, 3))  # 异步延迟async with session.get(url, headers=headers, timeout=15) as response:if response.status == 200:text = await response.text()html = etree.HTML(text)# 使用相同的解析逻辑all_text = html.xpath('//text()')for text in all_text:match = re.search(r'(\d+)\s*[个]?职位', text)if match:return int(match.group(1))job_items = html.xpath('//div[contains(@class,"job-card-wrapper")]')return len(job_items) if job_items else 0else:print(f"异步请求失败,状态码: {response.status}")return 0except Exception as e:print(f"异步请求Boss直聘数据出错: {e}")return 0# 分布式任务实现
@app.task
def crawl_company_task(company_name):"""Celery异步任务:爬取单个公司数据"""try:result = get_job_ratio(company_name)return resultexcept Exception as e:print(f"分布式任务执行失败: {e}")return {"company": company_name, "total": -1, "ai_jobs": -1, "ai_ratio": -1}

AI岗位占比计算与数据可视化

def get_job_ratio(company):"""计算公司AI岗位占比的核心逻辑"""# 获取公司总职位数total = get_boss_count(company)print(f"{company} 总职位数: {total}")# 统计AI相关职位ai_jobs = 0for kw in KEY_WORDS:job_count = get_boss_count(company, kw)print(f"{company} + {kw} 职位数: {job_count}")ai_jobs += job_counttime.sleep(random.uniform(3, 7))  # 防止请求过于频繁# 计算占比ratio = ai_jobs / total if total > 0 else 0return {"company": company, "total": total, "ai_jobs": ai_jobs, "ai_ratio": ratio}async def get_job_ratio_async(companies):"""异步批量处理多个公司数据"""async with aiohttp.ClientSession() as session:tasks = []for company in companies:# 为每个公司创建异步任务task = asyncio.create_task(process_company_async(session, company))tasks.append(task)results = await asyncio.gather(*tasks, return_exceptions=True)return [r for r in results if not isinstance(r, Exception)]async def process_company_async(session, company):"""异步处理单个公司的完整数据"""total = await get_boss_count_async(session, company)ai_jobs = 0for kw in KEY_WORDS:job_count = await get_boss_count_async(session, company, kw)ai_jobs += job_countawait asyncio.sleep(random.uniform(1, 3))ratio = ai_jobs / total if total > 0 else 0return {"company": company, "total": total, "ai_jobs": ai_jobs, "ai_ratio": ratio}def create_visualization(df):"""生成AI岗位占比可视化图表"""# 设置中文字体plt.rcParams['font.sans-serif'] = ['SimHei']plt.rcParams['axes.unicode_minus'] = False# 创建子图fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))# 柱状图:AI岗位占比sns.barplot(data=df.head(10), x='ai_ratio', y='company', ax=ax1, palette='viridis')ax1.set_title('Top 10 公司AI岗位占比', fontsize=14, fontweight='bold')ax1.set_xlabel('AI岗位占比')# 散点图:总岗位数 vs AI岗位数scatter = ax2.scatter(df['total'], df['ai_jobs'], c=df['ai_ratio'], cmap='plasma', s=100, alpha=0.7, edgecolors='black')ax2.set_title('总岗位数 vs AI岗位数分布', fontsize=14, fontweight='bold')ax2.set_xlabel('总岗位数')ax2.set_ylabel('AI岗位数')# 添加颜色条plt.colorbar(scatter, ax=ax2, label='AI岗位占比')plt.tight_layout()plt.savefig('ai_ratio_analysis.png', dpi=300, bbox_inches='tight')plt.show()return figdef predict_ai_trend(historical_data):"""基于历史数据预测AI岗位趋势"""# 特征工程features = ['total', 'company_size_encoded', 'industry_encoded']# 假设我们有这些特征的编码数据if all(col in historical_data.columns for col in features):X = historical_data[features]y = historical_data['ai_ratio']# 数据分割X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# 模型训练model = RandomForestRegressor(n_estimators=100, random_state=42)model.fit(X_train, y_train)# 预测准确度score = model.score(X_test, y_test)print(f"模型预测准确度: {score:.3f}")# 特征重要性feature_importance = pd.DataFrame({'feature': features,'importance': model.feature_importances_}).sort_values('importance', ascending=False)print("特征重要性排序:")print(feature_importance)return modelelse:print("缺少必要的特征列,无法训练预测模型")return None

🔧 技术与性能

1. 反爬虫策略

  • 随机User-Agent: 使用fake_useragent库模拟真实浏览器
  • 智能延迟: 随机时间间隔避免被识别为机器人
  • 请求头伪装: 完整的浏览器请求头模拟
  • 分布式爬取: 使用Celery任务队列分散请求压力

2. 数据提取的鲁棒性

  • 多重解析策略: 4种不同的数据提取方法
  • 容错机制: 异常处理确保程序稳定运行
  • 重试机制: 最多3次重试提高成功率
  • 异步并发: aiohttp实现高效并发请求

3. 数据持久化与分析

  • 中间结果保存: 每处理5家公司自动保存
  • 时间戳命名: 避免数据覆盖
  • UTF-8编码: 确保中文字符正确显示
  • 可视化分析: matplotlib/seaborn生成专业图表
  • 机器学习预测: 基于历史数据预测趋势

4. 实时监控系统

def job_monitor():"""定时监控任务"""print("开始执行AI岗位数据监控...")# 执行主爬取函数results = main()# 生成可视化报告if results:df = pd.DataFrame(results)create_visualization(df)# 发送报告(可集成钉钉/企业微信)send_report_notification(df)def send_report_notification(df):"""发送监控报告通知"""summary = {"total_companies": len(df),"avg_ai_ratio": df['ai_ratio'].mean(),"top_company": df.loc[df['ai_ratio'].idxmax(), 'company']}message = f"""📊 AI岗位监控日报✅ 成功分析公司: {summary['total_companies']}家📈 平均AI占比: {summary['avg_ai_ratio']:.2%}🏆 占比最高: {summary['top_company']}"""# 这里可以集成钉钉/企业微信APIprint(message)# 定时任务设置
schedule.every().day.at("09:00").do(job_monitor)
schedule.every().week.do(lambda: create_visualization(pd.read_csv(output_csv)))def run_scheduler():"""运行定时调度器"""while True:schedule.run_pending()time.sleep(60)

📊 数据分析功能

输出数据格式

company,total,ai_jobs,ai_ratio
百度,1250,180,0.144
阿里巴巴,2100,245,0.117
腾讯,1890,201,0.106

统计分析

  • 成功获取数据的公司数量
  • 平均AI岗位占比
  • AI岗位占比最高的公司

🚨 重要注意事项

法律合规

  • ⚠️ 仅供学习研究使用
  • ⚠️ 禁止商业用途
  • ⚠️ 遵守robots.txt协议
  • ⚠️ 控制爬取频率

技术限制

  • 网站结构变化可能影响数据提取
  • 反爬虫机制可能导致IP被封
  • 数据准确性依赖于页面结构稳定性

🚀 完整主程序实现

def main():"""主程序:整合所有功能模块"""# 重试机制MAX_RETRIES = 3results = []# 读取公司列表try:COMPANY_LIST = pd.read_csv(company_csv)["company_name"].tolist()except Exception as e:print(f"读取公司列表失败: {e}")COMPANY_LIST = ["百度", "阿里巴巴", "腾讯", "字节跳动", "华为"]# 随机打乱公司列表random_companies = COMPANY_LIST.copy()random.shuffle(random_companies)# 选择执行模式use_async = len(random_companies) > 10  # 超过10家公司使用异步模式if use_async:print("使用异步模式处理大量公司数据...")results = asyncio.run(get_job_ratio_async(random_companies))else:print("使用同步模式处理...")for company in random_companies:retries = 0success = Falsewhile retries < MAX_RETRIES and not success:try:time.sleep(random.uniform(3, 7))result = get_job_ratio(company)results.append(result)print(f"✅ {company}: 总职位 {result['total']}, AI职位 {result['ai_jobs']}, 占比 {result['ai_ratio']:.2%}")success = Trueexcept Exception as e:retries += 1print(f"❌ {company} 失败 ({retries}/{MAX_RETRIES}): {str(e)}")time.sleep(random.uniform(10, 15))if not success:results.append({"company": company, "total": -1, "ai_jobs": -1, "ai_ratio": -1})# 每处理5家公司保存中间结果if len(results) % 5 == 0:interim_df = pd.DataFrame(results)interim_csv = os.path.join(DATA_DIR, f"interim_{int(time.time())}.csv")interim_df.to_csv(interim_csv, index=False, encoding="utf_8_sig")# 保存最终结果df = pd.DataFrame(results)df.to_csv(output_csv, index=False, encoding="utf_8_sig")# 生成分析报告success_df = df[df['total'] >= 0]if not success_df.empty:print(f"\n📊 分析报告:")print(f"成功分析: {len(success_df)}/{len(COMPANY_LIST)} 家公司")print(f"平均AI占比: {success_df['ai_ratio'].mean():.2%}")# 生成可视化图表create_visualization(success_df)# 尝试预测模型if len(success_df) >= 10:  # 数据足够时训练预测模型model = predict_ai_trend(success_df)if model:print("✅ AI趋势预测模型训练完成")return resultsif __name__ == "__main__":# 确保数据目录存在os.makedirs(DATA_DIR, exist_ok=True)# 运行主程序main()# 可选:启动定时监控# run_scheduler()

🔧 持久化

Redis缓存与数据库存储

# Redis缓存实现
import redis
import json
from datetime import timedeltaredis_client = redis.Redis(host='localhost', port=6379, db=0)def get_cached_result(company, keyword=None):"""从Redis获取缓存结果"""cache_key = f"boss:{company}:{keyword or 'total'}"cached = redis_client.get(cache_key)if cached:return json.loads(cached)return Nonedef set_cache_result(company, keyword, result, expire_hours=24):"""设置Redis缓存"""cache_key = f"boss:{company}:{keyword or 'total'}"redis_client.setex(cache_key, timedelta(hours=expire_hours), json.dumps(result))# 数据库存储实现
import sqlite3def init_database():"""初始化SQLite数据库"""conn = sqlite3.connect('boss_data.db')cursor = conn.cursor()cursor.execute('''CREATE TABLE IF NOT EXISTS job_analysis (id INTEGER PRIMARY KEY AUTOINCREMENT,company TEXT NOT NULL,total_jobs INTEGER,ai_jobs INTEGER,ai_ratio REAL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')conn.commit()conn.close()def save_to_database(results):"""保存结果到数据库"""conn = sqlite3.connect('boss_data.db')cursor = conn.cursor()for result in results:cursor.execute('''INSERT INTO job_analysis (company, total_jobs, ai_jobs, ai_ratio)VALUES (?, ?, ?, ?)''', (result['company'], result['total'], result['ai_jobs'], result['ai_ratio']))conn.commit()conn.close()

NLP智能关键词识别

import jieba
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeansdef extract_ai_keywords(job_descriptions):"""使用NLP技术提取AI相关关键词"""# 分词处理segmented_texts = [' '.join(jieba.cut(desc)) for desc in job_descriptions]# TF-IDF向量化vectorizer = TfidfVectorizer(max_features=1000, stop_words=['的', '和', '在', '有'])tfidf_matrix = vectorizer.fit_transform(segmented_texts)# 聚类分析kmeans = KMeans(n_clusters=5, random_state=42)clusters = kmeans.fit_predict(tfidf_matrix)# 提取关键词feature_names = vectorizer.get_feature_names_out()ai_keywords = []for cluster_id in range(5):cluster_center = kmeans.cluster_centers_[cluster_id]top_indices = cluster_center.argsort()[-10:][::-1]cluster_keywords = [feature_names[i] for i in top_indices]ai_keywords.extend(cluster_keywords)return list(set(ai_keywords))

本文介绍的Boss直聘AI岗位数据爬取方案,不仅提供了完整的技术实现,还包含了丰富的技术拓展内容。通过合理的反爬虫策略、鲁棒的数据提取机制和完善的错误处理,能够稳定地获取有价值的行业数据。

希望这个项目能够帮助大家更好地了解AI行业的就业趋势,同时也为Python爬虫技术的学习提供参考。记住,技术无罪,关键在于如何合理合法地使用这些技术为社会创造价值。


http://www.dtcms.com/a/499300.html

相关文章:

  • Spring Boot 3零基础教程,WEB 开发 默认欢迎页 笔记28
  • Redis极简入门 整合springboot
  • 漫蛙漫画官网入口 - 免费漫画在线看|防走失页入口
  • MySQL中的约束详解
  • 服务流程企业网站东莞市建设安监监督网站
  • leetcode 206. 反转链表 python
  • 【C语言】自定义类型(附源码与图片分析)
  • 用户头像文件存储功能是如何实现的?
  • 网站设计大概在什么价位渠道销售
  • C++竞赛递推算法-斐波那契数列常见题型与例题详解
  • 单元测试-例子
  • 网站顶部素材山西制作网站
  • PHP 高效 JSON 库 JsonMachine
  • 网站建设内部因素百度站长平台有哪些功能
  • Linux内核IPoIB驱动深度解析:在InfiniBand上跑IP网络的高性能之道
  • 275TOPS算力边缘计算盒子的价值洞察与市场定位---视程空间
  • 对话 MoonBit 张宏波:为 AI 重构编程语言
  • QGIS制图专题4:缓冲区分析与服务半径专题图制作
  • IP 资源会枯竭吗?IPv6 能解决代理市场的矛盾吗?
  • 物联网运维中的边缘计算任务调度优化策略
  • TensorFlow2 Python深度学习 - 循环神经网络(LSTM)示例
  • C++第二十三课:猜数字游戏等练习
  • 河南省建设厅网站中州杯企业网站推广怎么做
  • 【数论】最大公因数 (gcd) 与最小公倍数 (lcm)
  • rocky linux MariaDB安装过程
  • git的 Rebase
  • 第8篇 QT联合halcon12在vs2019搭建环境开发图像处理
  • 【小白笔记】最大交换 (Maximum Swap)问题
  • CentOS安装Node.js
  • 深入解析MCP:从基础配置到高级应用指南