NSQ是什么

NSQ is a realtime distributed messaging platform.

NSQ是一个实时的分布式消息平台。NSQ有许多优秀的特性,比如支持没有单点故障的分布式拓扑结构,可以水平拓展,消息传递及时等。
同时,NSQ为消费者找到相应的生产者提供了一个发现服务,即nsqlooupd
也为集群的管理提供了一个健壮的web界面,即nsqadmin

另外NSQ做为消息队列的一种,有以下几种特性:

  1. 消息默认不是持久化的。
    NSQ中消息是存放在内存中的,当内存中的消息数量超过设定的值(--mem-queue-size)后,就会将消息持久化到磁盘。我们可以将--mem-queue-size设为0来保证所有消息都是持久化到磁盘的。
  2. 消息至少被交付一次。
    nsqd没有故障时消息会至少被交付一次,当然在许多情况下,比如客户端连接超时等,消息可能会被传递多次。
  3. 消息接收是无序的。
  4. 消费者最终会找到所有它订阅的Topic的生产者。
    nsqlookupd就是用来干这个的。

参考文献: http://nsq.io/overview/features_and_guarantees.html

NSQ中的一些概念

我们先来了解一下NSQ中的一些概念:
NSQ中消息的流转图

  1. nsqd
    Nsqd是一个守护进程,用来接受和转发消息。后面会具体提到。
  2. Topic
    一个Topic可以理解为一个种类的消息,clients发布消息就是发布到某一个Topic
  3. Channel
    Channel可以理解为消费者,一个Topic下可能有很多的Channel,发布到Topic的消息会被发送到每一个Channel
  4. Consumer
    一个Consumer在订阅Topic时会连接到一个指定的Channel,可能会有许多的Consumer连接到同一个Channel。消息只会被随机地发送给某一个Consumer

在这里,讲一下为啥说NSQ没有单点故障。
NSQ拓扑机构
NSQ的所有clients都是通过TCP与nsqd连接的。而且可以看出,所有consumer,都会与所有它订阅的Topicnsqd连接,所以在NSQ中,没有中间人,没有代理,所以也就没有单点故障。也因为所有consumer都连接到所有它订阅的Topicnsqd,NSQ的水平拓展也非常容易。而找到所有nsqd则是lookupd的功劳了。

NSQD

nsqd is the daemon that receives, queues, and delivers messages to clients.

nsqd是一个守护进程,主要是处理与clients的连接,接收和转发消息等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type NSQD struct {
...
topicMap map[string]*Topic
lookupPeers atomic.Value
tcpListener net.Listener
httpListener net.Listener
httpsListener net.Listener
tlsConfig *tls.Config
poolSize int
notifyChan chan interface{}
optsNotificationChan chan struct{}
exitChan chan int
waitGroup util.WaitGroupWrapper
ci *clusterinfo.ClusterInfo
}

NSQD中主要维护了它的topicMaptcpListener(用于处理来自clients的tcp连接),httpListener(处理http请求),waitGroup(协程间同步)等。

NSQD启动时主要做了什么

  1. 启动一个TCPServer接收tcp请求

    1
    2
    3
    4
    5
    n.tcpListener = tcpListener
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
    protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
    })
  2. 启动httpServer和httpsServer(如果需要)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    n.httpsListener = httpsListener
    httpsServer := newHTTPServer(ctx, true, true)
    n.waitGroup.Wrap(func() {
    http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
    })
    n.httpListener = httpListener
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
    http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
    })
  3. 定时处理inFlight消息(后面会提到,用来处理超时或者延迟发送的消息)

    1
    n.waitGroup.Wrap(func() { n.queueScanLoop() })
  4. 一些其他服务,比如lookup,stats等

    1
    2
    3
    4
    n.waitGroup.Wrap(func() { n.lookupLoop() })
    if n.getOpts().StatsdAddress != "" {
    n.waitGroup.Wrap(func() { n.statsdLoop() })
    }

Topic与Channel

在讲与client通信之前,先梳理一下TopicChannelclient这三者之间的关系。

Topic

Topic主要维护了channelMap(所拥有的channel), backend(内存消息队列超出设置大小时的备用队列,可能是持久化到磁盘diskqueue,也可能是丢弃dummyBackendQueue),memoryMsgChan(内存中的消息队列)等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Topic struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messageCount uint64
sync.RWMutex
name string
channelMap map[string]*Channel
backend BackendQueue
memoryMsgChan chan *Message
exitChan chan int
channelUpdateChan chan int
waitGroup util.WaitGroupWrapper
exitFlag int32
idFactory *guidFactory
ephemeral bool
deleteCallback func(*Topic)
deleter sync.Once
paused int32
pauseChan chan bool
ctx *context
}

