指引网

当前位置: 主页 > 编程开发 > Java >

使用BMT消息驱动BEAN和SPRING进行高性能的消息处理

来源:网络 作者:佚名 点击: 时间:2017-11-14 05:06
[摘要] 简介 自从被 引入EJB 2.0规范后,消息驱动bean (MDB)已经成为几乎所有企业Java项目的基
Spring

  简介

  自从被引入EJB 2.0规范后,消息驱动bean (MDB)已经成为几乎所有企业Java项目的基础。MDB拥有一个简单而整洁的API,用户不需要创建或生成home和local/remote接口,所有这些使得Java开发社区迅速且广泛地接受了MDB。如您所知,几乎任何大型J2EE项目都有一个重要部分是与其它系统的集成有关的。一种集成不同系统的可取方法是使用面向消息中间件(MOM)提供的企业队列(queue)和主题(topic)基础架构,这要求所有的J2EE服务器都拥有自己的兼容JMS的MOM实现。

  看起来大部分企业J2EE项目取决于JMS消息的接收和/或发送。因此,J2EE架构设计者和开发者就一定要熟悉不同的消息消费和生产方法,因为这可能最终决定项目的成功或失败。最近,开始出现一些MDB模型的替代方案,例如Spring和ActiveMQ的消息驱动POJO。当前的MDB模型除了本身是一项有趣有用的技术外,其地位也得以确定,它足够简单,并且在J2EE社区中得到了广泛的采用。

  本文主要关注基于MDB的传统消费模型。本文将展示如何利用一种更高效的非事务型消息检索来重构这一场景,而同时仍然保持业务用例的once-and-only-once服务质量(QoS);换句话说,业务代码应该只处理消息一次,并且不应该丢失任何消息。这是最严格的服务质量,也是实现中最有趣的部分。

  传统的消息消费模型

  可以说,在大多数的J2EE项目中,只要出现消费JMS消息的用例,都会使用MDB。除了最普通的情况外,如果从业务角度来看丢失传入的消息和/或处理消息副本可以接受,这些MDB都使用容器管理事务(CMT)划分模型和事务型属性RequiresNew来部署。为了使这些设置可以生效,应当用事务型的QueueConnectionFactory(或TopicConnectionFactory)来配置MDB。在这样的设置中,消息消费过程可以用下面的步骤来描述:

  • J2EE容器启动一个JTA事务。
  • MDB所监听的队列/主题的XAResource被添加到事务中。
  • 从队列/主题消费一条消息,并传递给MDB的onMessage()方法。
  • MDB处理消息。任何在MDB处理消息时会使用的事务型资源(比如:数据库、其它队列或主题以及JCA适配器)会被添加到步骤1中启动的同一个JTA事务中(当然,只有在资源支持XA时才可以)。
  • 如果处理成功(onMessage()调用无异常地返回),则J2EE容器会对JTA事务中的所有资源执行一次两阶段提交。因为JTA事务所拥有的资源之一是消费JMS消息的目的地,一旦JTA事务提交成功,消息就会从队列/主题中移除。
  • 如果处理失败(onMessage()调用抛出RuntimeException 异常),则J2EE容器会对表示步骤3中消费JMS消息的目的地的XAResource执行回滚,因此消息会停留在队列/主题中,稍后可能被重新发送。此外,在这种情况下,MDB实例会被销毁并从处理池中移除。

  如您所见,事务型消息消费的处理非常健壮。这样的处理模型保证了once-and-only-once的服务质量;换句话说,消息要么被成功地处理一次,要么根本不被处理(可能在超出预定义的重试次数或生存时间或者转移到停用(dead)消息队列之后,被从队列中移除)。

  传统模型的缺点

  事务型消息消费模型尽管健壮且成功,但是它还是有一些严重的缺点。首先,分布式事务严重地影响了处理性能(当从本地事务切换到XA时,性能降低50%是不足为奇的)。

  第二个缺点是由CMT造成的,实际的事务提交发生在应用程序代码之外,在onMessage()调用返回之后。这也许没什么大不了的(毕竟CMT的整体思想是要将应用程序从事务处理中解脱出来),但是存在一些令人不愉快的问题——一些错误情况直到事务提交后才进行侦测。例如,在BEA WebLogic Server中,默认情况下,所有由处理CMP bean(创建,更新等等)引起的DML操作都被延迟到事务提交阶段。这意味着应用程序可以认为它成功地更新了CMP bean的一个实例,而实际上实际的SQL更新可能会因为违反数据库的某种约束而失败。最糟糕的是应用程序代码不能够对此做出反应或者只是恰当地进行记录,因为它看不到异常。

  优化提议

  尽管对延迟的DML操作有一个应急方案(例如,在BEA WebLogic Server中,可以在部署描述符中禁用它),但是这伴随着性能损失。J2EE服务器将不能再聚集和/或批处理SQL更新(以便执行更高效),或者将它们全部忽略(如果事务稍后被标记为回滚的话)。

  本文认为,采用一种bean管理事务(bean-managed transaction,BMT)方法可以提供同样的服务质量,并对事务生命周期有更多的控制。应用程序代码将有机会恢复和/或更清楚地报告错误,同时避免上述的CMT模型的所有缺点。此外,我们预料从事务作用域中移除消息检索能带来重大的性能提升。

  在讨论BMT方法之前,我们需要分析在这种情况下从队列中消费消息会发生什么。如果我们使用BMT 划分部署MDB,J2EE服务器不会再把MDB监听的JSM目的地(队列或者主题)添加到事务中(事务将会在从队列中取走消息之后开始)。在这种情况下,BMT MDB应当用部署描述符中的非XA连接工厂配置;否则J2EE服务器会部署失败。

  根据JMS规范(JMS1.1第一节4.5.2),如果使用AUTO_ACKNOWLEDGE 或者 DUPS_OK_ACKNOWLEDGE模式非事务型地部署消息监听器,并且onMessage()方法抛出RuntimeException或它的任何子类,则消息会被重新发送。换句话说,重新设计用例来使用BMT是有可能的,如果在处理消息时出错的话,应用程序代码可以抛出RuntimeException,消息就会被重新发送(重试)。这种方法很有效,因为使用RuntimeException来表示不可恢复的错误是很自然的(例如Spring Framework的异常层次结构基本上全是基于RuntimeException的子类)。消息会重新发送,直到达到一定的次数(可在MOM软件层配置),之后它通常被丢弃或转移到停用消息队列,或者应用程序代码会计算消息被重新发送的次数,并决定什么时候应当停止尝试处理以及不处理就消费(如果适当的话,会产生错误消息)还是转移到另外的队列。

  如您所见,上述行为保证我们可以控制消息的重新发送,并且应用程序可以在处理反复失败的情况下重试。下面我们将展示如何在BMT情况下保证once-and-only-once行为。

  非事务型消息消费模型

  让我们看看非事务型消息消费模型中的事件顺序:

  • 从队列/主题中消费一条消息,并传递给MDB的onMessage()方法
  • 应用程序启动一个BMT事务
  • MDB处理消息。所有在MDB处理消息时会用到的事务型资源(例如:数据库、其他队列或主题,以及JCA适配器)都会被添加到步骤2中启动的BMT事务当中。
  • 应用程序全面控制BMT事务。它可以根据任何需要的业务逻辑提交或者回滚事务。最简单的情况是,如果从业务逻辑消息处理代码抛出任何异常,就回滚事务,否则就提交。
  • 如果调用onMessage()成功返回(没有抛出RuntimeException),则消息会被确认,并从队列中移除。

  您或许注意到了,从理论上来看,该序列有点简单了些,尽管其实现比普通的CMT情况要稍复杂。得到的回报是消息处理的巨大灵活性。例如,由应用程序代码来决定是提交还是回滚事务,并独立于消息确认。注意,直到onMessage()方法调用成功返回,消息才被确认。

  Bean管理和容器管理的事务以及所要求的服务质量

  到现在为止,一切都还不错,但是让我们看看能否保证与使用事务型消息消费模型时相同的服务质量(once and only once)。我们需要分析两个问题:

  • 消息是否会因为消息处理而丢失
  • 消息是否会被多次处理

  第一条很简单。从我们的讨论来看,很明显,由于消息直到onMessage()成功返回才被确认,只需要保证应用程序代码没有在不恰当的地方屏蔽掉异常,并且在任何抛出异常的地方回滚了事务。这是一个非常简单自然的方法。

  第二条有些棘手。如我们所看到的,如果在业务处理过程中抛出异常,消息会被重新发送。如果我们遵循上面的范例就不成问题:如果有任何异常,则回滚事务,并进一步重新抛出异常给J2EE服务器。当然,如果错误真的不可恢复,就应当采取措施防止无限的消息发送。这可以在MOM层完成(例如,停用消息队列、重新发送限制等等),或者,如我们将演示的,可以在应用层处理。

  如果先前的处理尝试以事务回滚结束,则消息重新发送就不是问题。从应用程序的角度来说,下一次的发送尝试应当作为一条新的消息对待(同一条消息先前的处理尝试结果是不可见的,因为事务被回滚了),所以在这种情况下不会有什么问题,没有任何特别的处理规则。

  还要分析在不同组件发生灾难性故障的情况下会发生什么——例如,如果J2EE服务器崩溃——从我们要求的服务质量来看,系统在重启后其状态还一致么?发生在消息确认(出现在最后一步)之前的任何故障都会导致消息重新发送。因此我们只需要关心一种故障场景,也就是J2EE服务器或MOM(或者它们之间的连接)在代码成功完成处理并提交BMT事务之后、J2EE服务器确认消息之前出现故障。这会使系统处于不一致状态:从应用程序的角度看,消息被成功处理;但是从MOM的角度看,消息还没有成功发送(由于故障而没有被确认)。这会导致重复的消息发送和处理,从而违反了我们的服务质量条约。可以证明,由于事务提交和确认之间的时间间隔应该非常短,这种故障场景不会经常发生。尽管如此,如果系统处于高负载下,同时处理的消息没有几百个可能也有几十个,那么在建立恢复机制的时候就应当考虑这种情况。

  有必要再次强调,我们所讨论的是最严格的服务质量水平(once and only once),因而从如此少见的情况下的副本中恢复是必需的。如果您的用例可以使用更宽松的服务质量水平(例如,once or more)开发,就可以跳过下面描述的恢复机制。

  总而言之,当使用非事务型BMT消息检索时,为了保证once-and-only-once 服务质量,我们需要控制消息的重新发送行为(根据EJB规范,这通过从onMessage()方法抛出RuntimeException完成),还需要一些机制来防止在消息处理的特定阶段(在从onMessage()调用返回与消息确认之间)发生故障时处理重复消息。下面我们将讨论防止处理重复消息。

  重复发送侦测和预防

  在前面的部分中,我们看到,当非事务型地消费消息,并且要求once-and-only-once的服务质量时,应用程序需要引入特殊的机制来防止重复处理,这种重复处理在一个或多个组件在事务提交和消息通知两步之间发生故障这一罕见的场景下会发生。这种机制必须足够灵活以应付不同的部署场景;例如,如果应用程序部署在集群上,则重新发送消息的节点可能与最初发送此消息的节点不同。为了发现重复以防止消息被再次处理,应用程序应该能够通过某种方法识别消息已经被处理过了。

  在数据库实例中标识已经处理过的消息似乎比较合理,数据库实例(通常)被集群中所有节点共享。我们可以用JMSMessageID作为消息的独有标识符,根据JMS文档,JMSMessageID是一个字符串,它在历史库中是惟一标识消息的键。虽然惟一性的确切范围是由提供者定义的,但是它至少可以覆盖一个特定提供者安装中的所有消息,这里的安装指的是某种消息路由器的互连集合。为了安全起见,我们还将使用JMSTimestamp。JMSTimestamp字段包含了将消息转交给提供者以供发送的时间。这两个字段的值结合起来,就可以保证惟一地标识每条消息,即使它们来自不同的提供者。

  显然,尽可能地将重复侦测机制设计得快速且高效是非常有益的,所以我们会完全避免使用SQL选择操作,而是创建一个带有JMSMessageID和JMSTimestamp字段中定义的组合主键的表。然后当随后试图插入具有相同主键的值的时候,由于违反数据的完整性,就可以侦测到重复记录,这样就可以侦测并进行适当的处理。下面是一个Oracle数据库的DDL脚本:

