当前位置: 首页 > news >正文

伦敦招聘数据管道系统设计与实现

伦敦招聘数据管道系统设计与实现

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家,觉得好请收藏。点击跳转到网站。

1. 项目概述

本项目旨在构建一个自动化数据管道,通过Adzuna API定期获取伦敦地区的招聘数据(如"数据分析师"和"软件工程师"职位),进行必要的数据清洗和转换,并将处理后的数据存储到Snowflake数据仓库中。系统将按周定期运行,确保数据的时效性和连续性。

2. 系统架构设计

2.1 整体架构

系统采用模块化设计,主要包含以下几个组件:

  1. API数据获取模块:负责与Adzuna API交互,获取原始招聘数据
  2. 数据处理模块:对原始数据进行清洗、转换和规范化
  3. 数据存储模块:将处理后的数据加载到Snowflake数据仓库
  4. 调度模块:控制整个管道的定时执行
  5. 监控与日志模块:记录系统运行状态和异常情况
+-------------------+     +-------------------+     +-------------------+
|   Adzuna API      |     |   Data Processing |     |   Snowflake       |
|   (数据源)        |---->|   (数据处理)      |---->|   (数据仓库)      |
+-------------------+     +-------------------+     +-------------------+^|+-------------------+|   Scheduler       ||   (调度器)        |+-------------------+

2.2 技术栈选择

  • 编程语言:Python 3.8+
  • API请求库:requests
  • 数据处理:pandas
  • 数据库连接:snowflake-connector-python + SQLAlchemy
  • 调度工具:可根据环境选择Airflow/Luigi或简单的cron调度
  • 配置管理:dotenv/python-dotenv

2.3 数据流设计

  1. 从Adzuna API获取原始JSON格式的招聘数据
  2. 将JSON数据转换为pandas DataFrame
  3. 执行数据清洗和转换操作
  4. 将处理后的DataFrame写入Snowflake
  5. 记录执行日志和监控指标

3. 详细实现

3.1 环境准备与配置

首先安装必要的Python库:

pip install requests pandas sqlalchemy snowflake-connector-python python-dotenv

创建配置文件.env

ADZUNA_APP_ID=your_app_id
ADZUNA_APP_KEY=your_app_key
SNOWFLAKE_USER=your_username
SNOWFLAKE_PASSWORD=your_password
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_WAREHOUSE=your_warehouse
SNOWFLAKE_DATABASE=your_database
SNOWFLAKE_SCHEMA=your_schema

3.2 Adzuna API数据获取模块

创建adzuna_api.py

import os
import requests
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenvload_dotenv()class AdzunaAPI:def __init__(self):self.app_id = os.getenv('ADZUNA_APP_ID')self.app_key = os.getenv('ADZUNA_APP_KEY')self.base_url = "https://api.adzuna.com/v1/api/jobs"self.country = "gb"  # 英国self.location = "london"def _build_url(self, endpoint, params=None):url = f"{self.base_url}/{self.country}/{endpoint}"params = params or {}params.update({'app_id': self.app_id,'app_key': self.app_key})return url, paramsdef search_jobs(self, what, days_ago=7, results_per_page=50, max_pages=10):"""搜索职位数据"""all_jobs = []for page in range(1, max_pages + 1):url, params = self._build_url("search",{'what': what,'where': self.location,'results_per_page': results_per_page,'page': page,'date_posted': f"since-{days_ago}d"})response = requests.get(url, params=params)if response.status_code != 200:print(f"Error fetching page {page}: {response.status_code}")breakdata = response.json()jobs = data.get('results', [])if not jobs:breakall_jobs.extend(jobs)print(f"Fetched page {page} with {len(jobs)} jobs")return pd.DataFrame(all_jobs)def get_job_categories(self):"""获取职位分类信息"""url, params = self._build_url("categories")response = requests.get(url, params=params)if response.status_code == 200:return response.json().get('results', [])return []

3.3 数据处理模块

创建data_processor.py

