当前位置: 首页 > news >正文

nats jetstream server code 分析

对象和缩写

jetstream导入两个对象:stream and consumer,在stream 之上构造jetstreamapi。在nats代码中,以下是一些常见的缩写
1.mset is stream
2.jsX is something of jetstream
3.o is consumer

代码分析

对于producer ,发送的消息将通过以下堆栈(包括kv 模式)进行处理

在这里插入图片描述

核心函数是:

func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error {

消息保存是通过以下代码:

// Store actual msg.
    if lseq == 0 && ts == 0 {
        seq, ts, err = store.StoreMsg(subject, hdr, msg)
    } else {
        // Make sure to take into account any message assignments that we had to skip (clfs).
        seq = lseq + 1 - clfs
        // Check for preAcks and the need to skip vs store.
        if mset.hasAllPreAcks(seq, subject) {
            mset.clearAllPreAcks(seq)
            store.SkipMsg()
        } else {
            err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
        }
    }

 

每个consumer 都有一个相对的影子主题,如果你有兴趣找到所有主题,你可以在func(sSublist)Insert(subsubscription)中添加一些跟踪代码。

集群模式下, leader node是设置在consumer level。

伴随consumer 和 stream的shadow 主题也很重要:在nats中,一切对象都保存为subject,就像

  • $JS.API
  • $JSC.CI - consumer
  • $SYS.REQ.USER.INFO
  • $SRV.>
  • _INBOX

在这里插入图片描述

对于customer和内部的订阅,subscribe,是通过以下addServiceImportSub 的代码实现

// Internal account clients are for service imports and need the '\r\n'.
        start := time.Now()
        if client.kind == ACCOUNT {
            sub.icb(sub, c, acc, string(subject), string(reply), msg)
        } else {
            sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize])
        }
        if dur := time.Since(start); dur >= readLoopReportThreshold {
            srv.Warnf("Internal subscription on %q took too long: %v", subject, dur)
        }

而func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, subject, reply, mh, msg []byte, gwrply bool) bool {
会call addServiceImportSub
在这里插入图片描述
注意nats有不少类似下面的func set模式来实现调用

// processServiceImport is an internal callback when a subscription matches an imported service
// from another account. This includes response mappings as well.
cb := func(sub *subscription, c *client, acc *Account, subject, reply string, msg []byte) {
        c.processServiceImport(si, acc, msg)
    }



服务器重新启动时,将重新分配已经存在 consumer给不同server:

 
func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) {

在这里插入图片描述

在setLeader()函数中启动相关例程loopAndGatherMsgs、processInboundAcks、processInboundNextMsgReqs,这些例程通过go channel进行协作。

   go func() {
            setGoRoutineLabels(labels)
            o.loopAndGatherMsgs(qch)
        }()

        // Now start up Go routine to process acks.
        go func() {
            setGoRoutineLabels(labels)
            o.processInboundAcks(qch)
        }()

        if pullMode {
            // Now start up Go routine to process inbound next message requests.
            go func() {
                setGoRoutineLabels(labels)
                o.processInboundNextMsgReqs(qch)
            }()
        }

在loopAndGatherMsgs函数中,以下代码是用于推送和拉取消费者的核心代码

// If we are in push mode and not active or under flowcontrol let's stop sending.
        if o.isPushMode() {
            if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) {
                goto waitForMsgs
            }
        } else if o.waiting.isEmpty() {
            // If we are in pull mode and no one is waiting already break and wait.
            goto waitForMsgs
        }

        // Grab our next msg.
        pmsg, dc, err = o.getNextMsg()

        // We can release the lock now under getNextMsg so need to check this condition again here.
        if o.closed || o.mset == nil {
            o.mu.Unlock()
            return
        }

在这里插入图片描述

pull操作符的核心代码:

// Grab our next msg.
        pmsg, dc, err = o.getNextMsg()

        // We can release the lock now under getNextMsg so need to check this condition again here.
        if o.closed || o.mset == nil {
            o.mu.Unlock()
            return
        }
        ...
// Do actual delivery.
        o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)

在这里插入图片描述

当客户端是推送消费者时,将通过updateDeliveryInterest设置兴趣和队列

在这里插入图片描述

func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
    interest := o.hasDeliveryInterest(localInterest)

    o.mu.Lock()
    defer o.mu.Unlock()

    mset := o.mset
    if mset == nil || o.isPullMode() {
        return false
    }

    if interest && !o.active {
        o.signalNewMessages()
    }
    // Update active status, if not active clear any queue group we captured.
    if o.active = interest; !o.active {
        o.qgroup = _EMPTY_
    } else {
        o.checkQueueInterest()
    }

推送模式也通过func(o*consumer)loopAndGatherMsgs(qch-chan-struct{})向客户端推送消息,当流接收到新消息时,通过通道触发跟踪功能

