某Boss直聘数据获取
🚀 Boss直聘AI岗位数据爬取:从零到一的完整方案
📖 前言
在当今AI浪潮席卷全球的时代,了解各大公司的AI岗位分布情况对求职者和行业分析师来说至关重要。本文将详细介绍如何通过Python爬虫技术,自动化获取Boss直聘平台上各大公司的AI相关职位数据,并进行智能分析。
🎯 项目概述
核心功能
- 🔍 自动爬取指定公司在Boss直聘的职位数据
- 📊 智能识别AI相关岗位(人工智能、机器学习、深度学习等)
- 📈 计算各公司AI岗位占比
- 💾 数据持久化存储与分析报告生成
技术栈
- Python 3.x - 主要开发语言
- requests - HTTP请求库
- pandas - 数据处理与分析
- lxml - HTML解析
- fake_useragent - 反爬虫伪装
💻 核心代码实现
依赖库安装
pip install requests pandas lxml fake_useragent matplotlib seaborn scikit-learn celery aiohttp
主要代码结构与分布式架构
# -*- coding: utf-8 -*-
import requests, pandas as pd, time, random, os, re, json
import matplotlib.pyplot as plt
import seaborn as sns
from lxml import etree
from fake_useragent import UserAgent
from urllib.parse import quote
from celery import Celery
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import schedule
import asyncio
import aiohttp# AI相关关键词配置
KEY_WORDS = ["人工智能", "AI", "算法", "机器学习", "深度学习"]# 数据存储路径
DATA_DIR = "data"
company_csv = os.path.join(DATA_DIR, "company_list.csv")
output_csv = os.path.join(DATA_DIR, "ai_job_ratio_boss.csv")# 分布式任务队列配置
app = Celery('boss_crawler', broker='redis://localhost:6379')
核心爬取函数
def get_boss_count(company, keyword=None):"""从Boss直聘获取职位数量的核心函数"""# 随机User-Agent防反爬ua = UserAgent()headers = {"User-Agent": ua.random,"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8","Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8","Connection": "keep-alive","Referer": "https://www.zhipin.com/","Cache-Control": "no-cache"}# 构建搜索查询search_term = company if not keyword else f"{company} {keyword}"search_term_encoded = quote(search_term)url = f"https://www.zhipin.com/web/geek/search?query={search_term_encoded}"try:# 随机延迟避免频率限制time.sleep(random.uniform(2, 5))response = requests.get(url, headers=headers, timeout=15)if response.status_code == 200:html = etree.HTML(response.text)# 多种方式提取职位数量(增强鲁棒性)# 方法1: 搜索结果计数count_elements = html.xpath('//div[contains(@class,"search-job-result")]//span[@class="search-job-result-count"]/text()')# 方法2: 页面标题提取title = html.xpath('//title/text()')# 方法3: 正则匹配职位数all_text = html.xpath('//text()')for text in all_text:match = re.search(r'(\d+)\s*[个]?职位', text)if match:return int(match.group(1))# 方法4: 直接计算职位卡片数量job_items = html.xpath('//div[contains(@class,"job-card-wrapper")]')if job_items:return len(job_items)return 0else:print(f"请求失败,状态码: {response.status_code}")return 0except Exception as e:print(f"请求Boss直聘数据出错: {e}")return 0# 异步版本实现
async def get_boss_count_async(session, company, keyword=None):"""异步版本的职位数量获取函数,提升并发性能"""ua = UserAgent()headers = {"User-Agent": ua.random,"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8","Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8","Connection": "keep-alive","Referer": "https://www.zhipin.com/","Cache-Control": "no-cache"}search_term = company if not keyword else f"{company} {keyword}"search_term_encoded = quote(search_term)url = f"https://www.zhipin.com/web/geek/search?query={search_term_encoded}"try:await asyncio.sleep(random.uniform(1, 3)) # 异步延迟async with session.get(url, headers=headers, timeout=15) as response:if response.status == 200:text = await response.text()html = etree.HTML(text)# 使用相同的解析逻辑all_text = html.xpath('//text()')for text in all_text:match = re.search(r'(\d+)\s*[个]?职位', text)if match:return int(match.group(1))job_items = html.xpath('//div[contains(@class,"job-card-wrapper")]')return len(job_items) if job_items else 0else:print(f"异步请求失败,状态码: {response.status}")return 0except Exception as e:print(f"异步请求Boss直聘数据出错: {e}")return 0# 分布式任务实现
@app.task
def crawl_company_task(company_name):"""Celery异步任务:爬取单个公司数据"""try:result = get_job_ratio(company_name)return resultexcept Exception as e:print(f"分布式任务执行失败: {e}")return {"company": company_name, "total": -1, "ai_jobs": -1, "ai_ratio": -1}
AI岗位占比计算与数据可视化
def get_job_ratio(company):"""计算公司AI岗位占比的核心逻辑"""# 获取公司总职位数total = get_boss_count(company)print(f"{company} 总职位数: {total}")# 统计AI相关职位ai_jobs = 0for kw in KEY_WORDS:job_count = get_boss_count(company, kw)print(f"{company} + {kw} 职位数: {job_count}")ai_jobs += job_counttime.sleep(random.uniform(3, 7)) # 防止请求过于频繁# 计算占比ratio = ai_jobs / total if total > 0 else 0return {"company": company, "total": total, "ai_jobs": ai_jobs, "ai_ratio": ratio}async def get_job_ratio_async(companies):"""异步批量处理多个公司数据"""async with aiohttp.ClientSession() as session:tasks = []for company in companies:# 为每个公司创建异步任务task = asyncio.create_task(process_company_async(session, company))tasks.append(task)results = await asyncio.gather(*tasks, return_exceptions=True)return [r for r in results if not isinstance(r, Exception)]async def process_company_async(session, company):"""异步处理单个公司的完整数据"""total = await get_boss_count_async(session, company)ai_jobs = 0for kw in KEY_WORDS:job_count = await get_boss_count_async(session, company, kw)ai_jobs += job_countawait asyncio.sleep(random.uniform(1, 3))ratio = ai_jobs / total if total > 0 else 0return {"company": company, "total": total, "ai_jobs": ai_jobs, "ai_ratio": ratio}def create_visualization(df):"""生成AI岗位占比可视化图表"""# 设置中文字体plt.rcParams['font.sans-serif'] = ['SimHei']plt.rcParams['axes.unicode_minus'] = False# 创建子图fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))# 柱状图:AI岗位占比sns.barplot(data=df.head(10), x='ai_ratio', y='company', ax=ax1, palette='viridis')ax1.set_title('Top 10 公司AI岗位占比', fontsize=14, fontweight='bold')ax1.set_xlabel('AI岗位占比')# 散点图:总岗位数 vs AI岗位数scatter = ax2.scatter(df['total'], df['ai_jobs'], c=df['ai_ratio'], cmap='plasma', s=100, alpha=0.7, edgecolors='black')ax2.set_title('总岗位数 vs AI岗位数分布', fontsize=14, fontweight='bold')ax2.set_xlabel('总岗位数')ax2.set_ylabel('AI岗位数')# 添加颜色条plt.colorbar(scatter, ax=ax2, label='AI岗位占比')plt.tight_layout()plt.savefig('ai_ratio_analysis.png', dpi=300, bbox_inches='tight')plt.show()return figdef predict_ai_trend(historical_data):"""基于历史数据预测AI岗位趋势"""# 特征工程features = ['total', 'company_size_encoded', 'industry_encoded']# 假设我们有这些特征的编码数据if all(col in historical_data.columns for col in features):X = historical_data[features]y = historical_data['ai_ratio']# 数据分割X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# 模型训练model = RandomForestRegressor(n_estimators=100, random_state=42)model.fit(X_train, y_train)# 预测准确度score = model.score(X_test, y_test)print(f"模型预测准确度: {score:.3f}")# 特征重要性feature_importance = pd.DataFrame({'feature': features,'importance': model.feature_importances_}).sort_values('importance', ascending=False)print("特征重要性排序:")print(feature_importance)return modelelse:print("缺少必要的特征列,无法训练预测模型")return None
🔧 技术与性能
1. 反爬虫策略
- 随机User-Agent: 使用fake_useragent库模拟真实浏览器
- 智能延迟: 随机时间间隔避免被识别为机器人
- 请求头伪装: 完整的浏览器请求头模拟
- 分布式爬取: 使用Celery任务队列分散请求压力
2. 数据提取的鲁棒性
- 多重解析策略: 4种不同的数据提取方法
- 容错机制: 异常处理确保程序稳定运行
- 重试机制: 最多3次重试提高成功率
- 异步并发: aiohttp实现高效并发请求
3. 数据持久化与分析
- 中间结果保存: 每处理5家公司自动保存
- 时间戳命名: 避免数据覆盖
- UTF-8编码: 确保中文字符正确显示
- 可视化分析: matplotlib/seaborn生成专业图表
- 机器学习预测: 基于历史数据预测趋势
4. 实时监控系统
def job_monitor():"""定时监控任务"""print("开始执行AI岗位数据监控...")# 执行主爬取函数results = main()# 生成可视化报告if results:df = pd.DataFrame(results)create_visualization(df)# 发送报告(可集成钉钉/企业微信)send_report_notification(df)def send_report_notification(df):"""发送监控报告通知"""summary = {"total_companies": len(df),"avg_ai_ratio": df['ai_ratio'].mean(),"top_company": df.loc[df['ai_ratio'].idxmax(), 'company']}message = f"""📊 AI岗位监控日报✅ 成功分析公司: {summary['total_companies']}家📈 平均AI占比: {summary['avg_ai_ratio']:.2%}🏆 占比最高: {summary['top_company']}"""# 这里可以集成钉钉/企业微信APIprint(message)# 定时任务设置
schedule.every().day.at("09:00").do(job_monitor)
schedule.every().week.do(lambda: create_visualization(pd.read_csv(output_csv)))def run_scheduler():"""运行定时调度器"""while True:schedule.run_pending()time.sleep(60)
📊 数据分析功能
输出数据格式
company,total,ai_jobs,ai_ratio
百度,1250,180,0.144
阿里巴巴,2100,245,0.117
腾讯,1890,201,0.106
统计分析
- 成功获取数据的公司数量
- 平均AI岗位占比
- AI岗位占比最高的公司
🚨 重要注意事项
法律合规
- ⚠️ 仅供学习研究使用
- ⚠️ 禁止商业用途
- ⚠️ 遵守robots.txt协议
- ⚠️ 控制爬取频率
技术限制
- 网站结构变化可能影响数据提取
- 反爬虫机制可能导致IP被封
- 数据准确性依赖于页面结构稳定性
🚀 完整主程序实现
def main():"""主程序:整合所有功能模块"""# 重试机制MAX_RETRIES = 3results = []# 读取公司列表try:COMPANY_LIST = pd.read_csv(company_csv)["company_name"].tolist()except Exception as e:print(f"读取公司列表失败: {e}")COMPANY_LIST = ["百度", "阿里巴巴", "腾讯", "字节跳动", "华为"]# 随机打乱公司列表random_companies = COMPANY_LIST.copy()random.shuffle(random_companies)# 选择执行模式use_async = len(random_companies) > 10 # 超过10家公司使用异步模式if use_async:print("使用异步模式处理大量公司数据...")results = asyncio.run(get_job_ratio_async(random_companies))else:print("使用同步模式处理...")for company in random_companies:retries = 0success = Falsewhile retries < MAX_RETRIES and not success:try:time.sleep(random.uniform(3, 7))result = get_job_ratio(company)results.append(result)print(f"✅ {company}: 总职位 {result['total']}, AI职位 {result['ai_jobs']}, 占比 {result['ai_ratio']:.2%}")success = Trueexcept Exception as e:retries += 1print(f"❌ {company} 失败 ({retries}/{MAX_RETRIES}): {str(e)}")time.sleep(random.uniform(10, 15))if not success:results.append({"company": company, "total": -1, "ai_jobs": -1, "ai_ratio": -1})# 每处理5家公司保存中间结果if len(results) % 5 == 0:interim_df = pd.DataFrame(results)interim_csv = os.path.join(DATA_DIR, f"interim_{int(time.time())}.csv")interim_df.to_csv(interim_csv, index=False, encoding="utf_8_sig")# 保存最终结果df = pd.DataFrame(results)df.to_csv(output_csv, index=False, encoding="utf_8_sig")# 生成分析报告success_df = df[df['total'] >= 0]if not success_df.empty:print(f"\n📊 分析报告:")print(f"成功分析: {len(success_df)}/{len(COMPANY_LIST)} 家公司")print(f"平均AI占比: {success_df['ai_ratio'].mean():.2%}")# 生成可视化图表create_visualization(success_df)# 尝试预测模型if len(success_df) >= 10: # 数据足够时训练预测模型model = predict_ai_trend(success_df)if model:print("✅ AI趋势预测模型训练完成")return resultsif __name__ == "__main__":# 确保数据目录存在os.makedirs(DATA_DIR, exist_ok=True)# 运行主程序main()# 可选:启动定时监控# run_scheduler()
🔧 持久化
Redis缓存与数据库存储
# Redis缓存实现
import redis
import json
from datetime import timedeltaredis_client = redis.Redis(host='localhost', port=6379, db=0)def get_cached_result(company, keyword=None):"""从Redis获取缓存结果"""cache_key = f"boss:{company}:{keyword or 'total'}"cached = redis_client.get(cache_key)if cached:return json.loads(cached)return Nonedef set_cache_result(company, keyword, result, expire_hours=24):"""设置Redis缓存"""cache_key = f"boss:{company}:{keyword or 'total'}"redis_client.setex(cache_key, timedelta(hours=expire_hours), json.dumps(result))# 数据库存储实现
import sqlite3def init_database():"""初始化SQLite数据库"""conn = sqlite3.connect('boss_data.db')cursor = conn.cursor()cursor.execute('''CREATE TABLE IF NOT EXISTS job_analysis (id INTEGER PRIMARY KEY AUTOINCREMENT,company TEXT NOT NULL,total_jobs INTEGER,ai_jobs INTEGER,ai_ratio REAL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')conn.commit()conn.close()def save_to_database(results):"""保存结果到数据库"""conn = sqlite3.connect('boss_data.db')cursor = conn.cursor()for result in results:cursor.execute('''INSERT INTO job_analysis (company, total_jobs, ai_jobs, ai_ratio)VALUES (?, ?, ?, ?)''', (result['company'], result['total'], result['ai_jobs'], result['ai_ratio']))conn.commit()conn.close()
NLP智能关键词识别
import jieba
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeansdef extract_ai_keywords(job_descriptions):"""使用NLP技术提取AI相关关键词"""# 分词处理segmented_texts = [' '.join(jieba.cut(desc)) for desc in job_descriptions]# TF-IDF向量化vectorizer = TfidfVectorizer(max_features=1000, stop_words=['的', '和', '在', '有'])tfidf_matrix = vectorizer.fit_transform(segmented_texts)# 聚类分析kmeans = KMeans(n_clusters=5, random_state=42)clusters = kmeans.fit_predict(tfidf_matrix)# 提取关键词feature_names = vectorizer.get_feature_names_out()ai_keywords = []for cluster_id in range(5):cluster_center = kmeans.cluster_centers_[cluster_id]top_indices = cluster_center.argsort()[-10:][::-1]cluster_keywords = [feature_names[i] for i in top_indices]ai_keywords.extend(cluster_keywords)return list(set(ai_keywords))
本文介绍的Boss直聘AI岗位数据爬取方案,不仅提供了完整的技术实现,还包含了丰富的技术拓展内容。通过合理的反爬虫策略、鲁棒的数据提取机制和完善的错误处理,能够稳定地获取有价值的行业数据。
希望这个项目能够帮助大家更好地了解AI行业的就业趋势,同时也为Python爬虫技术的学习提供参考。记住,技术无罪,关键在于如何合理合法地使用这些技术为社会创造价值。