深入理解ActiveMQ消息队列协议STMOP AMQP MQTT

前言

AWS MQ是完全托管的 ActiveMQ 服务, 最近需要使用, 于是学习其文档, 实践其特性, 由于 ActiveMQ 支持非常丰富的协议, OpenWire amqp stomp mqtt, 所以也学习了各大协议的特性及其SDK.

安装

本地开发最方便的方式当然是docker了, rmohr/activemq 文档比较好的且有aws支持的5.15.6版本的tag.

需要注意的是, 首先要根据其docker hub镜像文档上的几步操作, 将镜像中的默认配置文件复制到自定义的本机conf目录下 /usr/local/activemq/conf, 然后就快速地启动了一个默认配置的 ActiveMQ server

# active mq
docker run -itd --name activemq \
-p 61616:61616 -p 8161:8161 -p 5672:5672 -p 61613:61613 -p 1883:1883 -p 61614:61614 \
-v /usr/local/activemq/conf:/opt/activemq/conf \
-v /usr/local/activemq/data:/opt/activemq/data \
rmohr/activemq:5.15.6

特性

Advisory

ActiveMQ可以将本身的一些事件投递到系统的消息队列, 如 queue/topic的创建, 没有消费者的queue/topic等. http://activemq.apache.org/advisory-message.html

这个特性对于监控MQ非常有用, 默认配置时关闭的, 需要在配置文件activemq.xml中打开.

Wildcards

通配符

. 用于分割名字中的多个单词
* 表示任一名字, 不包括点号(.)
> 表示任一名字, 包括点号(.), 用于表示前缀, >符号后面不会再跟其他限制条件.

通配符可以用在配置文件中表名作用范围, 也可以用于订阅时的destination名字, 这个功能很不错.

Virtual Topic