CREATE TABLE message_log
    (message_id VARCHAR2(255) NOT NULL,
    message_ts INTEGER NOT NULL,
  CONSTRAINT pk_message_log
    PRIMARY KEY (message_id, message_ts)
);

  实现非事务型消息消费者模型

  让我们尝试构建一套工具来简化非事务型消息的处理过程。由于涉及到许多横切功能,利用AOP方法会比较好。下面,我们将演示如何利用Spring Framework的AOP特性来实现具有once-and-only-once服务质量的非事务型消息处理所需的所有逻辑。

  Spring AOP Framework的强大功能在其他文章中已经有所论述(参见参考资料)。它使我们可以声明式地定义一组方面,每个方面为执行流提供特定的功能。产生的代码看起来非常结构化,并且每个部分可以独立测试,从而覆盖到特定于每个组件的所有用例。

  关于Spring AOP Framework的基本介绍请参考Using the Spring AOP Frameworkwith EJB Components (中文版,dev2dev,2006年2月)。在Spring中使用AOP的首选方式是利用可以由Spring代理的组件接口。虽然JMS接口MessageListener和Message也可以用于此目的,但是定义一些更通用且与JMS接口解耦合的接口实际上会很有意义。例如,可以如下声明MessageProcessor接口:

public interface MessageProcessor {
   public Object process(MessageData messageData);
}

  注意,process()方法带有一个MessageData参数,这个参数封装了来自原始消息的所有数据。可以实现Spring的MessageConverter,将Message实例转换成MessageData,并扩展AbstractJmsMessageDrivenBean来实现通用处理逻辑,即:

  • JMS消息到MessageData的转换
  • 从应用程序上下文获得处理器实例
  • 消息处理器的执行
  • 异常处理

  可以从下载部分下载完整的源代码。以下是MessageDataDrivenBean的可能实现:

