当前位置: 首页 > wzjs >正文

网站防篡改 测试 怎么做怎么做网站教程

网站防篡改 测试 怎么做,怎么做网站教程,网站服务内容填网站建设可以,如何建网络营销网站Kafka和Spark-Streaming 一、Kafka 1、Kafka和Flume的整合 ① 需求1:利用flume监控某目录中新生成的文件,将监控到的变更数据发送给kafka,kafka将收到的数据打印到控制台: 在flume/conf下添加.conf文件, vi flume…

Kafka和Spark-Streaming

一、Kafka

1、Kafka和Flume的整合

① 需求1:利用flume监控某目录中新生成的文件,将监控到的变更数据发送给kafka,kafka将收到的数据打印到控制台:

在flume/conf下添加.conf文件,

vi flume-kafka.conf 

# 定义 Agent 组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# 配置 Source(监控目录)

a1.sources.r1.type=spooldir

a1.sources.r1.spoolDir=/root/flume-kafka/

a1.sources.r1.inputCharset=utf-8

# 配置 Sink(写入 Kafka)

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

#指定写入数据到哪一个topic

a1.sinks.k1.kafka.topic=testTopic

#指定写入数据到哪一个集群

a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

#指定写入批次

a1.sinks.k1.kafka.flumeBatchSize=20

#指定acks机制

a1.sinks.k1.kafka.producer.acks=1

# 配置 Channel(内存缓冲)

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

# 最大存储 1000 个 Event

a1.channels.c1.transactionCapacity=100

# 每次事务处理 100 个 Event

 

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

在指定目录之下创建文件夹: 

kafka中创建topic:

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 3

启动flume:

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

启动kafka消费者,验证数据写入成功

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node02:9029,node03:9092 --from-beginning

 

新增测试数据:

echo "hello flume,hello kafka" >> /root/flume-kafka/1.txt

flume:

Kafka消费者: 

② 需求2:Kafka生产者生成的数据利用Flume进行采集,将采集到的数据打印到Flume的控制台上。

vi kafka-flume.conf

# 定义 Agent 组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# 将 Flume Source 设置为 Kafka 消费者,从指定 Kafka 主题拉取数据。

a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource

#指定zookeeper集群地址

a1.sources.r1.zookeepers=node01:2181,node02:2181,node03:2181

#指定kafka集群地址

a1.sources.r1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

#指定生成消息的topic

a1.sources.r1.kafka.topics=testTopic

# 将 Flume 传输的数据内容直接打印到日志中,

a1.sinks.k1.type=logger

# 配置 Channel(内存缓冲)

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transcationCapacity=100

 

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

 

启动Kafka生产者

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic testTopic

启动Flume

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console

在生产者中写入数据 

 

Flume中采集到数据

 

2、Kafka和SparkStreaming的整合

① 导包。

<dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-streaming-kafka_2.11</artifactId>

            <version>1.6.2</version>

        </dependency>

② 代码实现。

def main(args: Array[String]): Unit = {

  val conf = new SparkConf()

  .setNode01("local[*]")

  .setAppName(this.getClass.getSimpleName)

  val ssc= new StreamingContext(conf,Seconds(2))

 

  // kafka的参数配置

  val kafkaParams = Map[String, Object](

    "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",

    "key.deserializer" -> classOf[StringDeserializer],

    "value.deserializer" -> classOf[StringDeserializer],

    "group.id" -> "hello_topic_group",

    "auto.offset.reset" -> "earliest",

    "enable.auto.commit" -> (false: java.lang.Boolean)

  )

 

  val topics = Array("helloTopic3")

//指定泛型的约定[String, String] key value

  val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](

    ssc,

    PreferConsistent,

    Subscribe[String, String](topics, kafkaParams)

  )

 

  stream.foreachRDD(rdd=>{

    rdd.foreach(println)

  })

  ssc.start()

  ssc.awaitTermination()

}

③ 利用Redis维护偏移量。使用Spark消费Kafka中的数据。

val config = ConfigFactory.load()

val conf = new SparkConf()

  .setNode01("local[*]")

  .setAppName(this.getClass.getSimpleName)

val ssc = new StreamingContext(conf, Seconds(3))

 

val groupId = "hello_topic_group"

val topic = "helloTopic7"

val topicArr = Array(topic)

 

val kafkaParams = Map[String, Object](

  "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",

  "key.deserializer" -> classOf[StringDeserializer],

  "value.deserializer" -> classOf[StringDeserializer],

  "group.id" -> groupId,

  "auto.offset.reset" -> "earliest",

  // 是否可以自动提交偏移量 自定义

  "enable.auto.commit" -> (false: java.lang.Boolean)

)

 

// 需要设置偏移量的值

