RabbitMQ初涉以及Golang实践
在并发编程中,多线程并发协作时采用生产者消费者模式是一个良好的解决方案。生产者线程将生成的数据放入一个阻塞队列中,消费者则直接从该队列中获取数据,这样做的目的是为了降低生产者与消费者之间的耦合性,同时也平衡了两者的不对等的处理能力。
为了达到上述目的,还可以考虑采用消息中间件。由此也引出我们今天的主题:RabbitMQ
。RabbitMQ
实现了AMQP
(Advanced Message Queue Protocol)
协议,是目前广泛使用的消息中间件之一。
RabbitMQ
中有几个重要的概念:
Queue
: 消息队列,是RabbitMQ
的核心Binding
: 绑定,Exchange
通过与Queue
绑定并为每个Queue
设置Routing Key
从而达到路由功能Channel
: 消息通道,每一个Channel
代表一个客户端会话任务Exchange
: 交换器,制定消息传递的规则,选择路由Routing Key:
路由关键字,Exchange
根据此项选择投递消息到哪个Queue
Virtual Host
: 虚拟机,用于隔离用户权限
安装RabbitMQ
Archlinux
:
1sudo pacman -S rabbitmq
安装完成之后,启动RabbitMQ
:
1sudo systemctl start rabbitmq.service
此时,RabbitMQ
就开始运行了,默认只采用AMQP
协议。如果想使用网页来管理服务器,可以激活对应的插件:
1sudo rabbitmq-lugins enable rabbitmq_management
AMQP
协议的端口为5672
,网页管理台端口为15672
,默认用户名和密码均为guest
(记得改密码!)
再将RabbitMQ
重启一下:
1sudo systemctl restart rabbitmq.service
新建用户和虚拟机:
1//新建一个用户
2sudo rabbitmqctl add_user username password
3//新建一个虚拟机
4sudo rabbitmqctl add_vhost NewHost
5//设置用户角色
6sudo rabbitmqctl set_user_tags username monitoring
7//设置 /NewHost对于用户username可用
8sudo rabbitmqctl set_permissions -p NewHost username ".*" ".*" ".*"
Golang使用RabbitMQ
想要通过Golang
来使用RabbitMQ(AMQP协议版)
,需要下载AMQP
库:
1go get github.com/streadway/amqp
然后编写producer.go
与consumer.go
两个程序:
1/*
2 producer.go
3 Author: Peven
4*/
5package main
6
7import (
8 "log"
9
10 "github.com/streadway/amqp"
11)
12
13func main() {
14 //start a new amqp connection
15 conn, err := amqp.Dial("amqp://username:password@localhost:5672/NewHost")
16 if err != nil {
17 log.Fatal(err)
18 }
19 defer conn.Close()
20
21 //declare a channel
22 ch, err := conn.Channel()
23 if err != nil {
24 log.Fatal(err)
25 }
26 defer ch.Close()
27
28 //declare a queue
29 queue, err := ch.QueueDeclare(
30 "TestQueue", //queue name
31 true, //durable
32 false, //auto-delete when unused
33 false, //exclusive
34 false, //no-wait
35 nil, //args
36 )
37 if err != nil {
38 log.Fatal(err)
39 }
40
41 //declare a exchange
42 err = ch.ExchangeDeclare(
43 "TestExchange", //exchange name
44 "direct", //type
45 true, //durable
46 false, //auto-delete when unused
47 false, //internal
48 false, //no-wait
49 nil, //args
50 )
51 if err != nil {
52 log.Fatal(err)
53 }
54
55 //binding a queue
56 err = ch.QueueBind(
57 queue.Name, //queue name
58 "routing_key", //routing key
59 "TestExchange", //exhchange name
60 false, //no-wait
61 nil, //args
62 )
63 if err != nil {
64 log.Fatal("1:", err)
65 }
66
67 //publish a message
68 err = ch.Publish(
69 "TestExchange", //exchange name
70 "routing_key", //routing key
71 false, //mandatory
72 false, //immediate
73 amqp.Publishing{
74 ContentType: "text/plain",
75 Body: []byte("This is a message"),
76 },
77 )
78 if err != nil {
79 log.Fatal(err)
80 }
81}
1/*
2 consumer.go
3 Author: Peven
4*/
5package main
6
7import (
8 "fmt"
9 "log"
10 "sync"
11
12 "github.com/streadway/amqp"
13)
14
15func main() {
16 //start a new amqp connection
17 conn, err := amqp.Dial("amqp://username:password@localhost:5672/NewHost")
18 if err != nil {
19 log.Fatal(err)
20 }
21 defer conn.Close()
22
23 //declare a channel
24 ch, err := conn.Channel()
25 if err != nil {
26 log.Fatal(err)
27 }
28 defer ch.Close()
29
30 //declare a queue
31 queue, err := ch.QueueDeclare(
32 "TestQueue", //queue name
33 true, //durable
34 false, //auto-delete when unused
35 false, //exclusive
36 false, //no-wait
37 nil, //args
38 )
39 if err != nil {
40 log.Fatal(err)
41 }
42
43 //get the delivery results
44 msgs, err := ch.Consume(
45 queue.Name, // queue name
46 "peven", // consumer name
47 true, // auto-ack
48 false, // exclusive
49 false, // no-local
50 false, // no-wait
51 nil, // args
52 )
53
54 var wg sync.WaitGroup
55 wg.Add(1)
56 go func() {
57 for m := range msgs {
58 fmt.Println("Receive a message: ", string(m.Body))
59 }
60 }()
61 wg.Done()
62}
运行完producer
之后,打开终端输入以下命令查看Exchange
:
1sudo rabbitmqctl list_exchanges -p NewHost
输出如下:
1Listing exchanges for vhost NewHost ...
2name type
3amq.topic topic
4amq.direct direct
5TestExchange direct //新创建的交换器
6amq.fanout fanout
7amq.rabbitmq.trace topic
8amq.match headers
9amq.headers headers
10 direct //默认路由,""
输入sudo rabbitctl list_bindings -p NewHost
查看建立的绑定:
1$ sudo rabbitmqctl list_bindings -p NewHost --no-table-headers
2
3Listing bindings for vhost NewHost...
4 exchange TestQueue queue TestQueue []
5TestExchange exchange TestQueue queue routing_key []
同时,运行consumer
也打印出了producer
发送的消息。