python爬虫——气象数据爬取
一、导入库与全局配置
python
运行
import json
import datetime
import time
import requests
from sqlalchemy import create_engine
import csv
import pandas as pd
作用:
- 引入数据解析、网络请求、时间处理、数据库操作等所需库。
requests
:发送 HTTP 请求获取网页数据。sqlalchemy
:连接和操作 MySQL 数据库。pandas
:处理 CSV 文件和数据清洗。
潜在问题:
- 未处理
requests
的超时(可能导致程序卡死)。 - 数据库密码直接写死在代码中(存在安全风险)。
二、核心爬取函数 scraw(code)
python
运行
def scraw(code):url = f'http://www.nmc.cn/rest/weather?stationid={code}&_=1675259309000'response = requests.get(url, headers=headers)try:data = json.loads(response.text)info = data['data']passed = data['data']['passedchart']real = data['data']['real']tempchart = data['data']['tempchart']predict = data['data']['predict']['detail']# 解析24小时天气数据并写入CSVfor i in passed:csv.writer(csv_obj).writerow([names[inx], ...])# 解析实时天气数据并写入CSVcsv.writer(csv_obj2).writerow([names[inx], ...])# 解析7天温度数据并写入CSVfor i in tempchart:csv.writer(csv_obj3).writerow([names[inx], ...])# 解析预报数据并写入CSVfor i in predict:csv.writer(csv_obj4).writerow([names[inx], ...])except:print(f'{code}爬取失败')
功能拆解:
-
URL 构造:
- 拼接城市代码(
stationid
)和时间戳参数(_
),可能用于防止缓存。 - 问题:时间戳硬编码(
1675259309000
),未动态生成,可能导致请求失效。
- 拼接城市代码(
-
数据解析:
- 通过
json.loads()
解析 JSON 响应,提取passedchart
(历史数据)、real
(实时数据)等字段。 - 风险:假设 JSON 结构固定,若网站接口变更会导致解析失败(需添加容错处理)。
- 通过
-
CSV 写入:
- 循环写入不同类型数据到 4 个 CSV 文件(
data24h.csv
、dataday.csv
等)。 - 问题:
names[inx]
依赖全局变量inx
,多线程环境下可能引发线程安全问题。
- 循环写入不同类型数据到 4 个 CSV 文件(
三、降雨量爬取函数 scraw_rain24h()
& scraw_rain1h()
python
运行
def scraw_rain24h():url = f'http://www.nmc.cn/rest/real/rain/hour24/{date}?_={times}'csv_obj5 = open('csv/rain24h.csv', 'w', ...)response = requests.get(url, headers=headers)data = json.loads(response.text)raindata = data['data']['data']for i in raindata:csv.writer(csv_obj5).writerow([i[0]+i[1], i[5]])csv_obj5.close()def scraw_rain1h():# 逻辑与scraw_rain24h()类似,仅URL和CSV文件不同
关键细节:
- URL 参数:
date
由主程序生成(格式为YYYYMMDD08
),times
为当前时间戳(动态生成)。 - 数据结构:降雨量数据通过
i[0]+i[1]
拼接城市名(假设i[0]
为省,i[1]
为市),i[5]
为降雨量。 - 问题:未处理城市名重复或异常数据(如
i[0]
或i[1]
为空)。
四、数据库存储函数 save()
python
运行
def save():DB_STRING = 'mysql+pymysql://root:mysql@127.0.0.1:3306/tianqi'engine = create_engine(DB_STRING)# 读取CSV文件df = pd.read_csv("csv/data24h.csv")df2 = pd.read_csv("csv/dataday.csv")# ... 读取其他CSV文件# 数据清洗df = df.drop('24h降雨量', axis=1)df2 = df2[df2['体感温度'] != 9999]df3 = df3[df3['最高温度'] != 9999]# 写入数据库df.to_sql('24h', con=engine, if_exists='replace', index=False)# ... 写入其他DataFrame
功能说明:
-
数据库连接:
- 使用 SQLAlchemy 创建数据库引擎,连接本地 MySQL 的
tianqi
数据库。 - 风险:密码
mysql
硬编码,需通过环境变量或配置文件管理。
- 使用 SQLAlchemy 创建数据库引擎,连接本地 MySQL 的
-
数据清洗:
- 删除无效列(如
24h降雨量
)和值为9999
的行(假设9999
为错误值)。 - 问题:清洗逻辑分散,未统一处理(如其他 CSV 文件可能也存在无效值)。
- 删除无效列(如
-
数据写入:
- 使用
to_sql
批量写入,if_exists='replace'
会覆盖表数据(可能导致历史数据丢失)。
- 使用
五、主程序逻辑(if __name__ == '__main__'
)
python
运行
if __name__ == '__main__':df = pd.read_csv('csv/citycode.csv')codes = df.code.tolist()names = df.城市.tolist()date = time.strftime('%Y%m%d', time.gmtime()) + '08'times = int(time.time() * 1000)headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; ...)'}# 初始化CSV文件csv_obj = open('csv/data24h.csv', 'w', ...)# ... 初始化其他CSV文件for inx, i in enumerate(codes):scraw(i)print(f"{names[inx]}爬取完毕")# 关闭CSV文件csv_obj.close()# ... 关闭其他CSV文件scraw_rain24h()scraw_rain1h()save()
流程分析:
-
准备阶段:
- 读取城市代码表(
citycode.csv
),获取codes
(城市代码)和names
(城市名)。 - 生成
date
(当前日期 +08
,可能为北京时间时区调整)和times
(毫秒级时间戳)。
- 读取城市代码表(
-
爬取阶段:
- 循环调用
scraw(i)
爬取每个城市的数据,依赖全局变量inx
和names
。 - 问题:未控制爬取频率(可能触发网站反爬机制),建议添加
time.sleep()
。
- 循环调用
-
收尾阶段:
- 关闭 CSV 文件句柄(需确保在异常情况下也能关闭,建议用
with
语句)。 - 爬取降雨量数据并保存到数据库。
- 关闭 CSV 文件句柄(需确保在异常情况下也能关闭,建议用
六、整体问题总结与改进方向
模块 | 问题 | 改进建议 |
---|---|---|
爬取逻辑 | 硬编码时间戳、未处理反爬 | 动态生成时间戳,添加请求头(如Referer )、限制爬取频率 |
异常处理 | 全局except 捕获,无详细日志 | 细化异常类型,使用logging 模块记录错误信息 |
资源管理 | CSV 文件未用with 语句,可能泄漏资源 | 改用with open(...) as f 管理文件 |
数据安全 | 数据库密码硬编码 | 使用环境变量(如os.getenv() )或配置文件 |
代码可维护性 | 全局变量耦合严重,逻辑分散 | 将功能封装为类,分离爬取、解析、存储逻辑 |
扩展性 | 难以为新城市或数据类型扩展 | 设计可配置的爬取规则和字段映射 |
通过分块优化,可显著提升代码的健壮性、可维护性和安全性,同时降低对目标网站的影响。
完整代码:
import json
import datetime
import time
import requests
from sqlalchemy import create_engine
import csv
import pandas as pddef scraw(code):# 发送 HTTP 请求,获取网页内容url = f'http://www.nmc.cn/rest/weather?stationid={code}&_=1675259309000'response = requests.get(url, headers=headers)try:data = json.loads(response.text)info = data['data']# 24小时天气情况passed = data['data']['passedchart']# 一天real = data['data']['real']# 最近七天最高低温度tempchart = data['data']['tempchart']# 预测predict = data['data']['predict']['detail']for i in passed:humidity = i['humidity'] # 相对湿度pressure = i['pressure'] # 空气压力rain1h = i['rain1h'] #rain24h = i['rain24h'] #temperature = i['temperature'] # 温度windDirection = i['windDirection']windSpeed = i['windSpeed']time = i['time']tempDiff = i['tempDiff'] # 体感温度csv.writer(csv_obj).writerow([names[inx],humidity, pressure, rain1h, rain24h, temperature, windDirection, windSpeed, time, tempDiff])csv.writer(csv_obj2).writerow([names[inx],datetime.datetime.now().date(), real['weather']['airpressure'], real['weather']['feelst'],real['weather']['humidity'], real['weather']['info'], real['weather']['rain'], real['weather']['temperature'],real['wind']['direct'], real['wind']['power'], real['wind']['speed']])for i in tempchart:time = i['time']max_temp = i['max_temp']min_temp = i['min_temp']csv.writer(csv_obj3).writerow([names[inx],time, max_temp, min_temp])for i in predict:date = i['date']temperatureday = i['day']['weather']['temperature']temperaturenight = i['night']['weather']['temperature']wind = i['day']['wind']['direct']csv.writer(csv_obj4).writerow([names[inx],date, temperatureday, temperaturenight, wind])except:print(f'{code}爬取失败')def scraw_rain24h():url = f'http://www.nmc.cn/rest/real/rain/hour24/{date}?_={times}'csv_obj5 = open('csv/rain24h.csv', 'w', encoding="utf-8",newline='')response = requests.get(url, headers=headers)data = json.loads(response.text)print(data)raindata = data['data']['data']csv.writer(csv_obj5).writerow(["城市",'降雨量'])for i in raindata:csv.writer(csv_obj5).writerow([i[0] +i[1], i[5]])print('爬取数据完毕')csv_obj5.close()def scraw_rain1h():url = f'http://www.nmc.cn/rest/real/rain/hour1/{date}?_={times}'csv_obj6 = open('csv/rain1h.csv', 'w', encoding="utf-8", newline='')response = requests.get(url, headers=headers)data = json.loads(response.text)raindata = data['data']['data']csv.writer(csv_obj6).writerow(["城市", '降雨量'])for i in raindata:csv.writer(csv_obj6).writerow([i[0] + i[1], i[5]])print('爬取数据完毕')csv_obj6.close()def save():# 存入数据库DB_STRING = 'mysql+pymysql://root:mysql@127.0.0.1:3306/tianqi'engine = create_engine(DB_STRING)df = pd.read_csv("csv/data24h.csv")df2 = pd.read_csv("csv/dataday.csv")df3 = pd.read_csv("csv/tempchart.csv")df4 = pd.read_csv("csv/predict.csv")df5 = pd.read_csv("csv/rain24h.csv")df6 = pd.read_csv("csv/rain1h.csv")#删除不正常值# 删除部分列值等于9999的行df = df.drop('24h降雨量',axis=1)df2 = df2[df2['体感温度'] != 9999]df3 = df3[df3['最高温度'] != 9999]df.to_sql('24h', con=engine, if_exists='replace',index=False)df2.to_sql('day', con=engine, if_exists='replace',index=False)df3.to_sql('tempchart', con=engine, if_exists='replace',index=False)df4.to_sql('predict', con=engine, if_exists='replace',index=False)df5.to_sql('rain24h', con=engine, if_exists='replace',index=False)df6.to_sql('rain1h', con=engine, if_exists='replace',index=False)print('保存数据库完毕')if __name__ == '__main__':df = pd.read_csv('csv/citycode.csv')codes = df.code.tolist()names = df.城市.tolist()#北京# codes = [54511]# names = ['北京']date = time.strftime('%Y%m%d', time.gmtime()) +'08'times = int(time.time() * 1000)# # 设置请求头部信息,避免被识别为爬虫headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}csv_obj = open('csv/data24h.csv', 'w', encoding="utf-8",newline='')csv_obj2 = open('csv/dataday.csv', 'w', encoding="utf-8", newline='')csv_obj3 = open('csv/tempchart.csv', 'w', encoding="utf-8", newline='')csv_obj4 = open('csv/predict.csv', 'w', encoding="utf-8", newline='')csv.writer(csv_obj).writerow(["城市","相对湿度", "气压", "一小时降雨量","24h降雨量", "温度", "风向", "风速","时间",'体感温度'])csv.writer(csv_obj2).writerow(["城市","日期","气压", '体感温度',"相对湿度","天气情况","一小时降雨量","温度", "风向", "风强度","风速"])csv.writer(csv_obj3).writerow(["城市","日期","最高温度", '最低温度'])csv.writer(csv_obj4).writerow(["城市","日期","白天温度", '夜晚温度',"风向"])for inx,i in enumerate(codes):scraw(i)print(f"{names[inx]}爬取完毕")csv_obj.close()csv_obj2.close()csv_obj3.close()csv_obj4.close()scraw_rain24h()scraw_rain1h()save()
import csv
import json
import requests
# 设置请求头部信息,避免被识别为爬虫headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}# 发送 HTTP 请求,获取网页内容url = 'http://www.nmc.cn/rest/province/all?_=1678112903659'
response = requests.get(url, headers=headers)
data = json.loads(response.text)
csv_obj = open('allcsv/citycode.csv', 'w', encoding="utf-8", newline='')
csv.writer(csv_obj).writerow(['城市','code'])
for i in data:code = i['code']url = f'http://www.nmc.cn/rest/province/{code}?_=1677854971362'response = requests.get(url, headers=headers)data = json.loads(response.text)for x in data:csv.writer(csv_obj).writerow([x['city'], x['code']])csv_obj.close()