JMS解析(二)——JMS简介

JMS解析(一)——JMS简介_踩踩踩从踩的博客-CSDN博客

前言

本篇文章会紧接着上篇文章未介绍完毕的JMS规范,继续消息接收、事务管理、持久化以及应用的介绍。

消息接收

消息接收分为队列接收和topic消息接收

对于队列的接收基本的consumer创建,一个接收者拿到就可以。

Topic消息接收

这其中包含着各种不同订阅方式,其中包括非共享非持久化订阅、非共享持久化订阅 共享非持久化订阅 共享持久化订阅
对于持久化订阅 ,对于生产者,发送一个消息,没有消费者,topic只会记录数字,不会存的。 topic在默认的情况下都不会存的。
持久订阅 
持久订阅时,客户端向JMS 注册一个识别自己身份的ID(clientId必须有),当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。 tips: ActiveMQ.Advisory开头的消息是activemq提供的一个管理消息推送。
  1. // 2、创建连接对象
  2. conn = connectionFactory.createConnection();
  3. // 通过conn对象,持久订阅需指定ClientId
  4. conn.setClientID('client-1');
  5. conn.start(); // 一定要启动

并且在创建消费者的目标,并且需要指定名字

  1. // 4、创建消息消费目标(Topic or Queue)
  2. Topic destination = session.createTopic(destinationUrl);
  3. // 5、创建消息消费者 http://activemq./destination-options.html
  4. TopicSubscriber consumer = session.createDurableSubscriber(destination, 'I love Durable');

对于持久订阅在页面会显示出来的。 

共享的目的是为了达到:负载均衡 容错 高可用

非共享非持久化订阅

非共享非持久订阅是使用主题消息的最简单方式。

将创建非共享的非持久订阅,并在该订阅上创建使用者对象订阅,使用以下方法之一:

  • 在经典API中,会话上的几个createConsumer方法之一。这些返回一个MessageConsumer对象。
  • 在简化的API中,JMSContext上的几个createConsumer方法之一。这些回归JMSConsumer对象。
  • 每个非共享非持久订阅都有一个使用者。如果应用程序需要在同一订阅上创建多个使用者,然后创建一个共享的非持久订阅应该改用。

noLocal参数可用于指定由主题自己发布到主题的消息不能将连接添加到订阅中。

非共享持久化订阅

需要接收所有已发布消息的应用程序使用持久订阅在某个主题上,包括在没有与之关联的消费者时发布的主题。JMS提供程序保留此持久订阅的记录,并确保来自topic的发布者将被保留,直到他们被交付给消费者并得到消费者的认可
持久订阅或直到过期。
非共享持久订阅在同一时间只能有一个活动(即未关闭)使用者同时。使用创建非共享持久订阅,并在该订阅上创建使用者以下方法之一:

  • 在经典API中,会话上的几个createDurableConsumer方法之一。这些返回MessageConsumer对象。
  • 在简化的API中,JMSContext上的几个createDurableConsumer方法之一。它们返回一个JMSConsumer对象。
必须指定订阅名

非共享持久订阅由客户端和客户端指定的名称标识必须设置的标识符。随后希望在此基础上创建使用者的客户端非共享持久订阅必须使用相同的客户端标识符。

取消持久订阅的方式:

非共享持久订阅将被持久化,并将继续累积消息,直到它被持久化使用会话、JMSContext或TopicSession上的unsubscribe方法删除。它是客户端在具有活动使用者或处于活动状态时删除持久订阅是错误的从中接收的消息是当前事务的一部分,或者尚未在中确认一场

共享非持久化订阅

非持久共享订阅由需要能够共享的工作的客户端使用从多个使用者之间的非持久主题订阅接收消息。因此,非持久共享订阅可能有多个使用者。每封来自订阅将仅传递给该订阅上的一个使用者。创建共享的非持久订阅,并在该订阅上创建使用者,使用以下方法之一:

  • 在经典API中,会话上的几个createSharedConsumer方法之一。这些返回MessageConsumer对象。
  • 在简化的API中,JMSContext上的几个createSharedConsumer方法之一。它们返回一个JMSConsumer对象。
必须指定订阅名(自定义)

 默认的方式,不使用共享方式,推到topic上,默认发送给所有的消费者

为什么需要共享,也是由于消费者大量的数据接收,处理不过来。通过加订阅者根本不会解决问题,因此引入共享的概念

在消息中间添加了共享件。在中间做一个存储,解决高并发的情况。 降低消费者的压力

共享持久化订阅