所谓virtual topic 就是将一个正常的topic, 变成了多个queue. 如TopicA启用了Virtual topic, 则consumer可以去消费 Consumer.xxx.TopicA 这样模式的queue的消息. (http://activemq.apache.org/virtual-destinations.html)

xxx对应类似NSQ中的Channel概念.

需要在activemq.xml中配置virtualDestinationInterceptor的范围 prefix及其他选项.

  • name=">" 表示所有的topic都启用virtualTopic功能.

  • prefix="Consumer.*." 表示可以订阅的virtualTopic的pattern是Consumer..

<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="Consumer.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>

Delay & Schedule

ActiveMQ支持延时消息及定时消息, 在message header中带上如下字段即可, 其中AMQ_SCHEDULED_PERIOD的最大值是long的最大值, 所以可以设置延时很长时间.

Property name type description
AMQ_SCHEDULED_DELAY long The time in milliseconds that a message will wait before being scheduled to be delivered by the broker
AMQ_SCHEDULED_PERIOD long The time in milliseconds to wait after the start time to wait before scheduling the message again
AMQ_SCHEDULED_REPEAT int The number of times to repeat scheduling a message for delivery
AMQ_SCHEDULED_CRON String Use a Cron entry to set the schedule

Dead Letter Queue

如果broker投递给消费者消息, 没有ACK或NACK, 则会触发重新投递, 投递超过一定次数则会进入死信队列, 默认只有一个公共的死信队列ActiveMQ.DLQ, 如果需要给topic分别设置死信队列, 则要在修改activemq.xml.

<broker>
   
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <!-- Set the following policy on all queues using the '>' wildcard -->
        <policyEntry queue=">">
          <deadLetterStrategy>
            <!--
              Use the prefix 'DLQ.' for the destination name, and make
              the DLQ a queue rather than a topic
            -->
            <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
          </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
</broker>

默认非持久化的topic不会进入到死信队列中, 如果需要, 则修改activemq.xml, 加入

<!--
Tell the dead letter strategy to also place non-persisted messages
onto the dead-letter queue if they can't be delivered.
-->
<deadLetterStrategy>
<... processNonPersistent="true" />
</deadLetterStrategy>

实践

STOMP

STOMP是Simple (or Streaming) Text Orientated Messaging Protocol 的缩写, 设计思路借鉴了HTTP, 有content-type, header, body, frame based, text based等类似HTTP的相关概念, 设计文档 < https://stomp.github.io/stomp-specification-1.2.html>, 非常得简洁, 一页就讲完了.

协议细节及特点:

  1. 对于重复的header key, 只有第一个有效.
  2. 服务端可以限制消息大小, header field数量, header长度.
  3. 一个client开多个subscriber时, 必须设置subscribe id.
  4. NACK command 表示 requeue.
  5. stomp有事务的概念, 消息从producer发出到broker确认收到算一个事务, broker投递到consumer ACK算一个事务, 事务具有原子性.
  6. 支持SSL.

ActiveMQ作为STOMP server

  1. 支持 v1.1版本的STMOP协议.

  2. 默认最大消息长度 maxDataLength104857600, maxFrameSizeMAX_LONG.

  3. 通过 destination 名字前缀是/queue/ 还是 /topic/ 来区分是 queue (生产消费模型)还是 topic(发布订阅模型). 真正的名字是去掉包括两个/符号的前缀后的.

  4. 发送默认不是持久化的, 需要在SEND时手动指定persistent:true的header以开启持久化.

    订阅默认不是持久化的, 需要在SUBSCRIBE时手动指定activemq.subscriptionName:订阅者名字的header来开启持久化订阅.

    很多特性都是靠STOMP header来处理的, ActiveMQ官方文档上有两节讲STOMP的header. http://activemq.apache.org/stomp.html#Stomp-StompExtensionsforJMSMessageSemantics

demo 代码

package main
import (
"context"
"github.com/go-stomp/stomp"
"github.com/hanjm/log"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
)
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
publisher(ctx, "/topic/stomp")
}()
wg.Add(1)
go func() {
defer wg.Done()
Subscriber(ctx, "channel1", "Consumer.channel1.stomp")
}()
//
wg.Add(1)
go func() {
defer wg.Done()
Subscriber(ctx, "channel2", "Consumer.channel2.stomp")
}()
wg.Add(1)
go func() {
defer wg.Done()
Subscriber(ctx, "channel3", "/topic/stomp")
}()
defer func() {
cancel()
wg.Wait()
}()
SignalsListen()
}
func publisher(ctx context.Context, destination string) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
defer conn.Disconnect()
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
err = conn.Send(
destination, // destination
"text/plain", // content-type
[]byte("Test message #"+strconv.Itoa(i)), stomp.SendOpt.Header("persistent", "true")) // body
if err != nil {
log.Error(err)
return
}
}
}
}
func Subscriber(ctx context.Context, clientID string, destination string) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
defer conn.Disconnect()
sub, err := conn.Subscribe(destination, stomp.AckClientIndividual, stomp.SubscribeOpt.Id(clientID), stomp.SubscribeOpt.Header("persistent", "true"))
if err != nil {
log.Fatal(err)
return
}
go func() {
select {
case <-ctx.Done():
err := sub.Unsubscribe()
if err != nil {
log.Fatal(clientID, err)
return
}
return
}
}()
for m := range sub.C {
if m.Err != nil {
log.Fatal(err)
return
}
log.Infof("%s msg body:%s", clientID, m.Body)
//log.Infof("%s msg header:%s", clientID, *m.Header)
//log.Infof("%s msg content-type:%s", clientID, m.ContentType)
//log.Infof("%s msg destination:%s", clientID, m.Destination)
m.Conn.Ack(m)
}
log.Info("close sub")
}
func SignalsListen() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGQUIT,
syscall.SIGTERM,
syscall.SIGINT,
syscall.SIGUSR1,
syscall.SIGUSR2)
switch <-sigs {
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT:
log.Info("service close")
}
return
}

MQTT

协议文档http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

翻译版文档https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html

协议细节及特点:

  1. transport支持TCP, 也支持WebSocket, 所以定位于IOT.
  2. 不支持生产消费模型, 只支持发布订阅模型.
  3. 用QOS来表示消息队列中的投递语义, QOS=0 表示至多发送一次, QOS=1表示至少发送一次, QOS=2表示精确地只发送一次.

ActiveMQ作为MQTT server

  1. 通配符不同, MQTT的 / + # 分别对应 ActiveMQ的. * >.
  2. QOS=0对应的是非持久化的topic, QOS=1或者QOS=2对应持久化的topic.

AMQP

协议文档: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html

AMQP相比 stomp mqtt 就复杂得多, 毕竟名字就是高级消息队列(Advanced Message Queuing Protocol ).

协议细节及特点:

  1. AMQP有很多不同的概念, 如Link, Container, Node. 不看模型文档的话就直接使用SDK的话会比较费劲. ContainerID对应ActiveMQ client ID, LinkName对应ActiveMQ subscription name.

