当前位置: 首页 > news >正文

nats jetstream 测试和客户端分析

基础分析

当创建stream的时候,最重要的 option 是 retentionPolicy

StreamConfiguration config = StreamConfiguration.builder()
        .name(streamName)
        .subjects("TestStream.>")
        .retentionPolicy(RetentionPolicy.Interest)
        .build();

when choose interest policy,must start at least one consumer before create stream
选择interest 策略时,必须在创建流之前至少启动一个消费者,LimitsPolicy 可以和其它policy叠加:

  • LimitsPolicy (default) - Retention based on the various limits that are set including: MaxMsgs, MaxBytes, MaxAge, and MaxMsgsPerSubject. If any of these limits are set, whichever limit is hit first will cause the automatic deletion of the respective message(s). See a full code example.
  • WorkQueuePolicy - Retention with the typical behavior of a FIFO queue. Each message can be consumed only once. This is enforced by only allowing one consumer to be created per subject for a work-queue stream (i.e. the consumers’ subject filter(s) must not overlap). Once a given message is ack’ed, it will be deleted from the stream. See a full code example.
  • InterestPolicy - Retention based on the consumer interest in the stream and messages. The base case is that there are zero consumers defined for a stream. If messages are published to the stream, they will be immediately deleted so there is no interest. This implies that consumers need to be bound to the stream ahead of messages being published to the stream. Once a given message is ack’ed by all consumers filtering on the subject, the message is deleted (same behavior as WorkQueuePolicy). See a full code example.

当一个客户端发送了stream message,processJetStreamMsg 负责处理消息

// processJetStreamMsg is where we try to actually process the stream msg.
func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error {

当一个消息保存到store后,一个全局的seq 被返回并送给客户端,实现了message的可最终和最少一次发送。

// Store actual msg.
    if lseq == 0 && ts == 0 {
        seq, ts, err = store.StoreMsg(subject, hdr, msg)
    } else {
        // Make sure to take into account any message assignments that we had to skip (clfs).
        seq = lseq + 1 - clfs
        // Check for preAcks and the need to skip vs store.
        if mset.hasAllPreAcks(seq, subject) {
            mset.clearAllPreAcks(seq)
            store.SkipMsg()
        } else {
            err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
        }
    }

msg先写入缓存,然后再写入磁盘,所以如果不是在集群模式下,可能会丢失

// Ask msg block to store in write through cache.
    err = mb.writeMsgRecord(rl, seq, subj, hdr, msg, ts, fs.fip)

最后,写入磁盘的函数是flushPendingMsgsLocked

func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) {

send response to client's inbox 
    // Send response here.
    if canRespond {
        response = append(pubAck, strconv.FormatUint(seq, 10)...)
        response = append(response, '}')
        mset.outq.sendMsg(reply, response)
    }

最后返回消息给客户端是如下函数

func (mset *stream) internalLoop() 
func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {

call stacks如下
在这里插入图片描述

func (mset *stream) internalLoop() 的关键代码如下:

case <-msgs.ch:
            // This can possibly change now so needs to be checked here.
            isClustered := mset.IsClustered()
            ims := msgs.pop()
            for _, im := range ims {
                // If we are clustered we need to propose this message to the underlying raft group.
                if isClustered {
                    mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg)
                } else {
                    mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0)
                }
                im.returnToPool()
            }
            msgs.recycle(&ims)

internalLoop 在 func (mset *stream) setupSendCapabilities() 中被启动:

func (mset *stream) setupSendCapabilities() {
    mset.mu.Lock()
    defer mset.mu.Unlock()
    if mset.outq != nil {
        return
    }
    qname := fmt.Sprintf("[ACC:%s] stream '%s' sendQ", mset.acc.Name, mset.cfg.Name)
    mset.outq = &jsOutQ{newIPQueue[*jsPubMsg](mset.srv, qname)}
    go mset.internalLoop()
}

