逐时nc数据批量处理为日平均
现有1系列的nc数据,每个数据格式如下:
需要进行批量处理将数据处理为日平均的tif,可使用下列脚本:
import os
import xarray as xr
import pandas as pd
import multiprocessing as mul
import numpy as np
from rasterio.transform import from_origin
import rasterio
from datetime import datetimedef save_as_geotiff(data_array, lons, lats, filename):"""将数据保存为 GeoTIFF 文件"""# 计算仿射变换参数transform = from_origin(lons.min(), lats.max(), (lons.max() - lons.min()) / len(lons), (lats.max() - lats.min()) / len(lats) )with rasterio.open(filename,'w',driver='GTiff',height=len(lats),width=len(lons),count=1,dtype=data_array.dtype,crs='EPSG:4326', # WGS84 坐标系transform=transform,) as dst:dst.write(data_array, 1) def process_snow_daily_average(args):"""处理雪深数据并计算日平均"""inpath, outpath, start_year, end_year = argsfor year in range(start_year, end_year + 1):for month in range(1, 13):filename = f"snow_{year}_{month:02d}.nc"file_path = os.path.join(inpath, filename)if not os.path.exists(file_path):print(f"Warning: File {file_path} does not exist, skipping...")continuetry:ds = xr.open_dataset(file_path)snow_data = ds['sde']time_values = pd.to_datetime(snow_data.valid_time.values)dates = [t.date() for t in time_values] unique_dates = sorted(set(dates))output_year_path = os.path.join(outpath, f"y{year}")os.makedirs(output_year_path, exist_ok=True)for current_date in unique_dates:date_mask = [d == current_date for d in dates]daily_data = snow_data[date_mask, :, :]daily_avg = daily_data.mean(dim='valid_time', skipna=True)lons = daily_data.longitude.valueslats = daily_data.latitude.valuesoutname = os.path.join(output_year_path,f"snow_{current_date.strftime('%Y%m%d')}.tif")save_as_geotiff(daily_avg.values, lons, lats, outname)print(f"{outname} has been converted!")ds.close()except Exception as e:print(f"Error processing file {file_path}: {str(e)}")continueif __name__ == "__main__":inpath = r"data/era5_sd_2025/1981_1989" outpath = r"data/era5_sd_2025/daily_1981_1989" start_year = 1981 end_year = 1989 num_processes = min(8, os.cpu_count()) # 使用较少的进程total_years = end_year - start_year + 1years_per_process = max(1, total_years // num_processes)args_list = []for i in range(num_processes):current_start = start_year + i * years_per_processcurrent_end = min(current_start + years_per_process - 1, end_year)if i == num_processes - 1:current_end = end_yearif current_start > end_year:breakargs_list.append((inpath, outpath, current_start, current_end))with mul.Pool(processes=num_processes) as pool:pool.map(process_snow_daily_average, args_list)print('--'*50)print('all jobs have finished!!!')