当NSQD接受到创建Topic的请求或这订阅的请求时,当Topic不存在,就会创建Topic。创建Topic时,除了初始化一些成员之外,还会启动一个名叫messagePump的协程,来处理该Topic的消息。
该协程主要是接收来自backendmemoryMsgChan的消息,然后转发给每一个channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
...
t.waitGroup.Wrap(func() { t.messagePump() })
...
}
func (t *Topic) messagePump() {
var msg *Message
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
...
}
for i, channel := range chans {
chanMsg := msg
...
err := channel.PutMessage(chanMsg)
...
}
}
}

另外,Topic收到消息时,会判断内存中的消息队列是否已满,如果是,则会将消息写入backend

1
2
3
4
5
6
7
8
9
10
func (c *Channel) put(m *Message) error {
select {
case c.memoryMsgChan <- m:
default:
b := bufferPoolGet() // 复用buffer,减少对象生成
err := writeMessageToBackend(b, m, c.backend)
bufferPoolPut(b)
}
return nil
}

Channel

Channel中维护了一些计数器,以及跟Topic一样的backendmemoryMsgChan,以及正在发送途中的消息,也就是已经发送给client但是没有得到client确认的消息inFlightMessages等。同时还维护了该channel所有的clients

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Channel struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
requeueCount uint64
messageCount uint64
timeoutCount uint64
...
topicName string
name string
ctx *context
backend BackendQueue
memoryMsgChan chan *Message
// state tracking
clients map[int64]Consumer
...
// Stats tracking
...
// TODO: these can be DRYd up
deferredMessages map[MessageID]*pqueue.Item
deferredPQ pqueue.PriorityQueue
inFlightMessages map[MessageID]*Message
inFlightPQ inFlightPqueue
}

如何处理与client的tcp连接

在前面已经提到,NSQD在启动时会启动一个tcpServer来处理与client的连接。在tcpServer中,会有Handle函数来处理tcp连接。当初始化连接时,会判断client使用的协议类型,目前只支持V2。最后会进入一个名叫IOLoop的函数与client进行通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type tcpServer struct {
ctx *context
}
func (p *tcpServer) Handle(clientConn net.Conn) {
...
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
...
protocolMagic := string(buf)
...
var prot protocol.Protocol
switch protocolMagic {
case " V2": //目前只支持V2
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
err = prot.IOLoop(clientConn) // 处理与client的通信
if err != nil {
p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
}

IOLoop

在IOLoop这个函数中,NSQD会与client进行通信,主要是接收client的消息,并根据client发送的消息类型来做相应的处理,比如订阅,确认消息等,并在client订阅某个channel之后给client发送消息.
具体:

  1. 在一个循环中接收客户端发送回来的信息,并根据消息的指令做相应的处理:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    for {
    line, err = client.Reader.ReadSlice('\n')
    params := bytes.Split(line, separatorBytes)
    response, err = p.Exec(client, params)
    }
    func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    switch {
    case bytes.Equal(params[0], []byte("FIN")):
    return p.FIN(client, params)
    case bytes.Equal(params[0], []byte("PUB")):
    return p.PUB(client, params)
    case bytes.Equal(params[0], []byte("SUB")):
    return p.SUB(client, params)
    ...
    }
  2. 启动了一个messagePump的协程来处理消息队列中的消息

    1
    go p.messagePump(client, messagePumpStartedChan)

在这个协程中主要处理几件主要的事情:
给客户端发送心跳信息:(当发送心跳失败,则断开与该client的连接)

1
2
3
4
5
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}

获取消息通道中的消息,并发送给客户端,而开始获取消息通道中的消息是在客户端订阅之后,之后会再提及。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
... // subChannel不为空时
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
...
...
case subChannel = <-subEventChan: // 订阅事件会将所订阅的channel发送到改go-chan,开始处理消息
// you can't SUB anymore
subEventChan = nil
case b := <-backendMsgChan:
msg, err := decodeMessage(b)
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg, &buf)
case msg := <-memoryMsgChan:
...

client的重要的几种消息类型与处理方式

SUB

当收到client发送过来的SUB消息后,NSQD会获取需要订阅的topicchannel,然后将client添加到相应的channel中,然后在messagePump中就可以消费channel中的消息了:

1
2
3
4
5
6
7
8
9
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
topic := p.ctx.nsqd.GetTopic(topicName)
channel := topic.GetChannel(channelName)
channel.AddClient(client.ID, client)
client.Channel = channel
// update message pump
client.SubEventChan <- channel //开始消费该channel中的消息
return okBytes, nil
}

RDY

