当前位置: 首页 > news >正文

spark3 streaming 读kafka写es

1. 代码

package data_import
import org.apache.spark.sql.{DataFrame, Row, SparkSession, SaveMode}
import org.apache.spark.sql.types.{ArrayType, DoubleType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.commons.lang3.exception.ExceptionUtils
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import org.apache.spark._
import org.apache.spark.streaming._
import alarm.Alarm
import org.elasticsearch.spark.sql._/** 用户特征记录*/
object KafkaImport {case class Parram(env: String, dst_table: String, kafka_address: String, kafka_topic: String, kafka_group_id: String, trigger_time: String)def main(args: Array[String]): Unit = {val param: Parram = utils.parseParam[Parram](args)println("args:" + param.toString())try {processTable(param)Alarm.alarm(env = param.env,level = Alarm.LevelInfo,content = "UserFeature Success")} catch {case e: Exception =>val msg = s"UserFeature handle failed,Error message:${e.getClass()}:${e.getMessage()}===>${ExceptionUtils.getStackTrace(e)}==>argsMap:${param.toString()}"println(msg)Alarm.alarm(env = param.env, level = Alarm.LevelWarning, content = msg)}}def processTable(param: Parram): Unit = {val conf = new SparkConf().setAppName("appName").setMaster("yarn")val ssc = new StreamingContext(conf, Seconds(param.trigger_time.toInt))val ss = SparkSession.builder.appName("KafkaImport").config("spark.sql.mergeSmallFiles.enabled", "true").config("spark.sql.mergeSmallFiles.threshold.avgSize", "true").config("spark.sql.mergeSmallFiles.maxSizePerTask", "true").config("es.net.http.auth.user", "elastic").config("es.net.http.auth.pass", "YX2021@greendog").getOrCreate()val kafkaParams = Map[String, Object]("bootstrap.servers" -> param.kafka_address,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> param.kafka_group_id,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array(param.kafka_topic)val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val schema: StructType = StructType(List(StructField("key", StringType, true),StructField("value", StringType, true)))stream.foreachRDD { rdd =>val data1 = rdd.map { record => Row(record.key, record.value) }val userData = ss.createDataFrame(data1, schema).withColumn("id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("app_id", get_json_object(col("value"), "$.ctx.app_id").cast(StringType)).withColumn("user_id", get_json_object(col("value"), "$.ctx.user_id").cast(StringType)).withColumn("session_id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("time", get_json_object(col("value"), "$.time").cast(LongType)).withColumn("datetime", getDayHourTime(col("time")).cast(TimestampType))userData.show()println("尝试连接ES...")val esDF = ss.read.format("org.elasticsearch.spark.sql").option("es.nodes", "192.168.145.43").option("es.port", "9200").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").load("test_saylo_user_feature_30033")println(s"索引中存在 ${esDF.count()} 条记录")userData.select(col("id"), col("session_id"), col("value"), col("app_id"), col("datetime")).filter(col("id").isNotNull && col("id") =!= "").write.option("es.nodes", "192.168.145.43").option("es.nodes.wan.only", "true").option("es.port", "9200").option("es.mapping.id", "id") // 替换为您的实际ID字段名// .option("es.mapping.type", "user_id:keyword") // 替换为您的实际ID字段名.mode("append").option("es.write.operation", "upsert").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").save("test_saylo_user_feature_30033")val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}ssc.start()// 等待停止ssc.awaitTermination()}private val getDayHourTime = udf((timestamp: Long) => {utils.getDayTime(timestamp)})
}

2. 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xverse.saylo.rec</groupId><artifactId>saylo_rec_data_offline_v2</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><scala.version>2.12.10</scala.version><spark.version>3.2.2</spark.version><jackson.version>2.14.0</jackson.version><shade.jar.name>${project.artifactId}-${project.version}-jar-with-dependencies.jar</shade.jar.name></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-30_2.12</artifactId><version>7.12.0</version></dependency><dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>  <!-- 或者你需要的版本 --></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.qcloud</groupId><artifactId>cos_api</artifactId><version>5.6.227</version></dependency><dependency><groupId>com.typesafe.play</groupId><artifactId>play-json_2.12</artifactId><version>2.9.2</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version>    <!-- Add Jackson dependencies --></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><!--    <dependency>--><!--      <groupId>org.apache.hadoop</groupId>--><!--      <artifactId>hadoop-client</artifactId>--><!--      <version>${spark.version}</version>--><!--    </dependency>--><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.6.1</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>3.6.1</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.64.0</version></dependency><!-- <dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency> --><dependency><groupId>org.json4s</groupId><artifactId>json4s-core_2.12</artifactId><version>3.6.6</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.6.3</version></dependency><dependency><groupId>com.tencentcloudapi</groupId><artifactId>tencentcloud-sdk-java-cls</artifactId><version>3.1.1174</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.tencentcloudapi.cls</groupId><artifactId>tencentcloud-cls-sdk-java</artifactId><version>1.0.15</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.6.0</version></extension></extensions><plugins><!--      <plugin>--><!--        <groupId>org.scala-tools</groupId>--><!--        <artifactId>maven-scala-plugin</artifactId>--><!--        <version>2.9.1</version>--><!--        <executions>--><!--          <execution>--><!--            <goals>--><!--              <goal>compile</goal>--><!--              <goal>testCompile</goal>--><!--            </goals>--><!--          </execution>--><!--        </executions>--><!--      </plugin>--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.5.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${shade.jar.name}</finalName><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><relocations><!-- <relocation><pattern>org.apache.commons</pattern><shadedPattern>com.acme.shaded.apachecommons</shadedPattern></relocation> --><relocation><pattern>com.google.protobuf</pattern><shadedPattern>my.project.shaded.protobuf</shadedPattern></relocation></relocations><createDependencyReducedPom>false</createDependencyReducedPom><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>my.Application</mainClass><manifestEntries><Implementation-Version>${version}</Implementation-Version><Main-Class>my.Application</Main-Class></manifestEntries></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /></transformers></configuration></execution></executions></plugin></plugins>
</build>
</project>

3. 注意事项

  1. ElasticSearch 7.x 默认不在支持指定索引类型
    如果es版本是7.+
    需注意
    save应写为
    .save(“test_saylo_user_feature_30033”)
    而不是
    .save(“test_saylo_user_feature_30033/docs”)
    否则会报类型转换错误。例如
    [user_id] cannot be changed from type [keyword] to [text]

  2. 依赖冲突
    找不到类

    Caused by: java.lang.ClassNotFoundException: com.acme.shaded.apachecommons.httpclient.HttpConnectionManager
    

在pom文件中手动加入,具体参加上面的pom文件

    <dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>   </dependency>
http://www.dtcms.com/a/273688.html

相关文章:

  • Google Benchmark 介绍和使用指南
  • 流批一体的“奥卡姆剃刀”:Apache Cloudberry 增量物化视图应用解析
  • CReFT-CAD 笔记 带标注工程图dxf,png数据集
  • 【EGSR2025】材质+扩散模型+神经网络相关论文整理随笔(四)
  • Jenkins 项目类型及配置项
  • FPGA实现SDI转LVDS视频发送,基于GTP+OSERDES2原语架构,提供工程源码和技术支持
  • 资源分享-FPS, 矩阵, 骨骼, 绘制, 自瞄, U3D, UE4逆向辅助实战视频教程
  • 飞算 JavaAI 深度体验:开启 Java 开发智能化新纪元
  • 【拓扑空间】示例及详解4
  • python的社区残障人士服务系统
  • 了解环网式 CAN 转光纤中继器
  • BPE(Byte Pair Encoding)分词算法
  • leetcode-hot100(283.移动零)
  • 政安晨【零基础玩转开源AI项目】ACE-Step —— 迈向音乐生成基础模型的重要一步:AI自动谱曲与自动演唱的免费开源框架部署实践
  • RLHF:人类反馈强化学习 | 对齐AI与人类价值观的核心引擎
  • python实现DoIP基本通信(收发报文)
  • 第十二章:网络编程
  • Typescript -字面量类型
  • Linux的基础I/O
  • 买小屏幕的时候注意避坑
  • [Java 17] 无模版动态生成 PDF:图片嵌入与动态表格渲染实战
  • Linux磁盘限速(Ubuntu24实测)
  • 算法学习笔记:17.蒙特卡洛算法 ——从原理到实战,涵盖 LeetCode 与考研 408 例题
  • cnpm exec v.s. npx
  • C语言常见面试知识点详解:从入门到精通
  • 亿级流量下的缓存架构设计:Redis+Caffeine多级缓存实战
  • Web安全 - 基于 SM2/SM4 的前后端国产加解密方案详解
  • Flutter优缺点
  • Java学习第三十二部分——异常
  • 【爬虫】- 爬虫原理及其入门