目录

RabbitMQ初涉以及Golang实践

在并发编程中,多线程并发协作时采用生产者消费者模式是一个良好的解决方案。生产者线程将生成的数据放入一个阻塞队列中,消费者则直接从该队列中获取数据,这样做的目的是为了降低生产者与消费者之间的耦合性,同时也平衡了两者的不对等的处理能力。

为了达到上述目的,还可以考虑采用消息中间件。由此也引出我们今天的主题:RabbitMQRabbitMQ实现了AMQP (Advanced Message Queue Protocol)协议,是目前广泛使用的消息中间件之一。

RabbitMQ中有几个重要的概念:

  • Queue: 消息队列,是RabbitMQ的核心

  • Binding: 绑定,Exchange通过与Queue绑定并为每个Queue设置Routing Key从而达到路由功能

  • Channel: 消息通道,每一个Channel代表一个客户端会话任务

  • Exchange: 交换器,制定消息传递的规则,选择路由

  • Routing Key: 路由关键字,Exchange根据此项选择投递消息到哪个Queue

  • Virtual Host: 虚拟机,用于隔离用户权限

安装RabbitMQ

Archlinux:

1sudo pacman -S rabbitmq

安装完成之后,启动RabbitMQ

1sudo systemctl start rabbitmq.service

此时,RabbitMQ就开始运行了,默认只采用AMQP协议。如果想使用网页来管理服务器,可以激活对应的插件:

1sudo rabbitmq-lugins enable rabbitmq_management

AMQP协议的端口为5672,网页管理台端口为15672,默认用户名和密码均为guest(记得改密码!)

再将RabbitMQ重启一下:

1sudo systemctl restart rabbitmq.service

新建用户和虚拟机:

1//新建一个用户
2sudo rabbitmqctl add_user username password 
3//新建一个虚拟机
4sudo rabbitmqctl add_vhost NewHost
5//设置用户角色
6sudo rabbitmqctl set_user_tags username monitoring
7//设置 /NewHost对于用户username可用
8sudo rabbitmqctl set_permissions -p NewHost username ".*" ".*" ".*"

Golang使用RabbitMQ

想要通过Golang来使用RabbitMQ(AMQP协议版),需要下载AMQP库:

1go get github.com/streadway/amqp

然后编写producer.goconsumer.go两个程序:

 1/*
 2	producer.go
 3	Author: Peven
 4*/
 5package main
 6
 7import (
 8	"log"
 9
10	"github.com/streadway/amqp"
11)
12
13func main() {
14	//start a new amqp connection
15	conn, err := amqp.Dial("amqp://username:password@localhost:5672/NewHost")
16	if err != nil {
17		log.Fatal(err)
18	}
19	defer conn.Close()
20
21	//declare a channel
22	ch, err := conn.Channel()
23	if err != nil {
24		log.Fatal(err)
25	}
26	defer ch.Close()
27
28	//declare a queue
29	queue, err := ch.QueueDeclare(
30		"TestQueue", //queue name
31		true,        //durable
32		false,       //auto-delete when unused
33		false,       //exclusive
34		false,       //no-wait
35		nil,         //args
36	)
37	if err != nil {
38		log.Fatal(err)
39	}
40
41	//declare a exchange
42	err = ch.ExchangeDeclare(
43		"TestExchange", //exchange name
44		"direct",       //type
45		true,           //durable
46		false,          //auto-delete when unused
47		false,          //internal
48		false,          //no-wait
49		nil,            //args
50	)
51	if err != nil {
52		log.Fatal(err)
53	}
54
55	//binding a queue
56	err = ch.QueueBind(
57		queue.Name,     //queue name
58		"routing_key",  //routing key
59		"TestExchange", //exhchange name
60		false,          //no-wait
61		nil,            //args
62	)
63	if err != nil {
64		log.Fatal("1:", err)
65	}
66
67	//publish a message
68	err = ch.Publish(
69		"TestExchange", //exchange name
70		"routing_key",  //routing key
71		false,          //mandatory
72		false,          //immediate
73		amqp.Publishing{
74			ContentType: "text/plain",
75			Body:        []byte("This is a message"),
76		},
77	)
78	if err != nil {
79		log.Fatal(err)
80	}
81}
 1/*
 2	consumer.go
 3	Author: Peven
 4*/
 5package main
 6
 7import (
 8	"fmt"
 9	"log"
10	"sync"
11
12	"github.com/streadway/amqp"
13)
14
15func main() {
16    //start a new amqp connection
17	conn, err := amqp.Dial("amqp://username:password@localhost:5672/NewHost")
18	if err != nil {
19		log.Fatal(err)
20	}
21	defer conn.Close()
22	
23    //declare a channel
24	ch, err := conn.Channel()
25	if err != nil {
26		log.Fatal(err)
27	}
28	defer ch.Close()
29	
30    //declare a queue
31	queue, err := ch.QueueDeclare(
32		"TestQueue", //queue name
33		true,        //durable
34		false,       //auto-delete when unused
35		false,       //exclusive
36		false,       //no-wait
37		nil,         //args
38	)
39	if err != nil {
40		log.Fatal(err)
41	}
42	
43    //get the delivery results
44	msgs, err := ch.Consume(
45		queue.Name, // queue name
46		"peven",    // consumer name
47		true,       // auto-ack
48		false,      // exclusive
49		false,      // no-local
50		false,      // no-wait
51		nil,        // args
52	)
53	
54	var wg sync.WaitGroup
55	wg.Add(1)
56	go func() {
57		for m := range msgs {
58			fmt.Println("Receive a message: ", string(m.Body))
59		}
60	}()
61	wg.Done()
62}

运行完producer之后,打开终端输入以下命令查看Exchange

1sudo rabbitmqctl list_exchanges -p NewHost

输出如下:

 1Listing exchanges for vhost NewHost ...
 2name    		type
 3amq.topic       	topic
 4amq.direct      	direct
 5TestExchange    	direct	//新创建的交换器
 6amq.fanout      	fanout
 7amq.rabbitmq.trace  	topic
 8amq.match       	headers
 9amq.headers     	headers
10        		direct	//默认路由,""

输入sudo rabbitctl list_bindings -p NewHost 查看建立的绑定:

1$ sudo rabbitmqctl list_bindings -p NewHost --no-table-headers
2
3Listing bindings for vhost NewHost...
4        exchange        TestQueue       queue   TestQueue       []
5TestExchange    exchange        TestQueue       queue   routing_key     []

同时,运行consumer也打印出了producer发送的消息。