nats jetstream server code 分析
对象和缩写
jetstream导入两个对象:stream and consumer,在stream 之上构造jetstreamapi。在nats代码中,以下是一些常见的缩写
1.mset is stream
2.jsX is something of jetstream
3.o is consumer
代码分析
对于producer ,发送的消息将通过以下堆栈(包括kv 模式)进行处理
核心函数是:
func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error {
消息保存是通过以下代码:
// Store actual msg.
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg)
} else {
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
// Check for preAcks and the need to skip vs store.
if mset.hasAllPreAcks(seq, subject) {
mset.clearAllPreAcks(seq)
store.SkipMsg()
} else {
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
}
}
每个consumer 都有一个相对的影子主题,如果你有兴趣找到所有主题,你可以在func(sSublist)Insert(subsubscription)中添加一些跟踪代码。
集群模式下, leader node是设置在consumer level。
伴随consumer 和 stream的shadow 主题也很重要:在nats中,一切对象都保存为subject,就像
- $JS.API
- $JSC.CI - consumer
- $SYS.REQ.USER.INFO
- $SRV.>
- _INBOX
对于customer和内部的订阅,subscribe,是通过以下addServiceImportSub 的代码实现
// Internal account clients are for service imports and need the '\r\n'.
start := time.Now()
if client.kind == ACCOUNT {
sub.icb(sub, c, acc, string(subject), string(reply), msg)
} else {
sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize])
}
if dur := time.Since(start); dur >= readLoopReportThreshold {
srv.Warnf("Internal subscription on %q took too long: %v", subject, dur)
}
而func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, subject, reply, mh, msg []byte, gwrply bool) bool {
会call addServiceImportSub
注意nats有不少类似下面的func set模式来实现调用
// processServiceImport is an internal callback when a subscription matches an imported service
// from another account. This includes response mappings as well.
cb := func(sub *subscription, c *client, acc *Account, subject, reply string, msg []byte) {
c.processServiceImport(si, acc, msg)
}
服务器重新启动时,将重新分配已经存在 consumer给不同server:
func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) {
在setLeader()函数中启动相关例程loopAndGatherMsgs、processInboundAcks、processInboundNextMsgReqs,这些例程通过go channel进行协作。
go func() {
setGoRoutineLabels(labels)
o.loopAndGatherMsgs(qch)
}()
// Now start up Go routine to process acks.
go func() {
setGoRoutineLabels(labels)
o.processInboundAcks(qch)
}()
if pullMode {
// Now start up Go routine to process inbound next message requests.
go func() {
setGoRoutineLabels(labels)
o.processInboundNextMsgReqs(qch)
}()
}
在loopAndGatherMsgs函数中,以下代码是用于推送和拉取消费者的核心代码
// If we are in push mode and not active or under flowcontrol let's stop sending.
if o.isPushMode() {
if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) {
goto waitForMsgs
}
} else if o.waiting.isEmpty() {
// If we are in pull mode and no one is waiting already break and wait.
goto waitForMsgs
}
// Grab our next msg.
pmsg, dc, err = o.getNextMsg()
// We can release the lock now under getNextMsg so need to check this condition again here.
if o.closed || o.mset == nil {
o.mu.Unlock()
return
}
pull操作符的核心代码:
// Grab our next msg.
pmsg, dc, err = o.getNextMsg()
// We can release the lock now under getNextMsg so need to check this condition again here.
if o.closed || o.mset == nil {
o.mu.Unlock()
return
}
...
// Do actual delivery.
o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)
当客户端是推送消费者时,将通过updateDeliveryInterest设置兴趣和队列
func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
interest := o.hasDeliveryInterest(localInterest)
o.mu.Lock()
defer o.mu.Unlock()
mset := o.mset
if mset == nil || o.isPullMode() {
return false
}
if interest && !o.active {
o.signalNewMessages()
}
// Update active status, if not active clear any queue group we captured.
if o.active = interest; !o.active {
o.qgroup = _EMPTY_
} else {
o.checkQueueInterest()
}
推送模式也通过func(o*consumer)loopAndGatherMsgs(qch-chan-struct{})向客户端推送消息,当流接收到新消息时,通过通道触发跟踪功能
// This will update and signal all consumers that match.
func (mset *stream) signalConsumers(subj string, seq uint64) {
mset.clsMu.RLock()
if mset.csl == nil {
mset.clsMu.RUnlock()
return
}
r := mset.csl.Match(subj)
mset.clsMu.RUnlock()
if len(r.psubs) == 0 {
return
}
// Encode the sequence here.
var eseq [8]byte
var le = binary.LittleEndian
le.PutUint64(eseq[:], seq)
msg := eseq[:]
for _, sub := range r.psubs {
sub.icb(sub, nil, nil, subj, _EMPTY_, msg)
}
}
sub.icb 是一个 processStreamSignal function ,通过以下代码设置 :
// Creates a sublist for consumer.
// All subjects share the same callback.
func (o *consumer) signalSubs() []*subscription {
o.mu.Lock()
defer o.mu.Unlock()
if o.sigSubs != nil {
return o.sigSubs
}
subs := []*subscription{}
if o.subjf == nil {
var sub = subscription{subject: []byte(fwcs), icb: o.processStreamSignal}
subs = append(subs, &sub)
o.sigSubs = subs
o.srv.Noticef("signalSubs subject:%s, o. %s, DeliverSubject %s", string(sub.subject), o.name, o.cfg.DeliverSubject)
return subs
}
for _, filter := range o.subjf {
var sub = subscription{subject: []byte(filter.subject), icb: o.processStreamSignal}
subs = append(subs, &sub)
o.srv.Noticef("signalSubs subject:%s, o. %s , DeliverSubject %s", string(sub.subject), o.name)
}
o.sigSubs = subs
return subs
}
另一个重要功能:signalNewMessages(如下所示)将在新创建的消费者中调用,等待新消息到达的消息
// Will signal us that new messages are available. Will break out of waiting.
func (o *consumer) signalNewMessages() {
// Kick our new message channel
select {
case o.mch <- struct{}{}:
default:
}
}
如果想得到更明细的内容,可以进一步阅读
function func (o *consumer) may trigger signalNewMessages()
和
func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) also may trigger signalNewMessages()
其它:
Option selectLimits 和Tier storage 不能共存
-
terms NRG: Drop append entries when upper layer is overloaded,
-
存储关键信息
const (
// JetStreamStoreDir is the prefix we use.
JetStreamStoreDir = “jetstream”
// JetStreamMaxStoreDefault is the default disk storage limit. 1TB
JetStreamMaxStoreDefault = 1024 * 1024 * 1024 * 1024
// JetStreamMaxMemDefault is only used when we can’t determine system memory. 256MB
JetStreamMaxMemDefault = 1024 * 1024 * 256
// snapshot staging for restores.
snapStagingDir = “.snap-staging”
)