public abstract class MessageDataDrivenBean extends
                       AbstractJmsMessageDrivenBean {
  private MessageConverter messageConverter =
                              new MessageDataConverter();
  private MessageProcessor messageProcessor;

  protected void onEjbCreate() {
    messageProcessor = ((MessageProcessor) 
                   getBeanFactory().getBean(getName()));
  }

  public void onMessage(Message message) {
    try {
      MessageData messageData = (MessageData) 
            getMessageConverter().fromMessage(message);
      this.messageProcessor.process(messageData);
    
    } catch( MessageConversionException ex) {
      String msg = "Message conversion error; "+
                                     ex.getMessage();
      this.logger.error(msg, ex);
      throw new RuntimeException(msg, ex);
      
    } catch( JMSException ex) {
      String msg = "JMS error; "+ex.getMessage();
      this.logger.error(msg, ex);
      throw new RuntimeException(msg, ex);

    }
  }
  
  protected MessageConverter getMessageConverter() {
    return this.messageConverter;
  }
  
  protected abstract String getName();
  
}

  具体的子类只需要提供一个返回getName()方法的实现,当在Spring上下文中定义MessageProcessor接口时,该方法返回接口特定实现的bean名字。子类也可以提供自己的MessageConverter,使用不同的策略填充MessageData。以下是MessageProcessor的一个简单实现:

