伦敦招聘数据管道系统设计与实现
伦敦招聘数据管道系统设计与实现
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家,觉得好请收藏。点击跳转到网站。
1. 项目概述
本项目旨在构建一个自动化数据管道,通过Adzuna API定期获取伦敦地区的招聘数据(如"数据分析师"和"软件工程师"职位),进行必要的数据清洗和转换,并将处理后的数据存储到Snowflake数据仓库中。系统将按周定期运行,确保数据的时效性和连续性。
2. 系统架构设计
2.1 整体架构
系统采用模块化设计,主要包含以下几个组件:
- API数据获取模块:负责与Adzuna API交互,获取原始招聘数据
- 数据处理模块:对原始数据进行清洗、转换和规范化
- 数据存储模块:将处理后的数据加载到Snowflake数据仓库
- 调度模块:控制整个管道的定时执行
- 监控与日志模块:记录系统运行状态和异常情况
+-------------------+ +-------------------+ +-------------------+
| Adzuna API | | Data Processing | | Snowflake |
| (数据源) |---->| (数据处理) |---->| (数据仓库) |
+-------------------+ +-------------------+ +-------------------+^|+-------------------+| Scheduler || (调度器) |+-------------------+
2.2 技术栈选择
- 编程语言:Python 3.8+
- API请求库:requests
- 数据处理:pandas
- 数据库连接:snowflake-connector-python + SQLAlchemy
- 调度工具:可根据环境选择Airflow/Luigi或简单的cron调度
- 配置管理:dotenv/python-dotenv
2.3 数据流设计
- 从Adzuna API获取原始JSON格式的招聘数据
- 将JSON数据转换为pandas DataFrame
- 执行数据清洗和转换操作
- 将处理后的DataFrame写入Snowflake
- 记录执行日志和监控指标
3. 详细实现
3.1 环境准备与配置
首先安装必要的Python库:
pip install requests pandas sqlalchemy snowflake-connector-python python-dotenv
创建配置文件.env
:
ADZUNA_APP_ID=your_app_id
ADZUNA_APP_KEY=your_app_key
SNOWFLAKE_USER=your_username
SNOWFLAKE_PASSWORD=your_password
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_WAREHOUSE=your_warehouse
SNOWFLAKE_DATABASE=your_database
SNOWFLAKE_SCHEMA=your_schema
3.2 Adzuna API数据获取模块
创建adzuna_api.py
:
import os
import requests
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenvload_dotenv()class AdzunaAPI:def __init__(self):self.app_id = os.getenv('ADZUNA_APP_ID')self.app_key = os.getenv('ADZUNA_APP_KEY')self.base_url = "https://api.adzuna.com/v1/api/jobs"self.country = "gb" # 英国self.location = "london"def _build_url(self, endpoint, params=None):url = f"{self.base_url}/{self.country}/{endpoint}"params = params or {}params.update({'app_id': self.app_id,'app_key': self.app_key})return url, paramsdef search_jobs(self, what, days_ago=7, results_per_page=50, max_pages=10):"""搜索职位数据"""all_jobs = []for page in range(1, max_pages + 1):url, params = self._build_url("search",{'what': what,'where': self.location,'results_per_page': results_per_page,'page': page,'date_posted': f"since-{days_ago}d"})response = requests.get(url, params=params)if response.status_code != 200:print(f"Error fetching page {page}: {response.status_code}")breakdata = response.json()jobs = data.get('results', [])if not jobs:breakall_jobs.extend(jobs)print(f"Fetched page {page} with {len(jobs)} jobs")return pd.DataFrame(all_jobs)def get_job_categories(self):"""获取职位分类信息"""url, params = self._build_url("categories")response = requests.get(url, params=params)if response.status_code == 200:return response.json().get('results', [])return []
3.3 数据处理模块
创建data_processor.py
:
import pandas as pd
import numpy as np
from datetime import datetimeclass DataProcessor:def __init__(self):self.required_columns = ['id', 'title', 'location', 'company', 'salary_min', 'salary_max', 'category', 'created', 'description']def clean_data(self, df):"""执行数据清洗和转换"""# 选择需要的列df = self._select_columns(df)# 处理日期时间df = self._process_datetime(df)# 处理薪资数据df = self._process_salary(df)# 处理分类数据df = self._process_category(df)# 处理位置数据df = self._process_location(df)# 处理公司数据df = self._process_company(df)# 处理描述数据df = self._process_description(df)return dfdef _select_columns(self, df):"""选择需要的列并重命名"""column_mapping = {'id': 'job_id','title': 'title','location': 'location','company': 'company','salary_min': 'salary_min','salary_max': 'salary_max','category': 'category','created': 'created','description': 'description'}# 确保所有需要的列都存在for col in self.required_columns:if col not in df.columns:df[col] = np.nanreturn df[list(column_mapping.keys())].rename(columns=column_mapping)def _process_datetime(self, df):"""处理日期时间字段"""if 'created' in df.columns:df['created'] = pd.to_datetime(df['created'])df['created_date'] = df['created'].dt.datedf['created_year'] = df['created'].dt.yeardf['created_month'] = df['created'].dt.monthdf['created_week'] = df['created'].dt.isocalendar().weekreturn dfdef _process_salary(self, df):"""处理薪资数据"""# 确保薪资为数值类型for col in ['salary_min', 'salary_max']:if col in df.columns:df[col] = pd.to_numeric(df[col], errors='coerce')# 计算平均薪资df['salary_avg'] = df[['salary_min', 'salary_max']].mean(axis=1)return dfdef _process_category(self, df):"""处理分类数据"""if 'category' in df.columns:# 如果category是字典,提取labelif isinstance(df['category'].iloc[0], dict):df['category'] = df['category'].apply(lambda x: x.get('label', ''))df['category'] = df['category'].str.lower().str.strip()return dfdef _process_location(self, df):"""处理位置数据"""if 'location' in df.columns:# 如果location是字典,提取display_nameif isinstance(df['location'].iloc[0], dict):df['location'] = df['location'].apply(lambda x: x.get('display_name', ''))df['location'] = df['location'].str.lower().str.strip()return dfdef _process_company(self, df):"""处理公司数据"""if 'company' in df.columns:# 如果company是字典,提取display_nameif isinstance(df['company'].iloc[0], dict):df['company'] = df['company'].apply(lambda x: x.get('display_name', ''))df['company'] = df['company'].str.lower().str.strip()return dfdef _process_description(self, df):"""处理描述数据"""if 'description' in df.columns:# 清理HTML标签和多余空格df['description'] = df['description'].str.replace('<[^<]+?>', '', regex=True)df['description'] = df['description'].str.replace('\s+', ' ', regex=True).str.strip()# 计算描述长度df['description_length'] = df['description'].str.len()return df
3.4 Snowflake数据存储模块
创建snowflake_loader.py
:
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
import pandas as pd
from dotenv import load_dotenvload_dotenv()class SnowflakeLoader:def __init__(self):self.user = os.getenv('SNOWFLAKE_USER')self.password = os.getenv('SNOWFLAKE_PASSWORD')self.account = os.getenv('SNOWFLAKE_ACCOUNT')self.warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')self.database = os.getenv('SNOWFLAKE_DATABASE')self.schema = os.getenv('SNOWFLAKE_SCHEMA')self.engine = self._create_engine()self.session = self._create_session()def _create_engine(self):"""创建SQLAlchemy引擎"""conn_string = (f"snowflake://{self.user}:{self.password}@{self.account}/"f"{self.database}/{self.schema}?warehouse={self.warehouse}")return create_engine(conn_string)def _create_session(self):"""创建数据库会话"""Session = sessionmaker(bind=self.engine)return Session()def create_schema_and_table(self):"""创建schema和表结构"""create_schema_sql = f"""CREATE SCHEMA IF NOT EXISTS {self.schema};"""create_table_sql = f"""CREATE TABLE IF NOT EXISTS {self.schema}.jobs_london (job_id VARCHAR(255) PRIMARY KEY,title VARCHAR(255),location VARCHAR(255),company VARCHAR(255),salary_min FLOAT,salary_max FLOAT,salary_avg FLOAT,category VARCHAR(255),created TIMESTAMP_NTZ,created_date DATE,created_year NUMBER,created_month NUMBER,created_week NUMBER,description TEXT,description_length NUMBER,load_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());"""with self.engine.connect() as conn:conn.execute(text(create_schema_sql))conn.execute(text(create_table_sql))conn.commit()def load_data(self, df):"""将数据加载到Snowflake"""# 添加加载时间戳df['load_timestamp'] = pd.Timestamp.now()# 写入数据df.to_sql('jobs_london',self.engine,schema=self.schema,if_exists='append',index=False,chunksize=1000,method='multi')def close(self):"""关闭连接"""self.session.close()self.engine.dispose()
3.5 主程序与调度
创建main.py
:
import logging
from datetime import datetime, timedelta
from adzuna_api import AdzunaAPI
from data_processor import DataProcessor
from snowflake_loader import SnowflakeLoader# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler('job_pipeline.log'),logging.StreamHandler()]
)
logger = logging.getLogger(__name__)def main():# 初始化各组件adzuna = AdzunaAPI()processor = DataProcessor()snowflake = SnowflakeLoader()try:logger.info("Starting job data pipeline")# 创建表结构snowflake.create_schema_and_table()# 定义要搜索的职位关键词job_keywords = ['data analyst', 'software engineer', 'data scientist']# 获取过去7天的数据days_ago = 7for keyword in job_keywords:logger.info(f"Fetching data for: {keyword}")# 从API获取数据raw_df = adzuna.search_jobs(keyword, days_ago=days_ago)if raw_df.empty:logger.warning(f"No data found for keyword: {keyword}")continue# 处理数据clean_df = processor.clean_data(raw_df)# 加载到Snowflakesnowflake.load_data(clean_df)logger.info(f"Successfully loaded {len(clean_df)} records for {keyword}")except Exception as e:logger.error(f"Error in job pipeline: {str(e)}", exc_info=True)raisefinally:snowflake.close()logger.info("Pipeline execution completed")if __name__ == "__main__":main()
3.6 调度设置
可以使用cron(Linux/macOS)或任务计划程序(Windows)设置每周运行:
Linux/macOS crontab示例:
0 0 * * 1 python /path/to/main.py >> /path/to/pipeline.log 2>&1
或者使用Python调度库如schedule:
import schedule
import time
from main import main# 每周一早上运行
schedule.every().monday.at("09:00").do(main)while True:schedule.run_pending()time.sleep(60)
4. Snowflake表结构与数据模型
4.1 Schema设计
在Snowflake中创建专门的schema来存储招聘数据:
CREATE SCHEMA IF NOT EXISTS jobs_data;
4.2 表结构设计
jobs_london
表结构设计如下:
CREATE OR REPLACE TABLE jobs_data.jobs_london (job_id VARCHAR(255) PRIMARY KEY, -- 职位唯一IDtitle VARCHAR(255), -- 职位标题location VARCHAR(255), -- 工作地点company VARCHAR(255), -- 公司名称salary_min FLOAT, -- 最低薪资salary_max FLOAT, -- 最高薪资salary_avg FLOAT, -- 平均薪资(计算字段)category VARCHAR(255), -- 职位分类created TIMESTAMP_NTZ, -- 职位创建时间created_date DATE, -- 职位创建日期(派生字段)created_year NUMBER, -- 创建年份(派生字段)created_month NUMBER, -- 创建月份(派生字段)created_week NUMBER, -- 创建周数(派生字段)description TEXT, -- 职位描述description_length NUMBER, -- 描述长度(字符数)load_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP() -- 数据加载时间
);-- 添加索引以提高查询性能
CREATE INDEX IF NOT EXISTS idx_jobs_london_company ON jobs_data.jobs_london(company);
CREATE INDEX IF NOT EXISTS idx_jobs_london_category ON jobs_data.jobs_london(category);
CREATE INDEX IF NOT EXISTS idx_jobs_london_created ON jobs_data.jobs_london(created);
CREATE INDEX IF NOT EXISTS idx_jobs_london_salary_avg ON jobs_data.jobs_london(salary_avg);
4.3 数据样例
从Snowflake表中查询的样例数据:
job_id | title | location | company | salary_min | salary_max | salary_avg | category | created | description | … |
---|---|---|---|---|---|---|---|---|---|---|
12345 | Senior Data Analyst | london | tech corp | 60000 | 80000 | 70000 | it jobs | 2023-05-01 09:00:00 | We are looking for… | … |
12346 | Software Engineer | london | data systems | 70000 | 90000 | 80000 | it jobs | 2023-05-02 10:30:00 | Join our team of… | … |
5. 数据质量与监控
5.1 数据质量检查
在数据加载前后实施数据质量检查:
def run_data_quality_checks(df):"""执行数据质量检查"""# 检查缺失值missing_values = df.isnull().sum()logger.info(f"Missing values:\n{missing_values}")# 检查重复的job_idduplicate_ids = df['job_id'].duplicated().sum()if duplicate_ids > 0:logger.warning(f"Found {duplicate_ids} duplicate job IDs")# 检查薪资合理性salary_issues = df[df['salary_min'] > df['salary_max']]if not salary_issues.empty:logger.warning(f"Found {len(salary_issues)} records with min salary > max salary")# 检查日期合理性future_dates = df[df['created'] > pd.Timestamp.now()]if not future_dates.empty:logger.warning(f"Found {len(future_dates)} records with future creation dates")
5.2 监控指标
跟踪关键指标以监控系统健康状态:
-
API调用指标:
- 成功/失败的API调用次数
- 每个关键词获取的记录数
- API响应时间
-
数据处理指标:
- 原始记录数
- 处理后记录数
- 数据清洗转换的异常数
-
数据库指标:
- 成功加载的记录数
- 加载时间
- 表大小增长
6. 系统局限性分析
6.1 当前系统的局限性
-
API限制:
- Adzuna API可能有请求频率限制
- 免费账户可能只能获取部分数据
- 历史数据获取可能受限
-
数据覆盖范围:
- 仅覆盖Adzuna平台的数据,不包含其他招聘网站
- 伦敦地区以外的数据未包含
- 某些行业或小众职位可能覆盖不足
-
数据处理:
- 薪资数据可能因货币和薪资周期不同而难以比较
- 职位描述的非结构化数据利用不足
- 缺乏高级的自然语言处理能力
-
系统架构:
- 简单的调度机制缺乏容错能力
- 没有实现增量更新机制
- 缺乏完善的数据版本控制
-
存储与分析:
- 未实现数据分区,随着数据量增长可能影响查询性能
- 缺乏预聚合的指标和物化视图
- 未设置数据保留策略
6.2 未来优化方向
-
数据获取增强:
- 集成多个招聘数据源(如Indeed、LinkedIn等)
- 实现更智能的增量数据获取机制
- 添加API调用失败的重试机制
-
数据处理改进:
- 实现更精细的薪资标准化(处理不同货币和薪资周期)
- 添加自然语言处理分析职位描述
- 实现公司名称标准化(处理同一公司的不同名称变体)
-
系统架构升级:
- 采用Airflow等专业调度工具
- 实现管道各阶段的监控和告警
- 添加数据版本控制和历史跟踪
-
存储与分析优化:
- 在Snowflake中实现数据分区(按时间、职位类别等)
- 创建物化视图加速常见查询
- 实现自动化的数据质量报告
-
扩展功能:
- 添加数据API层供其他系统消费
- 开发可视化仪表板展示趋势和分析结果
- 实现异常检测(如薪资异常波动、招聘数量变化等)
7. 部署与维护
7.1 部署指南
-
环境准备:
- Python 3.8+环境
- Snowflake账户配置
- Adzuna API密钥申请
-
配置步骤:
- 克隆代码仓库
- 安装依赖包:
pip install -r requirements.txt
- 创建并配置
.env
文件 - 测试Snowflake连接
-
初始化数据库:
- 运行
snowflake_loader.py
中的create_schema_and_table
方法 - 验证表结构是否正确创建
- 运行
-
首次运行:
- 执行
main.py
获取历史数据 - 验证数据是否成功加载到Snowflake
- 执行
-
设置调度:
- 配置cron作业或使用调度工具
- 设置适当的运行频率(如每周一早上)
7.2 维护计划
-
日常监控:
- 检查日志文件中的错误和警告
- 监控API调用配额使用情况
- 跟踪数据加载记录数变化
-
定期维护:
- 每月检查Snowflake存储使用情况
- 每季度审查API密钥和访问权限
- 定期更新依赖库版本
-
数据维护:
- 实施数据保留策略(如保留2年数据)
- 定期检查和修复数据质量问题
- 根据需要调整表结构和索引
8. 总结与展望
本系统实现了一个完整的招聘数据管道,从Adzuna API获取伦敦地区的招聘数据,经过清洗和转换后存储到Snowflake数据仓库。系统采用模块化设计,便于维护和扩展,并考虑了基本的数据质量监控。
未来,系统可以从以下几个方面进一步发展:
- 多数据源集成:整合更多招聘平台的数据,提供更全面的市场视角
- 高级分析功能:添加机器学习模型进行薪资预测、职位分类等
- 实时数据处理:从批处理转向近实时处理,提供更及时的市场洞察
- 自动化报告:生成定期市场趋势报告,自动发送给相关利益方
- 异常检测:识别招聘市场中的异常模式,如突然增加的某类职位或薪资变化
通过持续迭代和优化,该系统可以发展成为功能完善的招聘市场分析平台,为人力资源决策、职业规划等提供数据支持。
附录A:Snowflake账号设置链接
Snowflake账号设置指南
附录B:完整代码仓库
GitHub仓库链接 (示例链接,实际使用时替换为真实仓库)