




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第SpringCloudStream高级特性使用详解目录重试消息发送失败的处理消费错误处理自定义MessageHandler类型Endpoint端点Metrics指标ServerlessPartition统一PollingConsumer支持多个Binder同时使用建立事件机制
重试
Consumer端可以配置重试次数,当消息消费失败的时候会进行重试。
底层使用SpringRetry去重试,重试次数可自定义配置。
#默认重试次数为3,配置大于1时才会生效
spring.cloud.stream.bindings.channelName.consumer.maxAttempte=3
消息发送失败的处理
Producer发送消息出错的情况下,可以配置错误处理,将错误信息发送给对应ID的MessageChannel
消息发送失败的场景下,会将消息发送到一个MessageChannel。这个MessageChannel会取ApplicationContext中name为topic.errors(topic就是配置的destination)的Bean。如果找不到就会自动构建一个PublishSubscribeChannel。然后使用BridgeHandler订阅这个MessageChannel,同时再设置ApplicationContext中name为errorChannel的PublishSubscribeChannel消息通道为BridgeHandler的outputChannel。
publicstaticfinalStringERROR_CHANNEL_BEAN_NAME="errorChannel"
privateSubscribableChannelregisterErrorInfrastructure(
ProducerDestinationdestination){
//destination.getName()+".errors"
StringerrorChannelName=errorsBaseName(destination);
SubscribableChannelerrorChannel;
if(getApplicationContext().containsBean(errorChannelName)){
ObjecterrorChannelObject=getApplicationContext().getBean(errorChannelName);
if(!(errorChannelObjectinstanceofSubscribableChannel)){
thrownewIllegalStateException("Errorchannel'"+errorChannelName
+"'mustbeaSubscribableChannel");
errorChannel=(SubscribableChannel)errorChannelObject;
else{
errorChannel=newPublishSubscribeChannel();
((GenericApplicationContext)getApplicationContext()).registerBean(
errorChannelName,SubscribableChannel.class,()-errorChannel);
MessageChanneldefaultErrorChannel=null;
if(getApplicationContext()
.containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)){
defaultErrorChannel=getApplicationContext().getBean(
IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
MessageChannel.class);
if(defaultErrorChannel!=null){
BridgeHandlererrorBridge=newBridgeHandler();
errorBridge.setOutputChannel(defaultErrorChannel);
errorChannel.subscribe(errorBridge);
StringerrorBridgeHandlerName=getErrorBridgeName(destination);
((GenericApplicationContext)getApplicationContext()).registerBean(
errorBridgeHandlerName,BridgeHandler.class,()-errorBridge);
returnerrorChannel;
spring.cloud.stream.bindings.output.destination=test-output
#消息发送失败的处理逻辑默认是关闭的
ducer.errorChannelEnabled=true
@Bean("test-output.errors")
MessageChanneltestOutputErrorChannel(){
returnnewPublishSubscribeChannel();
@Service
classErrorProduceService{
@ServiceActivator(inputChannel="test-output.errors")
publicvoidreceiveProduceError(MessagereceiveMsg){
System.out.println("receiveerrormsg:"+receiveMsg);
消费错误处理
Consumer消费消息出错的情况下,可以配置错误处理,将错误信息发给对应ID的MessageChannel
消息错误处理与生产错误处理大致相同。错误的MessageChannel对应的name为topic.group.errors,还会加上多个MessageHandler订阅的一些判断,使用ErrorMessageStrategy创建错误消息等内容。
spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT)
publicvoidreceive(StringreceiveMsg){
thrownewRuntimeException("Oops");
@ServiceActivator(inputChannel="test-input.test-input-group.errors")
publicvoidreceiveConsumeError(MessagereceiveMsg){
System.out.println("receiveerrormsg:"+receiveMsg);
建议直接使用topic.group.errors这个消息通道,并设置发送到单播模式的DirectChannel消息通道中(使用@ServiceActivator注解接收会直接构成DirectChannel),这样会确保只会被唯一的一个订阅了topic.group.errors的MessageHandler处理,否则可能会被多个MessageHandler处理,导致出现一些意想不到的结果。
自定义MessageHandler类型
默认情况下,OutputBinding对应的MessageChannel和InputBinding对应的SubscribeChannel会被构造成DirectChannel。
SCS提供了BindingTargetFactory接口进行扩展,比如可以扩展构造PublishSubscribeChannel这种广播类型的MessageChannel。
BindingTargetFactory接口只有两个实现类
SubscribableChannelBindingTargetFactory:针对InputBinding和OutputBinding都会构造成DirectWithAttributesChannel类型的MessageChannel(一种带有HashMap属性的DirectChannel)。MessageSourceBindingTargetFactory:不支持OutputBinding,InputBinding会构造成DefaultPollableMessageSource。DefaultPollableMessageSource内部维护着MessageSource属性,该属性用于拉取消息。
Endpoint端点
SCS提供了BindingsEndpoint,可以获取Binding信息或对Binding生命周期进行修改,比如start、stop、pause或resume。
BindingsEndpoint的ID是bindings,对外暴露了一下3个操作:
修改Binding状态,可以改成STARTED、STOPPED、PAUSED和RESUMED,对应Binding接口的4个操作。查询单个Binding的状态信息。查询所有Binding的状态信息。
@Endpoint(id="bindings")
publicclassBindingsEndpoint{
@WriteOperation
publicvoidchangeState(@SelectorStringname,Statestate){
Bindingbinding=BindingsEndpoint.this.locateBinding(name);
if(binding!=null){
switch(state){
caseSTARTED:
binding.start();
break;
caseSTOPPED:
binding.stop();
break;
casePAUSED:
binding.pause();
break;
caseRESUMED:
binding.resume();
break;
default:
break;
@ReadOperation
publicListqueryStates(){
ListBindingbindings=newArrayList(gatherInputBindings());
bindings.addAll(gatherOutputBindings());
returnthis.objectMapper.convertValue(bindings,List.class);
@ReadOperation
publicBindingqueryState(@SelectorStringname){
Assert.notNull(name,"'name'mustnotbenull");
returnthis.locateBinding(name);
Metrics指标
该功能自动与micrometer集成进行Metrics统计,可以通过前缀spring.cloud.stream.metrics进行相关配置,配置项spring.cloud.stream.bindings.applicationMetrics.destination会构造MetersPublisherBinding,将相关的metrics发送到MQ中。
Serverless
默认与SpringCloudFunction集成。
可以使用Function处理消息。配置文件需要加上function配置。
spring.cloud.stream.function.definition=uppercase|addprefix
@Bean
publicFunctionString,Stringuppercase(){
returnx-x.toUpperCase();
@Bean
publicFunctionString,Stringaddprefix(){
returnx-"prefix-"+x;
Partition统一
SCS统一Partition相关的设置,可以屏蔽不同MQPartition的设置。
ProducerBinding提供的ProducerProperties提供了一些Partition相关的配置:
partitionKeyExpression:partitionkey提取表达式。partitionKeyExtractorName:是一个实现PartitionKeyExtractorStrategy接口的Beanname。PartitionKeyExtractorStrategy是一个根据Message获取partitionkey的接口。如果两者都配置,优先级高于partitionKeyExtractorName。partitionSelectorName:是一个实现PartitionSelectorStrategy接口的Beanname。PartitionSelectorStrategy是一个根据partitionkey决定选择哪个partition的接口。partitionSelectorExpression:partition选择表达式,会根据表达式和partitionkey得到最终的partition。如果两者都配置,优先partitionSelectorExpression表达式解析partition。partitionCount:partition个数。该属性不一定会生效,KafkaBinder和RocketMQBinder会使用topic上的partition个数覆盖该属性。
publicfinalclassPartitioningInterceptorimplementsChannelInterceptor{
@Override
publicMessagepreSend(Messagemessage,MessageChannelchannel){
if(!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)){
intpartition=this.partitionHandler.determinePartition(message);
returnMessageConverterConfigurer.this.messageBuilderFactory
.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER,partition).build();
else{
returnMessageConverterConfigurer.this.messageBuilderFactory
.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER,
message.getHeaders()
.get(BinderHeaders.PARTITION_OVERRIDE))
.removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();
publicclassPartitionHandler{
publicintdeterminePartition(Messagemessage){
Objectkey=extractKey(message);
intpartition;
if(ducerProperties.getPartitionSelectorExpression()!=null){
partition=ducerProperties.getPartitionSelectorExpression()
.getValue(this.evaluationContext,key,Integer.class);
else{
partition=this.partitionSelectorStrategy.selectPartition(key,
this.partitionCount);
//protectionincaseauserselectorreturnsanegative.
returnMath.abs(partition%this.partitionCount);
privateObjectextractKey(Messagemessage){
Objectkey=invokeKeyExtractor(message);
if(key==nullducerProperties.getPartitionKeyExpression()!=null){
key=ducerProperties.getPartitionKeyExpression()
.getValue(this.evaluationContext,message);
Assert.notNull(key,"Partitionkeycannotbenull");
returnkey;
PollingConsumer
实现MessageSource进行polling操作的Consumer。
普通的Pub/Sub模式需要定义Subs
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 专升本生理学练习题(附答案)
- 高级企业人力资源管理师三级测试题及参考答案
- 2025药店供货合同协议书范本
- 2025玉米买卖合同范本
- 公司职工保密协议
- 营销合作协议及补充条款
- 商业房产租赁与买卖协议
- 英语乙卷试题及答案讲解
- 纺织检测规范化运用试题及答案
- 风格融合与设计创新2024年国际商业美术设计师考试试题及答案
- (四调)武汉市2025届高中毕业生四月调研考试 物理试卷(含答案)
- 2025年济南市中区九年级中考数学一模考试试题(含答案)
- 大模型原理与技术-课件 chap6 大模型微调
- 数学建模与系统仿真智慧树知到期末考试答案2024年
- 天津科技大学工程硕士学位论文答辩评议书及表决票
- 寝室文化节优秀寝室宿舍展示PPT模板
- 跌倒的预防及护理预防跌倒的步骤通用课程PPT课件
- 冷却塔使用说明书
- 丽声北极星分级绘本第三级上 The New Teacher 教学设计
- 配电柜安装规则GGD
- 混凝土含气量试验记录表(气压法)
评论
0/150
提交评论