public class SimpleMessageProcessor implements MessageProcessor {

  private Log log = LogFactory.getLog(getClass());

  public Object process( MessageData messageData) {
    log.info(messageData);
    return null;
  }

}

  最后,具体的MDB看起来如下所示。注意,我们使用Xdoclet注释来声明部署描述符的元数据:

/**
 * SimpleMdb
 * 
 * .bean 
 *   name="org.javatx.mdb.SimpleMdb" 
 *   type="MDB"
 *   destination-type="javax.jms.Queue" 
 *   transaction-type="Bean"
 * 
 * .pool 
 *   initial-beans-in-free-pool=
 *                "" 
 *   max-beans-in-free-pool=
 *                ""
 * 
 * .message-driven 
 *   connection-factory-jndi-name=
 *                  "-e"
 *   destination-jndi-name=
 *                  "-e"
 *   jms-polling-interval-seconds=
 *                  ""
 *   
 * .env-entry
 *   name="BeanFactoryPath" 
 *   value="applicationContext.xml"
 */
public class SimpleMdb extends MessageDataDrivenBean {

  protected String getName() {
    return "simpleProcessor";
  }
  
}

  在上述代码中,BeanFactoryPath的env-entry被Spring的EJB类用来定位应用程序上下文。应用程序上下文中应该有simpleProcessor bean的声明,这个bean会处理所有的处理逻辑,以及非功能性的需求,比如:事务、防止消息的重复处理以及可选的跟踪和性能监控。

  显然,将所有非功能方面移到通知中,并利用包装了MessageProcessor实际实现的ProxyFactoryBean来定义拦截器链是很有意义的。定义可能如下所示:

<bean id="simpleProcessor" 
      class="org.springframework.aop.framework.ProxyFactoryBean">
  <property name="target">
    <bean class="SimpleMessageProcessor"/>
  </property>
  <property name="proxyInterfaces" 
               value="org.javatx.mdb.MessageProcessor"/>
  <property name="interceptorNames">
    <list>
      <idref local="mdbTransactionAdvisor"/>
      <idref local="mdbDuplicateHandlingAdvisor"/>
      <!-- optional monitoring and tracing -->
      <idref local="messageProcessorPerformanceMonitorAdvisor"/>
      <idref local="messageProcessorTraceAdvisor"/>
    </list>
  </property>
</bean>

  图1中的顺序图说明了消息处理过程以及支持该服务质量模型所需的advisor堆栈:

图 1.处理传入消息的advisor堆栈(单击图像查看大图)


图 1.处理传入消息的advisor堆栈(单击图像查看大图)

  实现消息拦截器

  现在我们来仔细看一下mdbTransactionInterceptor和mdbDuplicateHandlingAdvisor,它们使用上述方法提供了保证服务质量所需的功能。

  mdbTransactionAdvisor是利用标准的Spring TransactionInterceptor以及process()方法的PROPAGATION_REQUIRES_NEW事务属性定义的。

<bean id="mdbTransactionAdvisor" 
      class="org.springframework.transaction.interceptor.TransactionInterceptor">
  <property name="transactionManager" ref="transactionManager"/>
  <property name="transactionAttributes">
    <props>
      <!-- PROPAGATION_NAME,ISOLATION_NAME,readOnly,
                        timeout_NNNN,+Exception1,-Exception2 -->
      <prop key="process">PROPAGATION_REQUIRES_NEW,timeout_300</prop>
    </props>
  </property>
</bean>

  在WebLogic Server中,可以将Spring包装器用作服务器JNDI中javax.transaction.UserTransaction所公开的平台事务管理器,并定义应用程序上下文如下:

<bean id="transactionManager"  class=
 "org.springframework.transaction.jta.WebLogicJtaTransactionManager">
 <property name="userTransactionName" 
              value="javax.transaction.UserTransaction"/>
 <property name="autodetectTransactionManager" 
              value="false"/>
</bean>

  链中的下一个通知是mdbDuplicateHandlingAdvisor。因为它还要将一些独有键保存到数据库表中,所以需要一个数据源:

<bean id="mdbDuplicateHandlingAdvisor" 
        parent="messageProcessorAdvisor">
 <property name="advice">
  <bean class="org.javatx.mdb.MdbDuplicateMessagesHandlingAdvice">
    <!-- DataSource for duplicates table -->
    <constructor-arg index="0" ref="dataSource"/> 
  </bean>
 </property>
</bean>

  注意,虽然有可能为重复处理通知指定任意数据源,但是最好还是考虑一下当前的底层业务代码在做什么。例如,如果业务代码也使用了一个数据源,最好在通知中使用同一个——这样可能会减少事务中使用的不同XAResource数,而且,如果没有使用其他XAResource的话,J2EE服务器甚至可以使用单阶段提交优化,从而实际上使事务获得与本地相当的性能。MdbDuplicateMessagesHandlingAdvice的实现可能如下:

public final class MdbDuplicateMessagesHandlingAdvice 
                             implements MethodInterceptor {
 private final Log log = LogFactory.getLog(getClass());
 private final DuplicatesDao duplicatesDao;
 
 public MdbDuplicateMessagesHandlingAdvice(DataSource ds) {
   this.duplicatesDao = new DuplicatesDao(ds);
 }

 public Object invoke(MethodInvocation invocation) 
                                     throws Throwable {
   Object o = invocation.getArguments()[0];
   
   MessageData messageData = (MessageData) o;
   try {
     this.duplicatesDao.run(messageData);
     this.log.debug("Duplicate check ok");
   } catch (DataIntegrityViolationException e) {
     // Record already exists in database - duplicate message!
     // Log with appropriate severity or do whatever
     // other action is appropriate
     this.log.warn("Duplicate message, skipping processing: "+
         messageData+"; "+e.getMessage());
     // If we return from here, swallowing exception, message 
     // would be considered acknowledged    
     // and no redelivery would occur, that's exactly what we need.
     return null;
   }

   return invocation.proceed();
 }
}

  消息标识符持久化在DuplicatesDao中执行。这个DAO类扩展了Spring的SqlUpdate类,并将关键的消息属性插入到数据库表中,数据库表的主键由消息ID和时间戳字段定义,这样任何试图插入具有相同消息ID和时间戳的行都将由于违反DB的约束而导致失败(Spring会将其转换成一个运行时异常DataIntegrityViolationException)。

private static class DuplicatesDao extends SqlUpdate {
    private static final String SQL = 
      "INSERT INTO " +
        "MESSAGE_LOG(MESSAGE_ID, MESSAGE_TS) " +
        "VALUES(?, ?)";

    public DuplicatesDao(DataSource ds) {
        super(ds, SQL);
        declareParameter(new SqlParameter(Types.VARCHAR));
        declareParameter(new SqlParameter(Types.NUMERIC));
        setMaxRowsAffected(1);
    }

    public void run(MessageData data) {
        update(new Object[] {
            data.getMessageId(), 
            new BigDecimal(Long.toString(data.getTimestamp()))});
    }
}

  前面已经提到,链中的其他通知是可选的,但是如果需要的话,可以用于诊断和监控。Spring Framework提供了几种通用的实现,在应用程序上下文中可以声明如下:

<bean id="messageProcessorTraceAdvisor" 
         parent="messageProcessorAdvisor">
  <property name="advice">
    <bean class="org.springframework.aop.interceptor.SimpleTraceInterceptor">
      <property name="useDynamicLogger" value="true"/>
    </bean>
  </property>