RDY表示client已经准备好了接收来自NSQD的消息,client在发送RDY消息的同时,会带上一个count,表示一个可以接收的消息的窗口大小,有点像tcp的滑动窗口。因为当NSQD给client发送了消息之后,这条消息的状态就变成了inFlight,表示未经client确认,当inFlight的数目大于或等于count时,NSQD就不会给client发送更多的消息了。当client发送回来FIN消息时,inFlight数目就会减一。

1
2
3
4
5
6
7
8
func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error) {
...
count := int64(1)
...
client.SetReadyCount(count)
...
return nil, nil
}

FIN

当收到FIN消息时,表明client对收到某一条消息做确认。这时主要是将该消息从channel中的inFlightPQ去除,同时对client收到消息的条数做一个计数:

1
2
3
4
5
6
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
id, err := getMessageID(params[1])
err = client.Channel.FinishMessage(client.ID, *id)
client.FinishedMessage()
return nil, nil
}

PUB

PUB比较简单,就是收到消息后,将该消息转发给相应的Topic

1
2
3
4
5
6
7
8
9
10
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
...
messageBody := make([]byte, bodyLen)
...
topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(topic.GenerateID(), messageBody)
err = topic.PutMessage(msg)
...
return okBytes, nil
}

一些细节

消息发送的一些细节

NSQD在给client发送消息之后,会将该消息添加到该channel的一个叫inFlightPQ的优先级队列中。该优先级队列的个底层结构是数组,然后基于数组实现的小根堆。而权重则是发送消息时规定的timeOut的时长。
当NSQD在收到client发送过来的FIN确认消息之后,就会从inFlightPQ移除相应的消息。
同时,NSQD在启动时,就启动了一个协程,来定时处理每个channel中的inFlightPQdeferredPQ,也就是重新将消息发送给相应的Channel
这样,就能保证消息能至少被交付一次。

使用sync.WaitGroup 来做协程间的同步

在NSQ中,封装了一层sync.WaitGroup,来做协程之间的同步,因为go不像java,线程启动后,jvm会在用户线程都结束后才退出,而go的协程启动后,main并不会等协程结束才退出。所以就需要用WaitGroup来同步协程的执行情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1)
go func() {
cb()
w.Done()
}()
}
n.waitGroup.Wrap(func() { n.queueScanLoop() })
...
n.waitGroup.Wait()

使用sync.Pool复用缓冲池

因为go这种自动垃圾回收的语言有时为了减少垃圾回收的成本,可以减少对象的产生,也就是可以重用对象。sync.Pool就是官方提供的对象重用的工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var bp sync.Pool
func init() {
bp.New = func() interface{} {
return &bytes.Buffer{}
}
}
func bufferPoolGet() *bytes.Buffer {
return bp.Get().(*bytes.Buffer)
}
func bufferPoolPut(b *bytes.Buffer) {
bp.Put(b)
}

NSQ使用这个工具主要是在收到消息,往消息队列中写入的时候:

1
2
3
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)

使用bufio.Reader和bufio.Writer包装net.Conn,减少内存的重复分配

1
2
3
4
5
6
7
8
9
10
c := &clientV2{
...
Reader: bufio.NewReaderSize(conn, defaultBufferSize),
Writer: bufio.NewWriterSize(conn, defaultBufferSize),
...
}
// ReadSlice does not allocate new space for the data each request
// ie. the returned slice is only valid until the next call to it
line, err = client.Reader.ReadSlice('\n')

NSQ使用bufio.Reader来包装net.Conn,来减少内存的重复分配,因为bufio.ReaderReadSlice方法可以重复利用缓冲区,这样就减少了读socket时的内存分配。

减少[]byte到string的转换

因为[]bytestring的转换需要重新分配内存,所以在NSQ中很少能看到string类型的数据,都是[]byte类型。
比如MessageID就是一个[16]byte。因为从socket中读出来便是[]byte类型,而且为了减少slice[16]byte的复制,使用了unsafe包,来将slice强制转换成[16]byte

1
2
3
4
5
6
7
8
9
type MessageID [MsgIDLength]byte
// validate and cast the bytes on the wire to a message ID
func getMessageID(p []byte) (*MessageID, error) {
if len(p) != MsgIDLength {
return nil, errors.New("Invalid Message ID")
}
return (*MessageID)(unsafe.Pointer(&p[0])), nil
}

另外,Go 标准库仅仅在 string 上提供了数值转换方法。为了避免 string 的分配,NSQ自己实现了[]byte转换成数字的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func ByteToBase10(b []byte) (n uint64, err error) {
base := uint64(10)
n = 0
for i := 0; i < len(b); i++ {
var v byte
d := b[i]
switch {
case '0' <= d && d <= '9':
v = d - '0'
default:
n = 0
err = errBase10
return
}
n *= base
n += uint64(v)
}
return n, err
}

参考: A Journey Into NSQ