例子:消息驱动bank
为了阐述我们的观点,我们将开发和安装一个完整的样板应用程序:一个消息驱动的银行系统. 通过(幸亏有spring)改进的基于pojos的编程模型和保留相同的事务,我们可以不需要ejb或者一个应用服务器来实现这个系统。在下一个部分,我们将从消息驱动架构产生到另一个架构.就像基于web的架构一样.图1展示我们的样本应用程序的架构.

figure 1. architecture of the message-driven bank
在我们的例子中,我们将处理来自java消息服务队列的银行定单.一张定单的处理包括通过jdbc来更新当前帐户的数据库.为了避免信息的丢失和重复,我们将使用jta和jta/xa事务来配合更新:处理信息和更新数据库将发生在一个原子事务里.资源部分可得到jta/xa的更多信息.
编写应用程序代码
该应用程序将由两个java类组成: bank(一个dao)和messagedrivenbank.如图2.

figure 2. classes for the message-driven bank
bank是一个数据访问对象,这个对象封装数据库访问。messagedrivenbank是一个消息驱动façade并且是dao的委托.与典型的j2ee方法不同,这个应用程序不包括ejb类.
第一步:编写bank dao
如下, bank源代码是很直接和简单的jdbc操作.
package jdbc;
import javax.sql.*;
import java.sql.*;
public class bank
{
private datasource datasource;
public bank() {}
public void setdatasource ( datasource datasource )
{
this.datasource = datasource;
}
private datasource getdatasource()
{
return this.datasource;
}
private connection getconnection()
throws sqlexception
{
connection ret = null;
if ( getdatasource() != null ) {
ret = getdatasource().
getconnection();
}
return ret;
}
private void closeconnection ( connection c )
throws sqlexception
{
if ( c != null ) c.close();
}
public void checktables()
throws sqlexception
{
connection conn = null;
try {
conn = getconnection();
statement s = conn.createstatement();
try {
s.executequery (
"select * from accounts" );
}
catch ( sqlexception ex ) {
//table not there => create it
s.executeupdate (
"create table accounts ( " +
"account varchar ( 20 ), " +
"owner varchar(300), " +
"balance decimal (19,0) )" );
for ( int i = 0; i < 100 ; i++ ){
s.executeupdate (
"insert into accounts values ( " +
"'account"+i +"' , 'owner"+i +"', 10000 )"
);
}
}
s.close();
}
finally {
closeconnection ( conn );
}
//that concludes setup
}
//
//business methods are below
//
public long getbalance ( int account )
throws sqlexception
{
long res = -1;
connection conn = null;
try {
conn = getconnection();
statement s = conn.createstatement();
string query =
"select balance from accounts where account='"+
"account" + account +"'";
resultset rs = s.executequery ( query );
if ( rs == null || !rs.next() )
throw new sqlexception (
"account not found: " + account );
res = rs.getlong ( 1 );
s.close();
}
finally {
closeconnection ( conn );
}
return res;
}
public void withdraw ( int account , int amount )
throws exception
{
connection conn = null;
try {
conn = getconnection();
statement s = conn.createstatement();
string sql =
"update accounts set balance = balance - "+
amount + " where account ='account"+
account+"'";
s.executeupdate ( sql );
s.close();
}
finally {
closeconnection ( conn );
}
}
}
注意:代码并没有依赖ejb或任何专门的应用程序服务器.实际上,这是一个纯java代码,这个java代码是能在任何j2se环境下运行的.
你同时应注意:我们使用了来自jdbc的datasource接口.这意味着我们的类是独立于目前jdbc供应商提供的类. 你可能会疑惑,这怎么能与特定的数据管理系统(dbms)提供商的jdbc实现紧密结合呢? 这里就是spring框架帮你实现的. 这个技术被称为依赖注入:在我们的应用程序的启动期间,通过调用setdatasource方法,spring为我们提供了相应的datasource对象.在后面几部分我们会更多地提到spring.如果我们在以前使用应用程序服务器,我们将不得不借助于java命名绑定接口(jndi)查询.
除了直接使用jdbc,我们也可以使用hibernate或者一个jdo工具来实现我们的持久层.这同样不需要任何的ejb代码.
第二步:配置bankdao
我们会将便用spring框架来配置我们的应用程序.spring不是必需的,但是使用spring的好处是我们将可以简单的添加服务,如:我们java对象的事务和安全.这类似于应用服务器为ejb提供的东西,只是在我们的例子中spring将变得更容易.
spring也允许我们把我们的类从目前的jdbc驱动实现中分离出来:spring能够配置driver(基于我们的xml配置数据)并把它提供给bankdao对象(依赖注入原理).这样可以保持我们的java代码的清淅和集中.这步的spring配置文件如下:
<?xml version="1.0" encoding="utf-8"?>
<beans>
<bean id="datasource"
class="com.atomikos.jdbc.nonxa.nonxadatasourcebean">
<property name="user">
<value>sa</value>
</property>
<property name="url">
<value>jdbc:hsqldb:springnonxadb
</value>
</property>
<property name="driverclassname">
<value>org.hsqldb.jdbcdriver</value>
</property>
<property name="poolsize">
<value>1</value>
</property>
<property name="connectiontimeout">
<value>60</value>
</property>
</bean>
<bean id="bank" class="jdbc.bank">
<property name="datasource">
<ref bean="datasource"/>
</property>
</bean>
</beans>
这个xml文件包括两个对象的配置:访问数据库的datasource和使用这个datasource的bank对象.下面是由spring维护的一些基本任务.
? 创建应用程序(例: bank和datasource)需要的对象(“beans”).在xml文件中给出了这些对象的类名,并且在我们的例子中,这些对象需要有一个公共的无参数constructor (spring也允许参数,但是配置语法上有所不同).这些对象都被命名(xml中的id属性),所以我们后面能够引用这些对象. id也允许我们的应用程序找回它需要的已配置对象.
? 这些对象的初始化是通过在xml文件中的properties的值实现. 在xml文件中这些properties名 应与对应的类中的setxxx方法相对应.
? 将对象连接在一起 :一个property可能是另一个对象(例如:在我们例子中的数据源)的引用,引用可以通过id创建.
注意:在我们下一步中, 我们将选择配置一个jta-enabled的数据源(由atomikos transactions提供,可用于企业和j2se的jta产品,我们将应用于我们的应用程序). 简单起见,我们将使用hypersonicsqldb,这个dbms不需要专门的安装步骤―它是在.jar文件里,就像jta和spring.
但是,考虑到渐增的可靠性需求,强列推荐你使用xa-capable的dbms和jdbc驱动.没有xa的支持, 在crash或重启之后你的应用程序将不能恢复原有数据. 资源部分有链接到关于事务和xa的信息和一些例子.
作为一个练习,你可以试试从hypersonicsqldb转换到firstsql,一个易安装xa-compliant的dbms.换句话说,任何其他为企业准备的和xa-capable的dbms也会做得很好.
第三步:测试bankdao
让我们来测试我们的代码,(使用极限编程的程序员会首先写测试,但因开始不是很清淅,所以我们直到现在才开始写测试.)下面是一个简单的单元测试.这个测试可在你的的应用程序里运行:它通过spring获得一个bank对象来进行测试(这在setup方法中实现).注意:这个测试使用清楚的事务划分:每一个测试开始之前开始一个事务,每个测试结束时强制进行事务回滚.这是通过手工的方式来减少测试对数据库数据的影响.
package jdbc;
import com.atomikos.icatch.jta.usertransactionimp;
import junit.framework.testcase;
import java.io.fileinputstream;
import java.io.inputstream;
import org.springframework.beans.factory.xml.xmlbeanfactory;
public class banktest extends testcase
{
private usertransactionimp utx;
private bank bank;
public banktest ( string name )
{
super ( name );
utx = new usertransactionimp();
}
protected void setup()
throws exception
{
//start a new transaction
//so we can rollback the
//effects of each test
//in teardown!
utx.begin();
//open bean xml file
inputstream is =
new fileinputstream("config.xml");
//the factory is spring's entry point
//for retrieving the configured
//objects from the xml file
xmlbeanfactory factory =
new xmlbeanfactory(is);
bank = ( bank ) factory.getbean ( "bank" );
bank.checktables();
}
protected void teardown()
throws exception
{
//rollback all dbms effects
//of testing
utx.rollback();
}
public void testbank()
throws exception
{
int accno = 10;
long initialbalance = bank.getbalance ( accno );
bank.withdraw ( accno , 100 );
long newbalance = bank.getbalance ( accno );
if ( ( initialbalance - newbalance ) != 100 )
fail ( "wrong balance after withdraw: " +
newbalance );
}
}
我们将需要jta事务来确保jms和jdbc都是原子操作.一般来说,当经常都是两个或多个连接的时候,你应考虑一下jta/xa。例如,在我们例子中的jms和jdbc. spring本身不提供jta事务;它需要一个jta实现或者委派一个应用服务器来处理这个事务.在这里,我们使用了一个jta实现,这个实现可以在任何j2se平台上工作.
最终架构如下面图3.白色方框代表我们的应用程序代码.

