ActiveMQ 分析系列(三)

1.Clustering 最典型的JMS集群模型是,由一组JMS代理器构成,单个客户端连接他们中的一个,如果该broker宕机,则会自动连接另一个broker。我们通过在客户端连接中使用协议栈failover://实现。参考该Failover Transport Reference页面查看具体配置信息。
我们支持通过 static discovery 或 dynamic discovery自动检测broker的机制。这样客户端能自动的连接到一组broker同时broker也能发现和连接其他broker从而形成更大的网络。

1.1Networks of brokers 如果在集群中,只是使用client/server拓扑,那么就可能存在有些broker的消息一直在堆积,但是却没有客户端连接消费消息的情况。因此ActiveMQ提供了Networks of brokers的机制,它支持分布式的queues和topics。
主要有以下两种配置xml的方式:
1.使用硬编码的方式在networkConnector 元素中增加
【ActiveMQ 分析系列(三)】


2.使用Discovery方式去检测broker



更多的配置信息可以参见 Networks of Brokers



1.2Master SlaveNetworks of brokers可以解决集群扩展的问题,但对于消息在同一时刻任然存在于单个broker上,如果这个broker宕机了,只能等他重新启动才能发送消息(如果消息非持久,则会丢失)。Master Slave的目的就是将消息复制到slave broker上,当出现故障时可以迅速的切换到slave上。
目前支持3种slave方式

Master Slave Type
Requirements
Pros
Cons
Shared File System Master Slave
A shared file system such as a SAN
Run as many slaves as required. Automatic recovery of old masters
Requires shared file system
JDBC Master Slave
A Shared database
Run as many slaves as required. Automatic recovery of old masters
Requires a shared database. Also relatively slow as it cannot use the high performance journal
Replicated LevelDB Store
ZooKeeper Server
Run as many slaves as required. Automatic recovery of old masters. Very fast.
Requires a ZooKeeper server.


2.消息消费端特性 2.1Consumer Dispatch Async 分发异步机制broker将消息发送给客户端默认是异步的,异步就意味着多线程和上下文切换的消耗。但异步能将资源最大利用,因为不同的客户端访问的环境可能不同,速度有快有慢。如果认为客户端都够快,且不想将资源消耗在上下文切换上,可以设置为同步。如下:


//connect级别 ((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false); //connectFactory级别 ((ActiveMQConnection)connection).setDispatchAsync(false); //消息级别 queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false"); consumer = session.createConsumer(queue);





2.2Consumer Priority消息的发送是可以有权重级别的。在消费者端必须通过创建的时候设置级别。


queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10"); consumer = session.createConsumer(queue);

级别可以从0~127,高级别能接受低级别的消息。如果在生产者端设置的级别是4,那么3级别的消费者不能马上收到该消息,而在5、6之间,broker会把消息优先发送给6。只有在消息堆积超过broker队列或缓存极限时,才会把消息分发给低级别的消费者。

2.3 Exclusive Consumer 在集群情况下,同一个queue的消息可能被不同的客户端消费,但有些环境里需要消息链上的多条消息被连续处理,Exclusive Consumer特性提供了对这一需求的支持。设置了该特性的队列,会将消息消费的行为绑定到单一的客户端上,除非发生错误产生失败转移到另一个客户端,否则将一直绑定下去。 这在集群里特别的适用。若要设置,可以如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);


2.4 Manage Durable Subscribers 某些主题消息因为客户端在订阅后却长时间离线,而一直进驻内存,影响系统内存量及稳定性。有两种方式可以设置主题消息的过期时间,在配置文件中: 1.在 destinationPolicy标签中设置参数
" expireMessagesPeriod="300000"/>

2.在broker 标签中增加属性


2.5 Message GroupsMessage Groups就类似于并行的多个Exclusive Consumer队列。这些队列共用一个JMSXGroupID,同一JMSXGroupID的消息将固定发送给单一客户端,除非发生故障转移,或groups被关闭。可以如此使用:
Mesasge message = session.createTextMessage("hey"); message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05"); ... producer.send(message);


2.6 Redelivery Policy 转发策略。
Property
Default Value
Description
collisionAvoidanceFactor
0.15
The percentage of range of collision avoidance if enabled
maximumRedeliveries
6
Sets the maximum number of times a message will be redelivered before it is considered apoisoned pill and returned to the broker so it can go to a Dead Letter Queue (use value -1 to define infinite number of redeliveries)
maximumRedeliveryDelay
-1
Sets the maximum delivery delay that will be applied if the useExponentialBackOff option is set. (use value -1 to define that no maximum be applied) (v5.5)
initialRedeliveryDelay
1000L
The initial redelivery delay in milliseconds
redeliveryDelay
1000L
The delivery delay if initialRedeliveryDelay is 0 (v5.4)
useCollisionAvoidance
false
Should the redelivery policy use collision avoidance
useExponentialBackOff
false
Should exponential back-off be used (i.e. to exponentially increase the timeout)
backOffMultiplier
5
The back-off multiplier

使用方式如下:
ActiveMQConnection connection ...// Create a connection RedeliveryPolicy queuePolicy = new RedeliveryPolicy(); queuePolicy.setInitialRedeliveryDelay(0); queuePolicy.setRedeliveryDelay(1000); queuePolicy.setUseExponentialBackOff(false); queuePolicy.setMaximumRedeliveries(2); RedeliveryPolicy topicPolicy = new RedeliveryPolicy(); topicPolicy.setInitialRedeliveryDelay(0); topicPolicy.setRedeliveryDelay(1000); topicPolicy.setUseExponentialBackOff(false); topicPolicy.setMaximumRedeliveries(3); // Receive a message with the JMS API RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap(); map.put(new ActiveMQTopic(">"), topicPolicy); map.put(new ActiveMQQueue(">"), queuePolicy);


2.7 Retroactive Consumer 该特性使得订阅了某topic的客户端,可以追溯之前已发送或错过的消息。
topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true"); consumer = session.createConsumer(topic);


但该特性在集群环境下的activemq不能保证一贯性。

2.8 JMS Selectors 选择器可以支持对消息的筛选。 Selectors are a way of attaching a filter to a subscription to perform content based routing. Selectors are defined using SQL 92 syntax and typically apply to message headers; whether the standard properties available on a JMS message or custom headers you can add via the JMS code.
JMSType = 'car' AND color = 'blue' AND weight > 2500

    推荐阅读