setupSendCapabilities 则是ibei stream startup function调用

 func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileStoreConfig, sa *streamAssignment) (*stream, error) {

在这里插入图片描述


关键objects

// SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects.
// Will use dynamic nodes, path compression and lazy expansion.
// The reason this exists is to not only save some memory in our filestore but to greatly optimize matching
// a wildcard subject to certain members, e.g. consumer NumPending calculations.
type SubjectTree[T any] struct {
    root node
    size int
}
type fileStore struct {
    srv         *Server
    mu          sync.RWMutex
    state       StreamState
    tombs       []uint64
    ld          *LostStreamData
    scb         StorageUpdateHandler
    ageChk      *time.Timer
    syncTmr     *time.Timer
    cfg         FileStreamInfo
    fcfg        FileStoreConfig
    prf         keyGen
    oldprf      keyGen
    aek         cipher.AEAD
    lmb         *msgBlock
    blks        []*msgBlock
    bim         map[uint32]*msgBlock
    psim        *stree.SubjectTree[psi]
    tsl         int
    adml        int
    hh          hash.Hash64
    qch         chan struct{}
    fsld        chan struct{}
    cmu         sync.RWMutex
    cfs         []ConsumerStore
    sips        int
    dirty       int
    closing     bool
    closed      bool
    fip         bool
    receivedAny bool
    firstMoved  bool
}

consumer

创建函数

ConsumerConfiguration cc = ConsumerConfiguration.builder()
        .durable(durationName)
        .deliverPolicy(DeliverPolicy.All)
       .deliverSubject("TestStream")
        // if both set,otherwise where receive ErrorListenerLoggerImpl error but can work
       .filterSubjects("TestStream.mouse_move.>")
        //                   .filterSubjects("TestStream.mouse_move.>","TestStream.mouse_move")
       //  .filterSubject("TestStream.mouse_mve")
        .build();

重要options

  • Persistence - Durable / Ephemeral
    In addition to the choice of being push or pull, a consumer can also be ephemeral or durable. A consumer is considered durable when an explicit name is set on the Durable field when creating the consumer, or when InactiveThreshold is set.
    Durables and ephemeral have the same message delivery semantics but an ephemeral consumer will not have persisted state or fault tolerance (server memory only) and will be automatically cleaned up (deleted) after a period of inactivity, when no subscriptions are bound to the consumer.
    By default, consumers will have the same replication factor as the stream they consume, and will remain even when there are periods of inactivity (unless InactiveThreshold is set explicitly). Consumers can recover from server and client failure.

  • DeliverPolicy
    The policy choices include:

    • DeliverAll: Default policy. Start receiving from the earliest available message in the stream.
    • DeliverLast: Start with the last message added to the stream, or the last message matching the consumer’s filter subject if defined.
    • DeliverLastPerSubject: Start with the latest message for each filtered subject currently in the stream.
    • DeliverNew: Start receiving messages created after the consumer was created.
    • DeliverByStartSequence: Start at the first message with the specified sequence number. The consumer must specify OptStartSeq defining the sequence number.
    • DeliverByStartTime: Start with messages on or after the specified time. The consumer must specify OptStartTime defining the start time.
  • DeliverSubject (both worked in push and pull model )
    The subject to deliver messages to. Setting this field decides whether the consumer is push or pull-based. With a deliver subject, the server will push messages to clients subscribed to this subject.
    FilterSubjects
    A filter subject provides server-side filtering of messages before delivery to clients.
    For example, a stream factory-events with subject factory-events.. can have a consumer factory-A with a filter factory-events.A.* to deliver only events for factory A.
    A consumer can have a singular FilterSubject or plural FilterSubjects. Multiple filters can be applied, such as [factory-events.A., factory-events.B.] or specific event types [factory-events..item_produced, factory-events..item_packaged].

work queue type

When the stream is work queue type, all consumers must be in the same queue, otherwise the following errors may occur:

io.nats.client.JetStreamApiException: filtered consumer not unique on workqueue stream [10100]
at io.nats.client.api.ApiResponse.throwOnHasError(ApiResponse.java:88)
at io.nats.client.impl.NatsJetStreamImpl._createConsumer(NatsJetStreamImpl.java:132)
at io.nats.client.impl.NatsJetStreamManagement.addOrUpdateConsumer(NatsJetStreamManagement.java:131)
at io.nats.client.impl.NatsStreamContext.createOrUpdateConsumer(NatsStreamContext.java:91)
at io.nats.jetstreamtest.PullStreamConsumer.main(PullStreamConsumer.java:45)

When the stream is LimitsPolicy type not plus workqueue, push and pull consomer can both get message from stream.

  • deliverGroup Option only work under WorkQueuePolicy stream

timeout 相关

可以使用如下模式设置timeout

timeout = jsOptions == null || jsOptions.getRequestTimeout() == null ? conn.getOptions().getConnectionTimeout() : jsOptions.getRequestTimeout();

JetStreamOptions jtOptions = JetStreamOptions.builder().requestTimeout(Duration.ofMinutes(10)).build();

JetStream js = nc.jetStream(jtOptions);

相关文章:

  • vue3框架的响应式依赖追踪机制
  • C#变量与变量作用域详解
  • Android源码编译命令详解
  • DeepSeek与Manus:AI技术双星如何重构IT生产力格局
  • 作业-三层神经网络拟合任意函数、手写数字识别、室内定位
  • 【论文阅读】VAD: Vectorized Scene Representation for Efficient Autonomous Driving
  • STM32第一天建立工程
  • 云计算:虚拟化、容器化与云存储技术详解
  • 字节跳动C++客户端开发实习生内推-抖音基础技术
  • 数据库语句
  • leetcode day27 455+376
  • 中级网络工程师面试题参考示例(5)
  • Leetcode 刷题记录 06 —— 矩阵
  • Python 高级编程与实战:构建数据可视化应用
  • yolov5 onnx的部署文件(主要是onnx文件的使用)
  • AutoGen学习笔记系列(九)Advanced - Selector Group Chat
  • PawSQL for MSSQL:PawSQL 支持 SQL Server 的SQL优化、SQL审核、性能巡检
  • 【redis】type命令和定时器的两种实现方式(优先级队列、时间轮)
  • elasticsearch是哪家的
  • 每天五分钟深度学习pytorch:基于Pytorch搭建ResNet模型的残差块
  • 11次战斗起飞应对外军挑衅,逼退外军直升机细节曝光
  • 中国原创“地贫”基因编辑疗法新进展:复旦儿科医院治愈4名重型患儿
  • 绵阳一村民在外务工家中老宅被拆,镇政府回应:系施工方误拆
  • 科学与艺术的跨界对话可能吗?——评“以蚁为序的生命网络”
  • 小马智行一季度营收增12%:Robotaxi收入增长两倍,预计车队规模年底到千台
  • 住建部:2019年至2024年,全国累计开工改造老旧小区28万个