本文的目标读者是集成开发人员,其角色涉及使用服务组件体系结构(Service Component Architecture,SCA)集成MQ消息传递引擎。本文将描述如何使用数据处理程序基于MQ标头更改转换逻辑,同时确保处理程序保持协议独立性。涵盖的主题包括:
- 数据处理程序的解释
- 从数据处理程序访问MQ标头
- 基于标头改变数据处理程序的行为
- 确保数据处理程序保持协议独立性
在阅读本文之后,您将能够创作基于MQ标头动态地转换数据的可重用数据处理程序。
数据处理程序
数据处理程序负责将操作的输入和输出类型转换为有线使用的格式,反之亦然。以前,MQ绑定使用了数据绑定执行此功能。数据绑定具有访问有线格式(如MQ消息)的权限,并可以直接对消息进行读取或写入。在JMS绑定上无法使用这种连接到特定绑定类型(例如MQ数据绑定)的每个数据绑定。
数据处理程序提供了使用规范格式转换数据的通用方法。transform方法用于读取数据,并传递以下类型之一来读取输入:
- InputStream-用于读取字节
- Reader-用于读取文本
- Object-用于读取Java对象
transformInto方法用于写入数据,并传入以下类型之一来写入输出:
- OutputStream-用于写入字节
- Writer-用于写入文本
- Object-用于写入Java对象
这样可使消息的有线格式对数据处理程序透明,允许任何受支持的绑定重用它们。
数据处理程序和消息标头
当创作数据绑定时,会将消息标头传入读或写方法,并允许协议标头中的字段确定转换逻辑。不过,对于方法的输入参数不包括协议标头的数据处理程序,我们应如何访问它们?
在WebSphere Enterprise Service Bus (WESB) V6.2中,协议标头存储在名为ContextService的存储库中。ContextService通过使用普通SPI进行访问,并且可以从任何Java代码引用,其中包括POJO、函数选择器或数据处理程序。清单 1 显示了从ContextService获取MQHeader的示例代码。
清单 1. 使用上下文服务访问协议标头
以下是引用片段: HeadersType headers = ContextService.INSTANCE.getHeaders(); MQHeaderType mqHeader = headers.getMQHeader(); |
使用数据处理程序中的MQHeader
现在,我们已了解了如何访问数据处理程序中的MQ Header,让我们基于这些字段之一制定转换决策。MQMD的格式字段通常用于确定消息正文格式。消息正文的格式、编码和编码字符集存储在MQControl结构中,并包含在MQ Header中。清单 2 中所示的代码说明了如何根据MQMD中格式字段的值修改转换逻辑。
清单 2. 基于MQMD格式字段更改行为
以下是引用片段: package com.ibm.custom.datahandlers; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Map; import com.ibm.websphere.bo.BOFactory; import com.ibm.websphere.bo.BOXMLDocument; import com.ibm.websphere.bo.BOXMLSerializer; import com.ibm.websphere.sca.ServiceManager; import com.ibm.websphere.sca.mq.structures.MQControl; import com.ibm.websphere.sibx.smobo.HeadersType; import com.ibm.websphere.sibx.smobo.MQHeaderType; import com.ibm.wsspi.session.ContextService; import commonj.connector.runtime.DataHandler; import commonj.connector.runtime.DataHandlerException; import commonj.sdo.DataObject; @SuppressWarnings(“serial”) public class MQHeaderDataHandler implements DataHandler { Map bindingContext; String delimiter = “%”; /** * Reads a DataObject from an InputStream (Bytes support only) */ public Object transform(Object source, Class target, Object options) throws DataHandlerException { System.out.println(“MyDataHandler: Calling transform(” + source + “,” + target + “,” + options + “)”); DataObject output = null; InputStream is = (InputStream) source; //Handle InputStreams and DataObjects if (source instanceof InputStream && target.getName().equals(“commonj.sdo.DataObject”)) { if ((“MQXML”).equals(getMQMessageFormat())) { //If the format field of the incoming message is XML use BOXMLSerializer ServiceManager serviceMgr = new ServiceManager(); BOXMLSerializer xmlSerializer = (BOXMLSerializer) serviceMgr .locateService (“com/ibm/websphere/bo/BOXMLSerializer”); try { BOXMLDocument xmlDoc = xmlSerializer. readXMLDocumentWithOptions (is , options); output = xmlDoc.getDataObject(); ((InputStream)source).close(); } catch (IOException e) { throw new DataHandlerException (“Exception reading DataObject ” + e.getLocalizedMessage()); } } else { //Read delimited data byte[] ch = new byte[2]; StringBuffer buffer = new StringBuffer(); try { int len = is.read(ch); while (len != -1) { buffer.append(new String(ch), 0, len); len = is.read(ch); } } catch (IOException e) { throw new DataHandlerException(e); } //Split the values using the delimiter String[] values = buffer.toString().split(delimiter); System.out.println(“Found name: “+values[0]+” and id: ” +values[1]); //Build the CustomerType ServiceManager serviceManager = new ServiceManager(); BOFactory bofactory = (BOFactory) serviceManager.locateService (“com/ibm/websphere/bo/BOFactory”); output = bofactory.create(“http:// MQHeaderDataHandlerModule”, “CustomerType”); output.setString(“name”, values[0]); output.setInt(“id”, Integer.parseInt(values[1])); } } else { throw new DataHandlerException(“Source type ” + source.getClass().getName() + ” or Target type ” + target.getName() + ” unsupported by MyDataHandler”); } return output; } /** * Writes the DataObject out to a OutputStream (Bytes support only) */ public void transformInto(Object source, Object target, Object options) throws DataHandlerException { System.out.println(“Calling transformInto(” + source + “,” + target + “,” + options + “)”); if (source instanceof DataObject && target instanceof OutputStream) { OutputStream os = (OutputStream) target; DataObject bo = (DataObject) source; if ((“MQXML”).equals(getMQMessageFormat())) { //If the format field of the outgoing message is XML use BOXMLSerializer ServiceManager serviceMgr = new ServiceManager(); BOXMLSerializer xmlSerializer = (BOXMLSerializer) serviceMgr.locateService(“com/ibm/websphere/ bo/BOXMLSerializer”); try { xmlSerializer.writeDataObject(bo, bo.getType(). getURI(), bo.getType().getName(), os); } catch (IOException e) { throw new DataHandlerException(“Exception writing DataObject ” + e.getLocalizedMessage()); } } else { //Write delimited data String name = bo.getString(“name”); int id = bo.getInt(“id”); String message = name+delimiter+id; try { os.write(message.getBytes()); } catch (IOException e) { throw new DataHandlerException(“Exception writing DataObject ” + e.getLocalizedMessage()); } } } else { throw new DataHandlerException(“Source type ” + source.getClass() .getName() + ” or Target type ” + target.getClass().getName() + ” unsupported by MyDataHandler”); } } /** * Returns the format of the MQHeader or null if there is no MQHeader * @return */ private String getMQMessageFormat() { String format = null; HeadersType headers = ContextService.INSTANCE.getHeaders(); MQHeaderType mqHeader = null; if(headers!= null){ mqHeader = headers.getMQHeader(); if (mqHeader != null) { MQControl mqControl = mqHeader.getControl(); if (mqControl != null) { format = mqControl.getFormat(); if (format != null) { format = format.trim(); } } } } return format; } public void setBindingContext(Map context) { bindingContext = context; } } |
测试数据处理程序
首先,使用MQ绑定创建一个简单的模块。假定已安装WebSphere MQ,并且可以在WebSphere ESB中访问它。
在WID中创建名为MQHeaderDataHandlerModule的新模块
在MQHeaderDataHandlerModule项目中,创建名为CustomerType的新数据类型
图 1. CustomerType Business对象
添加名为name的String字段和名为id的int字段
保存并关闭新数据类型
创建名为CustomerInterface的新接口
添加名为processCustomer的单向操作,它可以接受CustomerType作为输入参数
图 2. Customer接口
将新的导出添加到组装图
将CustomerInterface添加到导出
在导出上配置MQ绑定
设置请求队列管理器名称
设置接收目标
为WebSphere MQ队列管理器设置适当的连接详细信息
要获取缺省的请求数据格式,请单击select
选择Select your custom data format transformation from the workspace,并单击select
选择MQHeaderDataHandler,并单击ok
勾选Add custom class to binding registry,并单击next(此操作可以在现有数据格式列表中显示)
输入名称MQHeaderDataHandler
添加描述
选择它适用的绑定类型,现在勾选MQ和JMS
图 3. 设置数据格式
在导入上,按照上述同一数据格式配置MQ绑定
为确保正确地创建DataObject,我们将使用Java组件将其输出,因此将一个数据对象添加到画布
将Java组件连接到导出和导入,您的模块现在应该与图 4 所示类似
图 4. MQHeaderDataHandlerModule
双击Java组件创建一个实现
将processCustomer的实现替换为清单 3 中的代码
清单 3. 打印数据对象
以下是引用片段: BOFactory factoryService = (BOFactory) new ServiceManager().locateService(“com/ibm/websphere/bo/BOFactory”); BOXMLSerializer xmlSerializerService =(BOXMLSerializer) new ServiceManager().locateService(“com/ibm/websphere/bo/BOXMLSerializer”); try { xmlSerializerService.writeDataObject(input1, input1.getType().getURI(), input1.getType().getName(), System.out); } catch (IOException e) { e.printStackTrace(); } locateService_CustomerInterfacePartner().processCustomer(input1); |
保存Java组件
保存模块
运行模块
为测试模块,我们将使用RFHUtil,它允许我们创建MQ消息,并将其放入导出接收目标。然后,我们可以使用WID中的服务器日志视图查看由Java组件输出的DataObject,并再次使用RFHUtil查看导入输出的消息。
在MQHeaderDataHandlerModule项目中创建一个名为CustomerType.xml的文件
将内容设置为清单 4 中显示的 XML
清单 4. CustomerType XML数据
以下是引用片段: <?xml version=”1.0″ encoding=”UTF-8″?> <p:CustomerType xsi:schemaLocation=”http://MQHeaderDataHandlerModule CustomerType.xsd “> <name>Tim</name> <id>114923</id> </p:CustomerType> |
将内容设置为清单 5 中显示的XML
清单 5. CustomerType带分隔符的数据
以下是引用片段: Tim%114923 |
将您的模块部署到服务器
启动RFHUtil,并设置导出使用的队列管理器和队列
使用Read File 读取 CustomerType.txt,您应该能够看到数据选项卡下的消息正文
按WriteQ按钮将消息发送到队列
选中WebSphere Integration Developer中的Server Logs视图,以便输出DataObject
回到RFHUtil中,将队列更改为导入使用的队列,并单击ReadQ
转到Data选项卡,看到的消息正文应该与图5所示类似
图 5. 输出带分隔符的消息
转回RFHUtil中的Main选项卡
使用Read File读取CustomerType.xml,您应该能够看到数据选项卡下的消息正文
转到MQMD选项卡,并将消息格式设置为MQXML
图 6. 设置MQXML消息格式
回到Main选项卡上,按WriteQ发送消息
将队列更改为导入使用的队列,并单击ReadQ
转到Data选项卡,看到的消息正文应该与图 7 所示类似
图 7. 输出XML消息
结束语
祝贺您,现在您已经成功地创建并测试了由MQ标头中的字段控制其行为的独立于协议的数据处理程序。我们可以看到,如果将格式字段设置为MQXML,则可以将消息视为XML。如果没有任何MQ标头,还可以将数据视为带分隔符的数据。请尝试将此数据处理程序与另一个绑定一起使用并检查其行为。
在本文中,我们主要讨论了如何使用MQ格式字段更改数据处理程序逻辑,我们还可以在任何其他标头(如JMS标头或HTTP标头)中使用相应字段(如果处理这些协议)。值得注意的是,如果启用Propagate protocol header,则标头仅对数据处理程序可用,这是缺省设置。要查看设置,请选中绑定属性面板中的Propagation选项卡。
我们一直都在努力坚持原创.......请不要一声不吭,就悄悄拿走。
我原创,你原创,我们的内容世界才会更加精彩!
【所有原创内容版权均属TechTarget,欢迎大家转发分享。但未经授权,严禁任何媒体(平面媒体、网络媒体、自媒体等)以及微信公众号复制、转载、摘编或以其他方式进行使用。】
微信公众号

TechTarget
官方微博

TechTarget中国
相关推荐
-
Hadoop 2:大数据演进中的一次大飞跃
新的Hadoop不仅能够进一步刺激为Hadoop编写应用程序,同时也将在Hadoop内创造全新的数据处理方法,这在此前的架构限制下是根本不可能实现的。
-
BEST:SOAP/XML和REST的替代方案
虽然拥有大量的机架服务器,以及大量软件开发人员的组织,基于web和集成服务的SOAP和REST很适合他们,但也会出现问题。
-
Spring 烂!差!
有些人可能对Spring的第一印象不太好,它真的很烂,很差吗,也许这只是你的一种偏见,它也有是自己的优点的。
-
基于SOA架构的业务安全性研究
SOA在提供价值链上企业之间信息共享和业务流程自动化的同时,也给业务信息安全带来了负面影响,且存在安全隐患,这些你知道吗?