当前位置: 首页 > news >正文

bigquery查询

注意我这里是多个项目,所以配置有两个项目id,通过业务去判断去查询哪个项目,这里google需要的权限如下:

需要查询bigquery,涉及项目:m-eu-firebase-prod、o-eeu-firebase-prod,每个项目
需要创建账号,需要权限如下:BigQuery Data Viewer和BigQueryJob User(sqli试运行使用),认
证文件目录/data/hanlP/reco/*****90.json,根据认证文件放置的位置修改nacos配置

pom依赖

        <dependency><groupId>com.google.cloud</groupId><artifactId>google-cloud-bigquery</artifactId><version>2.24.4</version></dependency><dependency><groupId>com.google.auth</groupId><artifactId>google-auth-library-oauth2-http</artifactId><version>1.16.0</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>32.1.2-jre</version></dependency>

yaml

#google 认证
gcp:projects:primary:project-id: o-eu-firebase-betacredentials-file: /data/hanlP/reco/o-eu-firebase-beta-36a1150bb82d.jsonsecondary:project-id: m-eu-firebase-betacredentials-file: /data/hanlP/reco/m-eu-firebase-beta-b322a1b54b90.json

配置加载

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import lombok.Data;import java.util.Map;@Component
@ConfigurationProperties(prefix = "gcp")
@Data
public class GcpProjectsConfig {private Map<String, GcpProject> projects;@Datapublic static class GcpProject {private String projectId;private String credentialsFile;}
}
import java.io.FileInputStream;
import java.io.IOException;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;@Configuration
public class GcpMultiProjectConfig {@Autowiredprivate GcpProjectsConfig gcpProjectsConfig;@Bean("primaryStorage")BigQuery primaryStorage() throws IOException {GcpProjectsConfig.GcpProject primary = gcpProjectsConfig.getProjects().get("primary");GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream(primary.getCredentialsFile()));return BigQueryOptions.newBuilder().setCredentials(credentials).setProjectId(primary.getProjectId()).build().getService();}@Bean("secondaryStorage")BigQuery secondaryStorage() throws IOException {GcpProjectsConfig.GcpProject secondary = gcpProjectsConfig.getProjects().get("secondary");GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream(secondary.getCredentialsFile()));return BigQueryOptions.newBuilder().setCredentials(credentials).setProjectId(secondary.getProjectId()).build().getService();}
}

查询方法

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;import org.springframework.stereotype.Service;import com.lnxw.yr.service.BigQueryService;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableResult;import lombok.extern.slf4j.Slf4j;/*** BigQuery服务实现*/
@Service
@Slf4j
public class BigQueryServiceImpl implements BigQueryService {@Overridepublic boolean dryRunSql(BigQuery bigQuery, String sql) {try {// 创建查询配置QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).setDryRun(true) // 设置为试运行模式.build();// 创建 JobInfoJobInfo jobInfo = JobInfo.newBuilder(queryConfig).build();// 创建 Job 并获取统计信息Job job = bigQuery.create(jobInfo);JobStatistics.QueryStatistics stats = job.getStatistics();log.info("bigquery试运行结果:" + stats);return true;} catch (Exception e) {log.error("bigquery试运行异常, sql="+sql+"==error:"+e);return false;}}@Overridepublic List<Map<String, Object>> executeQuery(BigQuery bigQuery, String sql) {try {// 创建查询配置QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false) // 使用标准SQL.build();// 执行查询TableResult result = bigQuery.query(queryConfig);// 处理结果return processTableResult(result);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("查询执行被中断", e);} catch (Exception e) {throw new RuntimeException("执行查询失败: " + e.getMessage(), e);}}@Overridepublic void executeQueryAsStream(BigQuery bigQuery, String sql, int pageSize, Consumer<List<Map<String, Object>>> resultConsumer) {try {// 创建查询配置QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false) // 使用标准SQL.build();// 创建查询作业JobId jobId = JobId.of(UUID.randomUUID().toString());Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());// 等待作业完成queryJob = queryJob.waitFor();// 检查作业是否成功if (queryJob == null) {throw new RuntimeException("查询作业未完成");} else if (queryJob.getStatus().getError() != null) {throw new RuntimeException("查询作业失败: " + queryJob.getStatus().getError().toString());}// 分页获取结果TableResult result = queryJob.getQueryResults();Schema schema = result.getSchema();int rowsProcessed = 0;// 处理结果List<Map<String, Object>> page = new ArrayList<>();for (FieldValueList row : result.getValues()) {page.add(convertRowToMap(row, schema));rowsProcessed++;// 当达到页面大小时,发送数据并重置页面if (rowsProcessed >= pageSize) {resultConsumer.accept(page);page = new ArrayList<>();rowsProcessed = 0;}}// 处理剩余的行if (!page.isEmpty()) {resultConsumer.accept(page);}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("查询执行被中断", e);} catch (Exception e) {throw new RuntimeException("执行查询失败: " + e.getMessage(), e);}}/*** 处理表格结果* @param result 表格结果* @return 处理后的结果列表*/private List<Map<String, Object>> processTableResult(TableResult result) {List<Map<String, Object>> resultList = new ArrayList<>();Schema schema = result.getSchema();for (FieldValueList row : result.iterateAll()) {Map<String, Object> rowMap = convertRowToMap(row, schema);resultList.add(rowMap);}return resultList;}/*** 将行转换为Map* @param row 行数据* @param schema 结果模式* @return 转换后的Map*/private Map<String, Object> convertRowToMap(FieldValueList row, Schema schema) {Map<String, Object> rowMap = new HashMap<>();for (Field field : schema.getFields()) {String fieldName = field.getName();FieldValue value = row.get(fieldName);// 根据字段类型转换值if (value.isNull()) {rowMap.put(fieldName, null);} else {switch (field.getType().getStandardType()) {case BOOL:rowMap.put(fieldName, value.getBooleanValue());break;case INT64:rowMap.put(fieldName, value.getLongValue());break;case FLOAT64:rowMap.put(fieldName, value.getDoubleValue());break;case STRING:rowMap.put(fieldName, value.getStringValue());break;case TIMESTAMP:rowMap.put(fieldName, value.getTimestampValue());break;case DATE:rowMap.put(fieldName, value.getStringValue()); // 日期转为字符串break;default:rowMap.put(fieldName, value.getStringValue()); // 默认转为字符串}}}return rowMap;}
} 

http://www.dtcms.com/a/428838.html

相关文章:

  • php餐饮美食店网站源码 生成html太原百度关键词推广
  • 国内互联网建站公司排名中铁建设集团有限公司什么级别
  • 蚌埠企业网站建设套餐手机网址进不去怎么设置
  • Android Studio Logcat中 杀死应用
  • 一次由Flowable定时器引发的“401”悬案:深入解析异步线程中的Token传递
  • 龙华哪有做网站设计手加工外包加工网
  • C语言循环与函数详解
  • 昆明做网站那家好建设网站商城
  • seo的网站建设建站基础:wordpress安装教程图解 - 天缘博客
  • centos 7.2 做网站婚礼请柬电子版免费制作app
  • 宜兴网站建设公司qq推广网站
  • 网站建设 设备推广app软件
  • 【动态规划:子数组/子串系列】单词拆分 环绕字符串中唯⼀的子字符串
  • 做网站服务器要什么系统推广怎么推广
  • qq网站登录北京网站优化推广分析
  • CNN的可视化:特征图与卷积核可视化方法(代码实现)
  • 读写RPLMN等APDU log显示为FF FF FF……问题研究
  • CKAD-CN 考试知识点分享(8) 升级与回滚
  • 网站建设公司该如何选择服务称赞的项目管理平台
  • 哪里做网站比较快wordpress主题 视频
  • 网站建设实训总结范文品牌市场营销策略
  • 网站界面设计软件网站备案去哪注销
  • 网页设计感十足的网站移动开发软件
  • LangChain 中 “附加 OpenAI 函数” 和 “附加 OpenAI 工具”
  • 山东住房和城乡建设厅网站登陆平面设计必学软件
  • 凡客建站官网登录入口Wordpress仿制网站
  • 网站开发技术的发展开发者门户网站是什么意思
  • GIS 相关基础知识
  • 陕西有色建设有限公司官方网站花生壳动态域名做网站
  • 企业网站seo平台不孕不育网站建设总结