RabbitMQ~消费者实时与消息服务器保持通话
RabbitMQ~消费者实时与消息服务器保持通话
来源:http://www.cnblogs.com/lori/p/6477931.html
这个文章主要介绍简单的消费者的实现,rabbitMQ实现的消费者可以对消息服务器进行实时监听,当有消息(生产者把消息推到服务器上之后),消费者可以自动去消费它,这通常是开启一个进程去维护这个对话,它与消息服务器保持一个TCP的长连接,整个这个过程于rabbitMQ为我们提供,程序开发人员只需要实现自己的回调方法即可。
简单的rabbitMQ消费者
/// <summary>
/// 消息消费者
/// </summary>
public class RabbitMqSubscriber : Lind.DDD.Commons.DisposableBase
{
private readonly string exchangeName;
private readonly string queueName;
private readonly IConnection connection;
private readonly IModel channel;
private bool disposed;
/// <summary>
/// 从消息服务器拉到消息后触发
/// </summary>
public event EventHandler<MessageReceivedEventArgs> MessageReceived;
/// <summary>
/// Initializes a new instance of <c>RabbitMqMessageSubscriber</c> class.
/// </summary>
/// <param name="uri"></param>
/// <param name="exchangeName"></param>
/// <param name="queueName"></param>
public RabbitMqSubscriber(string uri, string queueName, string userName = "", string password = "")
{
this.exchangeName = exchangeName;
this.queueName = queueName;
var factory = new ConnectionFactory() { Uri = uri };
if (!string.IsNullOrWhiteSpace(userName))
factory.UserName = userName;
if (!string.IsNullOrWhiteSpace(password))
factory.Password = password;
this.connection = factory.CreateConnection();
this.channel = connection.CreateModel();
}
public void Subscribe()
{
channel.QueueDeclare(
queue: this.queueName,
durable: false,//持久化
exclusive: false, //独占,只能被一个consumer使用
autoDelete: false,//自己删除,在最后一个consumer完成后删除它
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body;
var json = Encoding.UTF8.GetString(body);
var message = JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
this.OnMessageReceived(new MessageReceivedEventArgs(message));
channel.BasicAck(e.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName,
noAck: false,
consumer: consumer);
}
private void OnMessageReceived(MessageReceivedEventArgs e)
{
this.MessageReceived?.Invoke(this, e);
}
protected override void Finalize(bool disposing)
{
if (disposing)
{
if (!disposed)
{
this.channel.Dispose();
this.connection.Dispose();
disposed = true;
}
}
}
}
简单调用
class Program
{
static void Main(string[] args)
{
var subscriber = new Lind.DDD.RabbitMq.RabbitMqSubscriber("amqp://localhost:5672", "zzl");
subscriber.MessageReceived += Subscriber_MessageReceived;
subscriber.Subscribe();
Console.ReadKey();
}
private static void Subscriber_MessageReceived(object sender, RabbitMq.MessageReceivedEventArgs e)
{
Console.WriteLine("消费者2->消费了一个消息{0}", e.Message);
Lind.DDD.Logger.LoggerFactory.Instance.Logger_Debug("消费者2->消费了一个消息{0}" + e.Message);
Thread.Sleep(2000);
}
}
实时拉消息
RabbitMQ消息模型
通过上面图我们可以更容易和清晰的去理解rabbitmq的工作流程。
说说Exchange的几种模式
RabbitMQ里的Exchange提供了四种模式,或者叫它类型,它们是fanout,direct,topic和header,其中前三种模式我们用的比较多,所有我们主要介绍前3种!
Direct
任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
1.一般情况可以使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。
2.这种模式下不需要将Exchange进行任何绑定(binding)操作
3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
Fanout
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
1.可以理解为路由表的模式
2.这种模式不需要RouteKey
3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
Topic
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。
-END-