</bean>

<bean id="messageProcessorPerformanceMonitorAdvisor" 
         parent="messageProcessorAdvisor">
  <property name="advice">
    <bean class="org.springframework.aop.interceptor.PerformanceMonitorInterceptor">
      <property name="useDynamicLogger" value="true"/>
      <property name="prefix" value="messageProcessor"/>
    </bean>
  </property>
</bean>

  基本上就是这样。上述通知将处理消息处理的所有非功能性方面(如:事务以及所需的服务质量),所以MessageProcessor的实现只需要考虑消息处理的功能性需求(业务逻辑)。

  重发送与重试策略

  上述几个方面在发生处理错误(包括应用程序错误和/或诸如JDBC、JMS或JCA之类的事务型资源故障)时会将事务完全回滚。通常消息服务器(例如WebLogic Server JMS、Mqseries)会重发送消息,允许组件重试,如果有一个“受污染”的消息,它由于某种原因不能被处理,那么就可能导致死循环。

  为了解决这个问题,必须对需要尝试处理消息的次数加以限制。所有的企业级消息传递服务器都允许配置每个队列的重试次数并定义超过重试次数时所执行的行为(例如,将消息移到专门的“死信”队列)。由于数据库服务器或者所需的其他资源所出现的故障,许多消息可能会被放在“死信”队列中。因此,当故障解决以后,这些消息可以重新放到处理队列中等待另一次重试。有许多企业监控应用程序支持这种特性。

  如果由于某种原因,不适合使用“死信”队列,那么可以在应用程序端实现一个简单的监护通知。可以在MessageProcessor执行流的开始处实现这个通知。

  实际的实现可以随着希望对失败消息执行的操作而有所不同,但是必须考虑到在故障过程中有些资源很可能不可用——例如,此时数据库可能不可用。以下的例子将以严格的格式将失败的消息转储到日志文件中。实现一个从这样的日志文件读取消息并重新提交的工具并不困难。

public final class MdbRetryCountAdvice 
                               implements MethodInterceptor {
  private Log log = LogFactory.getLog(getClass());
  private int retryCount;

  public void setMaxRetryCount(int retryCount) {
    this.retryCount = retryCount;
  }
  
  public Object invoke(MethodInvocation methodInvocation) 
                                               throws Throwable {
    Object o = methodInvocation.getArguments()[0];

    try {
      return methodInvocation.proceed();
    } catch (Throwable e) {
      MessageData data = (MessageData) o;
      if (data.getDeliveryCount() >= this.retryCount) {
        this.log.fatal("Retry count exceeded. Message discarded."
                                                      + data, e);
        return null;
      }
      // assume business logic logged exception already 
      // so no stack trace here.
      throw e;
    }
  }
}

  此外,因为有处理消息重复的机制,您可能会考虑将MDB以DUPS_OK_ACKNOWLEDGE模式部署,而不是使用默认的AUTO_ACKNOWLEDGE模式。从理论上讲,这样可以提升性能,因为MOM可以花费较少的资源来保证在发送过程中没有重复。而实际的性能影响会依不同的MOM提供商而不同,所以我们建议运行一些负载测试来看一下实际的数字。对于IBM WebSphere MQ,我们没有发现两种模式之间的任何性能差别,但是您的情况可能不一样。

  业务对象池化

  因为在处理消息的过程中,如果有错误发生,我们的模式会抛出一个RuntimeException 异常,所以需要理解的一点是,根据EJB规范,在这种情况下,J2EE服务器应该丢弃这个MDB实例。在大多数情况下这都不成问题,因为在我们的实现中具体的MDB是相当轻量级的——几乎没有什么代码。但是,对MDB正在使用的实际业务对象必须多加小心。有些时候,出于各种原因不能使用业务逻辑的单元素实现(一种最常见的情况是,在业务逻辑处理器中使用非线程安全的XML解析器)。在这种情况下,应该在Spring的上下文中将业务对象定义成原型而非单元素,因为这样定义的对象能被多个MDB实例同时调用。

  我们的MessageDataDrivenBean 类设计已经为处理非单元素bean而进行了优化;您可以看到,在每个MDB示例中只执行了一次bean查找,即在onEjbCreate()方法中。因为EJB规范保证了MDB的单线程执行,所以可以放心地使用非单元素查找,并在每次调用时降低bean的实例化开销。