figure 3. architecture for the test
如你所看到的,当我们执行我们的测试,将会发生下面的情况:
1. banktest开始一个新事务.
2. 然后,这个test在spring运行期间获得一个bank对象.这步触发sping的创建和初始化过程.
3. 这个test调用bank的方法.
4. bank调用datasource对象,通过它自己的setdatasource 方法从spring 获取这个对像.
5. 这个数据源是jta-enabled,并且与jta实现交互来注册当前事务.
6. jdbc statements和帐户数据库交互.
7. 当方法返回时, test调用事务回滚.
8. jta记得住datasource对象,会命令它进行回滚.
第四步:添加声明式事务管理
spring允许添加声明式事务管理来管理java对象.假设我们想确认bank总是和一个有效的事务上下文一起被调用.我们通过在实际对象的上部配置一个proxy对象. proxy和实际对象有相同接口,所以客户通过完全相同的方式使用它. 配置proxy wrap每个bankdao方法到事务中.结果配置文件如下. 不要被xml的庞大吓倒―大多数内容能通过复制和粘贴到你自己的工程中再使用.
<?xml version="1.0" encoding="utf-8"?>
<beans>
<!--
use a jta-aware datasource
to access the db transactionally
-->
<bean id="datasource"
class="com.atomikos.jdbc.nonxa.nonxadatasourcebean">
<property name="user">
<value>sa</value>
</property>
<property name="url">
<value>jdbc:hsqldb:springnonxadb</value>
</property>
<property name="driverclassname">
<value>org.hsqldb.jdbcdriver</value>
</property>
<property name="poolsize">
<value>1</value>
</property>
<property name="connectiontimeout">
<value>60</value>
</property>
</bean>
<!--
construct a transactionmanager,
needed to configure spring
-->
<bean id="jtatransactionmanager"
class="com.atomikos.icatch.jta.usertransactionmanager"/>
<!--
also configure a usertransaction,
needed to configure spring
-->
<bean id="jtausertransaction"
class="com.atomikos.icatch.jta.usertransactionimp"/>
<!--
configure the spring framework to use
jta transactions from the jta provider
-->
<bean id="springtransactionmanager"
class="org.springframework.transaction.jta.jtatransactionmanager">
<property name="transactionmanager">
<ref bean="jtatransactionmanager"/>
</property>
<property name="usertransaction">
<ref bean="jtausertransaction"/>
</property>
</bean>
<!-- configure the bank to use our datasource -->
<bean id="banktarget" class="jdbc.bank">
<property name="datasource">
<ref bean="datasource"/>
</property>
</bean>
<!--
configure spring to insert
jta transaction logic for all methods
-->
<bean id="bank"
class="org.springframework.transaction.interceptor.transactionproxyfactorybean">
<property name="transactionmanager">
<ref bean="springtransactionmanager"/>
</property>
<property name="target">
<ref bean="banktarget"/>
</property>
<property name="transactionattributes">
<props>
<prop key="*">
propagation_required, -exception
</prop>
</props>
</property>
</bean>
</beans>
这个xml文件告诉spring去配置下面的对象:
1. 需要通过jdbc连接的datasource.
2. 添加jtatransactionmanager和jtausertransaction用于为jta事务的spring配置作准备.
3. springtransactionmanager用于告诉spring需要使用jta.
4. bankdao被重命名为banktarget (因如下解释的原因).
5. bank对象被添加用于包装事务和banktarget的所有方法.我们通过配置bank对象来使用
springtransactionmanager,这意味着所有事务将是jta事务. 每个事务都被设置为propagation_required,这将在任何异常下出现强制回滚.
对这些对象包含的内容,你都可以很容易的复制和粘贴jtatransactionmanager, jtausertransaction, springtransactionmanager到其他工程.其他的是应用程序相关的对象:datasource, banktarget, bank. bank对象很有趣:事实上对于banktarget它是一个proxy;他们拥有相同的接口. trick如下:当我们的应用程序请求spring去配置和返回bank对象,spring实际上将返回proxy(看起来和我们的应用程序完全相同),随后这个proxy将为我们开始/结束事务.这样,应用程序和bank类本身都不需要知道jta!图4阐述了在这步我们所得到的.

