Python数据分析实战:利用CDC 28年流感数据监测,构建疫情预警系统【数据集可下载】
[ 核心资源 ]
本文将通过一个完整的数据分析流程,展示如何使用Python处理CDC流感监测数据集,构建预警模型。文末提供了免费的CDC流感数据集及完整的Python源代码,帮助读者复现整个分析过程。
【数据集下载链接】 : 文章最后面获取
引言:从公共卫生问题到技术解决方案
在数据分析领域,将复杂的公共卫生数据转化为可执行的决策支持系统,是检验技术能力的重要场景。本文不是简单的数据可视化教程,而是展示如何构建一个多维度、多层次的流感监测分析框架。
我们将基于美国CDC长达28年的波动监测数据,不仅展示数据处理技巧,更重要的是提供如何设计分析逻辑、构建预警指标、实现自动化监测的系统性。
一、数据加载与结构理解
目标:加载疾病预防控制中心监测数据,了解其多文件结构和字段信息,为后续分析打良好基础。
Python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')# 配置中文显示和图表样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")# 加载门诊监测数据(ILI Network)
ili_data = pd.read_csv('ILI_National_Summary.csv')
# 加载实验室检测数据(Laboratory Data)
lab_data = pd.read_csv('WHO_NREVSS_Clinical_Labs.csv')# 数据结构探索
print("门诊监测数据维度:", ili_data.shape)
print("实验室检测数据维度:", lab_data.shape)
print("\n门诊监测数据前5行:")
print(ili_data.head())
二、数据浪费与序列构建时间
目标:将原始数据转化为可分析的时间序列格式,处理恢复值,创建统一的时间索引。
Python
# 创建标准化的日期列
def create_date_column(df, year_col='YEAR', week_col='WEEK'):"""将年份和周数转换为标准日期格式"""df['date'] = df.apply(lambda row: datetime.strptime(f"{int(row[year_col])}-W{int(row[week_col])}-1", "%Y-W%W-%w"), axis=1)return df# 应用日期转换
ili_data = create_date_column(ili_data)
lab_data = create_date_column(lab_data)# 数据清洗:处理缺失值和异常值
print(f"缺失值统计:\n{ili_data.isnull().sum()}")# 填充缺失的百分比数据
ili_data['%WEIGHTED ILI'].fillna(ili_data['%WEIGHTED ILI'].rolling(4, min_periods=1).mean(), inplace=True)# 创建时间索引
ili_data.set_index('date', inplace=True)
lab_data.set_index('date', inplace=True)print("数据预处理完成!")
三、核心分析:主动度的多维度评估
3.1 构建综合活跃度指标
目标:融合监测症状和实验室定位数据,构建更可靠的无效活跃度综合指标。
Python
# 合并门诊和实验室数据
merged_data = pd.merge(ili_data[['%WEIGHTED ILI', 'ILITOTAL']], lab_data[['PERCENT POSITIVE', 'TOTAL SPECIMENS']], left_index=True, right_index=True, how='inner'
)# 构建综合流感活跃度指标
def calculate_flu_activity_index(df):"""综合流感活跃度指标 = 症状就诊率 × 实验室阳性率 × 规模系数"""# 标准化各指标df['ili_norm'] = (df['%WEIGHTED ILI'] - df['%WEIGHTED ILI'].min()) / \(df['%WEIGHTED ILI'].max() - df['%WEIGHTED ILI'].min())df['lab_norm'] = (df['PERCENT POSITIVE'] - df['PERCENT POSITIVE'].min()) / \(df['PERCENT POSITIVE'].max() - df['PERCENT POSITIVE'].min())# 计算综合指标(权重可调整)df['flu_activity_index'] = (0.5 * df['ili_norm'] + 0.5 * df['lab_norm']) * 100return dfmerged_data = calculate_flu_activity_index(merged_data)# 可视化流感活跃度时间序列
plt.figure(figsize=(15, 6))
plt.plot(merged_data.index, merged_data['flu_activity_index'], linewidth=2, color='red', alpha=0.7)
plt.fill_between(merged_data.index, merged_data['flu_activity_index'], alpha=0.3, color='red')
plt.title('流感综合活跃度指数 (1997-2025)', fontsize=16)
plt.xlabel('时间', fontsize=12)
plt.ylabel('活跃度指数 (0-100)', fontsize=12)
plt.grid(True, alpha=0.3)
plt.show()
3.2 特殊模式识别与预测
目标:识别波动的规律,建立基准线,实现异常检测。
Python
# 提取季节性特征
merged_data['month'] = merged_data.index.month
merged_data['week_of_year'] = merged_data.index.isocalendar().week# 计算历史同期基准线
seasonal_baseline = merged_data.groupby('week_of_year')['flu_activity_index'].agg(['mean', 'std', lambda x: x.quantile(0.25), # Q1lambda x: x.quantile(0.75) # Q3
]).rename(columns={'<lambda_0>': 'Q1', '<lambda_1>': 'Q3'})# 定义预警阈值
seasonal_baseline['warning_threshold'] = seasonal_baseline['mean'] + 2 * seasonal_baseline['std']
seasonal_baseline['alert_threshold'] = seasonal_baseline['mean'] + 3 * seasonal_baseline['std']# 可视化季节性模式
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10))# 子图1:季节性基准线
ax1.plot(seasonal_baseline.index, seasonal_baseline['mean'], 'b-', linewidth=2, label='历史平均')
ax1.fill_between(seasonal_baseline.index, seasonal_baseline['Q1'], seasonal_baseline['Q3'], alpha=0.3, color='blue', label='正常区间(Q1-Q3)')
ax1.plot(seasonal_baseline.index, seasonal_baseline['warning_threshold'], 'y--', label='预警线')
ax1.plot(seasonal_baseline.index, seasonal_baseline['alert_threshold'], 'r--', label='警戒线')
ax1.set_xlabel('周数', fontsize=12)
ax1.set_ylabel('流感活跃度', fontsize=12)
ax1.set_title('流感季节性基准线与预警阈值', fontsize=14)
ax1.legend()
ax1.grid(True, alpha=0.3)# 子图2:年度对比热力图
pivot_data = merged_data.pivot_table(values='flu_activity_index', index=merged_data.index.year, columns='week_of_year'
)
sns.heatmap(pivot_data, cmap='RdYlBu_r', center=30, cbar_kws={'label': '活跃度指数'}, ax=ax2)
ax2.set_title('流感活跃度年度热力图 (1997-2025)', fontsize=14)
ax2.set_xlabel('周数', fontsize=12)
ax2.set_ylabel('年份', fontsize=12)plt.tight_layout()
plt.show()
四、高级应用:病毒亚型建立分析与预警系统
4.1 病毒亚型竞争动态分析
目标:追踪不同流行病毒亚型的流行趋势,预测主导毒株变化。
Python
# 分析病毒亚型分布
virus_subtypes = ['A (H1N1)', 'A (H3)', 'A (Subtyping not Performed)', 'B', 'BVic', 'BYam']# 计算各亚型的市场份额
for subtype in virus_subtypes:if subtype in lab_data.columns:lab_data[f'{subtype}_share'] = lab_data[subtype] / lab_data['TOTAL SPECIMENS'] * 100# 创建病毒演化追踪图
fig, axes = plt.subplots(2, 1, figsize=(15, 10))# 堆积面积图:展示病毒亚型演化
subtype_shares = lab_data[[col for col in lab_data.columns if '_share' in col]]
subtype_shares.plot.area(stacked=True, ax=axes[0], alpha=0.7)
axes[0].set_title('流感病毒亚型演化趋势 (1997-2025)', fontsize=14)
axes[0].set_xlabel('时间', fontsize=12)
axes[0].set_ylabel('占比 (%)', fontsize=12)
axes[0].legend(bbox_to_anchor=(1.05, 1), loc='upper left')# 主导毒株识别
lab_data['dominant_subtype'] = subtype_shares.idxmax(axis=1)
dominant_changes = lab_data['dominant_subtype'].ne(lab_data['dominant_subtype'].shift()).cumsum()
lab_data['regime'] = dominant_changes# 统计各时期主导毒株
regime_stats = lab_data.groupby('regime')['dominant_subtype'].agg([('主导毒株', 'first'),('开始时间', lambda x: x.index[0]),('结束时间', lambda x: x.index[-1]),('持续周数', 'count')
]).reset_index(drop=True)print("\n主导毒株变迁历史:")
print(regime_stats.head(10))
4.2 构建自动化预警系统
目标:实现实时监测和自动预警功能,为决策提供支持。
Python
class FluAlertSystem:"""流感监测预警系统"""def __init__(self, baseline_data, current_data):self.baseline = baseline_dataself.current = current_dataself.alert_levels = {'NORMAL': {'threshold': 0, 'color': 'green', 'message': '流感活动水平正常'},'ELEVATED': {'threshold': 50, 'color': 'yellow', 'message': '流感活动略有升高'},'WARNING': {'threshold': 70, 'color': 'orange', 'message': '流感进入活跃期,建议加强防护'},'ALERT': {'threshold': 85, 'color': 'red', 'message': '流感高峰期,强烈建议采取防护措施'}}def calculate_risk_score(self, week):"""计算当前风险评分"""current_index = self.current.loc[self.current.index.isocalendar().week == week, 'flu_activity_index'].iloc[-1]baseline_mean = self.baseline.loc[week, 'mean']baseline_std = self.baseline.loc[week, 'std']# Z-score标准化z_score = (current_index - baseline_mean) / baseline_std if baseline_std > 0 else 0# 转换为0-100风险评分risk_score = min(100, max(0, 50 + z_score * 15))return risk_scoredef get_alert_level(self, risk_score):"""根据风险评分确定预警级别"""for level, config in reversed(list(self.alert_levels.items())):if risk_score >= config['threshold']:return level, configreturn 'NORMAL', self.alert_levels['NORMAL']def generate_report(self, week):"""生成预警报告"""risk_score = self.calculate_risk_score(week)alert_level, alert_config = self.get_alert_level(risk_score)report = {'监测周': week,'风险评分': round(risk_score, 1),'预警级别': alert_level,'预警颜色': alert_config['color'],'建议': alert_config['message'],'趋势': self.analyze_trend(week)}return reportdef analyze_trend(self, week):"""分析近期趋势"""recent_weeks = range(max(1, week-4), week+1)recent_scores = [self.calculate_risk_score(w) for w in recent_weeks if w <= 52]if len(recent_scores) < 2:return '数据不足'trend = np.polyfit(range(len(recent_scores)), recent_scores, 1)[0]if trend > 5:return '快速上升↑'elif trend > 1:return '缓慢上升↗'elif trend < -5:return '快速下降↓'elif trend < -1:return '缓慢下降↘'else:return '基本稳定→'# 实例化预警系统
alert_system = FluAlertSystem(seasonal_baseline, merged_data)# 生成最近10周的预警报告
current_week = datetime.now().isocalendar()[1]
print("\n=== 流感预警系统报告 ===")
for week in range(max(1, current_week-5), min(53, current_week+5)):report = alert_system.generate_report(week)print(f"\n第{report['监测周']}周: {report['预警级别']} "f"(风险:{report['风险评分']}) {report['趋势']}")print(f" └─ {report['建议']}")
五、实用工具:个性化风险评估器
目标:根据用户特征(年龄、地区、健康状况)提供个性化的风险风险评估。
Python
class PersonalizedFluRiskAssessor:"""个性化流感风险评估工具"""def __init__(self, age_data):self.age_data = age_dataself.age_groups = {'0-4': {'risk_multiplier': 2.0, 'group_col': 'AGE 0-4'},'5-24': {'risk_multiplier': 0.8, 'group_col': 'AGE 5-24'},'25-49': {'risk_multiplier': 1.0, 'group_col': 'AGE 25-49'},'50-64': {'risk_multiplier': 1.3, 'group_col': 'AGE 50-64'},'65+': {'risk_multiplier': 2.5, 'group_col': 'AGE 65+'}}def assess_personal_risk(self, age, has_chronic_disease=False, is_pregnant=False, week=None):"""评估个人风险参数:- age: 年龄- has_chronic_disease: 是否有慢性疾病- is_pregnant: 是否怀孕- week: 当前周数"""# 确定年龄组age_group = self._get_age_group(age)base_risk = self.age_groups[age_group]['risk_multiplier']# 调整风险因子if has_chronic_disease:base_risk *= 1.5if is_pregnant:base_risk *= 1.3# 获取当前流感活跃度if week:current_activity = self._get_current_activity(week, age_group)final_risk = base_risk * current_activity / 50 # 标准化到0-10else:final_risk = base_risk# 生成建议recommendations = self._generate_recommendations(final_risk)return {'年龄组': age_group,'基础风险系数': round(base_risk, 2),'综合风险评分': round(final_risk, 1),'风险等级': self._get_risk_level(final_risk),'个性化建议': recommendations}def _get_age_group(self, age):"""确定年龄组"""if age <= 4:return '0-4'elif age <= 24:return '5-24'elif age <= 49:return '25-49'elif age <= 64:return '50-64'else:return '65+'def _get_current_activity(self, week, age_group):"""获取特定年龄组的当前活跃度"""col_name = self.age_groups[age_group]['group_col']if col_name in self.age_data.columns:week_data = self.age_data[self.age_data.index.isocalendar().week == week]if not week_data.empty:return week_data[col_name].iloc[-1]return 50 # 默认中等活跃度def _get_risk_level(self, risk_score):"""确定风险等级"""if risk_score < 2:return '低风险'elif risk_score < 4:return '中低风险'elif risk_score < 6:return '中等风险'elif risk_score < 8:return '中高风险'else:return '高风险'def _generate_recommendations(self, risk_score):"""生成个性化建议"""recommendations = []if risk_score >= 8:recommendations.extend(["强烈建议立即接种流感疫苗","避免前往人群密集场所","外出务必佩戴口罩","增加手部消毒频率"])elif risk_score >= 6:recommendations.extend(["建议接种流感疫苗","在室内人多场所佩戴口罩","保持良好的手部卫生习惯"])elif risk_score >= 4:recommendations.extend(["考虑接种流感疫苗","避免接触有流感症状的人","保持正常的卫生习惯"])else:recommendations.extend(["保持正常的预防措施即可","关注流感流行趋势"])return recommendations# 使用示例
risk_assessor = PersonalizedFluRiskAssessor(ili_data)# 测试不同人群
test_cases = [{'age': 3, 'has_chronic_disease': False, 'is_pregnant': False, 'desc': '健康幼儿'},{'age': 30, 'has_chronic_disease': False, 'is_pregnant': True, 'desc': '孕妇'},{'age': 70, 'has_chronic_disease': True, 'is_pregnant': False, 'desc': '有慢性病的老人'},{'age': 25, 'has_chronic_disease': False, 'is_pregnant': False, 'desc': '健康成年人'}
]print("\n=== 个性化风险评估结果 ===")
for case in test_cases:result = risk_assessor.assess_personal_risk(age=case['age'],has_chronic_disease=case['has_chronic_disease'],is_pregnant=case['is_pregnant'],week=current_week)print(f"\n{case['desc']}({case['age']}岁):")print(f" 风险等级: {result['风险等级']} (评分: {result['综合风险评分']})")print(f" 建议:")for rec in result['个性化建议']:print(f" - {rec}")
六、进阶思考:构建可扩展的监测框架
通过这个完整的分析案例,我们不仅处理了CDC的流感数据,更重要的是构建了一个可扩展、可复用的疫情监测分析框架。该框架可以评估:
-
其他传染病监测:将采用相同的方法论评估COVID-19、手足口病等
-
多地区对比分析:分区州级、城市级的精细化监测
-
实时预警系统:结合流处理技术实现准实时监测
-
机器学习预测:引入LSTM、Prophet等模型提升预测精度
总结
本文展示了如何将公共卫生数据转化为实用的技术解决方案。通过数据共享→指标构建→模式识别→预警系统→个性化评估的完整序列,我们构建了一个多层次的流感监测分析系统。
关键技术要点:
-
时间序列数据的标准化处理
-
多源数据融合的综合指标设计
-
基于历史基准的异常检测算法
-
面向对象的预警系统架构
-
个性化风险评估模型

如果这篇文章对你有帮助,欢迎点赞👍收藏⭐关注🔔