ActiveMQ作为AMQP server

  1. 使用1.0协议, 所以使用了0.9.1的2k star的sdk不能用.(https://github.com/streadway/amqp), 而且官方也认为没必要支持旧版本的协议.
  2. 默认最大消息长度 maxDataLength104857600(100MB), maxFrameSizeMAX_LONG, consumer持有的未确认最大消息数量prefetch为1000, producerCredit为10000. 可通过连接的URI设定.
  3. 支持SSL.
  4. 通过 destination 名字前缀是queue:// 还是 topic:// 来区分是 queue (生产消费模型)还是 topic(发布订阅模型). 真正的名字是去掉包括两个/符号的前缀后的.

性能

分别使用

github.com/vcabbage/amqp 76star 13issue 5contributors
github.com/go-stomp/stomp 132star 3issue 14contributors
github.com/eclipse/paho.mqtt.golang 650star 20issue 34contributors

作为SDK, 分别测试了下pub sub 1KB大小的消息普通场景.

publish性能上, amqp=stomp>mqtt, amqp和stomp差不多, 是mqtt的两倍多.
subscribe性能上, amqp比stomp快一点, mqtt则慢很多.

benchmark代码

package all_bench
import (
"bytes"
"context"
"github.com/eclipse/paho.mqtt.golang"
"github.com/go-stomp/stomp"
"github.com/hanjm/log"
"pack.ag/amqp"
"sync/atomic"
"testing"
"time"
)
var msgData = bytes.Repeat([]byte("1"), 1024)
var (
stompDestination = "bench-stomp"
amqpDestination = "bench-amqp"
mqttDestination = "bench-mqtt"
pubMsgCount = 20000
subMsgCount = 100
)
func TestMain(m *testing.M) {
m.Run()
}
// go test -bench Publish -benchmem
// go test -bench Sub -benchmem
func BenchmarkStompPublish(b *testing.B) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
defer conn.Disconnect()
b.N = pubMsgCount
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = conn.Send(
stompDestination, // destination
"text/plain", // content-type
msgData) // body
if err != nil {
log.Error(err)
return
}
}
}
func BenchmarkAmqpPublish(b *testing.B) {
// Create client
client, err := amqp.Dial("amqp://127.0.0.1",
amqp.ConnSASLPlain("system", "manager"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = session.Close(ctx)
if err != nil {
log.Errorf("failed to close session:%s", err)
return
}
//log.Info("session close")
}()
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress(amqpDestination),
amqp.LinkSourceDurability(amqp.DurabilityUnsettledState),
amqp.LinkSourceExpiryPolicy(amqp.ExpiryNever),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := sender.Close(ctx)
if err != nil {
log.Errorf("failed to close sender:%s", err)
return
}
//log.Infof("sender close")
}()
ctx := context.Background()
msg := amqp.NewMessage(msgData)
b.N = pubMsgCount
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Send message
err = sender.Send(ctx, msg)
if err != nil {
log.Fatal("Sending message:", err)
}
if err != nil {
log.Fatal(err)
return
}
}
}
func BenchmarkMqttPublish(b *testing.B) {
opt := mqtt.NewClientOptions().SetClientID("pubClient").SetCleanSession(false)
opt.AddBroker("tcp://127.0.0.1:1883")
client := mqtt.NewClient(opt)
t := client.Connect()
err := t.Error()
if err != nil {
log.Fatal(err)
return
}
if t.Wait() {
err := t.Error()
if err != nil {
log.Fatal(err)
return
}
}
defer func() {
client.Disconnect(10000)
}()
b.N = pubMsgCount
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
token := client.Publish(mqttDestination, 2, true, msgData)
err := token.Error()
if err != nil {
log.Fatal(err)
return
}
}
}
func BenchmarkStompSubscriber(b *testing.B) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
clientID := "1"
//defer conn.Disconnect()
sub, err := conn.Subscribe(stompDestination, stomp.AckClientIndividual, stomp.SubscribeOpt.Id(clientID))
if err != nil {
log.Fatal(err)
return
}
//defer func() {
// err := sub.Unsubscribe()
// if err != nil {
// log.Fatal(clientID, err)
// return
// }
// return
//}()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()
b.N = subMsgCount
b.ReportAllocs()
b.ResetTimer()
defer b.StopTimer()
var i int64 = 0
go func() {
for range time.Tick(time.Second) {
if atomic.LoadInt64(&i) >= int64(b.N) {
cancel()
}
}
}()
defer func() {
//log.Info("close")
}()
for {
select {
case m := <-sub.C:
if m.Err != nil {
log.Fatal(m.Err)
return
}
m.Conn.Ack(m)
i++
if atomic.LoadInt64(&i) > int64(b.N) {
return
}
case <-ctx.Done():
return
}
}
}
func BenchmarkAmqpSubscriber(b *testing.B) {
// Create client
client, err := amqp.Dial("amqp://127.0.0.1",
amqp.ConnSASLPlain("system", "manager"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
//defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
clientID := "1"
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := session.Close(ctx)
if err != nil {
log.Errorf("%s failed to close session:%s", clientID, err)
return
}
//log.Errorf("%s session close", clientID)
}()
// Continuously read messages
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress(amqpDestination),
amqp.LinkCredit(10),
)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := receiver.Close(ctx)
if err != nil {
log.Errorf("%s failed to close receiver:%s", clientID, err)
return
}
//log.Errorf("%s receiver close", clientID)
}()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()
b.N = subMsgCount
b.ReportAllocs()
b.ResetTimer()
defer b.StopTimer()
var i int64 = 0
go func() {
for range time.Tick(time.Second) {
if atomic.LoadInt64(&i) >= int64(b.N) {
cancel()
}
}
}()
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
if err == context.Canceled {
log.Infof("Reading message from AMQP:%s", err)
break
}
log.Errorf("Reading message from AMQP:%s", err)
break
}
//log.Infof("%s msg body:%s value:%T %s", clientID, msg.GetData(), msg.Value, msg.Value)
// Accept message
msg.Accept()
atomic.AddInt64(&i, 1)
if atomic.LoadInt64(&i) > int64(b.N) {
//log.Info("return")
return
}
}
}
func BenchmarkMqttSubscriber(b *testing.B) {
opt := mqtt.NewClientOptions().SetClientID("subClient").SetCleanSession(false)
opt.AddBroker("tcp://127.0.0.1:1883")
client := mqtt.NewClient(opt)
t := client.Connect()
if t.Wait() {
err := t.Error()
if err != nil {
log.Fatal(err)
return
}
}
defer func() {
client.Disconnect(1000)
}()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()
b.N = subMsgCount
b.ReportAllocs()
b.ResetTimer()
defer b.StopTimer()
var i int64 = 0
go func() {
for range time.Tick(time.Second) {
if atomic.LoadInt64(&i) >= int64(b.N) {
cancel()
}
}
}()
client.Subscribe(mqttDestination, 2, func(c mqtt.Client, m mqtt.Message) {
//log.Infof("%s msg body:%s", "1", m.Payload())
m.Ack()
atomic.AddInt64(&i, 1)
if atomic.LoadInt64(&i) > int64(b.N) {
//log.Info("return")
return
}
})
select {
case <-ctx.Done():
break
}
log.Info("close sub")
}