figure 4. architecture with declarative jta transactions in spring
现在的工作如下:
1. 应用程序调用bank对象.这将触发spring的初始化处理和返回proxy对象. 对应用程序而言,这个proxy行为和我们的bank是一样的.
2. 当bank的一个方法被调用, 这个调用将会通过proxy进行.
3. proxy使用springtransactionmanager创建一个新事务.
4. springtransactionmanager被配置为使用jta,因些它委派到jta.
5. 调用被forward到bank的实际对象,banktarget.
6. banktarget使用从spring中得到的datasource.
7. datasource对事务进行注册.
8. 通过规则的jdbc访问数据库.
9. 在返回时, proxy终止事务:如果在先前的序列中没有发生异常,那么将会提交终止指令.否则,它将会被回滚.
10. transaction 管理器与数据库配合进行提交和回滚.
在这步中的测试怎样进行?我们可重用banktest 和它清晰的事务划分:因为propagation_required, proxy将和在banktest中创建的事务上下文一起执行.
第五步:编写propagation_required
在这步,我们将添加jms处理逻辑.为了做到这样,我们主要需要实现jms messagelistener接口.我们也会添加公共的setbank方法使spring的依赖注入起作用.源代码如下:
package jms;
import jdbc.bank;
import javax.jms.message;
import javax.jms.mapmessage;
import javax.jms.messagelistener;
public class messagedrivenbank
implements messagelistener
{
private bank bank;
public void setbank ( bank bank )
{
this.bank = bank;
}
//this method can be private
//since it is only needed within
//this class
private bank getbank()
{
return this.bank;
}
public void onmessage ( message msg )
{
try {
mapmessage m = ( mapmessage ) msg;
int account = m.getintproperty ( "account" );
int amount = m.getintproperty ( "amount" );
bank.withdraw ( account , amount );
system.out.println ( "withdraw of " +
amount + " from account " + account );
}
catch ( exception e ) {
e.printstacktrace();
//force rollback
throw new runtimeexception (
e.getmessage() );
}
}
}
第六步:配置messagedrivenbank
这里我们配置messagedrivenbank去监听事务的queuereceiversessionpool.这样给我们可以实现和ejb(没有丢失信息和冗余信息)类似的消息机制,但在这里我们是用简单的pojo对象实现.当向pool中插入一个messagelistener,这个会话池将确保用jta/xa事务接收到消息.结合jta/xa-capable 的jdbc数据源,我们可以实现可靠的消息机制. spring的配置如下:
<?xml version="1.0" encoding="utf-8"?>
<!--
note: no explicit transaction manager bean
is necessary
because the queuereceiversessionpool will
start transactions by itself.
-->
<beans>
<bean id="datasource"
class="com.atomikos.jdbc.nonxa.nonxadatasourcebean">
<property name="user">
<value>sa</value>
</property>
<property name="url">
<value>jdbc:hsqldb:springnonxadb</value>
</property>
<property name="driverclassname">
<value>org.hsqldb.jdbcdriver</value>
</property>
<property name="poolsize">
<value>1</value>
</property>
<property name="connectiontimeout">
<value>60</value>
</property>
</bean>
<bean id="xafactory"
class="org.activemq.activemqxaconnectionfactory">
<property name="brokerurl">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="queue"
class="org.activemq.message.activemqqueue">
<property name="physicalname">
<value>bank_queue</value>
</property>
</bean>
<bean id="bank" class="jdbc.bank">
<property name="datasource">
<ref bean="datasource"/>
</property>
</bean>
<bean id="messagedrivenbank"
class="jms.messagedrivenbank">
<property name="bank">
<ref bean="bank"/>
</property>
</bean>
<bean id="queueconnectionfactorybean"
class="com.atomikos.jms.queueconnectionfactorybean">
<property name="resourcename">
<value>queue_broker</value>
</property>
<property name="xaqueueconnectionfactory">
<ref bean="xafactory"/>
</property>
</bean>
<bean id="queuereceiversessionpool"
class="com.atomikos.jms.queuereceiversessionpool"
init-method="start">
<property name="queueconnectionfactorybean">
<ref bean="queueconnectionfactorybean"/>
</property>
<property name="transactiontimeout">
<value>120</value>
</property>
<!--
default license allows only limited
concurrency so keep pool small
-->
<property name="poolsize">
<value>1</value>
</property>
<property name="queue">
<ref bean="queue"/>
</property>
<property name="messagelistener">
<ref bean="messagedrivenbank"/>
</property>
</bean>
</beans>
因为这篇文章需要一个便于安装的jms服务,所以这里我们使用activemq.如果你正在使用另一个jms实现,那么你将仍然能使用这部分提出的技术.接下来除了datasource和bank对象,我们将增加下面的对象定义:
? xafactory: 为建立jms连接的connection工厂.
? queue: queue代表我们将使用的jms队列, 这个队列被配置成activemq要求的形式.
? queueconnectionfactorybean:一个jta-aware的jms连接器.
? a queuereceiversessionpool for jta-enabled message consumption:注意:我们同时指定了用来调用的初始化方法(例:start);这是spring的另一个特性. start方法在session pool类里定义,它是在spring配置文件中进行配置的.
? messagedrivenbank:负责处理消息.
你可以问问自己事务管理是在哪里进行的.事实上, 在先前部分被添加的对象已消失.为什么呢?因为我们现在使用queuereceiversessionpool来接收来自jms的消息,并且这个类也为每次接收启动一个jta事务.我们也可以保留jta配置,另外添加jms配置, 但是这样可能会使xml文件更长. 现在session pool类将担当事务管理角色.它和proxy方法的工作相似; 只是这个类需要jms messagelistener 为之添加事务. 通过这样配置,在每个消息收接之前程序将启动一个新事务 无论何时, 当我们的消息实例正常返回时, 这个事务将提交. 如果出现runtimeexception, 那么这个事务将回滚. 结构如下面图5(可以清淅地看到一些jms对象).