// This will update and signal all consumers that match.
func (mset *stream) signalConsumers(subj string, seq uint64) {
    mset.clsMu.RLock()
    if mset.csl == nil {
        mset.clsMu.RUnlock()
        return
    }
    r := mset.csl.Match(subj)
    mset.clsMu.RUnlock()

    if len(r.psubs) == 0 {
        return
    }
    // Encode the sequence here.
    var eseq [8]byte
    var le = binary.LittleEndian
    le.PutUint64(eseq[:], seq)
    msg := eseq[:]
    for _, sub := range r.psubs {
        sub.icb(sub, nil, nil, subj, _EMPTY_, msg)
    }
}

sub.icb 是一个 processStreamSignal function ,通过以下代码设置 :

// Creates a sublist for consumer.
// All subjects share the same callback.
func (o *consumer) signalSubs() []*subscription {
    o.mu.Lock()
    defer o.mu.Unlock()

    if o.sigSubs != nil {
        return o.sigSubs
    }

    subs := []*subscription{}
    if o.subjf == nil {
        var sub = subscription{subject: []byte(fwcs), icb: o.processStreamSignal}
        subs = append(subs, &sub)
        o.sigSubs = subs
        o.srv.Noticef("signalSubs subject:%s, o. %s, DeliverSubject %s", string(sub.subject), o.name, o.cfg.DeliverSubject)
        return subs
    }

    for _, filter := range o.subjf {
        var sub = subscription{subject: []byte(filter.subject), icb: o.processStreamSignal}
        subs = append(subs, &sub)
        o.srv.Noticef("signalSubs subject:%s, o. %s , DeliverSubject %s", string(sub.subject), o.name)
    }
    o.sigSubs = subs
    return subs
}

另一个重要功能:signalNewMessages(如下所示)将在新创建的消费者中调用,等待新消息到达的消息

// Will signal us that new messages are available. Will break out of waiting.
func (o *consumer) signalNewMessages() {
    // Kick our new message channel
    select {
    case o.mch <- struct{}{}:
    default:
    }
}

如果想得到更明细的内容,可以进一步阅读

function func (o *consumer) may trigger signalNewMessages()


func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) also may trigger signalNewMessages()

其它:

Option selectLimits 和Tier storage 不能共存

  • terms NRG: Drop append entries when upper layer is overloaded,

  • 存储关键信息

const (
// JetStreamStoreDir is the prefix we use.
JetStreamStoreDir = “jetstream”
// JetStreamMaxStoreDefault is the default disk storage limit. 1TB
JetStreamMaxStoreDefault = 1024 * 1024 * 1024 * 1024
// JetStreamMaxMemDefault is only used when we can’t determine system memory. 256MB
JetStreamMaxMemDefault = 1024 * 1024 * 256
// snapshot staging for restores.
snapStagingDir = “.snap-staging”
)

相关文章:

  • 【2025年26期免费获取股票数据API接口】实例演示五种主流语言获取股票行情api接口之沪深A股涨停股池数据获取实例演示及接口API说明文档
  • Prompt engineering设计原则
  • 【芯片验证】verificationguide上的74道SystemVerilog面试题
  • Phi-4-multimodal:图、文、音频统一的多模态大模型架构、训练方法、数据细节
  • 向死而生:在心灵废墟上重建生命圣殿——论自我蜕变的五重维度
  • Linux网络之数据链路层协议
  • 蓝桥杯备考:图论初解
  • 如何避免依赖关键人员导致“单点故障”
  • 基于深度文档理解的开源 RAG 引擎RAGFlow的介绍和安装
  • git在cmd的操作
  • 【计算机网络】UDP
  • DMA在STM32中的应用
  • 文件上传靶场(10--20)
  • OPENGLPG第九版学习 -颜色、像素和片元 PART1
  • Educational Codeforces Round 27 G.Shortest Path Problem? 线性基、dfs
  • 通俗易懂的介绍LLM大模型技术常用专业名词(通用版)
  • 【redis】慢查询分析与优化
  • 三星首款三折叠手机被曝外屏6.49英寸:折叠屏领域的新突破
  • 只音 1.2.0 |纯净无广告,畅听全网音乐,支持无损下载和批量下载
  • 如何借助 ArcGIS Pro 高效统计基站 10km 范围内的村庄数量?
  • 建设自己的网站首页/怎么把广告发到各大平台
  • 企业网站php模板下载/网络营销渠道有哪些
  • 站内优化包括哪些/今天的新闻 联播最新消息
  • 企业动态网站模板/soe搜索优化
  • 做汽配网站需要多少钱/最新战争新闻事件今天
  • 网站定位案例/seo短视频保密路线