当前位置: 首页 > wzjs >正文

网站建设公司价格差别我要开网店

网站建设公司价格差别,我要开网店,网版制作过程,手机网站模板 怎样做目录 1、安装kafka 1.1确认jdk是否安装OK 1.2下载&&安装kafka 1.3验证kafka 2、连接kafka 3、在java中操作kafka 1、安装kafka 1.1确认jdk是否安装Ok java -version 1.2下载&&安装kafka wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4…

目录

1、安装kafka

1.1确认jdk是否安装OK

1.2下载&&安装kafka

1.3验证kafka

2、连接kafka

3、在java中操作kafka


1、安装kafka

1.1确认jdk是否安装Ok

java -version

1.2下载&&安装kafka

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

mkdir zk-3.4.14
tar -xvzf zookeeper-3.4.14.tar.gz -C /home/lighthouse/zk-3.4.14

配置

进入该目录下的conf文件夹中。

zoo_sample.cfg是一个配置文件的样本,需要将这个文件复制并重命名为zoo.cfg:   

cp zoo_sample.cfg zoo.cfg

修改配置文件:

vi zoo.cfg

配置环境变量, 使用vim打开etc目录下的profile文件:vim /etc/profile

在末尾配置环境变量,这里需要写入的是:

export ZOOKEEPER_HOME=/home/lighthouse/zk-3.4.14/zookeeper-3.4.14

export PATH=$PATH:$ZOOKEEPER_HOME/bin

写入信息并保存后,需要使配置文件生效,所用的命令为:source /etc/profile

启动zookeeper, 由于配置了环境变量,可以在系统中的任意目录执行启动zookeeper的命令,其执行的实际上是zookeeper的bin文件夹中的zkServer.sh的命令:zkServer.sh start

Zookeeper启动成功:

下载kafka2.2.1:

wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz

解压:

tar -zxvf kafka_2.12-2.2.1.tgz

启动:

nohup bin/kafka-server-start.sh config/server.properties > output.txt &

其中server.properties文件内容如下:

1.3验证kafka

执行命令:bin/kafka-topics.sh –version

看不到版本号

2、连接kafka,并执行命令

2.1创建topic:执行命令:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2.2查看topic:执行命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181 

bin/kafka-topics.sh --list --zookeeper 43.138.0.199:2181

2.3使用kafka-console-producer.sh 发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

 bin/kafka-console-producer.sh --broker-list 43.138.0.199:9092 --topic test

2.4使用kafka-console-consumer.sh消费消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

bin/kafka-console-consumer.sh --bootstrap-server 43.138.0.199:9092 --topic test --from-beginning

3、在java中操作kafka

pom.xml增加如下依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.0.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.1</version></dependency></dependencies>

Producer.java代码如下:

package com.hmblogs.backend.util;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;import java.util.Properties;/************************************************************ @ClassName   : Producer.java****** @author      : milo ^ ^****** @date        : 2018 03 14 11:34****** @version     : v1.0.x*******************************************************/
public class Producer {static Logger log = Logger.getLogger(Producer.class);private static final String TOPIC = "test";private static final String BROKER_LIST = "43.138.0.199:9092";private static KafkaProducer<String,String> producer = null;/*初始化生产者*/static {Properties configs = initConfig();producer = new KafkaProducer<String, String>(configs);}/*初始化配置*/private static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());return properties;}public static void main(String[] args) throws InterruptedException {//消息实体ProducerRecord<String , String> record = null;for (int i = 0; i < 3; i++) {record = new ProducerRecord<String, String>(TOPIC, "value"+(int)(10*(Math.random())));//发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (null != e){System.out.println("send error" + e.getMessage());}else {System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));}}});}producer.close();}
}

执行报错,如下:

send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time
send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time
send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time

搜索资料,尝试解决

既然本地调不通,那我就发到linux机器里面去调试

改成ProducerController

代码如下:

package com.hmblogs.backend.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Properties;@RestController
@Slf4j
public class ProducerController {private static final String TOPIC = "test";private static final String BROKER_LIST = "43.138.0.199:9092";private static KafkaProducer<String,String> producer = null;/*** sendMessage* @return*/@GetMapping(value = "/sendMessage")public void redisTestLock(){log.info("sendMessage");Properties configs = initConfig();producer = new KafkaProducer<String, String>(configs);send();}/*初始化配置*/private Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());return properties;}private void send() {//消息实体ProducerRecord<String , String> record = null;for (int i = 0; i < 3; i++) {record = new ProducerRecord<String, String>(TOPIC, "value:"+(int)(10*(Math.random())));//发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (null != e){log.error("send error:" + e.getMessage());}else {log.info(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));}}});}producer.close();}
}

 执行clean install命令,打包成jar文件,上传到云主机里,然后启动

java -jar hmblogs.jar

访问GET接口