figure 5. architecture for message-driven applications in spring
现在该架构工作如下:
1. 应用程序调用bank对象和初始化数据库表.
2. 应用程序queuereceiversessionpool, 因此触发一个start方法的调用去监听到达的消息.
3. queuereceiversessionpool在队列中侦察一个新消息.
4. queuereceiversessionpool开始一个新事务,并且注册这个事务.
5. queuereceiversessionpool调用已注册的messagelistener (messagedrivenbank).
6. 这将触发对bank 对象的调用.
7. bank 对象通过datasource访问数据库.
8. datasource注册事务.
9. 通过jdbc访问数据库.
10. 当处理完成时, queuereceiversessionpool会终止这个事务。然后进行commint(除非发生runtimeexception).
11. transaction manager开始消息队列的两阶段提交.
12. transaction manager开始数据库的两阶段提交.
第七步:编写应用程序
因为我们没有使用容器,我们仅仅提供一个java应用程序就可以启动整个银行系统.我们的java应用程序是非常简单: 它有能力找回配置的对象(spring通过xml文件将他们放到一起). 这个应用程序能在任何兼容的jdk(java development kit)上运行,并且不需要应用服务器.
package jms;
import java.io.fileinputstream;
import java.io.inputstream;
import org.springframework.beans.factory.xml.xmlbeanfactory;
import com.atomikos.jms.queuereceiversessionpool;
import jdbc.bank;
public class startbank
{
public static void main ( string[] args )
throws exception
{
//open bean xml file
inputstream is =
new fileinputstream(args[0]);
//the factory is spring's entry point
//for retrieving the configured
//objects from the xml file
xmlbeanfactory factory =
new xmlbeanfactory(is);
//retrieve the bank to initialize
//alternatively, this could be done
//in the xml configuration too
bank bank =
( bank ) factory.getbean ( "bank" );
//initialize the bank if needed
bank.checktables();
//retrieve the pool;
//this will also start the pool
//as specified in the beans xml file
//by the init-method attribute!
queuereceiversessionpool pool =
( queuereceiversessionpool )
factory.getbean (
"queuereceiversessionpool" );
//alternatively, start pool here
//(if not done in xml)
//pool.start();
system.out.println (
"bank is listening for messages..." );
}
}
这就是j2ee!是不是认为j2ee也很容易呢?
对通用性的考虑
这部分里我们看看更多的概念,这些概念在许多j2ee应用程序中是很重要的.我们同样将看到对这些概念来说,一个应用服务器并不是必须的.
集群和可扩展性
健壮的企业应用程序需要集群来分流负担. 在消息驱动应用程序的例子中,这很容易:我们自动地从jms应用程序继承得来处理能力.如果我们需要更强大的处理能力,那么我们只需增加更多连接相同jms服务器的进程.一个对服务性能有效的衡量标准是在队列中停留的消息的数量. 在其他情况下,如基于web的 架构(如下)我们能很容易地使用web环境下的集群能力.
方法级别的安全
一个典型的观点是认为ejb能增加方法级的安全性.虽然并没有在这篇文章中提到,但是在sping中配置方法级别的安全是可能的.这种配置类似于我们增加方法级的事务划分的方式.
对非消息驱动的应用程序的通用性
在不改变源代码的情况下(除了主应用程序类),我们使用的平台能很容易地被整合到任何j2ee web 应用服务器. 换句话说 , 通过jms进行后台处理;这使得web服务器在面对后台处理的延迟问题上更可靠和更独立.在任何情况下, 为了实现容器管理的事务或容器管理的安全性,我们都不再需要依靠ejb容器来实现.
关于容器管理持久化?
存在并被很多开发者检验过的技术例如:jdo或者hibernate 都不一定需要一个应用服务器. 另外,这些工具已经占据了持久化市场.
结论
今天,不需要应用服务器的j2ee已经成为可能,也很容易. 有人或许会说,没有应用程序服务器,有一些应用程序仍不能实现:例如,如果你需要一般的jca(java connectivity api) 功能性, 那么我们上面提供的平台是不够的. 但是,这可能会发生改变,因为不用使用一个应用程序服务器进行开发,测式和部署的好处实在是太大. 人们越来越相信: 将来得j2ee是一个模块化的”选择你所需要”架构. 这与我们之前的完全基于应用服务器的方法相反. 在这样的情况下,j2ee开发者将从应用服务器和ejb中解放出来.
闽公网安备 35060202000074号