RabbitMQ学习笔记

1-简介

RabbitMQ是最受欢迎的开源消息中间件之一,在全球范围内被广泛应用。RabbitMQ是轻量级且易于部署的,能支持多种消息协议。RabbitMQ可以部署在分布式系统中,以满足大规模、高可用的要求。

1.1-相关概念

我们先来了解下RabbitMQ中的相关概念,这里以5种消息模式中的路由模式为例。

1.2-RabbitMQ的安装和配置

我们在Docker中安装RabbitMQ

  • 下载rabbitmq 3.7.15的Docker镜像;
docker pull rabbitmq:3.7.15
  • 使用Docker命令启动服务;
docker run -p 5672:5672 -p 15672:15672 --name rabbitmq \
-d rabbitmq:3.7.15
  • 进入容器并开启管理功能;
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_management
  • 开启防火墙便于外网访问。
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
  • 访问RabbitMQ管理页面地址,查看是否安装成功(Linux下使用服务器IP访问即可):http://localhost:15672/

  • 输入账号密码并登录,这里使用默认账号密码登录:guest guest

  • 创建帐号并设置其角色为管理员:mall mall

  • 创建一个新的虚拟host为:/mall

  • 点击mall用户进入用户配置页面;

  • 给mall用户配置该虚拟host的权限;

  • 至此,RabbitMQ的配置完成。

1.3-AMQP协议

核心概念

  1. server:又称broker,接受客户端连接,实现AMQP实体服务。
  2. connection:连接和具体broker网络连接。
  3. channel:网络信道,几乎所有操作都在channel中进行,channel是消息读写的通道。客户端可以建立多个channel,每个channel表示一个会话任务。
  4. message:消息,服务器和应用程序之间传递的数据,由properties和body组成。properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body是消息实体内容。
  5. Virtual host:虚拟主机,用于逻辑隔离,最上层消息的路由。一个Virtual host可以若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。
  6. Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列上。
  7. banding:Exchange和Queue之间的虚拟连接,binding中可以包括routing key
  8. routing key:一个路由规则,虚拟机根据他来确定如何路由 一条消息。
  9. Queue:消息队列,用来存放消息的队列。

1.4-Exchange

交换机的类型,direct、topic、fanout、headers,durability(是否需要持久化true需要)auto delete当最后一个绑定Exchange上的队列被删除Exchange也删除。

  1. Direct Exchange,所有发送到Direct Exchange的消息被转发到RouteKey 中指定的Queue,Direct Exchange可以使用默认的默认的Exchange (default Exchange),默认的Exchange会绑定所有的队列,所以Direct可以直接使用Queue名(作为routing key )绑定。或者消费者和生产者的routing key完全匹配。
  2. Toptic Exchange,是指发送到Topic Exchange的消息被转发到所有关心的Routing key中指定topic的Queue上。Exchange 将routing key和某Topic进行模糊匹配,此时队列需要绑定一个topic。所谓模糊匹配就是可以使用通配符,“#”可以匹配一个或多个词,“”只匹配一个词比如“log.#”可以匹配“log.info.test” “log. “就只能匹配log.error。
  3. Fanout Exchange:不处理路由键,只需简单的将队列绑定到交换机上。发送到改交换机上的消息都会被发送到与该交换机绑定的队列上。Fanout转发是最快的。

1.5-消息队列解决了什么问题?

  • 异步处理
  • 应用解耦
  • 流量削锋
  • 日志处理
  • ……

2-5种消息模式

2.1-简单模式

简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。

1. 模式示意图

  • P:消息生产者
  • 红色:队列
  • C:消息消费者

包含三个对象:生产者、队列、消费者

2. Java实现

  • 获取mq连接
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class ConnectionUtil {
    /**
    * 获取MQ的连接
    * @return
    */
    public static Connection getConnection() throws IOException, TimeoutException {
    //定义一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("127.0.0.1");
    //AMQP的端口
    factory.setPort(5672);
    //vhost
    factory.setVirtualHost("/vhost_mmr");
    factory.setUsername("rabbit");
    factory.setPassword("123456");

    Connection connection = factory.newConnection();
    return connection;
    }
    }
  • 生产消息
    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Send {
    private static final String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();

    //从连接中获取一个通道
    Channel channel = connection.createChannel();

    //创建队列声明
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    String msg = "hello world!";

    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

    System.out.println("---send msg :" + msg);
    channel.close();
    connection.close();
    }
    }
  • 消费消息
    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Receive {
    private static final String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();

    //创建channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "utf8");
    System.out.println("msg receive : " + msg);
    }
    };

    channel.basicConsume(QUEUE_NAME, consumer);
    }
    }

3. Spring AMQP实现

port: 5672
virtual-host: /mall
username: mall
password: mall
publisher-confirms: true #消息发送到交换器确认
publisher-returns: true #消息发送到队列确认
  • 添加简单模式相关Java配置,创建一个名为simple.hello的队列、一个生产者和一个消费者;
/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class SimpleRabbitConfig {

@Bean
public Queue hello() {
return new Queue("simple.hello");
}

@Bean
public SimpleSender simpleSender(){
return new SimpleSender();
}

@Bean
public SimpleReceiver simpleReceiver(){
return new SimpleReceiver();
}

}
  • 生产者通过send方法向队列simple.hello中发送消息;
/**
* Created by macro on 2020/5/19.
*/
public class SimpleSender {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);

@Autowired
private RabbitTemplate template;

private static final String queueName="simple.hello";

public void send() {
String message = "Hello World!";
this.template.convertAndSend(queueName, message);
LOGGER.info(" [x] Sent '{}'", message);
}

}
  • 消费者从队列simple.hello中获取消息;
/**
* Created by macro on 2020/5/19.
*/
@RabbitListener(queues = "simple.hello")
public class SimpleReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);

@RabbitHandler
public void receive(String in) {
LOGGER.info(" [x] Received '{}'", in);
}

}
  • 在controller中添加测试接口,调用该接口开始发送消息;
