当前位置: 首页 > news >正文

【源码剖析】5-生产者-RecordAccumulator分析

主线程调用KafakaProducer.send方法之后,先将消息暂存在RecordAccumulator中,然后就可以返回了,当达到一定的条件,就会唤醒Sender线程发送RecordAccumulator中的消息。由于两个线程会操作RecordAccumulator,所以其必须是线程安全的。

RecordAccumulator中有一个以TopicPartition为key的ConcurrentMap,每个value是RecordBatch数组,每个RecordBatch有一个MemoryRecords对象的引用,MemoryRecords是真正 存放消息的地方。

MemoryRecords

MemoryRecords是多个消息的集合体,其中封装了NIO ByteBuffer来保存消息,Compressor用于对ByteBuffer中的消息进行压缩。

// 压缩器,对消息进行压缩,将压缩后的数据输出到buffer
private final Compressor compressor;// 记录最大可以写入多少字节
private final int writeLimit;// 用来保存消息的Java NIO ByteBuffer
private ByteBuffer buffer;// 只读模式还是读写模式
private boolean writable;

Compressor有两个重要的流类型字段,分别是bufferStream和appendStream。前者是Kafka自定义实现的ByteBufferOutputStream继承了java.io.OutputStream,封装了ByteBuffer,当写入数据超过ByteBuffer容量时,ByteBufferOutputStream会自动进行扩容。后者是DataOutputStream,为其添加了压缩的功能。压缩类型由Kafka配置的参数compression.type匹配值参数指定。

压缩器支持GZIP、SNAPPY、LZ4三种压缩方式,下面详细介绍一下压缩器创建压缩流的方式。

