当前位置: 首页 > news >正文

ftp发布asp.net网站/品牌推广策划方案案例

ftp发布asp.net网站,品牌推广策划方案案例,新能源汽车车型,揭阳cms建站模板文章目录 模式介绍建立连接单独确认代码实现逻辑运行结果 批量确认代码实现逻辑运行结果 异步确认实现逻辑介绍代码实现逻辑运行结果 三种策略对比以及完整代码 模式介绍 作为消息中间件,都会面临消息丢失的问题,消息丢失大概分为三种情况: …

文章目录

    • 模式介绍
    • 建立连接
    • 单独确认
      • 代码实现逻辑
      • 运行结果
    • 批量确认
      • 代码实现逻辑
      • 运行结果
    • 异步确认
      • 实现逻辑介绍
      • 代码实现逻辑
      • 运行结果
    • 三种策略对比以及完整代码

模式介绍

作为消息中间件,都会面临消息丢失的问题,消息丢失大概分为三种情况:

  1. 生产者问题:因为应用程序故障,网络抖动等各种原因,生产者没有成功向 broker 发送消息
  2. 消息中间件自身问题:生产者成功发送给了 Broker,但是 Broker 没有把消息保存好,导致消息丢失
  3. 消费者问题:Broker 发送消息到消费者,消费者在消费消息时,因为没有处理好,导致 broker 将消费失败的消息从列表中删除了

image.png

  • RabbitMQ 也对上述问题给出了相应的解决方案。
    • 问题二可以通过持久化机制
    • 问题三可以采用消息应答机制
    • 问题一可以采用发布确认机制

发布确认属于 RabbitMQ 的七大工作模式之一

生产者将信道设置成 confirm(确认)模式

  • 一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始)
    • 同一个 channel 下,序号不可能重复
  • 一旦消息被投递到所有匹配的对类之后,brocker 就会发送一个确认(ACK)给生产者(包含消息的唯一 ID
    • 这就使得生产者知道消息已经正确到达目的的队列了
  • 如果消息和对类是可持久化的,那么确认消息会在将消息写入磁盘之后发出
    • broker 回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号
    • 此外 broker 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理
      image.png

发送确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息

  1. 当消息最终得到确认之后,生产者可以通过回调方法来处理该确认消息
  2. 如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack) 命令,生产者同样可以在回调方法中处理该 nack 命令

使用发布确认机制,必须要信道设置成 confirm (确认) 模式

  • 发布确认是 AMQP 0.9.1 协议的扩展,默认情况下它不会被启用
  • 生产者通过 channel.confirmSelect() 将信道设置为 confirm 模式
Channel channel = connection.createChannel();
channel.confirmSelect();

发布确认有三种策略,接下来我们来介绍这三种策略

ProducerBrockerConsumer 都有可能丢失消息

  • 发布确认是来解决生产者 Producer 消息丢失的问题
  • 生产者可以在发送消息的同时,等待返回确认消息

建立连接

因为每一个策略都需要重复建立链接这一步骤,所以我们将其提出来,单独作为一个方法,需要的时候直接调用即可

  • 之后就不用重复写这一部分代码了
  • 在类里面,main 方法外面,使用一个静态方法
public class PublisherConfirms {  static Connection createConnection() throws Exception {  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  return connectionFactory.newConnection();  }  public static void main(String[] args) {  }  }

单独确认

Publishing Messages Individually

