SpringCloudStream高级特性使用详解_第1页
SpringCloudStream高级特性使用详解_第2页
SpringCloudStream高级特性使用详解_第3页
SpringCloudStream高级特性使用详解_第4页
SpringCloudStream高级特性使用详解_第5页
已阅读5页,还剩5页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第SpringCloudStream高级特性使用详解目录重试消息发送失败的处理消费错误处理自定义MessageHandler类型Endpoint端点Metrics指标ServerlessPartition统一PollingConsumer支持多个Binder同时使用建立事件机制

重试

Consumer端可以配置重试次数,当消息消费失败的时候会进行重试。

底层使用SpringRetry去重试,重试次数可自定义配置。

#默认重试次数为3,配置大于1时才会生效

spring.cloud.stream.bindings.channelName.consumer.maxAttempte=3

消息发送失败的处理

Producer发送消息出错的情况下,可以配置错误处理,将错误信息发送给对应ID的MessageChannel

消息发送失败的场景下,会将消息发送到一个MessageChannel。这个MessageChannel会取ApplicationContext中name为topic.errors(topic就是配置的destination)的Bean。如果找不到就会自动构建一个PublishSubscribeChannel。然后使用BridgeHandler订阅这个MessageChannel,同时再设置ApplicationContext中name为errorChannel的PublishSubscribeChannel消息通道为BridgeHandler的outputChannel。

publicstaticfinalStringERROR_CHANNEL_BEAN_NAME="errorChannel"