val offsets = mutable.HashMap[TopicPartition, Long]()

// 从redis中获取到值

val jedis1 = JedisPoolUtils.getJedis()

val allPO: util.Map[String, String] = jedis1.hgetAll(groupId + "-" + topic)

 

// 导入转换

import scala.collection.JavaConversions._

for(i<- allPO){

  // 主题 和分区 -> offset

  offsets += (new TopicPartition(topic,i._1.toInt) -> i._2.toLong)

}

 

 

val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](

  ssc,

  LocationStrategies.PreferConsistent,

  Subscribe[String, String](topicArr, kafkaParams, offsets)

)

 

stream.foreachRDD(rdd => {

  // rdd ConsumerRecord[String, String]

val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val result = rdd.map(_.value()).map((_, 1)).reduceByKey(_ + _)

  result.foreachPartition(it => {

    val jedis = JedisPoolUtils.getJedis()

    it.foreach(tp => jedis.hincrBy("streamkfkwc", tp._1, tp._2))

    // 等迭代器中的数据,全部完成之后,再关

    jedis.close()

  })

 

  // 把偏移量的Array 写入到redis中

  val jedis = JedisPoolUtils.getJedis()

  ranges.foreach(t => {

    jedis.hset(groupId + "-" + t.topic, t.partition.toString, t.untilOffset + "")

  })

  jedis.close()

})

ssc.start()

ssc.awaitTermination()

 

二、Spark-Streaming核心编程(三)

DStream转换

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。

 

1、无状态转化操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。

注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加

import StreamingContext._才能在 Scala 中使用。 

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。

例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。 

 

1.1、Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")

val ssc = new StreamingContext(sparkConf,Seconds(3))

 

val lineDStream :ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)

val wordAndCountDStream :DStream[(String,Int)] = lineDStream.transform(rdd => {

  val words :RDD[String] = rdd.flatMap(_.split(" "))

  val wordAndOne :RDD[(String,Int)] = words.map((_,1))

  val value :RDD[(String,Int)] = wordAndOne.reduceByKey(_+_)

  value

})

wordAndCountDStream.print()

 

ssc.start()

ssc.awaitTermination()

 

1.2、join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("join")

val ssc = new StreamingContext(sparkConf,Seconds(3))

 

val lineDStream1 :ReceiverInputDStream[String] = ssc.

  socketTextStream("node01",9999)

val lineDStream2 :ReceiverInputDStream[String] = ssc.

  socketTextStream("node02",8888)

 

val wordToOneDStream :DStream[(String,Int)] = lineDStream1

  .flatMap(_.split(" ")).map((_,1))

 

val wordToADstream :DStream[(String,String)] = lineDStream2

  .flatMap(_.split(" ")).map((_,"a"))

 

val joinDStream :DStream[(String,(Int,String))]=wordToOneDStream

  .join(wordToADstream)

 

joinDStream.print()

 

ssc.start()

ssc.awaitTermination()

http://www.dtcms.com/wzjs/252845.html

相关文章:

  • 新开神途手游发布网站网络营销的四个特点
  • anaconda可以做网站吗网络营销策划书的结构是什么
  • 做网站二级域名随便用吗aso安卓优化公司
  • 哪些网站可以做简历seo网络推广经理招聘
  • 网站建设方案是什么意思怎么制作网站?
  • 做网站更赚钱吗建设网站推广
  • 梅州建站联系方式如何创建自己的小程序
  • 邵东网站建设成人大专
  • 淮北公司做网站互联网营销顾问
  • 陕西做网站公司有哪些网络宣传
  • cms代码做网站视频广告联盟平台
  • 网站建设思路方案建立网站的详细步骤
  • wordpress网站如何引流郑州做网络优化的公司
  • 电子商务网站建设的简要任务执行书小红书信息流广告
  • 郑州营销网站公司地址app推广有哪些渠道
  • 企业vi设计需求企业seo网络推广
  • 西安app开发公司排名seo优化推广工程师
  • 杨浦手机网站建设友情链接
  • 做效果图去哪个网站接活关键词挖掘网站
  • 深圳国内设计网站app投放推广
  • h5网站开发框架百度 营销怎么收费
  • 嘉兴网站建设托管网站seo分析工具
  • html+css网站模板免费网络推广软件有哪些
  • 网站开发如何处理兼容性问题在线网站流量查询
  • 网易免费企业邮箱登录入口杭州新站整站seo
  • 淘客网站是怎么做的互联网推广方案
  • 昆山高端网站建设机构江门百度seo公司
  • 东莞微信网站建设怎样免费的域名和网站
  • 建旅游网站多少钱网址查询ip地址
  • 如何自己制作公众号网站优化关键词公司