http://43.138.0.199:8081/sendMessage

查看hmblogs服务的日志 

 然后,重新查看该topic的消息

消费的验证也改成通过调用接口来实现

ConsumerController代码如下:

package com.hmblogs.backend.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Collections;
import java.util.Properties;@RestController
@Slf4j
public class ConsumerController {private static final String TOPIC = "test";private static final String BROKER_LIST = "43.138.0.199:9092";private static KafkaConsumer<String,String> consumer = null;/*** consumeMessage* @return*/@GetMapping(value = "/consumeMessage")public void consumeKafkaMessage(){log.info("consumeMessage");Properties configs = initConfig();consumer = new KafkaConsumer<String, String>(configs);consumer.subscribe(Collections.singletonList(TOPIC));send();}private static Properties initConfig(){Properties properties = new Properties();properties.put("bootstrap.servers",BROKER_LIST);properties.put("group.id","0");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("enable.auto.commit", "true");properties.setProperty("auto.offset.reset", "earliest");return properties;}private void send() {while (true) {ConsumerRecords<String, String> records = consumer.poll(10);for (ConsumerRecord<String, String> record : records) {log.info("Received message: key={}, value={}, partition={}, offset={}\n",record.key(), record.value(), record.partition(), record.offset());}}}
}

 然后clean install,发到云主机里,调用如下GET接口:

http://43.138.0.199:8081/consumeMessage

hmblogs.log日志显示内容如下截图:


文章转载自:

http://6ytEZXx7.pxLsh.cn
http://DcpXyHz8.pxLsh.cn
http://RWyD7Td6.pxLsh.cn
http://pxN03IZz.pxLsh.cn
http://ra4zhfkt.pxLsh.cn
http://Xhzsq4RJ.pxLsh.cn
http://x7hcXapJ.pxLsh.cn
http://pOYFdsqx.pxLsh.cn
http://Y7wzDm36.pxLsh.cn
http://7qgkTOFE.pxLsh.cn
http://hkwoCMUD.pxLsh.cn
http://Oukt1U7K.pxLsh.cn
http://7robT1Eu.pxLsh.cn
http://8QYxh96M.pxLsh.cn
http://Cc0fyTtW.pxLsh.cn
http://ztVB1lN8.pxLsh.cn
http://sLzjbX9o.pxLsh.cn
http://dMf5osNb.pxLsh.cn
http://uBp3QiIc.pxLsh.cn
http://UPN3PJrA.pxLsh.cn
http://vL6j2OS5.pxLsh.cn
http://fiCQxXql.pxLsh.cn
http://i7CSjwTF.pxLsh.cn
http://alEDk2tX.pxLsh.cn
http://MFYQVpKs.pxLsh.cn
http://3Vbmdwfq.pxLsh.cn
http://kV91WfOu.pxLsh.cn
http://qrU78FAa.pxLsh.cn
http://Jw3MexMX.pxLsh.cn
http://ywhzvyif.pxLsh.cn
http://www.dtcms.com/wzjs/747934.html

相关文章:

  • 学校门户网站建设管理办法设计说明英语翻译
  • 安卓程序开发广州seo教程
  • 一般网站的架构河南网站建设详细流程
  • 便宜网站建设模板网站网站建设维护费一年多少钱
  • wordpress站点wordpress 漏洞 2014
  • 网站中英文切换怎么做查看wordpress密码
  • hao123网站难做吗正规加盟项目
  • 成都网站建设 3e网络如何让域名指向网站
  • 定陶住房和城乡建设局网站商务网站建设与维护 课件
  • 建站公司哪家好哪里app开发公司好
  • 罗湖城网站建设毕节网站开发公司电话
  • 做公众号一般在哪个网站照片网站怎么做推广
  • 旅游网站开发需求分析目的碗网站
  • 云盘网站建设旅游在线网站开发
  • 做搜索引擎网站wordpress老版本下载
  • 搭建网站怎么赚钱糖果网站是李笑来做的吗
  • 西安 企业网站建设品牌设计网站有哪些
  • 手机网站你们it外包公司简介
  • 打造公司的网站海外贸易平台有哪些
  • Hdi做指数网站wordpress 谷歌登陆
  • 做网站为什么能挣钱北京有多少互联网公司
  • 如何做好网站推网站设计好了如何上传到自己搭建的网上去
  • 西部数码网站管理助手错误广告设计公司有哪些渠道通路
  • n怎样建立自己的网站深圳团购网站设计哪家好
  • 白糖贸易怎么做网站十大免费推广平台
  • 网站建设方式丨金手指排名26电商网站建设懂你所需
  • 安徽省博物馆网站建设企业 北京 响应式网站
  • 焦作电子商务网站建设实例php建设网站用什么软件
  • 广告网站怎么做的博客软件 WordPress等
  • 国外的旅游网站做的如何奇墙网站建设