2022金蝶Apusic分布式消息队列用户手册V2.0_第1页
2022金蝶Apusic分布式消息队列用户手册V2.0_第2页
2022金蝶Apusic分布式消息队列用户手册V2.0_第3页
2022金蝶Apusic分布式消息队列用户手册V2.0_第4页
2022金蝶Apusic分布式消息队列用户手册V2.0_第5页
已阅读5页,还剩64页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Apusic分布式消息队列V2.0用户手册金蝶Apusic分布式消息队列V2.0用户手册HYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKHYPERLINKand2messageswith"key-4"producer.newMessage().key("key-1").value("message-1-1").send();producer.newMessage().key("key-1").value("message-1-2").send();producer.newMessage().key("key-1").value("message-1-3").send();producer.newMessage().key("key-2").value("message-2-1").send();producer.newMessage().key("key-2").value("message-2-2").send();producer.newMessage().key("key-2").value("message-2-3").send();producer.newMessage().key("key-3").value("message-3-1").send();producer.newMessage().key("key-3").value("message-3-2").send();producer.newMessage().key("key-4").value("message-4-1").send();32金蝶Apusic分布式消息队列V2.0用户手册producer.newMessage().key("key-4").value("message-4-2").send();Exclusive创建一个新的消费者,用Exclusive订阅模式订阅。Consumerconsumer=client.newConsumer()....topic("my-topic")subscriptionName("my-subscription")subscriptionType(SubscriptionType.Exclusive)subscribe()10条消息,消费的顺序与生产的顺序相同。到分区,并收到一个错误。Failover创建新的消费者,用Failover订阅模式订阅。Consumerconsumer1=client.newConsumer()....topic("my-topic")subscriptionName("my-subscription")subscriptionType(SubscriptionType.Failover)subscribe()Consumerconsumer2=client.newConsumer()....topic("my-topic")subscriptionName("my-subscription")subscriptionType(SubscriptionType.Failover)subscribe()就成为活跃的消费者。Shared创建新的消费者,用共享订阅模式订阅。Consumerconsumer1=client.newConsumer()...topic("my-topic")subscriptionName("my-subscription")subscriptionType(SubscriptionType.Shared)33金蝶Apusic分布式消息队列V2.0用户手册.subscribe()Consumerconsumer2=client.newConsumer()....topic("my-topic")subscriptionName("my-subscription")subscriptionType(SubscriptionType.Shared)subscribe()方式传递。Key_shared创建新的消费者并以Key_Shared订阅模式订阅。Consumerconsumer1=client.newConsumer()....topic("my-topic")subscriptionName("my-subscription")subscriptionType(SubscriptionType.Key_Shared)subscribe()Consumerconsumer2=client.newConsumer()....topic("my-topic")subscriptionName("my-subscription")subscriptionType(SubscriptionType.Key_Shared)subscribe()Key_Shared订阅和Shared订阅一样,所有消费者都可以附加到同一个订阅。但它与Key_Sharedkey会事先知道哪些keykey只会在同一时间被分配给一个消费者。consumer1收到以下信息("key-1","message-1-1")("key-1","message-1-2")("key-1","message-1-3")("key-3","message-3-1")("key-3","message-3-2")Consumer2收到以下信息34金蝶Apusic分布式消息队列V2.0用户手册("key-2","message-2-1")("key-2","message-2-2")("key-2","message-2-3")("key-4","message-4-1")("key-4","message-4-2")key的消息默认会被添加到一个批次中。broker将把批处理派发给消费者,所以默认的批处理机制可能会破坏Key_Shared订阅保证的消息分发语义。生产者需要使用KeyBasedBatcher。Producerproducer=client.newProducer().topic("my-topic")batcherBuilder(BatcherBuilder.KEY_BASED)create();..或者生产者可以禁用批处理。Producerproducer=client.newProducer().topic("my-topic").enableBatching(false).create();3.7Reader通过Reader接口,客户端可以在一个主题中"手动定位",从指定消息开始处理所有消息。Java的API使你能够通过指定一个主题和一个MessageId来创建Reader例子。byte[]msgIdBytes=//messagebytearrayMessageId=MessageId.fromByteArray(msgIdBytes);Readerreader=pulsarClient.newReader()...topic(topic)startMessageId(id)create();while(true){Messagemessage=reader.readNext();/Processmessage/}35金蝶Apusic分布式消息队列V2.0用户手册Reader被msgIdBytes识别后,Reader对主题中的每条消息进行迭代。上面的示例代码展示了将ReaderIDMessageId.earliest指向主题上最早的可用消息,MessageId.latest指向最近的可用消息。当你创建一个Reader时,你可以使用loadConf配置。在loadConf中可以使用以下参数。类型StringInt名称描述缺省值topicName主题名称receiverQueueSi一个消费者的接1000ze收队列的大小。调用Receive之累的消息数量。一个高于默认值的值会增加消费以更多的内存利用率为代价。ReaderListener<T>readerListener到消息时被调用。Reader名称StringStringreaderNamesubscriptionRolePrefix订阅角色前缀CryptoKeyReaderCryptoKeyRead抽象出对密钥存er储的访问的接口。ConsumerCryptoFailurcryptoFailureAct当消费者收到无ConsumerCryptoFailureAeActionion法解密的消息时,ction.FAIL应该采取行动。-FAIL:这是默认选项,使消息失败,直到加密成36金蝶Apusic分布式消息队列V2.0用户手册功。DISCARD-程序传递消息。-CONSUME:将加密的消息传递消息是应用程序的责任。消息解压失败。如果消息包含批端就不能在批处理中检索单个消息。交付的加密消息包含{@linkEncryptionContext},其中包含加密程序可以用它来解密消耗的消息有效载荷。booleanreadCompacted如果启用FalsereadCompacted,消费者会从一个压缩的主题中读个主题的全部消息积压。消费者只看到压37金蝶Apusic分布式消息队列V2.0用户手册缩后的主题中每到到达主题消息中压缩积压的点。正常一样发送消息。readCompacted只能在持久性话题的订阅上启用,这些订阅有一个订阅)。试图在非持久性主题的订阅或共享订阅上启用它,会导致订阅调用抛出一个PulsarClientException。booleanresetIncludeHea如果设置为falsed"true",将返回的第一条信息是由messageId指定的那条。如果设置为false,要返回的第一条信息是messageId指定38金蝶Apusic分布式消息队列V2.0用户手册的信息旁边的那条。3.7.1Sticky范围Reader在Stickykey范围Reader中,broker将只发送消息key的哈希值包含在指定的key哈希范围内的消息。一个Reader上可以指定多个key哈希值范围。下面是一个创建stickykey范围Reader的例子。pulsarClient.newReader()....topic(topic)startMessageId(MessageId.earliest)keyHashRange(Range.of(0,10000),Range.of(20001,30000))create();散列范围总大小为,所以范围的最大端应该小于或等于65535。3.8在中,所有的消息数据都是由字节数组组成的。消息Schema使你在构建和处果你构建了一个生产者,如果没有指定schema,那么这个生产者只能产生byte[]类型的消息。下面是一个例子。Producer<byte[]>producer=client.newProducer().topic(topic)create();.schemaADMQ哪种数据类型将在主题中传输。3.8.1Schema示例假设你有一个SensorReading类,你想通过一个主题来传输:publicclassSensorReading{publicfloattemperature;publicSensorReading(floattemperature){this.temperature=temperature;}39金蝶Apusic分布式消息队列V2.0用户手册//Ano-argconstructorisrequiredpublicSensorReading(){}publicfloatgetTemperature(){returntemperature;}publicvoidsetTemperature(floattemperature){this.temperature=temperature;}}然后你可以创建一个Producer<SensorReading>Consumer<SensorReading>):Producer<SensorReading>producerclient.newProducer(JSONSchema.of(SensorReading.class))=.topic("sensor-readings")create();.以下是目前可用于Java的schema:无schema或字节数组schemaProducer<byte[]>bytesProducer=client.newProducer(Schema.BYTES)topic("some-raw-bytes-topic")create();⚫..或者等效于:Producer<byte[]>bytesProducer=client.newProducer().topic("some-raw-bytes-topic")create();对于UTF-8编码的字符串数据,使用Schema.STRING:Producer<String>stringProducer=client.newProducer(Schema.STRING)topic("some-string-topic")create();使用Schema.JSON为POJOJSONschema:Producer<MyPojo>pojoProducer=client.newProducer(Schema.JSON(MyPojo.class))topic("some-pojo-topic").⚫..⚫.40金蝶Apusic分布式消息队列V2.0用户手册.create();⚫使用Schema.PROTOBUF生成Protobufschema。下面的例子显示了如何创建Protobufschema并使用它来实例化一个新的生产者:Producer<MyProtobuf>client.newProducer(Schema.PROTOBUF(MyProtobuf.class))topic("some-protobuf-topic")create();protobufProducer=..⚫用Schema.AVRO定义Avroschema。下面的代码片断演示了如何创建和使用Avroschema:Producer<MyAvro>avroProducer=client.newProducer(Schema.AVRO(MyAvro.class))topic("some-avro-topic")create();..4章Apusic在删除金蝶Apusic分布式消息队列之前,请停止以下过程:⚫⚫所有域名和其他相关流程命令提示使用安装目录或其子目录使用属于Java平台标准版(Java)的文件的任何应用程序删除金蝶Apusic分布式消息队列安装包解压安装目录。41金蝶Apusic分布式消息队列V2.0用户手册部分II管理控制台的使用管理控制台是一个基于Web的GUI订阅、broker、集群,并支持多个环境的动态配置。1章登录管理控制台在浏览器中键入RLhttps://ip:7750/ui/index.html在登录界面输入用户名和密码进行登录,默认都是admin。2章管理控制台可以管理多个集群。ADMQ集群中任意一台BROKER的WEBURL42金蝶Apusic分布式消息队列V2.0用户手册环境列表,列出当前的环境,可以进行编辑和删除。3章租户列表,列出当前环境的所有租户:创建租户,指定租户名称、所属的集群:43金蝶Apusic分布式消息队列V2.0用户手册租户包含的命名空间列表:租户的配置信息:4章管理命名空间从命名空间列表,进入某个命名空间的概要信息,还可以对bundle进行卸载和拆分:44金蝶Apusic分布式消息队列V2.0用户手册查看命名空间的主题列表:在命名空间新建主题:45金蝶Apusic分布式消息队列V2.0用户手册命名空间的策略配置:ReplicatedClusters(复制的集群):这是有多个集群且开启了跨集群复制时,表示数据会在哪几个集群间进行复制。SubscriptionAuthenticationMode(订阅身份验证模式):配置订阅身份验证,身份证时限制订阅命名约定的模式已启用。可用选项:“NONE(无”、“Perfix(前缀”验••NONE:每个客户端角色都可以使用任何允许的订阅名称连接。Perfix:只允许客户端使用以其角色名称为前缀的订阅名称去订阅ꢀ例如,如果客户机被验证为“example”,则它只能使用前缀为“example”的订阅名称阅。不允许使用其他订阅名称。进行订EnsembleSize(存储集合大小):该命名空间使用多少台bookie来存储数据。WriteQuorumSize(写入Quorum大小):该命名空间下每条消息使用多少台bookie存储数据,必须小于等于EnsembleSize设置的数量。来AckQuorumSizeAckQuorum大小):多少台bookie完成消息存储就认为这条消经存储成功,必须小于等于WriteQuorumSize息已Mark-DeleteRate(标记删除率):每秒允许多少被标记为删除的速率限制,0为无限制。EncryptionRequired(消息加密):对命名空间强制执行消息加密。Deduplication(重复数据消除):命名空间的重复数据消除。BacklogQuotaSizebacklog配额大小):命名空间允许的最大积压配额(以字节为单位)BacklogRetentionPolicybacklog保留策略):当达到backlog配额时要强制执行的保留策略。46金蝶Apusic分布式消息队列V2.0用户手册有效选项为:producer_request_hold:在资源可用(或等待超时)之前保留生产者发送的请求的策略.producer_exception:拒绝生产者发送的请求的策略.consumer_backlog_eviction:从最慢的消费者的backlog中逐出最旧消息的策略.AutoUpdateStrategy(schema自动更新策略兼容性检查策略定义更改检查Sche优先升级版本ALWAYS_COMPATIBLEschema兼检本求ALWAYS_INCOMPATIBLEschema演无无47金蝶Apusic分布式消息队列V2.0用户手册•ConsumBACKWARDschemaV3的consu••producer使用•schemaV3或V2•ConsumBACKWARD_TRANSITIVEschemaV3的本48金蝶Apusic分布式消息队列V2.0用户手册consuproducer使用••schemaV3、V2或V1编数••ProducFORWARDschemaV3或V2的••consu49金蝶Apusic分布式消息队列V2.0用户手册producer使用•schemaV3•ProducFORWARD_TRANSITIVE本schemaV3、V2或V1的consu••producer使•50金蝶Apusic分布式消息队列V2.0用户手册用schemaV3FULLSchemaV3•修求段和V2FULL_TRANSITIVESchemaV3、V2和V1之•修本求段51金蝶Apusic分布式消息队列V2.0用户手册兼SchemaValidationEnforcedschema强制验证):对生产者强制schema验证。如用,则允许没有schema的生产者向具有schema的主题生成消息。MessageTTL(seconds)(消息TTL(秒)):以秒为单位设置消息TTL。如果订阅的任何使用者未使用这些消息,则在为该订阅配置的TTL期间之后,这些消息将标记为“已使用”。果禁RetentionSize(megabytes)(保留大小()):保留大小,只应用于消息被所有订阅确认。RetentionPeriod(minutes)(保留期限(分钟)):保留期限,只应用于消息被所有订阅确认CompactionThreshold(bytes)(压缩阈值(字节)):当存储大小达到阈值时,将自动触发压缩。OffloadThreshold(bytes)(卸载阈值(字节)):当未卸载消息的数据大小达到阈值时,消息将自动卸载到分层存储。OffloadDeletion(milliseconds)(卸载删除延迟(毫秒)):在删除从bookie卸载的ledger前等待的毫秒数。负值表示删除已完全禁用。MaxProducersPerTopic(每个主题的最大生产者):每个主题允许的最大生产者数量。MaxConsumersPerTopic(每个主题最大消费者):每个主题允许的最大消费者数量。MaxConsumersPerSubscription(每个订阅的最大消费者数):每个订阅允许的最大消费者数。DispatchRatePerTopic(每个主题的调度率,根据每个主题限制Throughput(bytes/second):吞吐量(字节秒)Rate(messages/second):速率(消息秒)分派速率。):TimePeriod(seconds):时间段(秒)52金蝶Apusic分布式消息队列V2.0用户手册DispatchRatePerSubscription(每个订阅的调度率,根据每个订阅限制分派速率。):Throughput(bytes/second):吞吐量(字节秒)Rate(message/second):速率(消息秒)TimePeriod(seconds):时间段(秒)SubscribeRatePerConsumer(每个消费者的订阅率,限制消费者尝试订阅主题的速度。):Rate(subscribes/second):速率(消息秒)TimePeriod(seconds):时间段(秒)Anti-AffinityGroup:为此命名空间配置Anti-Affinity组。5章主题的概要信息浏览:53金蝶Apusic分布式消息队列V2.0用户手册6章在主题概览页面,可以查看到该主题的所有订阅,点击某个订阅,可以对其进行管理:7章集群列表从集群列表,进入某个集群的配置信息8章broker/bookie集群的broker列表,可以看到集群中broker的域名或者和端口号,拥有的命名空间,当前的消息收发速率和吞吐量。54金蝶Apusic分布式消息队列V2.0用户手册从集群列表,进入broker的概览信息页面:点击RuntimeConfig,查看broker的运行时配置:命名空间broker级别隔离策略列表:新建隔离策略:55金蝶Apusic分布式消息队列V2.0用户手册配置好集群名称,命名空间,主要和次要broker,自动故障转移策略等相关参数,该namespaces的数据只往primary的broker发,然果primarybroker达到了自动故障转移策略的设置,就将数据往secondarybrokers发。集群的bookie列表:56金蝶Apusic分布式消息队列V2.0用户手册9章的用户。查看当前用户列表:创建新用户并分配资源权限3二级是租户,三级是命名空间,勾选上一级,则下一级的全部权限会自动获取,my-namespace的命名空间,下面有my-topic的主题,勾选my-namespace的话,则后续如果在my-namespace下创建新的主题,也会自动拥有该权限,如果只勾选my-topic,则my-topic只能看到拥有权限的数据,使用进行消息生产消费时,也只能对有权主题进行操作。控台,限的57金蝶Apusic分布式消息队列V2.0用户手册查看客户端认证时需要的token信息:58金蝶Apusic分布式消息队列V2.0用户手册部分适配其他消息中间件接口ADMQ提供了适配其他消息中间件的SDK已有的基础架KOAAOAJOA其他消息应用程序切换到ADMQ下将其使用的客户端应用程序切换到ADMQ。下面以单机部署模式说明各个插件的使用方式。1章KafkaADMQKoA(Kafkaon将Kafka协议处理插件引入ADMQbroker,从而实现ADMQ对原生ApacheKafka协议的支持。将KoA协议处理插件添加到现有集群后,用户不用修改代码就可以将现有的Kafka应用程序和服务迁移到ADMQ,从而使用ADMQ的强大功能,例如:➢➢➢➢利用企业级多租户特性简化运营避免数据搬迁,简化操作利用ApacheBookKeeper和分层存储持久保留事件流利用PulsarFunctions进行无服务器化事件处理1.1修改配置文件/yourpath/admq-V1.0.0/share/pulsar/conf/standalone.conf设置协议messagingProtocols=kafkaprotocolHandlerDirectory=/yourpath/admq-V1.0.0/lib#allowAutoTopicCreationType默认是不分片的,但是KoA必须设置分片partitionedallowAutoTopicCreationType=partitioned设置监听器kafkaListeners=PLAINTEXT://:9092kafkaAdvertisedListeners=PLAINTEXT://:9092设置偏移管理59金蝶Apusic分布式消息队列V2.0用户手册brokerEntryMetadataInterceptors=ercept.AppendIndexMetadataInterceptor关闭topic自动删除ADMQ会删除已分区主题的非活动分区,而不会删除已分区主题的元数据。在这种情况下,无法创建丢失的分区,所以这个设置非常重要。brokerDeleteInactiveTopicsEnabled=false1.2下载kafka2.0以上版本运行生产者bin/kafka-console-producer.sh--broker-list:9092--topictest运行消费者bin/kafka-console-consumer.sh--bootstrap-server:9092--topictest--from-beginning之后就可以持续看到消息的发送和接收。2章ADMQ2.1修改配置文件/yourpath/admq-V1.0.0/share/pulsar/conf/standalone.confmessagingProtocols=mqttprotocolHandlerDirectory=./protocolsmqttListeners=mqtt://:1883advertisedAddress=2.2使用客户端进行测试。添加依赖:60金蝶Apusic分布式消息队列V2.0用户手册<<dependency><<<groupId>org.fusesource.mqtt-client</groupId>artifactId>mqtt-client</artifactId>version>1.16</version>/dependency>测试代码:/JavaCode/MQTTmqtt=newMQTT();mqtt.setHost("",1883);BlockingConnectionconnection=mqtt.blockingConnection();connection.connect();Topic[]QoS.AT_LEAST_ONCE)};connection.subscribe(topics);topics={newTopic("persistent://public/default/my-topic",//publishmessageconnection.publish("persistent://public/default/my-topic","HelloMOP!".getBytes(),QoS.AT_LEAST_ONCE,false);//receivemessageMessagereceived=connection.receive();System.out.println(newString(received.getPayload()));3章ADMQADMQ在java客户端实现了JMS2.0(Java消息服务)。可以使用对应的Java库(pulsar-jms)来使用它。通过JOA协议,可以实现JMS到的应用迁移,而无需修改代码,就可以使用了。3.1配置名称是否必须作用示例61金蝶Apusic分布式消息队列V2.0用户手册webServiceUrl是是PulsarHTTPhttp://localhost:8080endpointbrokerServiceUrl连接pulsarbrokerpulsar://localhost:6650或proxyenableTransactionjms.usePulsarAdmin否否启动事物false允许客户端使用falseadminapiauthPlugin否否授权插件authParamspulsar生成的生成的tokentokenproducerConfigconsumerConfig否否producer的配置Map<String,Object>consumer的配置Map<String,Object>其中authPlugin可以设置为org.apache.pulsar.client.impl.auth.AuthenticationToken3.2使用java程序测试,添加pom依赖:<dependency><<<groupId>com.datastax.oss</groupId>artifactId>pulsar-jms-all</artifactId>version>1.0.0</version></dependency>编写代码:/JavaCode/Stringtopic="persistent://public/default/example-topic";Map<String,Object>properties=HashMap<>();properties.put("webServiceUrl",":8080");properties.put("brokerServiceUrl","pulsar://:6650");Map<String,Object>producerConfig=HashMap<>();producerConfig.put("batchingEnabled",false);62金蝶Apusic分布式消息队列V2.0用户手册Map<String,Object>consumerConfig=HashMap<>();consumerConfig.put("receiverQueueSize",1);configuration.put("consumerConfig",consumerConfig);configuration.put("producerConfig",producerConfig);try(PulsarConnectionFactoryfactory=PulsarConnectionFactory(properties);){JMSContextcontext=factory.createContext();Queuequeue=context.createQueue(topic);//Listenmessages...context.createConsumer(queue).setMessageListener(newMessageListener(){Override@publicvoidonMessage(Messagemessage){{System.out.println("Received:"+message.getBody(String.class));}}catch(Exceptionerr){err.printStackTrace();}});for(inti=0;i<10;i++){message="Helloworld!"+i;System.out.println("Sending:"+message);context.createProducer().send(queue,message);}Thread.sleep(10000);}63金蝶Apusic分布式消息队列V2.0用户手册4章ADMQAoA(上的RabbitMQ代理上引入AMQP协议处带来了原生的AMQPAoA协议处理程序添加到您现有的AMQP到使用ADMQ目前,AoA协议处理程序支持AMQP0-9-1协议,仅支持持久交换和持久队列。一个VirtualHost对应的namespace只能设置一个bundle。并且需要提前为VirtualHost创建一个namespace。4.1修改配置文件/yourpath/admq-V1.0.0/share/pulsar/conf/standalone.confProtocolhandlerconfiguration#messagingProtocols=amqpprotocolHandlerDirectory=/yourpath/admq-V1.0.0/lib#SetAMQPservicelistenersamqpListeners=amqp://:5672advertisedAddress=#connection中,channel的最大数量,默认是64.设置成和rabbitMQ同样的channel2047#如果保持默认

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论