/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

@Autowired
private SimpleSender simpleSender;

@ApiOperation("简单模式")
@RequestMapping(value = "/simple", method = RequestMethod.GET)
@ResponseBody
public CommonResult simpleTest() {
for(int i=0;i<10;i++){
simpleSender.send();
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

4. 简单模式的不足

  • 耦合性高,生产者一一对应消费者,如果需要多个消费者消费队列中的消息,此时简单队列就无能为力了;
  • 队列名变更,源码需要同时变更。

2.2-工作模式

工作模式是指向多个互相竞争的消费者发送消息的模式,它包含一个生产者、两个消费者和一个队列。两个消费者同时绑定到一个队列上去,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。

1. 模式示意图

一个生产者将消息放入队列中,可以有多个消费者进行消费

为什么会出现工作队列?

Simple队列:是一一对应的,实际开发中,生产者改善消息是毫不费力的,而消费者一般需要跟业务相结合,消费者接收到消息之后就需要处理,可能需要花费时间,此时队列就会积压很多消息。

2. Java实现

(1) 轮询分发
  • 生产消息

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Send {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();

    //从连接中获取一个通道
    Channel channel = connection.createChannel();

    //创建队列声明
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    for (int i = 0; i < 50; i++) {
    String msg = "hello " + i;
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

    System.out.println("---send msg :" + msg);

    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }


    channel.close();
    connection.close();
    }
    }
  • 消费者1

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();

    //创建channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "utf8");
    System.out.println("[1] msg recv1 : " + msg);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    };

    boolean ack = true;
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
    }
  • 消费者2

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();

    //创建channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "utf8");
    System.out.println("[2] msg recv1 : " + msg);
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    };

    boolean ack = true;
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
    }
  • 现象

    消费者1和消费者2处理的消息是一样多的,这种分发方式称为轮询分发(round-robin),不管谁忙或者谁闲,都不会多给或者少给。任务均分。

(2)公平分发 fair dispatch

保证一次发送给消费者的消息不超过一条

/**
* 每个消费者发送确认消息之前,消息队列不发送下一个消息给消费者,消费者一次只处理一个消息
*
* 限制发送给同一个消费者不得超过一条消息
*/
int preFetchCount = 1;
channel.basicQos(preFetchCount);

使用公平分发,必须关闭自动应答ack,改为手动

channel.basicAck(envelope.getDeliveryTag(), false);
boolean ack = false;//自动应答改为false
channel.basicConsume(QUEUE_NAME, ack, consumer);
  • 生产消息

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Send {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();

    //从连接中获取一个通道
    Channel channel = connection.createChannel();

    //创建队列声明
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    /**
    * 每个消费者发送确认消息之前,消息队列不发送下一个消息给消费者,消费者一次只处理一个消息
    *
    * 限制发送给同一个消费者不得超过一条消息
    */
    int preFetchCount = 1;
    channel.basicQos(preFetchCount);

    for (int i = 0; i < 50; i++) {
    String msg = "hello " + i;
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

    System.out.println("---send msg :" + msg);

    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }


    channel.close();
    connection.close();
    }
    }
  • 消费消息

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();

    //创建channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    channel.basicQos(1);

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "utf8");
    System.out.println("[2] msg recv1 : " + msg);
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    channel.basicAck(envelope.getDeliveryTag(), false);
    }
    }
    };

    boolean ack = false;//自动应答改为false
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
    }

3. 消息应答与消息持久化

  • 消息应答
    boolean ack = false;//自动应答改为false
    channel.basicConsume(QUEUE_NAME, ack, consumer);

    ack = true时为自动确认模式,一旦rabbitMQ将消息分发给消费者,该消息就会在内存中删除;这种情况下,如果杀死正在处理消息的消费者,会丢失正在处理的消息;

    ack = false时为手动回执(消息应答)模式,如果有一个消费者挂掉,就会将会给其他消费者,rabbitMQ支持消息应答,消费者发送一个消息应答,告诉rabbitMQ这个消息已经被处理,然后rabbitMQ就删除内存中的消息;

    消息应答默认打开,即为false;

    由于消息在内存中存储,如果rabbitMQ挂掉,消息仍然会丢失。

  • 消息持久化
    boolean durable = false;
    channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

    durable控制的属性就是消息的持久化。

    已经声明好的队列,如果durable已经为false了,就无法修改为true,rabbitMQ不允许重新定义(不同参数)一个已存在的队列

4. Spring AMQP实现

  • 添加工作模式相关Java配置,创建一个名为work.hello的队列、一个生产者和两个消费者;
/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class WorkRabbitConfig {

@Bean
public Queue workQueue() {
return new Queue("work.hello");
}

@Bean
public WorkReceiver workReceiver1() {
return new WorkReceiver(1);
}

@Bean
public WorkReceiver workReceiver2() {
return new WorkReceiver(2);
}

@Bean
public WorkSender workSender() {
return new WorkSender();
}

}
  • 生产者通过send方法向队列work.hello中发送消息,消息中包含一定数量的.号;
/**
* Created by macro on 2020/5/19.
*/
public class WorkSender {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);

@Autowired
private RabbitTemplate template;

private static final String queueName = "work.hello";

public void send(int index) {
StringBuilder builder = new StringBuilder("Hello");
int limitIndex = index % 3+1;
for (int i = 0; i < limitIndex; i++) {
builder.append('.');
}
builder.append(index+1);
String message = builder.toString();
template.convertAndSend(queueName, message);
LOGGER.info(" [x] Sent '{}'", message);
}

}
  • 两个消费者从队列work.hello中获取消息,名称分别为instance 1instance 2,消息中包含.号越多,耗时越长;
/**
* Created by macro on 2020/5/19.
*/
@RabbitListener(queues = "work.hello")
public class WorkReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class);

private final int instance;

public WorkReceiver(int i) {
this.instance = i;
}

