当前位置: 首页 > news >正文

Spark和Spring整合处理离线数据

如果你比较熟悉JavaWeb应用开发,那么对Spring框架一定不陌生,并且JavaWeb通常是基于SSM搭起的架构,主要用Java语言开发。但是开发Spark程序,Scala语言往往必不可少。

众所周知,Scala如同Java一样,都是运行在JVM上的,所以它具有很多Java语言的特性,同时作为函数式编程语言,又具有自己独特的特性,实际应用中除了要结合业务场景,还要对Scala语言的特性有深入了解。

如果想像使用Java语言一样,使用Scala来利用Spring框架特性、并结合Spark来处理离线数据,应该怎么做呢?

本篇文章,通过详细的示例代码,介绍上述场景的具体实现,大家如果有类似需求,可以根据实际情况做调整。

1.定义一个程序启动入口

object Bootstrap {private val log = LoggerFactory.getLogger(Bootstrap.getClass)//指定配置文件如log4j的路径val ConfFileName = "conf"val ConfigurePath = new File("").getAbsolutePath.substring(0, if (new File("").getAbsolutePath.lastIndexOf("lib") == -1) 0else new File("").getAbsolutePath.lastIndexOf("lib")) + this.ConfFileName + File.separator//存放实现了StatsTask的离线程序处理的类private val TASK_MAP = Map("WordCount" -> classOf[WordCount])def main(args: Array[String]): Unit = {//传入一些参数,比如要运行的离线处理程序类名、处理哪些时间的数据if (args.length < 1) {log.warn("args 参数异常!!!" + args.toBuffer)System.exit(1)}init(args)}def init(args: Array[String]) {try {SpringUtils.init(Array[String]("applicationContext.xml"))initLog4j()val className = args(0)// 实例化离线处理类val task = SpringUtils.getBean(TASK_MAP(className))args.length match {case 3 =>// 处理一段时间的每天离线数据val dtStart = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1))val dtEnd = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(2))val days = Days.daysBetween(dtStart, dtEnd).getDays + 1for (i <- 0 until days) {val etime = dtStart.plusDays(i).toString("yyyy-MM-dd")task.runTask(etime)log.info(s"JOB --> $className 已成功处理: $etime 的数据")}case 2 =>// 处理指定的某天离线数据val etime = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1)).toString("yyyy-MM-dd")task.runTask(etime)log.info(s"JOB --> $className 已成功处理: $etime 的数据")case 1 =>// 处理前一天离线数据val etime = DateTime.now().minusDays(1).toString("yyyy-MM-dd")task.runTask(etime)log.info(s"JOB --> $className 已成功处理: $etime 的数据")case _ => println("执行失败 args参数:" + args.toBuffer)}} catch {case e: Exception =>println("执行失败 args参数:" + args.toBuffer)e.printStackTrace()}// 初始化log4jdef initLog4j() {val fileName = ConfigurePath + "log4j.properties"if (new File(fileName).exists) {PropertyConfigurator.configure(fileName)log.info("日志log4j已经启动")}}}
}

2.加载Spring配置文件工具类

object SpringUtils {private var context: ClassPathXmlApplicationContext = _def getBean(name: String): Any = context.getBean(name)def getBean[T](name: String, classObj: Class[T]): T = context.getBean(name, classObj)def getBean[T](_class: Class[T]): T = context.getBean(_class)def init(springXml: Array[String]): Unit = {if (springXml == null || springXml.isEmpty) {trythrow new Exception("springXml 不可为空")catch {case e: Exception => e.printStackTrace()}}context = new ClassPathXmlApplicationContext(springXml(0))context.start()}}

3.Spring配置文件applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context  http://www.springframework.org/schema/context/spring-context-4.0.xsd"><!-- 配置包扫描 --><context:component-scan base-package="com.bigdata.stats"/></beans>

4.定义一个trait,作为离线程序的公共"父类"

trait StatsTask extends Serializable {//"子类"继承StatsTask重写该方法实现自己的业务处理逻辑 def runTask(etime: String)
}
5.继承StatsTask的离线处理类
//不要忘记添加 @Component ,否则无法利用Spring对WordCount进行实例化
@Component
class WordCount extends StatsTask {override def runTask(etime: String): Unit = {val sparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate()import sparkSession.implicits._val words = sparkSession.read.textFile("/Users/BigData/Documents/data/wordcount.txt").flatMap(_.split(" ")).toDF("word")words.createOrReplaceTempView("wordcount")val df = sparkSession.sql("select word, count(*) count from wordcount group by word")df.show()}
}

更多干货抢先看: 世界格局的演变:一场“热闹非凡”的历史大戏

http://www.dtcms.com/a/361653.html

相关文章:

  • 在idea当中git的基础使用
  • Ansible变量与机密管理总结
  • 人工智能学习:什么是NLP自然语言处理
  • 【自记录】Ubuntu20.04下Python自编译
  • 全栈智算系列直播 | 智算中心对网络的需求与应对策略(上)
  • 基于FPGA的多协议视频传输IP方案
  • 【系统架构师设计(8)】需求分析之 SysML系统建模语言:从软件工程到系统工程的跨越
  • 硬件开发_基于Zigee组网的果园养殖监控系统
  • 简单高效的“色差斑块”匀色、水体修补、地物修复技巧
  • 51.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--新增功能--登录注册扩展
  • 开源项目_CN版金融分析工具TradingAgents
  • Linux权限详解:从基础到实践
  • Selenium 4 文件上传和下载操作指南
  • kubernetes应用的包管理Helm工具
  • MySql blob转string
  • 15693协议ICODE SLI 系列标签应用场景说明及读、写、密钥认证操作Qt c++源码,支持统信、麒麟等国产Linux系统
  • 【Pycharm】Pychram软件工具栏Git和VCS切换
  • 【数据可视化-102】苏州大学招生计划全解析:数据可视化的五大维度
  • 从零开始实现Shell | Linux进程调度实战
  • AI时代SEO关键词实战解析
  • Scala协变、逆变、上界/下界、隐式参数、隐式转换
  • daily notes[7]
  • Windows系统下如何配置和使用jfrog.exe
  • Ansible变量的定义与使用
  • docker 网络配置
  • MJ Prompt Tool-好用的Midjourney提示词工具
  • uniApp 混合开发全指南:原生与跨端的协同方案
  • 机器学习通关秘籍|Day 05:过拟合和欠拟合、正则化、岭回归、拉索回归、逻辑回归、Kmeans聚类
  • ChatGLM-6B全流程部署:环境搭建→模型加载→API调用(附避坑指南)
  • 【项目思维】这是一份嵌入式软件开发的大纲(简化版)