您好,欢迎来到化拓教育网。
搜索
您的当前位置:首页Stream实现异常重试

Stream实现异常重试

来源:化拓教育网

consumer抛出异常。

设置重试次数。

重试成功和失败的表现。

线程安全变量AtomicInteger,起名叫计数器。

消费者第一次消费消息也是算次数的。所以设置为2,在消费的时候出现异常的时候,会再消费一次。

什么叫单机版,意思就是消费者消费消息的时候,若出现异常,不会把消息发回mq,而是出现异常的那一台机器再去重试。

如果消费者出现异常,在重试次数范围内,如果这条消息被正常消费,是不会抛出异常的,相反,如果在重复次数中结束,这条消息还是没有被消费,那么它是会抛出异常的。

计数器会清零。

2代表重试一次。

re-queue

操作:将消费失败的消息重新存入队列。

单机:消费失败的消息不入队。

联机:消费失败的消息重新入队。让消费者的集群重新拉取消息。

前面配置过retry,开启re-queue会和retry冲突。

配置了retry后,requeue是不会生效的。

消息重新入队之后,可以在整个的consumer集群中,由任意一台服务器进行消费。

没提到rabbitmq有集群,都是生产者-消费者的集群用一个queue。这意味着,消息重新入队,consumer都会消费这条消息(广播消息)

把当前消费者线程挂起几秒后,再抛出异常。

如果不挂起几秒,consumer会一直往rabbitmq中插消息。

consumer收到消息后停顿3秒钟再抛出异常。

仅仅对当前consumer开启重新入队。

如果想在全局范围内开启重新入队呢?

用spring的rabbitmq配置,当前项目的所有consumer在throw处异常后,便会重新入队。

retry是指消费者接收到消息之后,若发生异常,消费逻辑的执行次数。也就是max-attempt。

re-queus是指消费逻辑出现异常后,消息重新入队。

这两项在配置文件中的配置会冲突。

若消费逻辑throw exception后,消费逻辑再次执行,会直接导致该消息无法重新入队,故,要把重试次数设置为1,也就是该消息只消费一次,若出现异常,则该消息重新入队。

消息重新入队后是为了让其它消费者消费消息么?当前消费者不能再次消费这条消息么?当然不能了,就算你再次消费这条消息,还是会抛出异常,从而导致消息重新入队,那当前消费者再次消费这条消息有什么意义呢?不管有没有,若当前消费者无法消费这条消息,折返后的消息被其它消费者消费,这个逻辑是要实现的。

多个节点,联机,多个节点共用一个group,那么消息只会被其中一个消费者消费。验证re-queue的消息是否会被其它消费者消费。

a节点:在22s的时候接收到消息,25s的时候抛出一个异常。在28s的时候,接收到消息31s之后抛出异常,那25到18之间的3s中在干啥呢?在这三s内,b节点接收到了重新入队的消息,在28s的的时候抛出异常并将消息requeue,故a节点在28s接收到消息。之前也学过,一个组内的消息是按照顺序轮着消费消息的。也验证了我的一个想法,被重新入队的消息,是可以被执行过该条消息的消费者再次消费的。

两个配置属性,一个是配置当前消费者消息requeue,另外一个配置所有消费者的消息requeue。

还有就是解决retry和requeue的冲突,把retry设置为1.

一条会在任意消费者那里都会抛出异常的消息,会不断重新入队,重新消费,再重新入队。那就用死信对列来解决吧。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- huatuo9.cn 版权所有 赣ICP备2023008801号-1

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务