@RabbitHandler
public void receive(String in) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", this.instance, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds());
}

private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}

}
  • 在controller中添加测试接口,调用该接口开始发送消息;
/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

@Autowired
private WorkSender workSender;

@ApiOperation("工作模式")
@RequestMapping(value = "/work", method = RequestMethod.GET)
@ResponseBody
public CommonResult workTest() {
for(int i=0;i<10;i++){
workSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}
  • 运行后结果可以发现生产者往队列中发送包含不同数量.号的消息,instance 1instance 2消费者互相竞争,分别消费了一部分消息。

2.3-发布/订阅模式

发布/订阅模式是指同时向多个消费者发送消息的模式(类似广播的形式),它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费消息。

1. 模式示意图

  • 一个生产者,多个消费者;
  • 每个消费者都有自己的队列;
  • 生产者没有直接把消息发送到队列,而是发送至交换机(eXchange)
  • 每个队列都要绑定到交换机上
  • 生产者发送的消息,经过交换机,到达队列,就能实现一个消息被多个消费者消费

2. Java实现

  • 生产消息

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    //声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    String msg = "hello ps";

    channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
    System.out.println("Send " + msg);

    channel.close();
    connection.close();
    }
    }
  • 消费消息

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Recv1 {
    private static final String QUEUE_NAME = "test_ps_fanout_email";
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    //绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

    channel.basicQos(1);
    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "utf8");
    System.out.println("[1] msg recv1 : " + msg);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    channel.basicAck(envelope.getDeliveryTag(), false);
    }
    }
    };

    boolean ack = false;//自动应答改为false
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
    }

    不同的队列做不同的事情。

3. Spring AMQP实现

  • 添加发布/订阅模式相关Java配置,创建一个名为exchange.fanout的交换机、一个生产者、两个消费者和两个匿名队列,将两个匿名队列都绑定到交换机;
/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class FanoutRabbitConfig {

@Bean
public FanoutExchange fanout() {
return new FanoutExchange("exchange.fanout");
}

@Bean
public Queue fanoutQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue fanoutQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanout);
}

@Bean
public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanout);
}

@Bean
public FanoutReceiver fanoutReceiver() {
return new FanoutReceiver();
}

@Bean
public FanoutSender fanoutSender() {
return new FanoutSender();
}

}
  • 生产者通过send方法向交换机exchange.fanout中发送消息,消息中包含一定数量的.号;
/**
* Created by macro on 2020/5/19.
*/
public class FanoutSender {
private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class);
@Autowired
private RabbitTemplate template;

private static final String exchangeName = "exchange.fanout";

public void send(int index) {
StringBuilder builder = new StringBuilder("Hello");
int limitIndex = index % 3 + 1;
for (int i = 0; i < limitIndex; i++) {
builder.append('.');
}
builder.append(index + 1);
String message = builder.toString();
template.convertAndSend(exchangeName, "", message);
LOGGER.info(" [x] Sent '{}'", message);
}

}
  • 消费者从绑定的匿名队列中获取消息,消息中包含.号越多,耗时越长,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1instance 2
/**
* Created by macro on 2020/5/19.
*/
public class FanoutReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class);

@RabbitListener(queues = "#{fanoutQueue1.name}")
public void receive1(String in) {
receive(in, 1);
}

@RabbitListener(queues = "#{fanoutQueue2.name}")
public void receive2(String in) {
receive(in, 2);
}

private void receive(String in, int receiver) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}

private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}

}
  • 在controller中添加测试接口,调用该接口开始发送消息;
/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

@Autowired
private FanoutSender fanoutSender;

@ApiOperation("发布/订阅模式")
@RequestMapping(value = "/fanout", method = RequestMethod.GET)
@ResponseBody
public CommonResult fanoutTest() {
for(int i=0;i<10;i++){
fanoutSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}
  • 运行后结果可以发现生产者往队列中发送包含不同数量.号的消息,instance 1instance 2同时获取并消费了消息。

2.4-路由模式

路由模式是可以根据路由键选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键转发到不同队列,队列绑定的消费者接收并消费消息。

1. 模式示意图

  • 声明exchange时指定为direct模式
  • 绑定队列时,指定路由键

2. Java实现

  • 生产者

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    //声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    String msg = "hello direct";

    //指定路由键
    String routingKey = "warning";
    channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

    System.out.println("send msg:" + msg);
    channel.close();
    connection.close();
    }
    }
  • 消费者1

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    channel.basicQos(1);

    //绑定队列与交换机时,指定路由键
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");


    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "utf8");
    System.out.println("[1] msg recv1 : " + msg);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    channel.basicAck(envelope.getDeliveryTag(), false);
    }
    }
    };

    boolean ack = false;//自动应答改为false
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
    }
  • 消费者2

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_2";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    channel.basicQos(1);

    //绑定队列与交换机时,指定路由键
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "utf8");
    System.out.println("[2] msg recv2 : " + msg);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    channel.basicAck(envelope.getDeliveryTag(), false);
    }
    }
    };

    boolean ack = false;//自动应答改为false
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
    }

3. Spring AMQP实现

  • 添加路由模式相关Java配置,创建一个名为exchange.direct的交换机、一个生产者、两个消费者和两个匿名队列,队列通过路由键都绑定到交换机,队列1的路由键为orangeblack队列2的路由键为greenblack
/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class DirectRabbitConfig {

@Bean
public DirectExchange direct() {
return new DirectExchange("exchange.direct");
}

@Bean
public Queue directQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue directQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding directBinding1a(DirectExchange direct, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("orange");
}

@Bean
public Binding directBinding1b(DirectExchange direct, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("black");
}

@Bean
public Binding directBinding2a(DirectExchange direct, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(direct).with("green");
}

@Bean
public Binding directBinding2b(DirectExchange direct, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(direct).with("black");
}

@Bean
public DirectReceiver receiver() {
return new DirectReceiver();
}


@Bean
public DirectSender directSender() {
return new DirectSender();
}

}
  • 生产者通过send方法向交换机exchange.direct中发送消息,发送时使用不同的路由键,根据路由键会被转发到不同的队列;