import pandas as pd
import numpy as np
from datetime import datetimeclass DataProcessor:def __init__(self):self.required_columns = ['id', 'title', 'location', 'company', 'salary_min', 'salary_max', 'category', 'created', 'description']def clean_data(self, df):"""执行数据清洗和转换"""# 选择需要的列df = self._select_columns(df)# 处理日期时间df = self._process_datetime(df)# 处理薪资数据df = self._process_salary(df)# 处理分类数据df = self._process_category(df)# 处理位置数据df = self._process_location(df)# 处理公司数据df = self._process_company(df)# 处理描述数据df = self._process_description(df)return dfdef _select_columns(self, df):"""选择需要的列并重命名"""column_mapping = {'id': 'job_id','title': 'title','location': 'location','company': 'company','salary_min': 'salary_min','salary_max': 'salary_max','category': 'category','created': 'created','description': 'description'}# 确保所有需要的列都存在for col in self.required_columns:if col not in df.columns:df[col] = np.nanreturn df[list(column_mapping.keys())].rename(columns=column_mapping)def _process_datetime(self, df):"""处理日期时间字段"""if 'created' in df.columns:df['created'] = pd.to_datetime(df['created'])df['created_date'] = df['created'].dt.datedf['created_year'] = df['created'].dt.yeardf['created_month'] = df['created'].dt.monthdf['created_week'] = df['created'].dt.isocalendar().weekreturn dfdef _process_salary(self, df):"""处理薪资数据"""# 确保薪资为数值类型for col in ['salary_min', 'salary_max']:if col in df.columns:df[col] = pd.to_numeric(df[col], errors='coerce')# 计算平均薪资df['salary_avg'] = df[['salary_min', 'salary_max']].mean(axis=1)return dfdef _process_category(self, df):"""处理分类数据"""if 'category' in df.columns:# 如果category是字典,提取labelif isinstance(df['category'].iloc[0], dict):df['category'] = df['category'].apply(lambda x: x.get('label', ''))df['category'] = df['category'].str.lower().str.strip()return dfdef _process_location(self, df):"""处理位置数据"""if 'location' in df.columns:# 如果location是字典,提取display_nameif isinstance(df['location'].iloc[0], dict):df['location'] = df['location'].apply(lambda x: x.get('display_name', ''))df['location'] = df['location'].str.lower().str.strip()return dfdef _process_company(self, df):"""处理公司数据"""if 'company' in df.columns:# 如果company是字典,提取display_nameif isinstance(df['company'].iloc[0], dict):df['company'] = df['company'].apply(lambda x: x.get('display_name', ''))df['company'] = df['company'].str.lower().str.strip()return dfdef _process_description(self, df):"""处理描述数据"""if 'description' in df.columns:# 清理HTML标签和多余空格df['description'] = df['description'].str.replace('<[^<]+?>', '', regex=True)df['description'] = df['description'].str.replace('\s+', ' ', regex=True).str.strip()# 计算描述长度df['description_length'] = df['description'].str.len()return df

3.4 Snowflake数据存储模块

创建snowflake_loader.py

import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
import pandas as pd
from dotenv import load_dotenvload_dotenv()class SnowflakeLoader:def __init__(self):self.user = os.getenv('SNOWFLAKE_USER')self.password = os.getenv('SNOWFLAKE_PASSWORD')self.account = os.getenv('SNOWFLAKE_ACCOUNT')self.warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')self.database = os.getenv('SNOWFLAKE_DATABASE')self.schema = os.getenv('SNOWFLAKE_SCHEMA')self.engine = self._create_engine()self.session = self._create_session()def _create_engine(self):"""创建SQLAlchemy引擎"""conn_string = (f"snowflake://{self.user}:{self.password}@{self.account}/"f"{self.database}/{self.schema}?warehouse={self.warehouse}")return create_engine(conn_string)def _create_session(self):"""创建数据库会话"""Session = sessionmaker(bind=self.engine)return Session()def create_schema_and_table(self):"""创建schema和表结构"""create_schema_sql = f"""CREATE SCHEMA IF NOT EXISTS {self.schema};"""create_table_sql = f"""CREATE TABLE IF NOT EXISTS {self.schema}.jobs_london (job_id VARCHAR(255) PRIMARY KEY,title VARCHAR(255),location VARCHAR(255),company VARCHAR(255),salary_min FLOAT,salary_max FLOAT,salary_avg FLOAT,category VARCHAR(255),created TIMESTAMP_NTZ,created_date DATE,created_year NUMBER,created_month NUMBER,created_week NUMBER,description TEXT,description_length NUMBER,load_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());"""with self.engine.connect() as conn:conn.execute(text(create_schema_sql))conn.execute(text(create_table_sql))conn.commit()def load_data(self, df):"""将数据加载到Snowflake"""# 添加加载时间戳df['load_timestamp'] = pd.Timestamp.now()# 写入数据df.to_sql('jobs_london',self.engine,schema=self.schema,if_exists='append',index=False,chunksize=1000,method='multi')def close(self):"""关闭连接"""self.session.close()self.engine.dispose()