<!-- Note that this defines a prototype -->  
<bean id="simpleProcessorImpl" singleton="false"
      class="org.javatx.mdb.SimpleMessageProcessor"/>

<bean id="simpleProcessor" 
      class="org.springframework.aop.framework.ProxyFactoryBean">
  <property name="targetSource" ref="simpleProcessorImpl"/>
  <property name="proxyInterfaces" 
            value="org.javatx.mdb.MessageProcessor"/>
  <property name="interceptorNames">
    <list>
      <idref local="mdbRetryCountAdvisor"/>
      <idref local="mdbTransactionAdvisor"/>
      <idref local="mdbDuplicateHandlingAdvisor"/>
      <idref local="messageProcessorPerformanceMonitorAdvisor"/>
      <idref local="messageProcessorTraceAdvisor"/>
    </list>
  </property>
</bean>

  如果想要进一步优化以避免创建过多的业务逻辑bean实例(例如,XML解析器以及其他创建开销大的对象),可以利用Spring对目标源池化的支持。Spring可以维护一个同一实例池,每个新方法调用都从池中取出空闲对象。这也不需要对代码做任何改动。只需在应用程序上下文中将SimpleProcessor定义为非单元素,定义一个池化的目标源(可以指定池的大小和其他参数),然后修改simpleProcessor代理定义以使用池化的目标源:

<!-- Note that this defines a prototype -->  
<bean id="simpleProcessorProtImpl" singleton="false"
      class="org.javatx.mdb.SimpleMessageProcessor"/>

<bean id="simpleProcessorPooled" 
   class="org.springframework.aop.target.CommonsPoolTargetSource">
 <property name="targetBeanName" value="simpleProcessorProtImpl"/>
 <property name="maxSize" value="5"/>
</bean>>  

<bean id="simpleProcessor" 
      class="org.springframework.aop.framework.ProxyFactoryBean">
  <property name="targetSource" ref="simpleProcessorImpl"/>
  ...
</bean>

  这一简单的改动可以帮助您获得更好的应用程序性能,尤其是如果预料到消息处理过程会经常失败的话,因为当从onMessage()方法及其依赖关系中抛出运行时异常时,MDB实例就会被丢弃。注意,Spring使用了Jakarta Commons Pool作为其池化实现,所以您的应用程序必须包含commons-pool jar。

  下载

  您可以下载与本文相关的源代码: custom-mdb-processing-source.zip

  结束语

  正如您所看到的,我们成功实现了以非事务型方式处理消息消费并保持once-and-only-once服务质量的目标。我们相信在转向BMT以后,应用程序开发人员将获得更多对应用程序行为和事务处理的控制权,并同时享受明显的性能提升。这一转换的另一个作用是,如果只打算消费消息,那么您不必关心所使用的MOM是否具有对XA事务的健壮支持。

  虽然乍一看好像代码变得更多更复杂了(如文中所示),但是该解决方案可以使用Spring的AOP框架以一种非常结构化的方法实现,这使您可以利用一些相互独立的通知将一些非功能需求的单个方面组合起来。每个通知负责处理功能的一个特定方面。但是,将其组装在一个Spring应用程序上下文中,就可以管理消息处理的所有非功能性需求,包括事物处理、重复处理、“被污染”消息的处理,以及常见的跟踪、日志记录和性能监控。

------分隔线----------------------------