代码实现逻辑

    /**  * 单独确认  */  private static void publishingMessagesIndividually() throws Exception {  // 1. 创建连接  // 我们将连接的建立写在 try 里面,这样就不用再去关闭了  try(Connection connection = createConnection()) {  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirms 模式  channel.confirmSelect();  // 4. 声明队列(交换机就使用内置的,就不再声明了)  // 队列对象、是否持久化、是否独占、是否自动删除、传递参数  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false,null);  // 5. 发送消息,并等待确认  // 这里我们需要可以再创建一个 MESSAGE_COUNT 全局变量,来指定消息的数量  long start = System.currentTimeMillis();  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  // 信道的发送  // 交换机的名称(我们使用的是内置交换机,也就是空的)、routingKey、  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());  // 等待确认(等待确认消息,只要消息被确认,这个方法就会被返回)  // 有 waitForConfirms() 和 waitForConfirmsOrDie() 随便用哪个  // 如果超时过期,则抛出 TimeoutException。如果任何消息被 nack(丢失),waitForConfirmsOrDie 则抛出 Exception  channel.waitForConfirmsOrDie(5000);  }  long end = System.currentTimeMillis();  // 这里注意是 printf            System.out.printf("单独确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);  }  }

运行结果

单独确认==>消息条数: 200, 耗时: 5793 ms 
  • 可以发现,耗时较长

观察上面代码,会发现这种策略是每发送一条消息后就调用 channel.waitForConfirmsOrDie() 方法, 之后等待服务器的确认

  • 这其实是一种串行同步等待的方式
  • 尤其对于持久化的消息来说,需要等待消息确认存储在硬盘之后才会返回 (调用 Linux 内核中的 fsync 方法)

但是消息确认机制是支持异步的,可以一边发送消息,一边等待消息确认。由此进行了改进,我们看另外两种策略

  • Publishing Messages in Batches(批量确认):每发送一批消息之后,调用 channel.waitForConfirms 方法,等待服务器的确认返回
  • Handling Publisher Confirms Asynchronously(异步确认):提供一个回调方法,服务端确认了一条或者多条消息后,客户端会对这个方法进行处理

批量确认

Publishing Messages in Batches

代码实现逻辑

/**  * 批量确认  */  
private static void publishingMessagesInBatches() throws Exception {  // 1. 建立连接  try(Connection connection = createConnection()){  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirm 模式  channel.confirmSelect();  // 4. 声明队列  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);  // 5. 发送消息,并进行确认  // 设置批量处理的大小和计数器  long start = System.currentTimeMillis();  int batchSize = 100;  int outstandingMessageCount = 0;  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());  outstandingMessageCount++;  // 当计数器的大小达到了设置的批量处理大小,就进行确认  if(outstandingMessageCount == batchSize) {  channel.waitForConfirmsOrDie(5000);  // 消息确认后,计数器要清零  outstandingMessageCount = 0;  }  // 当计数器大小 < 100 的时候,由于没有达到批量发送的标准,所以单独再进行发送  if (outstandingMessageCount > 0) {  channel.waitForConfirmsOrDie(5000);  }  }  long end = System.currentTimeMillis();  System.out.printf("批量确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start);  }  
}

运行结果

批量确认==>消息条数: 200, 耗时: 128 ms 

异步确认

Handling Publisher Confirms Asynchronously

实现逻辑介绍

异步 confirm 方法的编程实现最为复杂

  • Channel 接口提供了一个方法 addConfirmListener()
  • 这个方法可以添加 ConfirmListener 回调接口

ConfirmListener 接口中包含两个方法:

  • handleAck(long deliveryTag, boolean multiple),处理 RabbitMQ 发送给生产者的 ack
    • deliveryTag 表示发送消息的序号
    • multiple 表示是否批量确认
  • handleNack(long deliveryTag, boolean multiple),处理 RabbitMQ 发送给生产者的 nack
    image.png|413

我们需要为每一个 Channel 维护一个已发送消息的序号集合

  • 当收到 RabbitMQconfirm 回调时,从集合中删除对应的消息
  • Channel 开启 confirm 模式后,channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号
  • 我们可以使用 SortedSet 的有序性来维护这个已发消息的集合
    1. 当收到 ack 时,从序列中删除该消息的序号。如果为批量确认消息,表示小于当前序号 deliveryTag 的消息都收到了,则清楚对应集合
    2. 当收到 nack 时,处理逻辑类似,不过需要结合具体业务情况,进行消息重发等操作

代码实现逻辑

/**  * 异步确认  */  
private static void handlingPublisherConfirmsAsynchronously() throws Exception {  // 1. 建立连接  try(Connection connection = createConnection()) {  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirm 模式  channel.confirmSelect();  // 4. 声明队列  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);  // 5. 监听 confirm        long start = System.currentTimeMillis();  // 创建一个集合,用来存放未确认的消息(的id)  SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());  channel.addConfirmListener(new ConfirmListener() {  @Override  public void handleAck(long deliveryTag, boolean multiple) throws IOException {  // 如果是批量确认,就要将集合中 <= deliveryTag 的 id 都给清除掉  if(multiple) {  // headSet(n)方法返回当前集合中小于 n 的集合  // 先获取到这部分 id,然后一起 clear 清除掉即可  confirmSeqNo.headSet(deliveryTag + 1).clear();  }else {  // 单独确认,只需要移除当前这个 id 即可  confirmSeqNo.remove(deliveryTag);  }  }  @Override  public void handleNack(long deliveryTag, boolean multiple) throws IOException {  // 和 ack 处理模式基本是相似的,只是多了一步重发处理  if(multiple) {  confirmSeqNo.headSet(deliveryTag + 1).clear();  }else {  confirmSeqNo.remove(deliveryTag);  }  // 业务需要根据实际场景进行处理,比如重发,此处代码省略  }  });  // 6. 发送消息  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  long seqNo = channel.getNextPublishSeqNo(); // 拿到消息的序号  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());  confirmSeqNo.add(seqNo); // 将消息的序号加入集合中  }  // 确认消息已处理完  while (!confirmSeqNo.isEmpty()) {  // 没有处理完,就休眠一段时间后再确认一下,看是否处理完  Thread.sleep(10);  }  long end = System.currentTimeMillis();  System.out.printf("异步确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start);  }  
}

运行结果

单独确认==>消息条数: 200, 耗时: 93 ms 

三种策略对比以及完整代码

package rabbitmq.publisher.confirms;  import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.ConfirmListener;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.Collections;  
import java.util.SortedSet;  
import java.util.TreeSet;  public class PublisherConfirms {  private static final Integer MESSAGE_COUNT = 200;  static Connection createConnection() throws Exception {  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  return connectionFactory.newConnection();  }  public static void main(String[] args) throws Exception {  // Strategy #1: Publishing Messages Individually  // 单独确认  publishingMessagesIndividually();  // Strategy #2: Publishing Messages in Batches  // 批量确认  publishingMessagesInBatches();  // Strategy #3: Handling Publisher Confirms Asynchronously  // 异步确认  handlingPublisherConfirmsAsynchronously();  }  /**  * 单独确认  */  private static void publishingMessagesIndividually() throws Exception {  // 1. 创建连接  // 我们将连接的建立写在 try 里面,这样就不用再去关闭了  try(Connection connection = createConnection()) {  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirms 模式  channel.confirmSelect();  // 4. 声明队列(交换机就使用内置的,就不再声明了)  // 队列对象、是否持久化、是否独占、是否自动删除、传递参数  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false,null);  // 5. 发送消息,并等待确认  // 这里我们需要可以再创建一个 MESSAGE_COUNT 全局变量,来指定消息的数量  long start = System.currentTimeMillis();  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  // 信道的发送  // 交换机的名称(我们使用的是内置交换机,也就是空的)、routingKey、  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());  // 等待确认(等待确认消息,只要消息被确认,这个方法就会被返回)  // 有 waitForConfirms() 和 waitForConfirmsOrDie() 随便用哪个  // 如果超时过期,则抛出 TimeoutException。如果任何消息被 nack(丢失),waitForConfirmsOrDie 则抛出 Exception                channel.waitForConfirmsOrDie(5000);  }  long end = System.currentTimeMillis();  // 这里注意是 printf            System.out.printf("单独确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);  }  }  /**  * 批量确认  */  private static void publishingMessagesInBatches() throws Exception {  // 1. 建立连接  try(Connection connection = createConnection()){  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirm 模式  channel.confirmSelect();  // 4. 声明队列  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);  // 5. 发送消息,并进行确认  // 设置批量处理的大小和计数器  long start = System.currentTimeMillis();  int batchSize = 100;  int outstandingMessageCount = 0;  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());  outstandingMessageCount++;  // 当计数器的大小达到了设置的批量处理大小,就进行确认  if(outstandingMessageCount == batchSize) {  channel.waitForConfirmsOrDie(5000);  // 消息确认后,计数器要清零  outstandingMessageCount = 0;  }  // 当计数器大小 < 100 的时候,由于没有达到批量发送的标准,所以单独再进行发送  if (outstandingMessageCount > 0) {  channel.waitForConfirmsOrDie(5000);  }  }  long end = System.currentTimeMillis();  System.out.printf("批量确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);  }  }  /**  * 异步确认  */  private static void handlingPublisherConfirmsAsynchronously() throws Exception {  // 1. 建立连接  try(Connection connection = createConnection()) {  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirm 模式  channel.confirmSelect();  // 4. 声明队列  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);  // 5. 监听 confirm            long start = System.currentTimeMillis();  // 创建一个集合,用来存放未确认的消息(的id)  SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());  channel.addConfirmListener(new ConfirmListener() {  @Override  public void handleAck(long deliveryTag, boolean multiple) throws IOException {  // 如果是批量确认,就要将集合中 <= deliveryTag 的 id 都给清除掉  if(multiple) {  // headSet(n)方法返回当前集合中小于 n 的集合  // 先获取到这部分 id,然后一起 clear 清除掉即可  confirmSeqNo.headSet(deliveryTag + 1).clear();  }else {  // 单独确认,只需要移除当前这个 id 即可  confirmSeqNo.remove(deliveryTag);  }  }  @Override  public void handleNack(long deliveryTag, boolean multiple) throws IOException {  // 和 ack 处理模式基本是相似的,只是多了一步重发处理  if(multiple) {  confirmSeqNo.headSet(deliveryTag + 1).clear();  }else {  confirmSeqNo.remove(deliveryTag);  }  // 业务需要根据实际场景进行处理,比如重发,此处代码省略  }  });  // 6. 发送消息  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  long seqNo = channel.getNextPublishSeqNo(); // 拿到消息的序号  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());  confirmSeqNo.add(seqNo); // 将消息的序号加入集合中  }  // 确认消息已处理完  while (!confirmSeqNo.isEmpty()) {  // 没有处理完,就休眠一段时间后再确认一下,看是否处理完  Thread.sleep(10);  }  long end = System.currentTimeMillis();  System.out.printf("异步确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start);  }  }  }
  • 消息条数越多,异步确认的优势越明显
http://www.whsansanxincailiao.cn/news/30284832.html

相关文章:

  • 智联招聘网站怎么做微招聘信息吗/湖南网站定制
  • 网站制作计算机/站内推广的方法和工具
  • 哪个cms可以做交友网站/阿里指数查询
  • 苏州市相城区建设局网站/seo研究协会网
  • 网站活动模板/关键词查询工具
  • 移动网站建设生要女/关键词排名网站
  • 长沙 外贸网站建设公司价格/腾讯会议价格
  • 网站建设-设计/百度推广后台登录入口
  • 宁德市路桥建设有限公司网站/优化游戏卡顿的软件
  • 网站开发 报刊/嘉兴seo计费管理
  • 临沂网站建设哪家专业/香港服务器
  • 网页设计师认证/关键词优化软件
  • 学校做网站需要什么/2023知名品牌营销案例100例
  • 网站推广营销公司/巨量算数数据分析入口
  • 网站正常打开速度/营销型网站建设团队
  • 政务公开和网站建设工作的建议/查询网站信息
  • 优质聊城做网站公司/网络推广公司怎么找客户
  • Gzip 网站 能够压缩图片吗/奶茶的营销推广软文
  • 杭州富阳网站建设/网站友情链接是什么
  • 网站建设与规划试卷/百度搜索次数统计
  • 在网站添加邮箱/谷歌seo排名优化服务
  • java网站开发 csdn/百度指数查询官网入口登录
  • 昆山哪里有做网站的/今日重大新闻头条财经
  • 局域网怎么建立/谷歌推广seo
  • 重庆黄埔建设集团网站/百度seo排名优化是什么
  • 吾爱网站/夸克搜索引擎
  • 做外贸都用什么网站/百度查看订单
  • 提供电子商务网站建设外包服务的企业/网页搜索关键词
  • 如何做动态网站/湖南靠谱seo优化
  • 教育网络系统管理/博客程序seo