3.5 主程序与调度

创建main.py

import logging
from datetime import datetime, timedelta
from adzuna_api import AdzunaAPI
from data_processor import DataProcessor
from snowflake_loader import SnowflakeLoader# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler('job_pipeline.log'),logging.StreamHandler()]
)
logger = logging.getLogger(__name__)def main():# 初始化各组件adzuna = AdzunaAPI()processor = DataProcessor()snowflake = SnowflakeLoader()try:logger.info("Starting job data pipeline")# 创建表结构snowflake.create_schema_and_table()# 定义要搜索的职位关键词job_keywords = ['data analyst', 'software engineer', 'data scientist']# 获取过去7天的数据days_ago = 7for keyword in job_keywords:logger.info(f"Fetching data for: {keyword}")# 从API获取数据raw_df = adzuna.search_jobs(keyword, days_ago=days_ago)if raw_df.empty:logger.warning(f"No data found for keyword: {keyword}")continue# 处理数据clean_df = processor.clean_data(raw_df)# 加载到Snowflakesnowflake.load_data(clean_df)logger.info(f"Successfully loaded {len(clean_df)} records for {keyword}")except Exception as e:logger.error(f"Error in job pipeline: {str(e)}", exc_info=True)raisefinally:snowflake.close()logger.info("Pipeline execution completed")if __name__ == "__main__":main()

3.6 调度设置

可以使用cron(Linux/macOS)或任务计划程序(Windows)设置每周运行:

Linux/macOS crontab示例:

0 0 * * 1 python /path/to/main.py >> /path/to/pipeline.log 2>&1

或者使用Python调度库如schedule:

import schedule
import time
from main import main# 每周一早上运行
schedule.every().monday.at("09:00").do(main)while True:schedule.run_pending()time.sleep(60)

4. Snowflake表结构与数据模型

4.1 Schema设计

在Snowflake中创建专门的schema来存储招聘数据:

CREATE SCHEMA IF NOT EXISTS jobs_data;

4.2 表结构设计

jobs_london表结构设计如下:

CREATE OR REPLACE TABLE jobs_data.jobs_london (job_id VARCHAR(255) PRIMARY KEY,          -- 职位唯一IDtitle VARCHAR(255),                      -- 职位标题location VARCHAR(255),                   -- 工作地点company VARCHAR(255),                    -- 公司名称salary_min FLOAT,                        -- 最低薪资salary_max FLOAT,                        -- 最高薪资salary_avg FLOAT,                        -- 平均薪资(计算字段)category VARCHAR(255),                   -- 职位分类created TIMESTAMP_NTZ,                   -- 职位创建时间created_date DATE,                       -- 职位创建日期(派生字段)created_year NUMBER,                     -- 创建年份(派生字段)created_month NUMBER,                    -- 创建月份(派生字段)created_week NUMBER,                     -- 创建周数(派生字段)description TEXT,                        -- 职位描述description_length NUMBER,               -- 描述长度(字符数)load_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()  -- 数据加载时间
);-- 添加索引以提高查询性能
CREATE INDEX IF NOT EXISTS idx_jobs_london_company ON jobs_data.jobs_london(company);
CREATE INDEX IF NOT EXISTS idx_jobs_london_category ON jobs_data.jobs_london(category);
CREATE INDEX IF NOT EXISTS idx_jobs_london_created ON jobs_data.jobs_london(created);
CREATE INDEX IF NOT EXISTS idx_jobs_london_salary_avg ON jobs_data.jobs_london(salary_avg);