/**
* Created by macro on 2020/5/19.
*/
public class DirectSender {

@Autowired
private RabbitTemplate template;

private static final String exchangeName = "exchange.direct";

private final String[] keys = {"orange", "black", "green"};

private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class);

public void send(int index) {
StringBuilder builder = new StringBuilder("Hello to ");
int limitIndex = index % 3;
String key = keys[limitIndex];
builder.append(key).append(' ');
builder.append(index+1);
String message = builder.toString();
template.convertAndSend(exchangeName, key, message);
LOGGER.info(" [x] Sent '{}'", message);
}

}
  • 消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1instance 2
/**
* Created by macro on 2020/5/19.
*/
public class DirectReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class);

@RabbitListener(queues = "#{directQueue1.name}")
public void receive1(String in){
receive(in, 1);
}

@RabbitListener(queues = "#{directQueue2.name}")
public void receive2(String in){
receive(in, 2);
}

private void receive(String in, int receiver){
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}

private void doWork(String in){
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}

}
  • 在controller中添加测试接口,调用该接口开始发送消息;
/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

@Autowired
private DirectSender directSender;

@ApiOperation("路由模式")
@RequestMapping(value = "/direct", method = RequestMethod.GET)
@ResponseBody
public CommonResult directTest() {
for(int i=0;i<10;i++){
directSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}
  • 运行后结果中可以发现生产者往队列中发送包含不同路由键的消息,instance 1获取到了orangeblack消息,instance 2获取到了greenblack消息。

2.5-Topic模式

通配符模式是可以根据路由键匹配规则选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键匹配规则绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键匹配规则转发到不同队列,队列绑定的消费者接收并消费消息。

1. 模式示意图

  • *:只能匹配一个单词;
  • #:可以匹配零个或多个单词。

2. Java实现

  • 生产者

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    //声明exchange,指定模式为topic
    channel.exchangeDeclare(EXCHANGE_NAME, "topic");

    String msg = "商品....";

    String routingKey = "goods.delete";
    channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

    System.out.println("send msg:" + msg);
    channel.close();
    connection.close();
    }
    }
  • 消费者1

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Created by wangbin on 2018/6/26.
    */
    public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_1";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    channel.basicQos(1);

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "utf8");
    System.out.println("[1] msg recv1 : " + msg);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    channel.basicAck(envelope.getDeliveryTag(), false);
    }
    }
    };

    boolean ack = false;//自动应答改为false
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
    }
  • 消费者2

    import com.meituan.mq.simple.utils.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Created by wangbin on 2018/6/26.
    */
    public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_2";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    channel.basicQos(1);

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "utf8");
    System.out.println("[2] msg recv2 : " + msg);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    channel.basicAck(envelope.getDeliveryTag(), false);
    }
    }
    };

    boolean ack = false;//自动应答改为false
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
    }

    其中,消费者1绑定路由键为goods.#,消费者2绑定路由键为goods.add。当生产者发送的消息路由键为goods.add时,两个消费者都会收到消息并处理;当生产者发送的消息路由键为goods.update时,只有消费者1可以接收到消息。

3. Spring AMQP实现

  • 添加通配符模式相关Java配置,创建一个名为exchange.topic的交换机、一个生产者、两个消费者和两个匿名队列,匹配*.orange.**.*.rabbit发送到队列1,匹配lazy.#发送到队列2
/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class TopicRabbitConfig {

@Bean
public TopicExchange topic() {
return new TopicExchange("exchange.topic");
}

@Bean
public Queue topicQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue topicQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");
}

@Bean
public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");
}

@Bean
public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");
}

@Bean
public TopicReceiver topicReceiver() {
return new TopicReceiver();
}

@Bean
public TopicSender topicSender() {
return new TopicSender();
}

}
  • 生产者通过send方法向交换机exchange.topic中发送消息,消息中包含不同的路由键
/**
* Created by macro on 2020/5/19.
*/
public class TopicSender {

@Autowired
private RabbitTemplate template;

private static final String exchangeName = "exchange.topic";

private static final Logger LOGGER = LoggerFactory.getLogger(TopicSender.class);


private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};

public void send(int index) {
StringBuilder builder = new StringBuilder("Hello to ");
int limitIndex = index%keys.length;
String key = keys[limitIndex];
builder.append(key).append(' ');
builder.append(index+1);
String message = builder.toString();
template.convertAndSend(exchangeName, key, message);
LOGGER.info(" [x] Sent '{}'",message);
System.out.println(" [x] Sent '" + message + "'");
}

}
  • 消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1instance 2
/**
* Created by macro on 2020/5/19.
*/
public class TopicReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(TopicReceiver.class);

@RabbitListener(queues = "#{topicQueue1.name}")
public void receive1(String in){
receive(in, 1);
}

@RabbitListener(queues = "#{topicQueue2.name}")
public void receive2(String in){
receive(in, 2);
}

public void receive(String in, int receiver){
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}

private void doWork(String in){
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}

}
  • 在controller中添加测试接口,调用该接口开始发送消息;
/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

@Autowired
private TopicSender topicSender;

