bigquery查询
注意我这里是多个项目,所以配置有两个项目id,通过业务去判断去查询哪个项目,这里google需要的权限如下:
需要查询bigquery,涉及项目:m-eu-firebase-prod、o-eeu-firebase-prod,每个项目
需要创建账号,需要权限如下:BigQuery Data Viewer和BigQueryJob User(sqli试运行使用),认
证文件目录/data/hanlP/reco/*****90.json,根据认证文件放置的位置修改nacos配置
pom依赖
<dependency><groupId>com.google.cloud</groupId><artifactId>google-cloud-bigquery</artifactId><version>2.24.4</version></dependency><dependency><groupId>com.google.auth</groupId><artifactId>google-auth-library-oauth2-http</artifactId><version>1.16.0</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>32.1.2-jre</version></dependency>
yaml
#google 认证
gcp:projects:primary:project-id: o-eu-firebase-betacredentials-file: /data/hanlP/reco/o-eu-firebase-beta-36a1150bb82d.jsonsecondary:project-id: m-eu-firebase-betacredentials-file: /data/hanlP/reco/m-eu-firebase-beta-b322a1b54b90.json
配置加载
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import lombok.Data;import java.util.Map;@Component
@ConfigurationProperties(prefix = "gcp")
@Data
public class GcpProjectsConfig {private Map<String, GcpProject> projects;@Datapublic static class GcpProject {private String projectId;private String credentialsFile;}
}
import java.io.FileInputStream;
import java.io.IOException;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;@Configuration
public class GcpMultiProjectConfig {@Autowiredprivate GcpProjectsConfig gcpProjectsConfig;@Bean("primaryStorage")BigQuery primaryStorage() throws IOException {GcpProjectsConfig.GcpProject primary = gcpProjectsConfig.getProjects().get("primary");GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream(primary.getCredentialsFile()));return BigQueryOptions.newBuilder().setCredentials(credentials).setProjectId(primary.getProjectId()).build().getService();}@Bean("secondaryStorage")BigQuery secondaryStorage() throws IOException {GcpProjectsConfig.GcpProject secondary = gcpProjectsConfig.getProjects().get("secondary");GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream(secondary.getCredentialsFile()));return BigQueryOptions.newBuilder().setCredentials(credentials).setProjectId(secondary.getProjectId()).build().getService();}
}
查询方法
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;import org.springframework.stereotype.Service;import com.lnxw.yr.service.BigQueryService;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableResult;import lombok.extern.slf4j.Slf4j;/*** BigQuery服务实现*/
@Service
@Slf4j
public class BigQueryServiceImpl implements BigQueryService {@Overridepublic boolean dryRunSql(BigQuery bigQuery, String sql) {try {// 创建查询配置QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).setDryRun(true) // 设置为试运行模式.build();// 创建 JobInfoJobInfo jobInfo = JobInfo.newBuilder(queryConfig).build();// 创建 Job 并获取统计信息Job job = bigQuery.create(jobInfo);JobStatistics.QueryStatistics stats = job.getStatistics();log.info("bigquery试运行结果:" + stats);return true;} catch (Exception e) {log.error("bigquery试运行异常, sql="+sql+"==error:"+e);return false;}}@Overridepublic List<Map<String, Object>> executeQuery(BigQuery bigQuery, String sql) {try {// 创建查询配置QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false) // 使用标准SQL.build();// 执行查询TableResult result = bigQuery.query(queryConfig);// 处理结果return processTableResult(result);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("查询执行被中断", e);} catch (Exception e) {throw new RuntimeException("执行查询失败: " + e.getMessage(), e);}}@Overridepublic void executeQueryAsStream(BigQuery bigQuery, String sql, int pageSize, Consumer<List<Map<String, Object>>> resultConsumer) {try {// 创建查询配置QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false) // 使用标准SQL.build();// 创建查询作业JobId jobId = JobId.of(UUID.randomUUID().toString());Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());// 等待作业完成queryJob = queryJob.waitFor();// 检查作业是否成功if (queryJob == null) {throw new RuntimeException("查询作业未完成");} else if (queryJob.getStatus().getError() != null) {throw new RuntimeException("查询作业失败: " + queryJob.getStatus().getError().toString());}// 分页获取结果TableResult result = queryJob.getQueryResults();Schema schema = result.getSchema();int rowsProcessed = 0;// 处理结果List<Map<String, Object>> page = new ArrayList<>();for (FieldValueList row : result.getValues()) {page.add(convertRowToMap(row, schema));rowsProcessed++;// 当达到页面大小时,发送数据并重置页面if (rowsProcessed >= pageSize) {resultConsumer.accept(page);page = new ArrayList<>();rowsProcessed = 0;}}// 处理剩余的行if (!page.isEmpty()) {resultConsumer.accept(page);}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("查询执行被中断", e);} catch (Exception e) {throw new RuntimeException("执行查询失败: " + e.getMessage(), e);}}/*** 处理表格结果* @param result 表格结果* @return 处理后的结果列表*/private List<Map<String, Object>> processTableResult(TableResult result) {List<Map<String, Object>> resultList = new ArrayList<>();Schema schema = result.getSchema();for (FieldValueList row : result.iterateAll()) {Map<String, Object> rowMap = convertRowToMap(row, schema);resultList.add(rowMap);}return resultList;}/*** 将行转换为Map* @param row 行数据* @param schema 结果模式* @return 转换后的Map*/private Map<String, Object> convertRowToMap(FieldValueList row, Schema schema) {Map<String, Object> rowMap = new HashMap<>();for (Field field : schema.getFields()) {String fieldName = field.getName();FieldValue value = row.get(fieldName);// 根据字段类型转换值if (value.isNull()) {rowMap.put(fieldName, null);} else {switch (field.getType().getStandardType()) {case BOOL:rowMap.put(fieldName, value.getBooleanValue());break;case INT64:rowMap.put(fieldName, value.getLongValue());break;case FLOAT64:rowMap.put(fieldName, value.getDoubleValue());break;case STRING:rowMap.put(fieldName, value.getStringValue());break;case TIMESTAMP:rowMap.put(fieldName, value.getTimestampValue());break;case DATE:rowMap.put(fieldName, value.getStringValue()); // 日期转为字符串break;default:rowMap.put(fieldName, value.getStringValue()); // 默认转为字符串}}}return rowMap;}
}