4.3 数据样例

从Snowflake表中查询的样例数据:

job_idtitlelocationcompanysalary_minsalary_maxsalary_avgcategorycreateddescription
12345Senior Data Analystlondontech corp600008000070000it jobs2023-05-01 09:00:00We are looking for…
12346Software Engineerlondondata systems700009000080000it jobs2023-05-02 10:30:00Join our team of…

5. 数据质量与监控

5.1 数据质量检查

在数据加载前后实施数据质量检查:

def run_data_quality_checks(df):"""执行数据质量检查"""# 检查缺失值missing_values = df.isnull().sum()logger.info(f"Missing values:\n{missing_values}")# 检查重复的job_idduplicate_ids = df['job_id'].duplicated().sum()if duplicate_ids > 0:logger.warning(f"Found {duplicate_ids} duplicate job IDs")# 检查薪资合理性salary_issues = df[df['salary_min'] > df['salary_max']]if not salary_issues.empty:logger.warning(f"Found {len(salary_issues)} records with min salary > max salary")# 检查日期合理性future_dates = df[df['created'] > pd.Timestamp.now()]if not future_dates.empty:logger.warning(f"Found {len(future_dates)} records with future creation dates")

5.2 监控指标

跟踪关键指标以监控系统健康状态:

  1. API调用指标

    • 成功/失败的API调用次数
    • 每个关键词获取的记录数
    • API响应时间
  2. 数据处理指标

    • 原始记录数
    • 处理后记录数
    • 数据清洗转换的异常数
  3. 数据库指标

    • 成功加载的记录数
    • 加载时间
    • 表大小增长

6. 系统局限性分析

6.1 当前系统的局限性

  1. API限制

    • Adzuna API可能有请求频率限制
    • 免费账户可能只能获取部分数据
    • 历史数据获取可能受限
  2. 数据覆盖范围

    • 仅覆盖Adzuna平台的数据,不包含其他招聘网站
    • 伦敦地区以外的数据未包含
    • 某些行业或小众职位可能覆盖不足
  3. 数据处理

    • 薪资数据可能因货币和薪资周期不同而难以比较
    • 职位描述的非结构化数据利用不足
    • 缺乏高级的自然语言处理能力
  4. 系统架构

    • 简单的调度机制缺乏容错能力
    • 没有实现增量更新机制
    • 缺乏完善的数据版本控制
  5. 存储与分析

    • 未实现数据分区,随着数据量增长可能影响查询性能
    • 缺乏预聚合的指标和物化视图
    • 未设置数据保留策略

6.2 未来优化方向

  1. 数据获取增强

    • 集成多个招聘数据源(如Indeed、LinkedIn等)
    • 实现更智能的增量数据获取机制
    • 添加API调用失败的重试机制
  2. 数据处理改进

    • 实现更精细的薪资标准化(处理不同货币和薪资周期)
    • 添加自然语言处理分析职位描述
    • 实现公司名称标准化(处理同一公司的不同名称变体)
  3. 系统架构升级

    • 采用Airflow等专业调度工具
    • 实现管道各阶段的监控和告警
    • 添加数据版本控制和历史跟踪
  4. 存储与分析优化

    • 在Snowflake中实现数据分区(按时间、职位类别等)
    • 创建物化视图加速常见查询
    • 实现自动化的数据质量报告
  5. 扩展功能

    • 添加数据API层供其他系统消费
    • 开发可视化仪表板展示趋势和分析结果
    • 实现异常检测(如薪资异常波动、招聘数量变化等)

7. 部署与维护

7.1 部署指南

  1. 环境准备

    • Python 3.8+环境
    • Snowflake账户配置
    • Adzuna API密钥申请
  2. 配置步骤

    • 克隆代码仓库
    • 安装依赖包:pip install -r requirements.txt
    • 创建并配置.env文件
    • 测试Snowflake连接
  3. 初始化数据库

    • 运行snowflake_loader.py中的create_schema_and_table方法
    • 验证表结构是否正确创建
  4. 首次运行

    • 执行main.py获取历史数据
    • 验证数据是否成功加载到Snowflake
  5. 设置调度

    • 配置cron作业或使用调度工具
    • 设置适当的运行频率(如每周一早上)

