kafka使用-Producer

简介
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。
主线程中由 KafkaProducer 创建消息,然后通过拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。
Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。
程序员通过代码操作的过程为主线程中的步骤。消息累加器和sender线程的内容属于源码内容。以下主要是主线程中1、2、3步的内容。
Producer生产消息的基本代码
import org.apache.kafka.clients.producer.*;import java.util.HashMap;
import java.util.Map;public class KafkaProcuderTest {public static void main(String[] args) throws InterruptedException {// 创建配置对象Map<String, Object> configMap = new HashMap<String, Object>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 对生产的数据k,v进行序列化操作configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者对象KafkaProducer<String,String> producer = new KafkaProducer<>(configMap);// 创建数据// 通过生产者对象将数据发送到kafkafor(int i = 0;i<10000;i++) {System.out.println("sending data");ProducerRecord<String, String> record = new ProducerRecord<>("first-topic", i%2, "key"+i, "value"+i);producer.send(record);