@ApiOperation("通配符模式")
@RequestMapping(value = "/topic", method = RequestMethod.GET)
@ResponseBody
public CommonResult topicTest() {
for(int i=0;i<10;i++){
topicSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}
  • 运行后结果中可以发现生产者往队列中发送包含不同路由键的消息,instance 1instance 2分别获取到了匹配的消息。

3-消息Ack确认机制

RabbitMQ的消息确认有两种。

  • 消息发送确认:这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
  • 消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

3.1-环境配置

1. 引入依赖

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

2. 配置文件

spring.application.name=rabbitmq
server.port=8084

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

3. 配置类

@Configuration
public class RabbitConfig {
public static final String CONFIRM_QUEUE_A = "confirm_queue_A";
public static final String CONFIRM_QUEUE_B = "confirm_queue_B";
public static final String CONFIRM_EXCHANGE = "confirm_topic_exchange";
private static final String CONFIRM_QUEUE_A_RoutingKey="topic.message";
private static final String CONFIRM_QUEUE_B_RoutingKey="topic.#";

//Json格式转换
private static final MessageConverter jsonMessageConverter=new Jackson2JsonMessageConverter();

@Autowired
private RabbitTemplate rabbitTemplate;

//测试队列A
@Bean
public Queue confirmQueryA() {
return new Queue(CONFIRM_QUEUE_A);
}

//测试队列B
@Bean
public Queue confirmQueryB() {
return new Queue(CONFIRM_QUEUE_B);
}

//测试交换机,类型为topic
@Bean
TopicExchange confirmTopicExchange() {
return new TopicExchange(CONFIRM_EXCHANGE);
}

//绑定测试交换机和测试队列A
@Bean
Binding bindingConfirmExchangeA(Queue confirmQueryA, TopicExchange confirmTopicExchange) {
return BindingBuilder.bind(confirmQueryA).to(confirmTopicExchange).with(CONFIRM_QUEUE_A_RoutingKey);
}

//绑定测试交换机和测试队列B
@Bean
Binding bindingConfirmExchangeB(Queue confirmQueryB, TopicExchange confirmTopicExchange) {
return BindingBuilder.bind(confirmQueryB).to(confirmTopicExchange).with(CONFIRM_QUEUE_B_RoutingKey);
}


}

3.2-消息发送确认

1. ConfirmCallback

通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。

消息的确认,指生产者收到投递消息后,如果Broker收到消息就会给我们 的生产者一个应答,生产者接受应答来确认broker是否收到消息。

  • ConfirmCallBackHandler

    // 通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
    public class ConfirmCallBackHandler implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    System.out.println("消息唯一标识:"+correlationData);
    System.out.println("确认结果:"+ack);
    System.out.println("失败原因:"+cause);
    }
    }
  • 在RabbitConfig中配置RabbitTempalte

    //初始化加载方法,对RabbitTemplate进行配置
    @PostConstruct
    void rabbitTemplate(){
    //消息发送确认,发送到交换器Exchange后触发回调
    rabbitTemplate.setConfirmCallback(new ConfirmCallBackHandler());
    }
  • 该功能需要开启确认,spring-boot中配置如下:

    #消息发送交换机确认
    spring.rabbitmq.publisher-confirms = true

2. ReturnCallback

通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)。

Return消息机制处理一些不可路由的消息,我们的生产者通过指定一个Exchange和Routinkey,把消息送达到某一个队列中去,然后我们消费者监听队列进行消费处理!

在某些情况下,如果我们在发送消息的时候当Exchange不存在或者指定的路由key路由找不到,这个时候如果我们需要监听这种不可到达的消息,就要使用Return Listener!

Mandatory 设置为true则会监听器会接受到路由不可达的消息,然后处理。如果设置为false,broker将会自动删除该消息。

  • ReturnCallBackHandler

    // 通过实现ReturnCallback接口
    // 如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
    public class ReturnCallBackHandler implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    System.out.println("消息主体 message:"+message);
    System.out.println("应答码 replyCode: :"+replyCode);
    System.out.println("原因描述 replyText:"+replyText);
    System.out.println("交换机 exchange:"+exchange);
    System.out.println("消息使用的路由键 routingKey:"+routingKey);
    }
    }
  • 在RabbitConfig中配置RabbitTempalte

    //初始化加载方法,对RabbitTemplate进行配置
    @PostConstruct
    void rabbitTemplate(){
    //消息发送确认,发送到交换器Exchange后触发回调
    rabbitTemplate.setConfirmCallback(new ConfirmCallBackHandler());
    //消息发送确认,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
    rabbitTemplate.setReturnCallback(new ReturnCallBackHandler());
    //自定义格式转换
    //rabbitTemplate.setMessageConverter(jsonMessageConverter);
    }
  • 使用该功能需要开启确认,spring-boot中配置如下:

    #消息发送队列回调
    spring.rabbitmq.publisher-returns = true

3. 定制化模板