privateSubscribableChannelregisterErrorInfrastructure(

ProducerDestinationdestination){

//destination.getName()+".errors"

StringerrorChannelName=errorsBaseName(destination);

SubscribableChannelerrorChannel;

if(getApplicationContext().containsBean(errorChannelName)){

ObjecterrorChannelObject=getApplicationContext().getBean(errorChannelName);

if(!(errorChannelObjectinstanceofSubscribableChannel)){

thrownewIllegalStateException("Errorchannel'"+errorChannelName

+"'mustbeaSubscribableChannel");

errorChannel=(SubscribableChannel)errorChannelObject;

else{

errorChannel=newPublishSubscribeChannel();

((GenericApplicationContext)getApplicationContext()).registerBean(

errorChannelName,SubscribableChannel.class,()-errorChannel);

MessageChanneldefaultErrorChannel=null;

if(getApplicationContext()

.containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)){

defaultErrorChannel=getApplicationContext().getBean(

IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,

MessageChannel.class);

if(defaultErrorChannel!=null){

BridgeHandlererrorBridge=newBridgeHandler();

errorBridge.setOutputChannel(defaultErrorChannel);

errorChannel.subscribe(errorBridge);

StringerrorBridgeHandlerName=getErrorBridgeName(destination);

((GenericApplicationContext)getApplicationContext()).registerBean(

errorBridgeHandlerName,BridgeHandler.class,()-errorBridge);

returnerrorChannel;

spring.cloud.stream.bindings.output.destination=test-output

#消息发送失败的处理逻辑默认是关闭的

ducer.errorChannelEnabled=true

@Bean("test-output.errors")

MessageChanneltestOutputErrorChannel(){

returnnewPublishSubscribeChannel();

@Service

classErrorProduceService{

@ServiceActivator(inputChannel="test-output.errors")

publicvoidreceiveProduceError(MessagereceiveMsg){

System.out.println("receiveerrormsg:"+receiveMsg);

消费错误处理

Consumer消费消息出错的情况下,可以配置错误处理,将错误信息发给对应ID的MessageChannel

消息错误处理与生产错误处理大致相同。错误的MessageChannel对应的name为topic.group.errors,还会加上多个MessageHandler订阅的一些判断,使用ErrorMessageStrategy创建错误消息等内容。

spring.cloud.stream.bindings.input.destination=test-input

spring.cloud.stream.bindings.input.group=test-input-group

@StreamListener(Sink.INPUT)

publicvoidreceive(StringreceiveMsg){

thrownewRuntimeException("Oops");

@ServiceActivator(inputChannel="test-input.test-input-group.errors")

publicvoidreceiveConsumeError(MessagereceiveMsg){

System.out.println("receiveerrormsg:"+receiveMsg);

建议直接使用topic.group.errors这个消息通道,并设置发送到单播模式的DirectChannel消息通道中(使用@ServiceActivator注解接收会直接构成DirectChannel),这样会确保只会被唯一的一个订阅了topic.group.errors的MessageHandler处理,否则可能会被多个MessageHandler处理,导致出现一些意想不到的结果。

自定义MessageHandler类型

默认情况下,OutputBinding对应的MessageChannel和InputBinding对应的SubscribeChannel会被构造成DirectChannel。

SCS提供了BindingTargetFactory接口进行扩展,比如可以扩展构造PublishSubscribeChannel这种广播类型的MessageChannel。

BindingTargetFactory接口只有两个实现类

SubscribableChannelBindingTargetFactory:针对InputBinding和OutputBinding都会构造成DirectWithAttributesChannel类型的MessageChannel(一种带有HashMap属性的DirectChannel)。MessageSourceBindingTargetFactory:不支持OutputBinding,InputBinding会构造成DefaultPollableMessageSource。DefaultPollableMessageSource内部维护着MessageSource属性,该属性用于拉取消息。

Endpoint端点

SCS提供了BindingsEndpoint,可以获取Binding信息或对Binding生命周期进行修改,比如start、stop、pause或resume。

BindingsEndpoint的ID是bindings,对外暴露了一下3个操作:

修改Binding状态,可以改成STARTED、STOPPED、PAUSED和RESUMED,对应Binding接口的4个操作。查询单个Binding的状态信息。查询所有Binding的状态信息。

@Endpoint(id="bindings")

publicclassBindingsEndpoint{

@WriteOperation

publicvoidchangeState(@SelectorStringname,Statestate){

Bindingbinding=BindingsEndpoint.this.locateBinding(name);

if(binding!=null){

switch(state){

caseSTARTED:

binding.start();

break;

caseSTOPPED:

binding.stop();

break;

casePAUSED:

binding.pause();

break;

caseRESUMED:

binding.resume();

break;

default:

break;

@ReadOperation

publicListqueryStates(){

ListBindingbindings=newArrayList(gatherInputBindings());

bindings.addAll(gatherOutputBindings());

returnthis.objectMapper.convertValue(bindings,List.class);

@ReadOperation

publicBindingqueryState(@SelectorStringname){

Assert.notNull(name,"'name'mustnotbenull");

returnthis.locateBinding(name);

Metrics指标

该功能自动与micrometer集成进行Metrics统计,可以通过前缀spring.cloud.stream.metrics进行相关配置,配置项spring.cloud.stream.bindings.applicationMetrics.destination会构造MetersPublisherBinding,将相关的metrics发送到MQ中。

Serverless

默认与SpringCloudFunction集成。

可以使用Function处理消息。配置文件需要加上function配置。

spring.cloud.stream.function.definition=uppercase|addprefix

@Bean

publicFunctionString,Stringuppercase(){

returnx-x.toUpperCase();

@Bean

publicFunctionString,Stringaddprefix(){

returnx-"prefix-"+x;

Partition统一

SCS统一Partition相关的设置,可以屏蔽不同MQPartition的设置。

ProducerBinding提供的ProducerProperties提供了一些Partition相关的配置:

partitionKeyExpression:partitionkey提取表达式。partitionKeyExtractorName:是一个实现PartitionKeyExtractorStrategy接口的Beanname。PartitionKeyExtractorStrategy是一个根据Message获取partitionkey的接口。如果两者都配置,优先级高于partitionKeyExtractorName。partitionSelectorName:是一个实现PartitionSelectorStrategy接口的Beanname。PartitionSelectorStrategy是一个根据partitionkey决定选择哪个partition的接口。partitionSelectorExpression:partition选择表达式,会根据表达式和partitionkey得到最终的partition。如果两者都配置,优先partitionSelectorExpression表达式解析partition。partitionCount:partition个数。该属性不一定会生效,KafkaBinder和RocketMQBinder会使用topic上的partition个数覆盖该属性。

publicfinalclassPartitioningInterceptorimplementsChannelInterceptor{

@Override

publicMessagepreSend(Messagemessage,MessageChannelchannel){

if(!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)){

intpartition=this.partitionHandler.determinePartition(message);

returnMessageConverterConfigurer.this.messageBuilderFactory

.fromMessage(message)

.setHeader(BinderHeaders.PARTITION_HEADER,partition).build();

else{

returnMessageConverterConfigurer.this.messageBuilderFactory

.fromMessage(message)

.setHeader(BinderHeaders.PARTITION_HEADER,

message.getHeaders()

.get(BinderHeaders.PARTITION_OVERRIDE))

.removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();

publicclassPartitionHandler{

publicintdeterminePartition(Messagemessage){

Objectkey=extractKey(message);

intpartition;

if(ducerProperties.getPartitionSelectorExpression()!=null){

partition=ducerProperties.getPartitionSelectorExpression()

.getValue(this.evaluationContext,key,Integer.class);

else{

partition=this.partitionSelectorStrategy.selectPartition(key,

this.partitionCount);

//protectionincaseauserselectorreturnsanegative.

returnMath.abs(partition%this.partitionCount);

privateObjectextractKey(Messagemessage){

Objectkey=invokeKeyExtractor(message);

if(key==nullducerProperties.getPartitionKeyExpression()!=null){

key=ducerProperties.getPartitionKeyExpression()

.getValue(this.evaluationContext,message);

Assert.notNull(key,"Partitionkeycannotbenull");

returnkey;

PollingConsumer

实现MessageSource进行polling操作的Consumer。

普通的Pub/Sub模式需要定义Subs

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论