当前位置: 首页 > wzjs >正文

信用网站建设原则竞价推广渠道

信用网站建设原则,竞价推广渠道,视频网站如何做盗链,免费做销售网站有哪些目录 1、安装kafka 1.1确认jdk是否安装OK 1.2下载&&安装kafka 1.3验证kafka 2、连接kafka 3、在java中操作kafka 1、安装kafka 1.1确认jdk是否安装Ok java -version 1.2下载&&安装kafka wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4…

目录

1、安装kafka

1.1确认jdk是否安装OK

1.2下载&&安装kafka

1.3验证kafka

2、连接kafka

3、在java中操作kafka


1、安装kafka

1.1确认jdk是否安装Ok

java -version

1.2下载&&安装kafka

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

mkdir zk-3.4.14
tar -xvzf zookeeper-3.4.14.tar.gz -C /home/lighthouse/zk-3.4.14

配置

进入该目录下的conf文件夹中。

zoo_sample.cfg是一个配置文件的样本,需要将这个文件复制并重命名为zoo.cfg:   

cp zoo_sample.cfg zoo.cfg

修改配置文件:

vi zoo.cfg

配置环境变量, 使用vim打开etc目录下的profile文件:vim /etc/profile

在末尾配置环境变量,这里需要写入的是:

export ZOOKEEPER_HOME=/home/lighthouse/zk-3.4.14/zookeeper-3.4.14

export PATH=$PATH:$ZOOKEEPER_HOME/bin

写入信息并保存后,需要使配置文件生效,所用的命令为:source /etc/profile

启动zookeeper, 由于配置了环境变量,可以在系统中的任意目录执行启动zookeeper的命令,其执行的实际上是zookeeper的bin文件夹中的zkServer.sh的命令:zkServer.sh start

Zookeeper启动成功:

下载kafka2.2.1:

wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz

解压:

tar -zxvf kafka_2.12-2.2.1.tgz

启动:

nohup bin/kafka-server-start.sh config/server.properties > output.txt &

其中server.properties文件内容如下:

1.3验证kafka

执行命令:bin/kafka-topics.sh –version

看不到版本号

2、连接kafka,并执行命令

2.1创建topic:执行命令:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2.2查看topic:执行命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181 

bin/kafka-topics.sh --list --zookeeper 43.138.0.199:2181

2.3使用kafka-console-producer.sh 发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

 bin/kafka-console-producer.sh --broker-list 43.138.0.199:9092 --topic test

2.4使用kafka-console-consumer.sh消费消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

bin/kafka-console-consumer.sh --bootstrap-server 43.138.0.199:9092 --topic test --from-beginning

3、在java中操作kafka

pom.xml增加如下依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.0.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.1</version></dependency></dependencies>

Producer.java代码如下:

package com.hmblogs.backend.util;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;import java.util.Properties;/************************************************************ @ClassName   : Producer.java****** @author      : milo ^ ^****** @date        : 2018 03 14 11:34****** @version     : v1.0.x*******************************************************/
public class Producer {static Logger log = Logger.getLogger(Producer.class);private static final String TOPIC = "test";private static final String BROKER_LIST = "43.138.0.199:9092";private static KafkaProducer<String,String> producer = null;/*初始化生产者*/static {Properties configs = initConfig();producer = new KafkaProducer<String, String>(configs);}/*初始化配置*/private static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());return properties;}public static void main(String[] args) throws InterruptedException {//消息实体ProducerRecord<String , String> record = null;for (int i = 0; i < 3; i++) {record = new ProducerRecord<String, String>(TOPIC, "value"+(int)(10*(Math.random())));//发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (null != e){System.out.println("send error" + e.getMessage());}else {System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));}}});}producer.close();}
}

执行报错,如下:

send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time
send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time
send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time

搜索资料,尝试解决

既然本地调不通,那我就发到linux机器里面去调试

改成ProducerController

代码如下:

package com.hmblogs.backend.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Properties;@RestController
@Slf4j
public class ProducerController {private static final String TOPIC = "test";private static final String BROKER_LIST = "43.138.0.199:9092";private static KafkaProducer<String,String> producer = null;/*** sendMessage* @return*/@GetMapping(value = "/sendMessage")public void redisTestLock(){log.info("sendMessage");Properties configs = initConfig();producer = new KafkaProducer<String, String>(configs);send();}/*初始化配置*/private Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());return properties;}private void send() {//消息实体ProducerRecord<String , String> record = null;for (int i = 0; i < 3; i++) {record = new ProducerRecord<String, String>(TOPIC, "value:"+(int)(10*(Math.random())));//发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (null != e){log.error("send error:" + e.getMessage());}else {log.info(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));}}});}producer.close();}
}

 执行clean install命令,打包成jar文件,上传到云主机里,然后启动

java -jar hmblogs.jar

访问GET接口

http://43.138.0.199:8081/sendMessage

查看hmblogs服务的日志 

 然后,重新查看该topic的消息

消费的验证也改成通过调用接口来实现

ConsumerController代码如下:

package com.hmblogs.backend.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Collections;
import java.util.Properties;@RestController
@Slf4j
public class ConsumerController {private static final String TOPIC = "test";private static final String BROKER_LIST = "43.138.0.199:9092";private static KafkaConsumer<String,String> consumer = null;/*** consumeMessage* @return*/@GetMapping(value = "/consumeMessage")public void consumeKafkaMessage(){log.info("consumeMessage");Properties configs = initConfig();consumer = new KafkaConsumer<String, String>(configs);consumer.subscribe(Collections.singletonList(TOPIC));send();}private static Properties initConfig(){Properties properties = new Properties();properties.put("bootstrap.servers",BROKER_LIST);properties.put("group.id","0");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("enable.auto.commit", "true");properties.setProperty("auto.offset.reset", "earliest");return properties;}private void send() {while (true) {ConsumerRecords<String, String> records = consumer.poll(10);for (ConsumerRecord<String, String> record : records) {log.info("Received message: key={}, value={}, partition={}, offset={}\n",record.key(), record.value(), record.partition(), record.offset());}}}
}

 然后clean install,发到云主机里,调用如下GET接口:

http://43.138.0.199:8081/consumeMessage

hmblogs.log日志显示内容如下截图:

http://www.dtcms.com/wzjs/425853.html

相关文章:

  • 中国工程造价网百度快照优化的优势是什么
  • 淘宝建设网站靠谱吗山西seo优化公司
  • 佛山做网站哪家好寻找外贸客户的网站
  • 成都极客联盟网站建设公司个人网站制作教程
  • 企业网站建设基本流程seo外链资源
  • 9i网站建设网络营销师证书有用吗
  • 网站设计模板 英文翻译长春网站建设团队
  • django网站开发论文十大营销策划公司排名
  • 天津外包加工网深圳关键词优化公司哪家好
  • 建设淘宝网站需要多少钱湛江seo推广外包
  • 曲阜做网站的公司百度网盘客服人工电话95188
  • 杭州网站运营短视频搜索seo
  • 淡水网站建设公司网页优化包括什么
  • 互联网招聘网站网络营销有几种方式
  • wordpress 单本小说站网站推广是什么
  • 营销型网站的建设流程图seo软件资源
  • 成都网站建设好的公司大连seo按天付费
  • 做网站图标企业查询软件
  • 网站开发流程分析企业网站推广渠道有哪些
  • 做外汇需要关注哪几个网站宁波seo教程推广平台
  • Wordpress网站删除多余主题百度搜索推广采取
  • 做网站用php如何学习推广注册app赚钱平台
  • 英文互动网站建设搜索关键词热度
  • 河南做网站送记账seo管理是什么
  • wordpress 特效代码网站的seo
  • 杭州做网站电话开源cms建站系统
  • 企业展厅装修seo研究协会
  • 银行的网站怎么做优化seo软件
  • vs2013做简单的网站学生没钱怎么开网店
  • 辽宁省住房建设厅网站深圳网络营销推广