基于redis+mysql+php的简单队列实现
消息队列在是分布式系统中必不可少的中间件,目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ等。然而对于一个要求不高的小型系统来说,单独使用维护这些队列系统代价有点大。而redis可以在做缓存的同时也满足队列的需求。redis的list是有序的列表,加上其出队入队函数,利用其特性很简单的就能实现一个消息队列。
一、业务层邮件入队列
入队列使用redis的lpush指令
LPUSH key value1 [value2]
将一个或多个值插入到列表头部
先将邮件详情写入mysql存储,再将该邮件id push到redis队列头部
function sendSysMail($userId, $type, $content)
{
$sysSmsLogModel = M('SysEmailLog');
$data = array(
'user_id' => $userId,
'content' => $content,
'add_time' => time(),
'date_time' => date('Y-m-d H:i:s', time()),
'add_ip' => get_client_ip(),
);
if($sysSmsLogModel->create($data))
{
$emailId = $sysSmsLogModel->add(); //邮件ID
//添加邮件队列
$redis = new Redis();
$redis->connect(C('REDIS_HOST'), C('REDIS_PORT'));
$redis->lPush('mailQueue', $emailId); //lPush将邮件ID入到队列头部
return TRUE;
}
else
{
return FALSE;
}
}
二、后台队列监听
后台执行一个PHP进程,监听队列,对队列循环进行出队列操作。一旦队列有数据,就从mysql中取对应的邮件详细情况进行发送。
出队操作使用BRPOP命令
BRPOP key1 [key2 ] timeout
移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
使用过程中发生了件奇怪的事情,后台进程开启后开始执行的好好的,并没有什么异常,但跑了一晚上,第二天再来看时便不正常了。
看了下redis的数据,都正常的出入队列了,但是邮件并未发送,日志中也没有发送失败的记录。
一番折腾,发现是mysql连接这块存在问题,mysql默认的连接超时时间是28800秒,即8小时,超过8小时,mysql连接就断开了,过了一晚上,mysql自然就go away了。
于是在没有队列没有数据的时候执行select 1来延长mysql连接的超时时间点,select 1执行异常时重新建立一个mysql的连接替换之前的连接,以此来保持mysql的连接始终可用。
<?php
//mailQueue.php
$config = include('config.php');
$redis = new Redis();
$redis->connect($config['REDIS_HOST'], $config['REDIS_PORT']);
$pdo= new PDO($config['DSN'], $config['DB_USER'], $config['DB_PWD'], array(PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION));
while (true)
{
$task = $redis->brPop('mailQueue', 10); //brPop从队列尾部出队
if ($task)
{
$emailId = $task[1];
$sql = "SELECT a.*, b.email FROM sys_email_log AS a, user_main AS b WHERE a.user_id = b.user_id AND a.email_id = {$emailId}";
$res = $pdo->query($sql);
foreach ($res as $row)
{
$to = $row['email'];
$title = '邮件提醒';
$content = $row['content'];
$result = send($to, $title, $content); //send邮件发送函数,调用外部系统邮件接口
if ($result)
{
echo date("Y-m-d H:i:s", time()) . ": " . $emailId . "发送成功\n";
}
else
{
echo date("Y-m-d H:i:s", time()) . ": " . $emailId . "发送失败\n";
}
}
}
else
{
try
{
$pdo->query("SELECT 1");
echo date("Y-m-d H:i:s", time()) . ": 保持连接\n";
}
catch (PDOException $e)
{
$pdo = new PDO($config['DSN'], $config['DB_USER'], $config['DB_PWD'], array(
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION));
echo date("Y-m-d H:i:s", time()) . ": mysql重连\n";
}
}
}
?>
三、队列监听程序的守护进程
用shell写的守护进程,方便队列监听程序的开启关闭以及状态查看
#!/bin/bash
#开启
function start()
{
#先检测程序是否已经开启
pid=`ps -ef | grep "php -f mailQueue.php" | grep -v grep | awk '{print $2}'`
if [ "$pid" == "" ]
then
php -f mailQueue.php >> mailLog &
echo "程序启动成功"
else
echo "程序已经开启过"
fi
}
#关闭
function stop()
{
pid=`ps -ef | grep "php -f mailQueue.php" | grep -v grep | awk '{print $2}'`
if [ "$pid" == "" ]
then
echo "程序未开启"
else
kill -9 $pid
echo "程序关闭成功"
fi
}
#查看开启状态
function status()
{
pid=`ps -ef | grep "php -f mailQueue.php" | grep -v grep | awk '{print $2}'`
if [ "$pid" == "" ]
then
echo "程序未开启"
else
echo "程序运行中,pid: $pid"
fi
}
#主程序
case "$1" in
"start")
start
;;
"stop" )
stop
;;
* )
echo "参数错误! Usage: mailQueue [start|stop|status]"
;;
esac
四、运行演示
1.使用守护进程脚本启动队列后台监听进程
2.数据库中添加一条测试数据
3.redis手动入队数据,模拟业务入队
4.查看邮件发送日志
五、总结
这是一个很简单的消息队列程序,实现了消息队列最基本的功能。但要想稳定运行还有很多需要优化处理的地方。
1.程序只考虑了最简单的状况,对于邮件发送失败的情况只是打了日志,并未做任何处理。可以在数据库邮件详情表增加记录邮件发送失败次数的字段。邮件发送失败后,累加记录失败次数,设置一个最大失败次数,小于最大失败次数则重新入队列,再次发送。直到达到最大失败次数之后不再发送,以此来提高邮件的发送成功率。
2.重新启动后数据的恢复,redis本身有完善数据备份机制,这点可以通过redis本身机制来实现。