一些细节行为

官方的FAQ里面写了一些实现的细节

  1. 如果producer比较快而consumer比较慢的话, ActiveMQ的流量控制功能使得producer阻塞. http://activemq.apache.org/what-happens-with-a-fast-producer-and-slow-consumer.html
  2. 不支持消费者拿到消息之后Requeue, 即不支持像NSQ那样的消费者出现业务逻辑错误后重试.http://activemq.apache.org/how-do-i-unack-the-message-with-stomp.html.
分享到 评论

Macos Docker container连接宿主机172.17.0.1的办法

在Linux docker container里面, 如果想访问宿主机上的服务, 用 172.17.0.1 这个host即可.

今天在Mac上的 dockercontainer里面启动一个服务, 这个服务需要连我主机上的MySQL, 用 172.17.0.1 是访问不了的, Connection refused.

root@d99939cc53fc:/tmp# curl 172.17.0.1:3306
curl: (7) Failed to connect to 172.17.0.1 port 3306: Connection refused

但是看网络结构, 和Linux的一样, 也是在172.17段下的.

root@d99939cc53fc:/tmp# ip addr
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
2: tunl0@NONE: <NOARP> mtu 1480 qdisc noop state DOWN group default qlen 1
link/ipip 0.0.0.0 brd 0.0.0.0
3: ip6tnl0@NONE: <NOARP> mtu 1452 qdisc noop state DOWN group default qlen 1
link/tunnel6 :: brd ::
6: eth0@if7: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
inet 172.17.0.2/16 brd 172.17.255.255 scope global eth0
valid_lft forever preferred_lft forever

不得其解, Google之, 发现有个隐藏奥秘, https://stackoverflow.com/questions/38504890/docker-for-mac-1-12-0-how-to-connect-to-host-from-container 问题下有人在 Docker Community Edition 17.06.0-ce-mac18, 2017-06-28release notes中发现有

Add an experimental DNS name for the host: docker.for.mac.localhost

这样一条更新日志.

页面搜索docker.for.mac.localhost, 发现在 Docker Community Edition 17.12.0-ce-mac46 2018-01-09 的 release notes中发现有一条相关的更新日志

所以, 结论就是在 container 中应该用 docker.for.mac.host.internal 来访问宿主机.

于是用curl看一下端口通不通, 果然通.

root@d99939cc53fc:/tmp# curl docker.for.mac.host.internal:3306
5.7.21Bf
分享到 评论