本文共 13744 字,大约阅读时间需要 45 分钟。
本系列的向您介绍了整个系列中使用的交易系统场景。在中,您构建并测试了该交易系统应用程序。在第 3 部分中,您将了解如何使用该交易系统场景作为示例,将 WebSphere Message Broker(以下称为 Message Broker)集成到业务事件处理解决方案中。
将 Message Broker 与 WebSphere Business Events(以下称为 Business Events)集成的优点是什么呢?Message Broker 提供了用于业务应用程序集成和消息转换的智能 SOA 方法。但是,业务经常还需要业务事件处理。例如,尽管集成的应用程序可以通过 Message Broker 自由地彼此发送消息,但是您的业务还需要检测复杂的消息模式(例如,您的其中一个应用程序在一段时间内似乎不是活动的)。WebSphere Business Events(以下称为 Business Events)可以检测这样的模式,自动生成消息,并将消息发送到 Message Broker 以便能够纠正该情况。
|
本文中使用的场景基于一个交易系统,其中有一个 Business Events 应用程序处理买入和卖出交易事件。如果同一个客户在一小时内买入并卖出同一股票,则 Business Events 系统将生成一个 SellAfterBuy 操作。对于特定客户在一天内执行的每三个 SellAfterBuy 操作,无论是对哪些股票执行的,都会生成一个 SpeculativeCustomer 操作。有关该场景的更多信息,请参阅。
在本文中,我们将使用您在中创建的交易系统应用程序,对其进行增强以包括与 Message Broker 的集成,并使用 Message Broker 接触点发送事件和使用操作。
图 1 显示了本文中描述的 Message Broker 和 Business Events 集成场景的大致体系结构:
在图 1 中,该集成体系结构具有五个主要组件,其中三个组件包含在 WebSphere Message Broker“超级组件”中:
我们的集成场景遵循以下原则:
这些原则确保 Message Broker 超级组件和客户端除了需要知道如何使用标准接口进行通信以外,均不需要有关 Business Event 组件的任何特定知识,以便对 Business Event 组件实现的任何可能的改动不会影响 WebSphere Message Broker 超级组件。
您可以使用本文描述的过程从头构建您自己的项目。该解决方案的 Message Broker 项目交换文件和 Business Events 项目文件包括在部分。您可以将这些文件分别导入 Message Broker Toolkit 和 Business Events。但是,您仍然需要完成 部分中描述的手动配置。
|
我们将使用 WebSphere MQ 作为 Message Broker 与 Business Events 之间的消息以及 Message Broker 与交易系统客户端之间的消息的 JMS 提供者。在下面的部分中,您将配置 WebSphere MQ JMS 并创建必需的队列管理器和队列。
JMS 队列用于在 Message Broker 与 Business Events 之间传递消息。要配置 WebSphere MQ JMS,请完成以下操作步骤:
INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactorPROVIDER_URL=file:/C:/JNDI-Directory. |
# Define a QueueConnectionFactory# Only parameters being overridden from their default values are specified.# This sets up a MQ client binding.DEF QCF(qcf1) +TRANSPORT(CLIENT) +QMANAGER(WBRK61_DEFAULT_QUEUE_MANAGER) +HOSTNAME(127.0.0.1)+PORT(2414)#DEF Q(wbe_input_queue) +QUEUE(WBE_INPUT_QUEUE) +QMANAGER(WBRK61_DEFAULT_QUEUE_MANAGER)#DEF Q(wbe_output_queue) +QUEUE(WBE_OUTPUT_QUEUE) +QMANAGER(WBRK61_DEFAULT_QUEUE_MANAGER)#DEF Q(wbe_backout_queue) +QUEUE(WBE_BACKOUT_QUEUE) +QMANAGER(WBRK61_DEFAULT_QUEUE_MANAGER)END |
此文件声明该队列连接工厂名称为 qcf1,并指定了三个 JMS 队列:wbe_input_queue、wbe_output_queue 和 wbe_backout_queue。对应的 WebSphere MQ 队列分别为 WBE_INPUT_QUEUE、WBE_OUTPUT_QUEUE 和 WBE_BACKOUT_QUEUE。Message Broker 输出流(将在稍后描述)中的 JMSInput 节点需要 wbe_backout_queue,用于该流在无法成功处理消息时所撤销的消息。
jmsadmin < your_definition_file |
使用 WebSphere MQ Explorer 创建一个 WebSphere MQ 队列管理器。如果您使用任何现有的 MQ 队列管理器,例如可能在安装 Message Broker 时已经为您创建的 WBRK61_DEFAULT_QUEUE_MANAGER,则可以跳过此步骤。否则,请启动 WebSphere MQ Explorer 并右键单击 Queue Managers,然后单击 New => Queue Manager 创建新的队列管理器。
如果您使用 WBRK61_DEFAULT_QUEUE_MANAGER,则应该看到如图 2 所示的队列。
如果您是创建新的队列管理器,则需要自己创建以下队列:
例如,要创建 WBE_INPUT_QUEUE,请右键单击 WBRK61_DEFAULT_QUEUE_MANAGER 下面的 Queues,并选择 New => Local Queue。
|
Business Events 要求输入事件和出站操作对象符合一般 XML 格式,如下所述:
您开发的 Message Broker 流需要使用如下事件消息示例所示的消息格式与 Business Events 交互:
String_CustomerID StockA 2008-04-10T14:01:55Z 9.9 9.9 |
图 3 演示了该输入流,其中使用了 MQInput 节点来从 MQ 队列读取 MQ 消息。输入消息被传递到 TradeFilter 节点以检查输入消息是否属于某些类型,这些类型指示是否应该将输入消息作为事件传递给 Business Events。如果是,则将消息传递给 CreateEvent 节点,后者将消息转换为 Business Events 能够接受的格式。然后 MQJMSTransform. 将转换后的消息从 MQ 消息转换为可由 Business Events 读取的 JMS 消息。然后 JMSOutput 节点将 JMS 消息(现在是 Business Events 事件)发送到 Business Events。为简单起见,无关的消息将发送到 UnknownMessageTypeNode 节点,后者将消息存储在某个 MQ 队列中。
完成以下步骤创建如图 3 所示的消息流:
输入消息的正文预期应该具有根元素 ,此根元素包含五个子元素:、、、 和 。下面是这样的输入消息的示例:
BuyString_CustomerIDStockA9.99.9 |
元素的有效值为 Buy 和 Sell。我们将在稍后的阶段中创建 Buy 和 Sell 消息。
CREATE FUNCTION Main() RETURNS BOOLEANBEGIN DECLARE myref REFERENCE TO Body.Trade.Type; RETURN (myref='Buy') OR (myref='Sell');END; |
CREATE FUNCTION Main() RETURNS BOOLEANBEGIN CALL CopyMessageHeaders(); -- create SET OutputRoot.XML.connector.(XML.Attribute)name = 'Trade System'; SET OutputRoot.XML.connector.(XML.Attribute)version = '2.2'; -- create SET OutputRoot.XML.connector."connector-bundle".(XML.Attribute)name = CAST (InputRoot.XML.Trade.Type AS CHARACTER); SET OutputRoot.XML.connector."connector-bundle".(XML.Attribute)type = 'event'; -- create SET OutputRoot.XML.connector."connector-bundle"."connector-object".(XML.Attribute)name = 'TradeObject'; -- create "CustomerID" field SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[1] = InputRoot.XML.Trade.CustomerID; SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[1].(XML.Attribute)name = 'CustomerID'; SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[1].(XML.Attribute)type = 'String'; -- create "StockID" field SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[2] = InputRoot.XML.Trade.StockID; SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[2].(XML.Attribute)name = 'StockID'; SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[2].(XML.Attribute)type = 'String'; -- create "Date" field SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[3] = CAST (CURRENT_DATE AS CHARACTER FORMAT 'IU'); SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[3].(XML.Attribute)name = 'Date'; SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[3].(XML.Attribute)type = 'DateTime'; -- create "Quantity" field SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[4] = InputRoot.XML.Trade.Quantity; SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[4].(XML.Attribute)name = 'Quantity'; SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[4].(XML.Attribute)type = 'Real'; -- create "Price" field SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[5] = InputRoot.XML.Trade.Price; SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[5].(XML.Attribute)name = 'Price'; SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[5].(XML.Attribute)type = 'Real'; RETURN TRUE;END; |
图 5 显示了 Message Broker 输出流。JMSInput 节点读取 Business Events 生成的操作。JMSMQTransform. 节点将消息转换为 MQ 消息。然后 RouteReply 节点将转换后的消息路由到其中一个适当的 MQOutput 节点。
完成以下步骤创建如图 5 所示的输出消息流:
尽管我们是在使用中创建的 Business Events 应用程序,但是由于使用 Business Events 消息队列连接连接器来连接到 Message Broker,您需要修改该应用程序的事件和操作的连接器参数。为此,请完成以下操作步骤:
完成以下步骤为 Trade System 接触点中的 Buy、Ack、Sell After Buy 和 Speculative Customer 操作配置连接:
|
现在您需要测试已创建的 Message Broker 流。要测试流,请执行以下操作步骤:
mqsicreatebroker -i -a -q -n |
mqsicreateconfigmgr -i -a -q |
BuyString_CustomerIDStockA9.99.9 |
SellString_CustomerIDStockA9.99.9 |
|
为简单起见,本文描述的解决方案使用 Business Events 内置的消息队列连接连接器来连接到 Message Broker 和 Business Events。这意味着将 Message Broker JMS 消息放在作为 Message Broker 和 Business Events 中介的 JMS 队列中,并从中读取消息。就性能而言,这不是最佳的解决方案,因为中间的消息队列连接连接器引入了额外的消息开销。这是因为事件消息由消息队列连接连接器转发到 Business Events 引擎,而不是直接发送到该引擎。
一种简单的解决方案是配置 Message Broker 输入流中的 JMSOutput 节点将事件直接发送到 Message Broker jms/eventTopic 或 jms/durableEventTopic 主题,并配置 Message Broker 输出流中的 JMSInput 节点直接从 jms/actionTopic 或 jms/durableActionTopic 接收 JMS 消息。但是,这样的配置当前不支持 Business Events ResultEvent,因此将把所有操作发送到 JMSInput 节点。您还可以使用其他连接支持(例如 HTTP/SOAP/File)连接到其他 Business Events 内置连接器。
|
在本文中,您了解了如何快速配置 Message Broker 和 Business Events 以实现互操作性。我们使用了示例 Broker 消息来描述如何为 Business Events 构造事件消息并由 Broker 消息流将其发送到 Business Events,以及如何从 Business Events 接收操作消息并由 Broker 消息流对其进行处理。
来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/14789789/viewspace-610855/,如需转载,请注明出处,否则将追究法律责任。
转载于:http://blog.itpub.net/14789789/viewspace-610855/