RabbitMQ集群
一、原理
Rabbitmq的集群是依赖于erlang的集群来工作的,所以必须先构建起erlang的集群环境。而Erlang的集群中各节点是通过一个magic cookie来实现的,这个cookie存放在 /var/lib/rabbitmq/.erlang.cookie 中,文件是400的权限。所以必须保证各节点cookie保持一致,否则节点之间就无法通信。
二、准备工作
部署RabbitMQ集群,通过使用haproxy作为反向代理访问rabbitMQ,从而实现负载均衡及高可用,具体配置如下:
准备三台主机:
172.20.102.80 rabbitmq-server1
172.20.102.138 rabbitmq-server2
172.20.102.161 rabbitmq-server3
各rabbitMQ服务器更改hosts文件:
[[email protected] ~]#vim /etc/hosts
172.20.102.80 rabbitmq-server1
172.20.102.138 rabbitmq-server2
172.20.102.161 rabbitmq-server3
各rabbitMQ服务器安装并启动rabbitMQ服务:
[[email protected] ~]# yum install rabbitmq-server
[[email protected] ~]# systemctl start rabbitmq-server
[[email protected] ~]# systemctl enable rabbitmq-server
[[email protected] ~]# rabbitmq-plugins enable rabbitmq_management #开启web管理界面插件
[[email protected] ~]# systemctl restart rabbitmq-server
#扩展--添加用户,并赋予权限
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
访问web界面:默认用户名为guest 密码为guest
三、操作步骤
1、拷贝server1的cookie到server2和server3上
[[email protected] rabbitmq]#scp /var/lib/rabbitmq/.erlang.cookie 172.20.102.138:/var/lib/rabbitmq/.erlang.cookie
[[email protected] rabbitmq]#scp /var/lib/rabbitmq/.erlang.cookie 172.20.102.161:/var/lib/rabbitmq/.erlang.cookie
2、各节点重启rabbitMQ服务并验证端口
各节点启动服务:
[[email protected] ~]# systemctl restart rabbitmq-server
[[email protected] ~]# systemctl restart rabbitmq-server
[[email protected] ~]# systemctl restart rabbitmq-server
验证端口:#ss -tnl
5672:消费者访问的端口
15672:web管理端口
25672:集群状态通信端口
3、查看各rabbitMQ服务的集群状态
[[email protected] rabbitmq]#rabbitmqctl cluster_status
Cluster status of node '[email protected]' ...
[{nodes,[{disc,['[email protected]']}]},
{running_nodes,['[email protected]']},
{cluster_name,<<"[email protected]">>},
{partitions,[]}]
...done.
[[email protected] rabbitmq]#rabbitmqctl cluster_status
Cluster status of node '[email protected]' ...
[{nodes,[{disc,['[email protected]']}]},
{running_nodes,['[email protected]']},
{cluster_name,<<"[email protected]">>},
{partitions,[]}]
...done.
[[email protected] rabbitmq]#rabbitmqctl cluster_status
Cluster status of node '[email protected]' ...
[{nodes,[{disc,['[email protected]']}]},
{running_nodes,['[email protected]']},
{cluster_name,<<"[email protected]">>},
{partitions,[]}]
...done.
4、将rabbitMQ的节点添加到集群
在rabbitmq-server1作为内存节点连接起来,并作为内存节点,在rabbitmq-server1执行以下命令:
[[email protected] rabbitmq]#rabbitmqctl cluster_status
Cluster status of node '[email protected]' ...
[{nodes,[{disc,['[email protected]']}]},
{running_nodes,['[email protected]']},
{cluster_name,<<"[email protected]">>}, #未添加到集群之前只有自己一台节点
{partitions,[]}]
...done.
[[email protected] ~]# rabbitmqctl stop_app #停止应程序
[[email protected] ~]# rabbitmqctl reset #清空元数据
[[email protected] rabbitmq]#rabbitmqctl join_cluster [email protected] --ram #将rabbitmq-server1添加到集群当中,并成为内存节点,不加--ram默认是磁盘节点
Clustering node '[email protected]' with '[email protected]' ...
...done.
[[email protected] rabbitmq]#rabbitmqctl cluster_status
Cluster status of node '[email protected]' ...
[{nodes,[{disc,['[email protected]']}, #默认是磁盘节点
{ram,['[email protected]']}]}] #内存节点
...done.
[[email protected] rabbitmq]#rabbitmqctl cluster_status #添加之后的集群状态
Cluster status of node '[email protected]' ...
[{nodes,[{disc,['[email protected]']},
{ram,['[email protected]']}]},
{running_nodes,['[email protected]','[email protected]']},
{cluster_name,<<"[email protected]">>},
{partitions,[]}]
...done.
5、将rabbitmq-server3添加到集群
[[email protected] rabbitmq]# rabbitmqctl cluster_status #未添加到集群之后只有自己一个节点
Cluster status of node '[email protected]' ...
[{nodes,[{disc,['[email protected]']}]},
{running_nodes,['[email protected]']},
{cluster_name,<<"[email protected]">>},
{partitions,[]}]
...done.
[[email protected] rabbitmq]# rabbitmqctl stop_app #停止应用程序
Stopping node '[email protected]' ...
...done.
[[email protected] rabbitmq]# rabbitmqctl reset #重设元数据
Resetting node '[email protected]' ...
...done.
[[email protected] rabbitmq]# rabbitmqctl join_cluster [email protected] --ram #添加到集群当中并设置为内存节点
Clustering node '[email protected]' with '[email protected]' ...
...done.
[[email protected] rabbitmq]# rabbitmqctl start_app #启动应用程序
Starting node '[email protected]' ...
...done.
[[email protected] rabbitmq]# rabbitmqctl cluster_status #查看集群状态
Cluster status of node '[email protected]' ...
[{nodes,[{disc,['[email protected]']}, #一个磁盘节点,集群当中最少有一个磁盘节点用于消息的持久化
{ram,['[email protected]','[email protected]']}]}, #两个内存节点
{running_nodes,['[email protected]','[email protected]', #当前正在运行的节点
'[email protected]']},
{cluster_name,<<"[email protected]">>}, #集群名称
{partitions,[]}]
...done.
5、更改为镜像模式
只要在其中一台节点执行以下命令即可
[[email protected] ~]#rabbitmqctl set_policy ha-all "#" '{"ha-mode":"all"}' # "#"为任意0个或多个即为所有,也可以使用"^test"匹配开头,还可以使用其他正则匹配
Setting policy "ha-all" for pattern "#" to "{\"ha-mode\":\"all\"}" with priority "0" ...
...done.
6、在任意一台节点的管理界面创建一个queue,验证是可以同步到其他节点
7、在其他rabbitMQ节点进行验证queue是否存在
8、使用命令也可以验证
[[email protected] ~]#rabbitmqctl list_queues
Listing queues ...
test 0
test1 0
test2 0
...done.
9、通过haproxy 实现反向代理
#安装haproxy
[[email protected] ~]#yum -y install haproxy
#主要配置如下
vim /etc/haproxy/haproxy.cfg
global
maxconn 100000
uid 99
gid 99
daemon
nbproc 1
log 127.0.0.1 local0 info
defaults
option http-keep-alive
#option forwardfor
maxconn 100000
mode tcp
timeout connect 300000ms
timeout client 300000ms
timeout server 300000ms
listen rabbitmq 0.0.0.0:5671
mode tcp
log global
balance roundrobin
server rabbitmq1 172.20.102.80:5672 check inter 2000 rise 3 fall 3
server rabbitmq2 172.20.102.138:5672 check inter 2000 rise 3 fall 3
server rabbitmq3 172.20.102.161:5672 check inter 2000 rise 3 fall 3
listen rabbitmq_cluster 0.0.0.0:15671
mode tcp
log global
balance roundrobin
server rqslave1 172.20.102.80:15672 check inter 2000 rise 3 fall 3
server rqslave2 172.20.102.138:15672 check inter 2000 rise 3 fall 3
server rqslave3 172.20.102.161:15672 check inter 2000 rise 3 fall 3
#启动Haproxy
[[email protected] ~]#systemctl start haproxy
[[email protected] ~]#ss -tnl
State Recv-Q Send-Q Local Address:Port Peer Address:Port
LISTEN 0 512 *:5671 *:*
LISTEN 0 128 *:22 *:*
LISTEN 0 512 *:15671 *:*
10、访问15671端口,验证可以访问到后端真实的RabbitMQ服务器
11、使用python通过haproxy向rabbitMQ写入消息
①写入消息的的生产者代码
#!/bin/env python
#coding:utf-8
#Author: ZhangJie
import pika
#用户名密码
cert = pika.PlainCredentials("guest","guest")
#连接到rabbitMQ服务器
conn = pika.BlockingConnection(pika.ConnectionParameters("172.20.102.152",5671,'/',cert))
#创建频道
chanel = conn.channel()
#声明消息队列,如果队列不存在就创建,存在就将消息在此队队列中创建,如果将消息发送到不存在的队列,则rabbitMQ会自动清除这些消息
chanel.queue_declare(queue="test7")
#exchange告诉消息去往的队列,routing_key是队列名,body是要传递的消息内容
chanel.basic_publish(exchange="",
routing_key="test7",
body="hello world!")
print("消息写入成功")
#消息写入完成,关闭连接
conn.close()
②执行结果
12、通过haproxy反向代理向rabbitMQ服务器获取消息
①获取消息的消费者脚本内容
#!/bin/env python
#coding:utf-8
#Author: ZhangJie
import pika
#用户名
cert = pika.PlainCredentials("guest","guest")
#连接到服务器
conn = pika.BlockingConnection(pika.ConnectionParameters("172.20.102.152",5671,"/",cert))
#创建频道
channel = conn.channel()
#声明消息队列,如果不存在就创建
channel.queue_declare(queue="hello")
# 定义一个回调函数来处理,这边的回调函数就是将信息打印出来。
def callback(ch,method,properties,body):
print("[x] Received %r" % body)
channel.basic_consume('test5',callback,
auto_ack=False,
exclusive=False,
consumer_tag=None,
arguments=None)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()
②执行结果
13、guest 账号密码忘了,无法登陆:解决办法
[[email protected] ~]#rabbitmqctl change_password guest guest
[[email protected] ~]#rabbitmqctl set_user_tags guest administrator
[[email protected] ~]#systemctl restart rabbitmq-server
14、集群状态操作命令
查看服务状态:rabbitmqctl status
关闭服务:rabbitmqctl stop
查看mq用户:rabbitmqctl list_users
查看用户权限:rabbitmqctl list_user_permissions guest
新增用户: rabbitmqctl add_user admin 123456
四、操作集群
1、添加一个用户 tom
2、添加virtual hosts , 注意,virtual hosts都以斜杠开头
3、设置权限
4、添加完成
5、退出,重新登录