public Compressor(ByteBuffer buffer, CompressionType type) {this.type = type; //压缩器类型....// create the streambufferStream = new ByteBufferOutputStream(buffer);appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {try {switch (type) {case NONE:return new DataOutputStream(buffer);case GZIP:return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));case SNAPPY:try {OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);return new DataOutputStream(stream);} catch (Exception e) {throw new KafkaException(e);}case LZ4:....// 逻辑同snappydefault:throw new IllegalArgumentException("Unknown compression type: " + type);}} catch (IOException e) {throw new KafkaException(e);}}

这里有个需要注意点,GIZ使用new创建压缩流,snappy是通过反射实现。原因是GZIP是JDK自带的压缩方式,snappy需要额外引包,为了尽可能减少依赖,使用反射方式可以在不使用snappy方式时不引入此依赖。

Compressor提供了一系列的put*方法,这是装饰器模式,通过appendStream增加压缩功能,通过bufferStream增加自动扩缩容功能。

estimatedBytesWritten用于估算使用量,判断是否需要扩容。

  • 输入
    • writtenUncompressed:未压缩数据的字节数(longint)。
    • type.id:数据类型的标识符(如文本、二进制等),用于从 TYPE_TO_RATE 数组中查找对应的压缩率。
    • TYPE_TO_RATE:一个数组或映射表,存储不同数据类型的基础压缩率(例如,文本类型可能是 0.3,表示压缩后大小为未压缩的 30%)。
    • COMPRESSION_RATE_ESTIMATION_FACTOR:一个全局调整因子(例如 1.1),用于微调估算结果(可能用于补偿未考虑的额外开销,如压缩头信息)。
  • 输出
    • 返回 long 类型的估算值,表示压缩后的字节数。
public long estimatedBytesWritten() {if (type == CompressionType.NONE) {return bufferStream.buffer().position();} else {// estimate the written bytes to the underlying byte buffer based on uncompressed written bytesreturn (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);}
}

其他方法:

  • append:判断MemoryRecords是否为可写模式,然后调用Compressor的put方法写ByteBuffer
  • hasRoomFor:通过estimatedBytesWritten估算是否有剩余空间
  • close
  • sizeInBytes

RecordBatch

RecordBatch对象中除了MemoryRecord对象,还有其他的统计信息:

public int recordCount = 0;
public int maxRecordSize = 0;
public volatile int attempts = 0;
public final long createdMs;
public long drainedMs;
public long lastAttemptMs;
public final MemoryRecords records;
public final TopicPartition topicPartition;
public final ProduceRequestResult produceFuture;
public long lastAppendTime;
private final List<Thunk> thunks;
private long offsetCounter = 0L;
private boolean retry;

当RecordBatch中全部消息被正常响应或者超时、关闭生产者时,会调用ProduceRequestResult.done方法,将produceFuture标记为完成,并通过ProduceRequestResult中的error字段区分时正常完成还是异常完成。

Tunk类中的callback字段指向对应消息的callback对象,其另一个字段是FutureRecordMetadata类型。

FutureRecordMetadata类有两个关键字段,result:ProducerRequestResult类型,指向对应消息所在RecordBatch的produceFuture字段;relativeOffset记录了对应消息在RecordBatch中的偏移量。FutureRecordMetadata实现了Future接口,但其实现基本都是委托给了ProduceRequestRequest对应的方法,消息是按照RecordBatch进行发送和确认的。当生产者已经收到某条消息的响应时,FutureRecordMetadata.get方法就会返回RecordMetadata对象,包含了消息的元数据信息。

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {// 估算剩余空间if (!this.records.hasRoomFor(key, value)) {return null;} else {// 向MeroryRecord对象中添加数据long checksum = this.records.append(offsetCounter++, timestamp, key, value);this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));this.lastAppendTime = now;// 创建FutureRecordMetadata对象FutureRecordMetadata对象 future = new FutureRecordMetadata(this.produceFuture, this.recordCount,timestamp, checksum,key == null ? -1 : key.length,value == null ? -1 : value.length);//将用户自定义的Callback和FutureRecordMetadata对象保存到thunks对象中if (callback != null)thunks.add(new Thunk(callback, future));this.recordCount++;return future;}
}

当RecordBatch收到正常、超时、或者生产者关闭时,都会调用ProduceRequestResult.done方法,在此方法中会调用所有消息的Callback回调,并调用其produceFuture字段的done方法。

public void done(long baseOffset, long timestamp, RuntimeException exception) {log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",topicPartition,baseOffset,exception);// execute callbacksfor (int i = 0; i < this.thunks.size(); i++) {try {Thunk thunk = this.thunks.get(i);if (exception == null) {// If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,thunk.future.checksum(),thunk.future.serializedKeySize(),thunk.future.serializedValueSize());thunk.callback.onCompletion(metadata, null);} else {thunk.callback.onCompletion(null, exception);}} catch (Exception e) {log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);}}this.produceFuture.done(topicPartition, baseOffset, exception);
}

BufferPool

ByteBuffer的创建和释放是比较消耗资源的,BufferPool实现了ByteBuffer的复用。

private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
private final Deque<ByteBuffer> free;
private final Deque<Condition> waiters;
private long availableMemory;
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;

每个BufferPool只针对特定大小的的ByteBuffer进行管理。

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {if (size > this.totalMemory)throw new IllegalArgumentException("Attempt to allocate " + size+ " bytes, but there is a hard limit of "+ this.totalMemory+ " on memory allocations.");this.lock.lock();try {// check if we have a free buffer of the right size pooledif (size == poolableSize && !this.free.isEmpty())return this.free.pollFirst();// now check if the request is immediately satisfiable with the// memory on hand or if we need to blockint freeListSize = this.free.size() * this.poolableSize;if (this.availableMemory + freeListSize >= size) {// we have enough unallocated or pooled memory to immediately// satisfy the requestfreeUp(size);this.availableMemory -= size;lock.unlock();return ByteBuffer.allocate(size);} else {// we are out of memory and will have to blockint accumulated = 0;ByteBuffer buffer = null;Condition moreMemory = this.lock.newCondition();long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);this.waiters.addLast(moreMemory);// loop over and over until we have a buffer or have reserved// enough memory to allocate onewhile (accumulated < size) {long startWaitNs = time.nanoseconds();long timeNs;boolean waitingTimeElapsed;try {waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {this.waiters.remove(moreMemory);throw e;} finally {long endWaitNs = time.nanoseconds();timeNs = Math.max(0L, endWaitNs - startWaitNs);this.waitTime.record(timeNs, time.milliseconds());}if (waitingTimeElapsed) {this.waiters.remove(moreMemory);throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");}remainingTimeToBlockNs -= timeNs;// check if we can satisfy this request from the free list,// otherwise allocate memoryif (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {// just grab a buffer from the free listbuffer = this.free.pollFirst();accumulated = size;} else {// we'll need to allocate memory, but we may only get// part of what we need on this iterationfreeUp(size - accumulated);int got = (int) Math.min(size - accumulated, this.availableMemory);this.availableMemory -= got;accumulated += got;}}// remove the condition for this thread to let the next thread// in line start getting memoryCondition removed = this.waiters.removeFirst();if (removed != moreMemory)throw new IllegalStateException("Wrong condition: this shouldn't happen.");// signal any additional waiters if there is more memory left// over for themif (this.availableMemory > 0 || !this.free.isEmpty()) {if (!this.waiters.isEmpty())this.waiters.peekFirst().signal();}// unlock and return the bufferlock.unlock();if (buffer == null)return ByteBuffer.allocate(size);elsereturn buffer;}} finally {if (lock.isHeldByCurrentThread())lock.unlock();}
}

释放内存

public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {if (size == this.poolableSize && size == buffer.capacity()) {buffer.clear();this.free.add(buffer);} else {this.availableMemory += size;}Condition moreMem = this.waiters.peekFirst();if (moreMem != null)moreMem.signal();} finally {lock.unlock();}
}

RecordAccumulator

以下为RecordAccumulator中的关键字段:

private volatile boolean closed;
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final long lingerMs;
private final long retryBackoffMs;
private final BufferPool free;
private final Time time;
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
private final IncompleteRecordBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
private int drainIndex;
  • batches:TopicPartition与RecordBatch集合的映射关系,类型是ConcurrentMap,但是Deque是非线程安全的,追加消息或者发送RecordBatch的时候,需要同步加锁。

KafkaProducer的send方法最终会调用RecordAccumulator的append方法将消息追加到RecordAccumulator中,主要逻辑为:

  1. 首先在batches集合中查找TopicPartition对应的Deque,查找不到,则创建新的Deque,并添加到batches集合中。
  2. 对Deque加锁(使用synchronized关键字加锁)。
  3. 调用tryAppend()方法,尝试向Deque中最后一个RecordBatch追加Record。
  4. synchronized块结束,自动解锁。
  5. 追加成功,则返回RecordAppendResult(其中封装了ProduceRequestResult)。
  6. 追加失败,则尝试从BufferPool中申请新的ByteBuffer。
  7. 对Deque加锁(使用synchronized关键字加锁),再次尝试第3步。
  8. 追加成功,则返回;失败,则使用第5步得到的ByteBuffer创建RecordBatch。
  9. 将Record追加到新建的RecordBatch中,并将新建的RecordBatch追加到对应的Deque尾部。
  10. 将新建的RecordBatch追加到incomplete集合。
  11. synchronized块结束,自动解锁。
  12. 返回RecordAppendResult,RecordAppendResult会中的字段会作为唤醒Sender线程的条件。
public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Callback callback,long maxTimeToBlock) throws InterruptedException {// We keep track of the number of appending thread to make sure we do not miss batches in// abortIncompleteBatches().appendsInProgress.incrementAndGet();try {// 1: 查找是否有TopicPartition对应的DequeDeque<RecordBatch> dq = getOrCreateDeque(tp);synchronized (dq) {// 2.对dq加锁// 边界检测//3.向Deqqueue中最后一个RecordBatch追加RecordRecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);if (appendResult != null)return appendResult;// 5.追加成功直接返回}// 6.追加失败则从bufferPool中申请空间ByteBuffer buffer = free.allocate(size, maxTimeToBlock);synchronized (dq) {// ....// 7.对dp加锁后在此调用tryAppend尝试追加RecordRecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...free.deallocate(buffer);return appendResult;// 8.追加成功则返回}MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//9. 在新创建的batch中追加Record,并将其添加到batchs集合中FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));// 10.将新建的RecordBatch追加到incomplete集合中 dq.addLast(batch);incomplete.add(batch);return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);// 返回RecordAppendResult}} finally {appendsInProgress.decrementAndGet();}
}

KafkaProducer.doSend方法的最后一步就是判断此次向RecordAccumulator追加消息后是否满足发送条件,调用ready方法获取集群中符合发送条件的节点集合。筛选条件如下:

  1. Deque中有多个RecordBatch或是第一个RecordBatch是否满了。
  2. 是否超时了。
  3. 是否有其他线程在等待BufferPool释放空间(即BufferPool的空间耗尽了)。
  4. 是否有线程正在等待flush操作完成。
  5. Sender线程准备关闭。

下面来看一下ready方法的代码,它会遍历batches集合中每个分区,首先查找当前分区Leader副本所在的Node,如果满足上述五个条件,则将此Node信息记录到readyNodes集合中。遍历完成后返回ReadyCheckResult对象,其中记录了满足发送条件的Node集合、在遍历过程中是否有找不到Leader副本的分区、下次调用ready()方法进行检查的时间间隔。

public ReadyCheckResult ready(Cluster cluster, long nowMs) {Set<Node> readyNodes = new HashSet<>();long nextReadyCheckDelayMs = Long.MAX_VALUE;Set<String> unknownLeaderTopics = new HashSet<>();boolean exhausted = this.free.queued() > 0;for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {TopicPartition part = entry.getKey();Deque<RecordBatch> deque = entry.getValue();Node leader = cluster.leaderFor(part);synchronized (deque) {if (leader == null && !deque.isEmpty()) {// This is a partition for which leader is not known, but messages are available to send.// Note that entries are currently not removed from batches when deque is empty.unknownLeaderTopics.add(part.topic());} else if (!readyNodes.contains(leader) && !muted.contains(part)) {RecordBatch batch = deque.peekFirst();if (batch != null) {boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;long waitedTimeMs = nowMs - batch.lastAttemptMs;long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);boolean full = deque.size() > 1 || batch.records.isFull();boolean expired = waitedTimeMs >= timeToWaitMs;boolean sendable = full || expired || exhausted || closed || flushInProgress();if (sendable && !backingOff) {readyNodes.add(leader);} else {// Note that this results in a conservative estimate since an un-sendable partition may have// a leader that will later be found to have sendable data. However, this is good enough// since we'll just wake up and then sleep again for the remaining time.nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);}}}}}return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

获取到发送的节点后,调用drain方法,返回要发送的数据格式:

public Map<Integer, List<RecordBatch>> drain(Cluster cluster,Set<Node> nodes,int maxSize,long now) {// 转化后的结果Map<Integer, List<RecordBatch>> batches = new HashMap<>();for (Node node : nodes) {  //遍历Node集合int size = 0;// 获取当前node上的节点List<PartitionInfo> parts = cluster.partitionsForNode(node.id());List<RecordBatch> ready = new ArrayList<>();/* to make starvation less likely this loop doesn't start at 0 */int start = drainIndex = drainIndex % parts.size();do {PartitionInfo part = parts.get(drainIndex);TopicPartition tp = new TopicPartition(part.topic(), part.partition());// Only proceed if the partition has no in-flight batches.if (!muted.contains(tp)) {Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));if (deque != null) {synchronized (deque) {RecordBatch first = deque.peekFirst();if (first != null) {boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;// Only drain the batch if it is not during backoff period.if (!backoff) {if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {// there is a rare case that a single batch size is larger than the request size due// to compression; in this case we will still eventually send this batch in a single// requestbreak;} else {RecordBatch batch = deque.pollFirst();batch.records.close();size += batch.records.sizeInBytes();ready.add(batch);batch.drainedMs = now;}}}}}}this.drainIndex = (this.drainIndex + 1) % parts.size();} while (start != drainIndex);batches.put(node.id(), ready);}return batches;
}
http://www.dtcms.com/a/405779.html

相关文章:

  • PHP编程基础
  • 单片机 | 基于51单片机的摇摇棒设计全解析
  • 从零开始部署Android环境的Jenkins CI/CD流水线(docker环境,Win系统)
  • HttpSessionBindingListener
  • AndroidEventBus 发布者发布一次订阅者接收到多次问题
  • Unity开发CI/CD工具Jenkins的安装(Windows10)
  • 按键精灵安卓/ios辅助工具,脚本开发新手教程ui界面介绍
  • Machine Learning HW4 report: 语者识别 (Hongyi Lee)
  • Android 系统源码级进程保活全方案:从进程创建到后台防护
  • 在hadoop中Job提交的流程
  • 基于Qt和FFmpeg的安卓监控模拟器/手机摄像头模拟成onvif和28181设备
  • 01MemoryOS环境搭建 python3.10
  • 建设部网站职责划定html精美登录界面源码
  • 网站建设基本步骤顺序网站的整体风格
  • Leetcode 146. LRU 缓存 哈希表 + 双向链表
  • VideollaMA 3论文阅读
  • Android 14 系统 ANR (Application Not Responding) 深度分析与解决指南
  • 《红色脉络:一部PLMN在中国的演进史诗 (1G-6G)》 第11篇 | 核心网演进终局:从EPC到5GC——微服务与“云原生”
  • k8s中的NetworkPolicy
  • 【大语言模型】大模型后训练入门指南
  • 【初学】使用 node 编写 MCP Server
  • 阿里云云原生挑战官方用例SPL
  • 销售管理软件免费版什么叫seo优化
  • Apache POI 在 Linux 无图形界面环境下因字体配置问题导致Excel导出失败的解决方案
  • 咨询顾问进阶——146页PPT详解麦肯锡-企业管理整合咨询-组织设计方案【附全文阅读】
  • 力扣995. K 连续位的最小翻转次数
  • Resources$NotFoundException
  • pg下使用 TimescaleDB并创建1亿数据
  • 自动化脚本的操作逻辑与实现
  • UVa12418 Game of 999