需要接收所有已发布消息的应用程序使用持久订阅在某个主题上,包括在没有与之关联的消费者时发布的主题。JMS提供程序保留此持久订阅的记录,并确保来自topic的发布者将被保留,直到他们被交付给消费者并得到消费者的认可持久订阅或直到过期。共享的非持久订阅由需要能够共享的工作的客户端使用从多个使用者之间的持久订阅接收消息。共用的耐用家具

因此,订阅可能有多个消费者。订阅中的每封邮件将仅传递给该订阅上的一个消费者。创建共享持久订阅,并使用共享持久订阅在该订阅上创建使用者以下方法之一:

  • 在经典API中,会话上的几个createSharedDurableConsumer方法之一。它们返回一个MessageConsumer对象。
  • 在简化API中,上的几个createSharedDurableConsumer方法之一JMSContext。它们返回一个JMSConsumer对象。
MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException;

 ActiveMQ 中的共享订阅实现

这里对于topic是不需要共享操作的。

原理

JMS 中共享是 2.0 规范定义的。 ActiveMQ 只实现了 1.1 规范,它早就提出它的共享解决办法。

 ActiveMQ Virtual Topic

在其中做到的:

 使用队列去解决共享数据问题。也就是虚拟主题,解决  虚拟topic 

https://activemq./virtual-destinations
配置
activemq.xml broker 节点下增加如下配置
  1. <destinationInterceptors>
  2. <virtualDestinationInterceptor>
  3. <virtualDestinations>
  4. <!--name:主题名,可以是通配符 prefix:队列的前缀
  5. selectorAware:表示从Topic中将消息转发给Queue时,是否关注Consumer的 selector情况。如果为false,那么Topic中的消息全部转发给Queue,
  6. 否则只会转发匹配Queue Consumer的selector的消息 -->
  7. <virtualTopic name='VirtualTopic.>' prefix='VirtualTopicConsumers.*.'selectorAware='false'/>
  8. <virtualTopic name='aa' prefix='VirtualTopicConsumers.*.' selectorAware='false'/>
  9. </virtualDestinations>
  10. </virtualDestinationInterceptor>
  11. </destinationInterceptors>

表示从Topic中将消息转发给Queue时,是否关注Consumer的 selector情况。如果为false,那么Topic中的消息全部转发给Queue,
 否则只会转发匹配Queue Consumer的selector的消息

通配符说明

ActiveMQ queue topic 名称支持通配符,通过通配符可以增强 queue topic 的功能
  • . 用于分隔path名称
  • * 用于匹配任何path名称
  •  用于递归匹配以xx名称开头的任何目标
示例:
com.study.*.mq 匹配 com.study.a.mq, 不匹配 com.study.a.b.mq
com.> com.study.a.mq com.study.a.b.mq 都匹配
通配符可用于生产和消费者,在生产者中使用通配符时,消息将发送到所有匹配的目标上;在消费者中 使用通配符时,将接收所有匹配的目标的消息。
代码
生成者向 虚拟 Topic 发送消息(注意:目标是 Topic ) 
  1. new ProducerThread('tcp://mq.study.com:61616', 'VirtualTopic.Order').start();
  2. Destination destination = session.createTopic(destinationUrl);

配置好对应的虚拟topic即可

多个消费者从同一 队列 消费消息
  1. public static void main(String[] args) {
  2. new ConsumerThread('tcp://mq.study.com:61616',
  3. 'VirtualTopicConsumers.A.VirtualTopic.Order').start();
  4. new ConsumerThread('tcp://mq.study.com:61616',
  5. 'VirtualTopicConsumers.A.VirtualTopic.Order').start();
  6. new ConsumerThread('tcp://mq.study.com:61616',
  7. 'VirtualTopicConsumers.B.VirtualTopic.Order').start();
  8. new ConsumerThread('tcp://mq.study.com:61616',
  9. 'VirtualTopicConsumers.B.VirtualTopic.Order').start();
  10. }
  11. Destination destination = session.createQueue(destinationUrl);

同步方式接收消息

 异步方式接收消息

客户机可以注册实现JMS MessageListener接口的对象消费者当消息到达消费者时,提供者通过调用侦听器的onMessage方法。侦听器可以抛出RuntimeException;然而这被认为是客户端编程错误。行为端正的听众应该抓住这样的机会异常并尝试将导致这些异常的消息转移到特定于应用程序的某种形式“无法处理的邮件”目标。

  1. // 6、异步接收消息
  2. consumer.setMessageListener(new MessageListener() {
  3. @Override
  4. public void onMessage(Message message) {
  5. if (message instanceof TextMessage) {
  6. try {
  7. System.out.println( '收到文本消息:' + ((TextMessage) message).getText());
  8. }
  9. catch (JMSException e) {
  10. e.printStackTrace();
  11. }
  12. } else {
  13. System.out.println(message);
  14. }
  15. }
  16. });