7.2 维护计划

  1. 日常监控

    • 检查日志文件中的错误和警告
    • 监控API调用配额使用情况
    • 跟踪数据加载记录数变化
  2. 定期维护

    • 每月检查Snowflake存储使用情况
    • 每季度审查API密钥和访问权限
    • 定期更新依赖库版本
  3. 数据维护

    • 实施数据保留策略(如保留2年数据)
    • 定期检查和修复数据质量问题
    • 根据需要调整表结构和索引

8. 总结与展望

本系统实现了一个完整的招聘数据管道,从Adzuna API获取伦敦地区的招聘数据,经过清洗和转换后存储到Snowflake数据仓库。系统采用模块化设计,便于维护和扩展,并考虑了基本的数据质量监控。

未来,系统可以从以下几个方面进一步发展:

  1. 多数据源集成:整合更多招聘平台的数据,提供更全面的市场视角
  2. 高级分析功能:添加机器学习模型进行薪资预测、职位分类等
  3. 实时数据处理:从批处理转向近实时处理,提供更及时的市场洞察
  4. 自动化报告:生成定期市场趋势报告,自动发送给相关利益方
  5. 异常检测:识别招聘市场中的异常模式,如突然增加的某类职位或薪资变化

通过持续迭代和优化,该系统可以发展成为功能完善的招聘市场分析平台,为人力资源决策、职业规划等提供数据支持。

附录A:Snowflake账号设置链接

Snowflake账号设置指南

附录B:完整代码仓库

GitHub仓库链接 (示例链接,实际使用时替换为真实仓库)

http://www.dtcms.com/a/306390.html

相关文章:

  • android-PMS-常见定制场景
  • 【文章浏览 I】
  • 【7】串口编程三种模式(查询/中断/DMA)韦东山老师学习笔记(课程听不懂的话试着来看看我的学习笔记吧)
  • luoguP13511 [KOI P13511 [KOI 2025 #1] 等腰直角三角形
  • S3、SFTP、FTP、FTPS 协议的概念、对比与应用场景
  • vulhub ica1靶场攻略
  • AI框架工具FastRTC快速上手2——整体框架及Stream类详解
  • 浏览器pdf、image显示
  • MaxKB+MinerU:通过API实现PDF文档解析并存储至知识库
  • 虚幻基础:旋转体
  • 在java开发中,错误信息类中定义一个errMap,为什么要在static{}中,put键值对?这是为什么?好处是什么?
  • 嵌入式 C 语言入门:分支结构(if/switch)的用法与硬件控制实践
  • [ java IO ] 文件传输中的输入输出(流)
  • 算法能力提升之快速矩阵
  • PSO-TCN-BiLSTM-MATT粒子群优化算法优化时间卷积神经网络-双向长短期记忆神经网络融合多头注意力机制多特征分类预测/故障诊断Matlab实现
  • 电动车充电桩能耗实时监测解决方案
  • 【Java】批量生成Excel放入文件夹并打zip压缩包
  • LangChain 完全入门:5分钟搭建你的第一个AI智能体
  • 河南萌新联赛2025第(三)场:河南理工大学【补题】
  • 氯碱废水除钙镁金属离子
  • 无人机在复杂气流中,IMU 如何精准捕捉姿态变化以维持稳定?
  • WPFC#超市管理系统(3)商品管理
  • 今日行情明日机会——20250730
  • 【LeetCode】链表反转实现与测试
  • ansible巡检脚本
  • 2025年7月28日–7月29日 · AI 今日头条
  • 串口接收数据包(协议带帧头帧尾)的编程实现方法:1、数据包格式定义结构体2、使用队列进行数据接收、校验解包
  • centos7 aarch64上安装PostgreSQL14.3
  • 如何在生成式引擎优化(GEO)中取得成功
  • Java:高频面试知识分享1