转为nc
# -*- coding: utf-8 -*-
import os
import xarray as xr
import pandas as pd
import multiprocessing as muldef era5_hourly_wind_compose_array(args):inpath, outpath, variable_name1, variable_name2, ini_num, end_num = argsfor i in range(ini_num, end_num):year = 1981 + iyear_dir = "y" + str(year)year_path = os.path.join(inpath, year_dir)# 读取文件并按名称排序nc_files = sorted(os.listdir(year_path))# 创建输出年份子文件夹output_year_path = os.path.join(outpath, year_dir)os.makedirs(output_year_path, exist_ok=True)for j in range(0, len(nc_files), 2):# 确保成对读取文件file_u = os.path.join(year_path, nc_files[j])file_v = os.path.join(year_path, nc_files[j+1])data1 = xr.open_dataset(file_u)data2 = xr.open_dataset(file_v)time = data1['valid_time']time_pd = time.to_pandas()data_var1 = data1[variable_name1]data_var2 = data2[variable_name2]for k in range(len(data_var1)):# 计算矢量速度data_arr1 = data_var1[k]data_arr2 = data_var2[k]data_arr = (data_arr1.data ** 2 + data_arr2.data ** 2) ** 0.5 # Added .data here# 创建新 Dataset 并命名变量为 'vector'ds = xr.Dataset({'vector': (['latitude', 'longitude'], data_arr)}, coords={'latitude': data_arr1.latitude.data, # Added .data here'longitude': data_arr1.longitude.data, # Added .data here'time': time[k].data # Added .data here})# 构建输出文件名outname = os.path.join(output_year_path, f"wind_{str(time_pd.index[k])[:10]}_{str(time_pd.index[k])[11:13]}.nc")# 保存为 NetCDF 文件ds.to_netcdf(outname)print(f"{outname} has been converted!")if __name__ == "__main__":inpath = r"inpath/"outpath = r"outpath/"variable_name1 = "u10"variable_name2 = "v10"ini_num = 34 end_num = 35 num_processes = 50 #era5_hourly_wind_compose_array((inpath, outpath, variable_name1, variable_name2, ini_num, end_num))num_processes = min(50, os.cpu_count()) total_years = end_num - ini_numyears_per_process = total_years // num_processesargs_list = []for i in range(num_processes):start = ini_num + i * years_per_processend = start + years_per_process if i < num_processes - 1 else end_numargs_list.append((inpath, outpath, variable_name1, variable_name2, start, end))# 使用进程池并行处理with mul.Pool(processes=num_processes) as pool:pool.map(era5_hourly_wind_compose_array, args_list)
转tif
import os
import xarray as xr
import pandas as pd
import multiprocessing as mul
import numpy as np
from rasterio.transform import from_origin
import rasteriodef save_as_geotiff(data_array, lons, lats, filename):"""将数据保存为 GeoTIFF 文件"""# 计算仿射变换参数transform = from_origin(lons.min(), # 左边界经度lats.max(), # 上边界纬度(lons.max() - lons.min()) / len(lons), # 经度分辨率(lats.max() - lats.min()) / len(lats) # 纬度分辨率)# 保存为 GeoTIFFwith rasterio.open(filename,'w',driver='GTiff',height=len(lats),width=len(lons),count=1,dtype=data_array.dtype,crs='EPSG:4326', # WGS84 坐标系transform=transform,) as dst:dst.write(data_array, 1) # 写入数据到第1波段def era5_hourly_wind_compose_array(args):inpath, outpath, variable_name1, variable_name2, start_year, end_year = argsfor year in range(start_year, end_year + 1):year_dir = "y" + str(year)year_path = os.path.join(inpath, year_dir)# 检查年份目录是否存在if not os.path.exists(year_path):print(f"Warning: Directory {year_path} does not exist, skipping...")continue# 读取文件并按名称排序nc_files = sorted([f for f in os.listdir(year_path) if f.endswith('.nc')])# 创建输出年份子文件夹output_year_path = os.path.join(outpath, year_dir)os.makedirs(output_year_path, exist_ok=True)for j in range(0, len(nc_files), 2):# 确保成对读取文件if j + 1 >= len(nc_files):print(f"Warning: Missing pair for file {nc_files[j]} in {year_path}")continuefile_u = os.path.join(year_path, nc_files[j])file_v = os.path.join(year_path, nc_files[j+1])try:data1 = xr.open_dataset(file_u)data2 = xr.open_dataset(file_v)time = data1['valid_time']time_pd = time.to_pandas()data_var1 = data1[variable_name1]data_var2 = data2[variable_name2]for k in range(len(data_var1)):# 计算矢量风速data_arr1 = data_var1[k].valuesdata_arr2 = data_var2[k].valueswind_speed = np.sqrt(data_arr1**2 + data_arr2**2)# 获取经纬度坐标lons = data_var1.longitude.valueslats = data_var1.latitude.values# 构建输出文件名outname = os.path.join(output_year_path, f"wind_{str(time_pd.index[k])[:10]}_{str(time_pd.index[k])[11:13]}.tif")# 保存为 GeoTIFFsave_as_geotiff(wind_speed, lons, lats, outname)print(f"{outname} has been converted!")except Exception as e:print(f"Error processing files {file_u} and {file_v}: {str(e)}")continueif __name__ == "__main__":inpath = r"inpath/"outpath = r"outpath/tif"variable_name1 = "u10"variable_name2 = "v10"start_year = 2015 end_year = 2016 num_processes = min(50, os.cpu_count()) # 使用CPU核心数或50中的较小值# 计算每个进程处理的年数total_years = end_year - start_year + 1years_per_process = max(1, total_years // num_processes)# 创建参数列表args_list = []for i in range(num_processes):current_start = start_year + i * years_per_processcurrent_end = min(current_start + years_per_process - 1, end_year)# 确保最后一个进程处理剩余的所有年份if i == num_processes - 1:current_end = end_yearif current_start > end_year:breakargs_list.append((inpath, outpath, variable_name1, variable_name2, current_start, current_end))# 使用进程池并行处理with mul.Pool(processes=num_processes) as pool:pool.map(era5_hourly_wind_compose_array, args_list)