/**
* 定制化amqp模版
   * connectionFactory:包含了yml文件配置参数
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 必须设置为 true,不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调
// 而且 ReturnCallback 比 ConfirmCallback 先回调,意思就是 ReturnCallback 执行完了才会执行 ConfirmCallback
rabbitTemplate.setMandatory(true);
// 设置 ConfirmCallback 回调 yml需要配置 publisher-confirms: true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
// 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false
// 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意)
if (ack) {
String messageId = correlationData.getId();
System.out.println("confirm:"+messageId);
}
});
// 设置 ReturnCallback 回调 yml需要配置 publisher-returns: true
// 如果发送到交换器成功,但是没有匹配的队列,就会触发这个回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText,
exchange, routingKey) -> {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("return:"+messageId);
});
return rabbitTemplate;
}

}

3.3-消息接收确认

1. 确认模式

  • AcknowledgeMode.NONE:不确认
  • AcknowledgeMode.AUTO:自动确认
  • AcknowledgeMode.MANUAL:手动确认

spring-boot中配置方法:

spring.rabbitmq.listener.simple.acknowledge-mode = manual

2. 手动确认

  • 成功确认

    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    • deliveryTag:该消息的index

    • multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。

    消费者成功处理后,调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。

    示例:

    @Component
    @RabbitListener(queues = "confirm_queue_B")
    public class Customer {
    @RabbitHandler
    public void process(Message message, Channel channel){
    System.out.println("ReceiverA:"+new String(message.getBody()));
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
  • 失败确认

    1. 失败确认1

      void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
      • deliveryTag:该消息的index。
      • multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
      • requeue:被拒绝的是否重新入队列。

      示例:

      @Component
      @RabbitListener(queues = "confirm_queue_B")
      public class Customer {
      @RabbitHandler
      public void processJsonMessage(@Payload String body, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Message message,Channel channel){
      System.out.println("ReceiverA:"+new String(message.getBody()));
      channel.basicNack(deliveryTag,true,true);
      }
    2. 失败确认2

      void basicReject(long deliveryTag, boolean requeue) throws IOException;
      • deliveryTag:该消息的index。
      • requeue:被拒绝的是否重新入队列。

      channel.basicNackchannel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

      @Component
      @RabbitListener(queues = "confirm_queue_B")
      public class Customer {
      public void processJsonMessage(@Payload String body, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Message message,Channel channel){
      System.out.println("ReceiverA:"+new String(message.getBody()));
      channel.basicReject(deliveryTag,true);
      }

3. 思考

  • 手动确认模式,消息手动拒绝中如果requeue为true会重新放入队列,但是如果消费者在处理过程中一直抛出异常,会导致入队-》拒绝-》入队的循环,该怎么处理呢?

    1. 根据异常类型来选择是否重新放入队列;
    2. 先成功确认,然后通过**channel.basicPublish()**重新发布这个消息。重新发布的消息网上说会放到队列后面,进而不会影响已经进入队列的消息处理。
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
  • 消息确认的作用是什么?

    为了防止消息丢失。消息丢失分为发送丢失和消费者处理丢失,相应的也有两种确认机制。

  • 处理模板案例

    @Component
    @RabbitListener(queues = "confirm_queue_B")
    public class AckTempalte {
    enum Action{
    ACCEPT, // 处理成功
    RETRY, // 可以重试的错误
    REJECT, // 无需重试的错误
    }
    @RabbitHandler
    public void processJsonUser(Message message, Channel channel){
    Action action=Action.ACCEPT;
    long tag=message.getMessageProperties().getDeliveryTag();
    try{

    message.getMessageProperties().getConsumerTag();
    System.out.println( message.getMessageProperties().getConsumerTag());
    String message1 = new String(message.getBody(), "UTF-8");
    System.out.println("获取消息'" + message1 + "'");

    }catch (Exception e){
    // 根据异常种类决定是ACCEPT、RETRY还是 REJECT
    action = Action.RETRY;
    e.printStackTrace();

    }finally {
    try {
    // 通过finally块来保证Ack/Nack会且只会执行一次
    if (action == Action.ACCEPT) {
    channel.basicAck(tag, true);
    // 重试
    } else if (action == Action.RETRY) {
    channel.basicNack(tag, false, true);
    Thread.sleep(2000L);
    // 拒绝消息也相当于主动删除mq队列的消息
    } else {
    channel.basicNack(tag, false, false);
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }
    }

4-RabbitMQ的消费端限流

4.1-为什么需要限流?

  • 假设一个场景,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这种情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!此时很有可能导致服务器崩溃,严重的可能导致线上的故障。
  • 除了这种场景,还有一些其他的场景,比如说单个生产者一分钟生产出了几百条数据,但是单个消费者一分钟可能只能处理60条数据,这个时候生产端和消费端肯定是不平衡的。通常生产端是没办法做限制的。所以消费端肯定需要做一些限流措施,否则如果超出最大负载,可能导致消费端性能下降,服务器卡顿甚至崩溃等一系列严重后果。

4.2-消费端限流机制

RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consume或者channel设置Qos的值) 未被确认前,不进行消费新的消息。
需要注意:

  1. 不能设置自动签收功能(autoAck = false)
  2. 如果消息没被确认,就不会到达消费端,目的就是给消费端减压

4.3-限流相关API

1. 限流设置

void basicQOS(unit prefetchSize,ushort prefetchCount,Boolean global)
  • prefetchSize: 单条消息的大小限制,消费端通常设置为0,表示不做限制;
  • prefetchCount: 一次最多能处理多少条消息,通常设置为1;
  • global: 是否将上面设置应用于channel,false代表consumer级别;

2. 注意事项

prefetchCountautoAck=false 的情况下生效,即在自动应答的情况下这个值是不生效的。

3. 手工ACK - basicAck()

void basicAck(Integer deliveryTag,boolean multiple)

手工ACK,调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你可以给我下一条了。参数multiple表示是否批量签收,由于我们是一次处理一条消息,所以设置为false

4. 限流显示

消费者:关闭autoACK,进行限流设置

public class Consumer {

public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.157");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();

String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
//4 声明交换机和队列,然后进行绑定设置路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//进行参数设置:单条消息的大小限制,一次最多能处理多少条消息,是否将上面设置应用于channel
channel.basicQos(0, 1, false);

//限流: autoAck设置为 false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}

5-如何保证消息的幂等性?

消息的幂等性,即如何保证消费不被重复消费。

5.1-什么是幂等性?

可以参考数据库乐观锁机制,比如执行一条更新库存的 SQL 语句,在并发场景,为了性能和数据可靠性,会在更新时加上查询时的版本,并且更新这个版本信息。可能你要对一个事情进行操作,这个操作可能会执行成百上千次,但是操作结果都是相同的,这就是幂等性。

5.2-非幂等的情况

  1. 生产者已把消息发送到mq,在mq给生产者返回ack的时候网络中断,故生产者未收到确定信息,生产者认为消息未发送成功,但实际情况是,mq已成功接收到了消息,在网络重连后,生产者会重新发送刚才的消息,造成mq接收了重复的消息;

    解决办法:

    mq内部会为每条消息生成一个全局唯一、与业务无关的消息id,当mq接收到消息时,会先根据该id判断消息是否重复发送,mq再决定是否接收该消息。

  2. 消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;

    解决办法:

    也可利用mq的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。

5.3-消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。

业界主流的幂等性有两种操作:

  • 唯一 ID + 指纹码 机制:
    1. 指纹码(就是时间戳 + 业务的一些规则, 来保证id + 指纹码在同一时刻是唯一的,不会出现重复)
    2. 唯一ID + 指纹码机制,利用数据库主键去重;
    3. select count(1) from t_order where id = 唯一ID + 指纹码;
    4. 好处:实现简单;
    5. 坏处:高并发下有数据库写入的瓶颈;
    6. 解决方案:跟进ID进行分库分表进行算法路由;
  • 利用Redis的原子性实现

1. 唯一 ID + 指纹码 机制

大家肯定懂唯一 ID 的,就不多说了,为什么需要指纹码呢?这是为了应对用户在一瞬间的频繁操作,这个指纹码可能是我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中。

好处就是实现简单,就一个拼接,然后查询判断是否重复。

坏处就是在高并发时,如果是单个数据库就会有写入性能瓶颈

解决方案 :根据 ID 进行分库分表,对 id 进行算法路由,落到一个具体的数据库,然后当这个 id 第二次来又会落到这个数据库,这时候就像我单库时的查重一样了。利用算法路由把单库的幂等变成多库的幂等,分摊数据流量压力,提高性能。

2. 利用 redis 的原子性去实现

使用 redis 的原子性去实现需要考虑两个点:

  • 一是 是否 要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性? 数据库与缓存进行同步肯定要进行写操作,到底先写 redis 还是先写数据库,这是个问题,涉及到缓存更新与淘汰的问题
  • 二是如果不落库,那么都存储到缓存中,如何设置定时同步的策略? 不入库的话,可以使用双重缓存等策略,保障一个消息副本,具体同步可以使用类似 databus 这种同步工具。

6-TTL队列/消息和死信队列

6.1-TTL消息和TTL队列

TTL是Time To Live的缩写,也就是生存时间

1. TTL消息

RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。

2. TTL队列

RabbitMQ支持为每个队列设置消息的超时时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

3. 注意事项

  • 两者的区别是设置队列的过期时间是对该队列的所有消息生效的。
  • 为消息设置TTL有一个问题:RabbitMQ只对处于队头的消息判断是否过期(即不会扫描队列),所以,很可能队列中已存在死消息,但是队列并不知情。这会影响队列统计数据的正确性,妨碍队列及时释放资源。

6.2-死信队列

1. 死信队列介绍

死信队列:DLX,dead-letter-exchange

利用DLX,当消息在一个队列中变成死信 (dead message,就是没有任何消费者消费)) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。

2. 消息变成死信有以下几种情况

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

3. 死信处理过程

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中的消息做相应的处理。

4. 死信队列设置

  1. 首先需要设置死信队列的exchange和queue,然后进行绑定:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #
#表示只要有消息到达了Exchange,那么都会路由到这个queue上
  1. 然后需要有一个监听,去监听这个队列进行处理
  2. 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:arguments.put(" x-dead-letter-exchange","dlx.exchange");,这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!

5. 死信队列案例

  • 生产端
    public class Producer {

    public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.43.157");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 获取Connection
    Connection connection = connectionFactory.newConnection();
    //3 通过Connection创建一个新的Channel
    Channel channel = connection.createChannel();

    String exchange = "test_dlx_exchange";
    String routingKey = "dlx.save";

    String msg = "Hello RabbitMQ DLX Message";

    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)
    .contentEncoding("UTF-8")
    .expiration("10000")
    .build();
    //发送消息
    channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    }
    }
  • 自定义消费者
    public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
    super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");
    System.err.println("consumerTag: " + consumerTag);
    System.err.println("envelope: " + envelope);
    System.err.println("properties: " + properties);
    System.err.println("body: " + new String(body));
    }
    }
  • 消费端
    • 声明正常处理消息的交换机、队列及绑定规则
    • 在正常交换机上指定死信发送的Exchange
    • 声明死信交换机、队列及绑定规则
    • 监听死信队列,进行后续处理,这里省略
    public class Consumer {

    public static void main(String[] args) throws Exception {

    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.43.157");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");

    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();

    // 声明一个普通的交换机 和 队列 以及路由
    String exchangeName = "test_dlx_exchange";
    String routingKey = "dlx.#";
    String queueName = "test_dlx_queue";

    channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    //指定死信发送的Exchange
    Map<String, Object> agruments = new HashMap<String, Object>();
    agruments.put("x-dead-letter-exchange", "dlx.exchange");
    //这个agruments属性,要设置到声明队列上
    channel.queueDeclare(queueName, true, false, false, agruments);
    channel.queueBind(queueName, exchangeName, routingKey);

    //要进行死信队列的声明
    channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
    channel.queueDeclare("dlx.queue", true, false, false, null);
    channel.queueBind("dlx.queue", "dlx.exchange", "#");

    channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
    }
  • 运行说明

    启动消费端,此时查看管控台,新增了两个Exchange,两个Queue。在test_dlx_queue上我们设置了DLX,也就代表死信消息会发送到指定的Exchange上,最终其实会路由到dlx.queue上。

    此时关闭消费端,然后启动生产端,查看管控台队列的消息情况,test_dlx_queue的值为1,而dlx_queue的值为0。
    10s后的队列结果如图,由于生产端发送消息时指定了消息的过期时间为10s,而此时没有消费端进行消费,消息便被路由到死信队列中。

    实际环境我们还需要对死信队列进行一个监听和处理,当然具体的处理逻辑和业务相关,这里只是简单演示死信队列是否生效。

7-RabbitMQ的集群模式

7.1-主备模式

实现RabbitMQ的高可用集群,一般在并发和数据量不高的情况下,这种模式非常的好且简单。主备模式也称为Warren模式

主备模式:主节点提供读写,从节点不提供读写服务,只是负责提供备份服务,如果主节点宕机备份从节点会自动切换成主节点,提供服务。
主从模式:主节点提供读写,从节点只读。

  • 主备模式:所谓rabbitmq另外一种模式就是warren(兔子窝),就是一个主/备方案(主节点如果挂了,从节点提供服务而已,和activemq利用zookeeper做主/备一样)

  • HaProxy配置:
listen rabbitmq_cluster
bind 0.0.0.0:5672
mode tcp #配置TCP模式
balance roundrobin #简单的轮询
server bhz76 192.168.11.12:5672 check inter 5000 rise 2 fall 3 #主节点
server bhz77 192.168.11.13:5672 backup check inter 5000 rise 2 fall 3 #备用节点

备注:rabbitmq集群节点配置 inter 每隔5秒对mq集群做健康检查,2次正确证明服务器可用,3次失败证明服务器不可用,并且配置主备机制

7.2-远程模式(不常用)

远程模式:远距离通信和复制,所谓Shovel就是我们可以把消息进行不同数据中心的复制工作,我们可以跨地域的让两个mq集群互联。我们下面看一下Shovel架构模型:

在使用了shovel插件后,模型变成了近端同步确认远端异步确认方式,大大提高了订单确认速度,并且还能保证可靠性。

  • 远程模式:Shovel集群的拓补图:

  • Shovel集群的配置,首先启动rabbitmq插件,命令如下:

rabbitmq-plugins enable amqp_client
rabbitmq-plugins enable rabbitmq_shovel
  • 创建rabbitmq.conf文件:touch /etc/rabbitmq/rabbitmq.config
  • 添加配置见rabbitmq.config
  • 最后我们需要资源服务器和目的服务器都使用相同的配置文件(rabbitmq.config)

7.3-镜像模式(常用)

  • 镜像模式:集群模式非常经典的就是Mirror镜像模式,保证100%数据不丢失,在实际工作中用的最多的。并且实现集群非常的简单,一般互联网大厂都会构建这种镜像集群模式。
  • Mirror镜像队列,目的是为了保证rabbitmq数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是2-3个实现数据同步(对于100%数据可靠性解决方案一般是3个节点)集群架构如下:

7.4-多活模式

  • 多活模式:这种模式也是实现异地数据复制的主流模式,因为Shovel模式配置比较复杂,所以一般来说实现异地集群都是使用双活或者多活模式来实现的。这种模式需要依赖rabbitmq的federation插件,可以实现继续的可靠AMQP数据通信,多活模式在实际配置与应用非常的简单。

  • RabbitMQ部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心中各部署一套RabbitMQ集群,各中心之间还需要实现部分队列消息共享。多活集群架构如下:

  • Federation插件是一个不需要构建Cluster,而在Brokers之间传输消息的高性能插件,Federation插件可以在Brokers或者Cluster之间传输消息,连接双方可以使用不同的users和vistual hosts,双方也可以使用版本不同的RabbitMQ和Erlang。Federation插件使用AMQP协议通信,可以接收不连续的传输。

  • Federation Exchanges,可以看成Downstream从Upstream主动拉取消息,但并不是拉取所有消息,必须是在Downstream上已经明确定义Bindings关系的Exchange,也就是有实际的物理Queue来接收消息,才会从Upstream拉取消息到Downstream。使用AMQP协议实施代理间通信,Downstream会将绑定关系组合在一起,绑定/解绑命令将会发送到Upstream交换机。因此,FederationExchange只接收具有订阅的消息。

7.5-RabbitMQ集群镜像模式从0到1

  • RabbitMQ集群环境节点说明

    !

  • 详细步骤:
    RabbitMQ镜像集群搭建步骤

    • HAProxy是一款提供高可用性、负载均衡以及基于TCP(第四层)和HTTP(第七层)应用的代理软件,支持虚拟主机,他是免费、快速并且可靠的一种解决方案。HAProxy特别适用于那些负载特大的web站点,这些站点通常又需要会话保持或七层处理。HAProxy运行在时下的硬件上,完全可以支撑数以万计的并发连接。并且它的运行模式使得它可以很简单安全的整合进您当前的架构中,同时可以保护你的web服务器不被暴露到网络上。
    • HAProxy借助于OS上几种常见的技术来实现性能的最大化:
    1. 单进程、时间驱动模型显著降低上下文切换的开销及内存占用
    2. 在任何可用的情况下,单缓冲(single buffering)机制能以不复制任何数据的方式完成读写操作,这会节约大量的CPU时钟周期及内存带宽
    3. 借助于Linux2.6上的splice()系统调用,HAProxy可以实现零复制转发(Zero-copy- forwarding),在linux3.5及以上的OS上还可以实现零复制启动(zero-starting)
  • KeepAlived软件主要是通过VRRP协议实现高可用功能的。VRRP是Virtual Router RedundancyProtocol(虚拟路由器冗余协议)的缩写,VRRP出现的目的就是为了解决静态路由单点故障问题的,它能保证党个别节点宕机时,整个网络可以不间断地运行,所以,KeepAlived一方面具有配置管理LVS的功能,同时还具备对LVS下面节点进行健康检查差的功能,另一方面可实现系统网络服务的高可用功能。

    KeepAlived服务的三个重要功能:

    1. 管理LVS负载均衡软件
    2. 实现LVS集群节点的健康检查
    3. 作为系统网络服务的高可用性(failover)
  • KeepAlived高可用原理
    KeepAlived高可用服务对之间的故障转移,是通过VRRP(Virtual Router Redundancy Protocol,虚拟路由器冗余协议)来实现的。在KeepAlived服务正常工作是,主Master节点会不断地向备节点发送(多播的方式)心跳消息,用以告诉备Backup节点自己还活着,当主master节点发生故障时,就无法发送心跳消息,备节点也就因此无法继续监测到来自主Master节点的心跳了,于是调用自身的接管程序,接管主Master节点 的IP资源及服务。当主Master节点恢复时,备Backup节点又会释放主节点故障时自身接管的IP资源和服务,恢复到原来的备用角色。

参考

——https://www.jianshu.com/p/588e1c959f03

——https://mp.weixin.qq.com/s/YDGWbdYgWy6ttUxjWYCIjw

——https://mp.weixin.qq.com/s/qGg3etLnI38i-G8aFbulWw