发布于 
pv: - | uv: -

利用 Redis 实现消息队列

什么是消息队列

消息队列(message queue) 可以分为两部分,即消息(message)与队列(queue),它是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:

  1. 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。
  2. 消息队列主要解决了应用耦合、异步处理、流量削锋等问题。

当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。

遇到的场景

在这里我遇到的场景是: 对于拾光同步发信(点击按钮时立即发信)可能会造成响应过慢或同一时间请求量过大导致的QPS限制,为此我尝试用PHP+Redis设计了一个消息队列服务.

开始实现

消息队列的本质和队列结构类似,均为先进先出(FIFO),这里利用到的是redis的List(列表).利用lpush进行入队,然后通过rpop出队.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Mq{
public static $key = 'mq'; //Redis键名称
function __construct(){
$this->redis = new Redis();
$rhis->redis->connect('127.0.0.1', 6379); //连接Redis
}

public function push($str){
$this->redis->lpush(self::$key,$str); //从队尾入队
}

public function proc($str){
$data = $this->redis->rpop(self::$key); //从队首出队并处理
print_r($data);
/* code */
}
}

至此我们让Mq类中的proc方法循环运行,便实现了一个最简单的消息队列服务,但是在实际过程中可能会遇到一些问题,比如循环运行proc方法导致的负载高,对于这个问题,我们可以用brpop来缓解。

1
2
3
4
5
6
public function proc($str){
while($data = $this->redis->brpop(self::$key),3600){ //从队首出队并处理
print_r($data);
/* code */
}
}

这里的3600代表超时时间,如果队列中一直没有数据,便会阻塞3600秒,当然我们也可以把这个值设置成0来无限阻塞,但是我们又遇到了一个问题,长时间的阻塞导致Redis自动断开了连接,这里可以试着在redis连接后使用$redis->setOption(3, -1);来将Redis的连接时间设置成永不超时.