异步接收时如果listener抛出RuntimeException ,此时的处理结果与 session’s acknowledgment mode有关:

  • AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - 消息将立马重发,重发几次provider 定。JMSRedelivered 头和 JMSXDeliveryCount 属性都会被设置。
  • CLIENT_ACKNOWLEDGE - 传递下一条消息。如果客户端向要provider重复上一条未确认的消息, 则需手动调用session.recover()来恢复session(recover的说明见下节消息确认)。
  • Transacted Session - 传递下一条消息。客户端可以选择commit or rollback 会话 (in other words, a RuntimeException does not automatically rollback the session)。
【注意】:一个 Session 是使用单线程来执行它所有的 message listeners

JMS提供程序应标记具有消息侦听器的客户端,这些侦听器将RuntimeException作为可能出故障了。

会话使用单个线程运行其所有消息侦听器。当线程忙于执行时在一个侦听器中,要异步传递到会话的所有其他消息都必须等待。

在spring中持久化订阅

使用 注解 jmslistener ,并且配置好持久化订阅方式,这里注意默认走的是队列的方式,点对点的方式

  1. @JmsListener(destination = 'topic://durableTopic', subscription = 'I love Durable')
  2. public void receive(String text) {
  3. System.out.println(Thread.currentThread().getName() + ' Received <' + text + '>');
  4. }
  1. @Bean
  2. public DefaultJmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory,
  3. DefaultJmsListenerContainerFactoryConfigurer configurer) {
  4. DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  5. // This provides all boot's default to this factory, including the
  6. // message converter
  7. configurer.configure(factory, connectionFactory);
  8. // You could still override some of Boot's default if necessary.
  9. // factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
  10. // factory.setSessionTransacted(true);
  11. // 持久订阅设置
  12. // 1、设置为topic方式目标
  13. factory.setPubSubDomain(true);
  14. // 2、设置客户端id (区分到客户端级别)
  15. factory.setClientId('client-2');
  16. // 3、设置为持久订阅
  17. factory.setSubscriptionDurable(true);
  18. return factory;
  19. }

在配置文件中也可以配置  对应就行

消息确认

消息确认有四种方式:
  • 事务控制方式:如果session是开启事务的,消息的确认是由commit来自动处理的,而恢复则是 rollback自动处理。如果处理会话,则处理消息确认通过提交自动执行,通过回滚自动处理恢复
如果 session 是未开启事务的,则有下面三种可选确认方式,而恢复则需手动来控制。
  • DUPS_OK_ACKNOWLEDGE-懒惰(延缓)确认收到消息,适用于能容忍重复消息的情 况,它带来的好处是减轻session判断重复消息的工作。一般实现可以是异步、批量方式ActiveMQ
  • AUTO_ACKNOWLEDGE -Session自动确 ,在成功收到消息后或在成功从message listener的消息方法处理返回后,自动确认消息。 默认方式
  • CLIENT_ACKNOWLEDGE 客户端自己来回复确认(我们自己来控制回复确认)
  1. // 3、创建会话(可以创建一个或者多个session)
  2. // 自动回复消息确认
  3. //session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  4. // 懒惰回复消息确认(在可以容忍重复消息的情况下使用,可提高效率)
  5. // session = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
  6. // 用户手动回复消息确认
  7. session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
  8. // 开启事务,由事务控制来回复消息确认
  9. // session = conn.createSession(false, Session.SESSION_TRANSACTED);
message.acknowledge();

 

开启了手动消费,如果不确认则会一直存在着。 

session’s recover method
stop session 并以第一个未确认消息重新开始。
在spring中

jms.acknowledage-mode ,也是全局默认的 消息确认设置

消息确认模型的指定

select 选择器

Message selection  根据选择器去选择那个位置
JSM 规范文档: 3.8. Message selection
Selection 是消费端选择性消费消息的功能,通过使用消息头、消息属性中的字段来编写一个 SQL 条件 表达式来说明要消息提供者如何选择性递送消息给消费者。
Session 的创建 Consumer 方法:
MessageConsumer createConsumer(Destination destination, String messageSelector)
JMS 示例:
消费者 :
// 5、创建消息消费者 MessageConsumer consumer = session.createConsumer(destination, 'subtype='aaa'');
生产者:
  1. String text = 'message! From: ' + Thread.currentThread().getName() + ' : ' + System.currentTimeMillis();
  2. TextMessage message = session.createTextMessage(text);
  3. // 设置消息属性
  4. message.setStringProperty('subtype', “aaa');
  5. // 7、通过producer 发送消息
  6. producer.send(message);
Spring 示例
消费者 :
  1. @JmsListener(destination = 'queue1', selector = 'subtype='bbb'')
  2. public void receive(String text, @Header('subtype') String subtype) {
  3. System.out.println(Thread.currentThread().getName() + ' Received <' + text + '> subtype=' + subtype);
  4. }
  1. @Transactional // 在开启事务的情况下需要加事务注解
  2. public void sendMessage(String subtype) {
  3. // 发送延时消息
  4. jmsTemplate.convertAndSend('queue1', 'message with property', message -> {
  5. // 设置消息属性
  6. message.setStringProperty('subtype', subtype); return message;
  7. });
  8. System.out.println('Sending an message with subtype=' + subtype); }

事务处理

事务管理(概念、操作方法),对于分布式事务的处理
  • 事务将会话的输入消息流和输出消息流组织成一系列原子单位。当事务提交时,它的输入原子单元被确认,并且它的发送相关的输出原子单位。如果事务回滚完成,则其生成的消息将被销毁并自动恢复其已使用的邮件。
  • 事务使用其会话的commit()或rollback()方法完成。这个会话当前事务的完成将自动开始下一个会话。结果是事务处理会话始终有一个当前事务,在该事务中完成其工作。
  1. // 3、创建会话(可以创建一个或者多个session)
  2. // 开启事务
  3. session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
  1. // 6、创建文本消息
  2. for (int i = 0; i < 10; i++) {
  3. String text = i + 'message! From: ' + Thread.currentThread().getName() ;
  4. TextMessage message = session.createTextMessage(text);
  5. // 7、通过producer 发送消息
  6. producer.send(message);
  7. System.out.println('Sent message: ' + text);
  8. Thread.sleep(1000L);
  9. }
  10. // 提交事务
  11. session.commit();
  12. // 回滚事务
  13. // session.rollback();

Spring JTA

spring 中使用时,一般都是数据库 +MQ 的场景,需要用 JTA 来进行分布式事务管理
  • 引入分布式事务管理实现
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-jta-atomikos</artifactId>
  4. </dependency>
  • 配置数据源
  1. spring:
  2. jta:log-dir: d:/tmp # 可不配,使用默认目录
  3. activemq:
  4. broker-url: tcp://mq.study.com:61616
  5. pool:
  6. enabled: true
  7. max-connections: 50
  8. #user: admin #password: secret
  9. #数据源参数配置
  10. spring.datasource:
  11. url: jdbc:mysql://127.0.0.1:3306/test? useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
  12. username: root
  13. password: root
像平常一样使用 @Transactional 注解
  1. // @Transactional 异步方式下,加 @Transactional 无效,而是走异步异常根据消息确认机制来处理 ,
  2. // 默认的消息确认模式 为
  3. // AUTO_ACKNOWLEDGE,抛出异常,则会立马重发,重发provider指定的次数(这里可看出默认配置的重发次数为6),
  4. // 重发指定次数后还是不成功,则消息将被转移到死信队列 ActiveMQ.DLQ
  5. @JmsListener(destination = 'transaction-test')
  6. public void reciveMessageAndSaveDb2(String data) {
  7. System.out.println('收到消息:' + data);
  8. this.jdbcTemplate.update('insert t_log(id,log) values(?,?)', System.currentTimeMillis(), data);
  9. }

Spring-JMS API 详解

JmsTemplate

是简化 JMS 同步操作的帮助类。学习要点:
  • 掌握各类简便方法
  • 掌握execute(..)方法来自己操作、设置jms api
  • 掌握可配置属性及参数方式配置属性 spring.jms.template.*

spring boot会帮我们做自动注入 

注意点:
  • 默认的目标类型为Point-to-Point (Queues) ,如是Topic,通过pubSubDomain 属性设置(true)
  • JMS Session 默认设置为'not transacted' and 'auto-acknowledge',如果 JMS Session是在事务中 创建的,则这两个参数将被忽略,是通过事务来控制。而如果是使用原生JMS,可通过 'sessionTransacted' and 'sessionAcknowledgeMode' bean properties来设置。
  •  JmsTemplate 使用 DynamicDestinationResolver SimpleMessageConverter 作为目标名解析 和消息转换的默认策略。可通过 'destinationResolver' and 'messageConverter' bean properties 来覆盖默认值。
  • JmsTemplate使用的ConnectionFactory需返回池化的Connections(or a single shared Connection)

正确的用法:

1. 至少要分两个 JmsTempate queue topic
2. 如果有多种业务队列 /topic 他们的方式各不相同,则需要按业务队列 /topic 配置 topic
也是由于  JmsTemplate本身就是单例的,要保证多个实例下的jmstemplate线程安全
  1. @Configuration
  2. public class JmsConfiguration {
  3. @Primary
  4. @Bean
  5. public JmsTemplate queueJmsTemplate(ConnectionFactory connectionFactory) {
  6. PropertyMapper map = PropertyMapper.get();
  7. JmsTemplate template = new JmsTemplate(connectionFactory);
  8. // template.setDestinationResolver(destinationResolver);
  9. template.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
  10. return template;
  11. }
  12. @Bean
  13. public JmsTemplate topicJmsTemplate(ConnectionFactory connectionFactory) {
  14. PropertyMapper map = PropertyMapper.get();
  15. JmsTemplate template = new JmsTemplate(connectionFactory);
  16. // template.setDestinationResolver(destinationResolver);
  17. return template;
  18. }
  19. }

JmsMessagingTemplate

JmsTemplate 上增加了一些方法
covertAndsent

MessageConverter

掌握它的用途,已有实现

默认的MessageConverter只能转换基本类型(如String、Map、Serializable),而我们的电子邮件不是故意序列化的。我们想用Jackson和将内容序列化为文本格式的json(即作为文本消息)。Spring Boot将检测到存在MessageConverter,并将其与默认JmsTemplate和任何由DefaultJmsListenerContainerFactoryConfigurer创建的JmsListenerContainerFactory。

  1. @Bean // Serialize message content to json using TextMessage
  2. public MessageConverter jacksonJmsMessageConverter() {
  3. MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
  4. converter.setTargetType(MessageType.TEXT);
  5. converter.setTypeIdPropertyName('_type'); return converter;
  6. }

@JmsListener

带注释的JMS侦听器方法允许具有类似于MessageMapping提供:

获取对jms会话的访问权javax.jms.Message或其子类之一,以访问原始jms消息

org.springframework.messaging.Message使用Spring的消息传递抽象对应方

  • @有效负载-带注释的方法参数,包括对验证的支持
  • @Header-带注释的方法参数,用于提取特定的头值,包括标准头值

由org.springframework.JMS.support.JmsHeaders@headers定义的JMS头-带注释的方法参数,该参数还必须可分配给获取对所有标头的访问权org.springframework.messaging.MessageHeaders用于获取对所有标题org.springframework.messaging.support.MessageHeaderAccessororg.springframework.jms.support.JmsMessageHeaderAccessor,方便访问所有方法参数带注释的方法可能具有非void返回类型。当他们这样做时,方法的结果调用作为JMS应答发送到由传入消息。如果未设置此标头,则可以通过添加@发送到方法声明。

DefaultJmsListenerContainerFactory 的配置

注释,该注释将方法标记为指定对象上JMS消息侦听器的目标目的地containerFactory标识用于构建jms的org.springframework.jms.config.JmsListenerContainerFactory侦听器容器。如果未设置,则假定默认容器工厂可用于beanjmsListenerContainerFactory的名称,除非通过提供了显式默认值

配置考虑设置习惯org.springframework.jms.config.defaultjmsListenerContainerFactorybean。用于生产为此,您通常会微调超时和恢复设置。最重要的是,违约“自动确认”模式不提供可靠性保证,因此请确保使用在可靠性需要的情况下处理会话。

  1. @Configuration
  2. static class JmsConfiguration {
  3. @Bean
  4. public DefaultJmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
  5. DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); //This provides all boot's default to this factory, including the
  6. // message converter
  7. configurer.configure(factory, connectionFactory);
  8. // You could still override some of Boot's default if necessary.
  9. // factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
  10. // factory.setSessionTransacted(true);
  11. return factory;
  12. }
  13. }
  1. @Component
  2. public class Receiver {
  3. @JmsListener(destination = 'mailbox', containerFactory = 'myFactory')
  4. public void receiveMessage(Email email) {
  5. System.out.println('Received <' + email + '>');
  6. }
  7. }

此注释可用作元注释,以创建具有属性覆盖。

CachingConnectionFactory

CachingConnectionFactory extends SingleConnectionFactory

 

 缓存的连接工厂,创建的工厂。添加的配置

JmsPoolConnectionFactory

  1. <dependency>
  2. <groupId>org.messaginghub</groupId>
  3. <artifactId>pooled-jms</artifactId>
  4. </dependency>

  1. spring.activemq.pool.enabled=true
  2. spring.activemq.pool.max-connections=50