如何使用MQ标头动态确定数据处理程序的行为?

日期: 2010-07-11 作者:Philip NortonAlex WoodFenglian Xu 来源:TechTarget中国 英文

  本文的目标读者是集成开发人员,其角色涉及使用服务组件体系结构(Service Component Architecture,SCA)集成MQ消息传递引擎。本文将描述如何使用数据处理程序基于MQ标头更改转换逻辑,同时确保处理程序保持协议独立性。涵盖的主题包括:

  •   数据处理程序的解释
  •   从数据处理程序访问MQ标头
  •   基于标头改变数据处理程序的行为
  •   确保数据处理程序保持协议独立性

  在阅读本文之后,您将能够创作基于MQ标头动态地转换数据的可重用数据处理程序。

  数据处理程序

  数据处理程序负责将操作的输入和输出类型转换为有线使用的格式,反之亦然。以前,MQ绑定使用了数据绑定执行此功能。数据绑定具有访问有线格式(如MQ消息)的权限,并可以直接对消息进行读取或写入。在JMS绑定上无法使用这种连接到特定绑定类型(例如MQ数据绑定)的每个数据绑定。

  数据处理程序提供了使用规范格式转换数据的通用方法。transform方法用于读取数据,并传递以下类型之一来读取输入:

  •   InputStream-用于读取字节
  •   Reader-用于读取文本
  •   Object-用于读取Java对象

  transformInto方法用于写入数据,并传入以下类型之一来写入输出:

  •   OutputStream-用于写入字节
  •   Writer-用于写入文本
  •   Object-用于写入Java对象

  这样可使消息的有线格式对数据处理程序透明,允许任何受支持的绑定重用它们。

  数据处理程序和消息标头

  当创作数据绑定时,会将消息标头传入读或写方法,并允许协议标头中的字段确定转换逻辑。不过,对于方法的输入参数不包括协议标头的数据处理程序,我们应如何访问它们?

  在WebSphere Enterprise Service Bus (WESB) V6.2中,协议标头存储在名为ContextService的存储库中。ContextService通过使用普通SPI进行访问,并且可以从任何Java代码引用,其中包括POJO、函数选择器或数据处理程序。清单 1 显示了从ContextService获取MQHeader的示例代码。

  清单 1. 使用上下文服务访问协议标头
    

以下是引用片段:
HeadersType headers = ContextService.INSTANCE.getHeaders();
MQHeaderType mqHeader = headers.getMQHeader(); 
 
  使用数据处理程序中的MQHeader

  现在,我们已了解了如何访问数据处理程序中的MQ Header,让我们基于这些字段之一制定转换决策。MQMD的格式字段通常用于确定消息正文格式。消息正文的格式、编码和编码字符集存储在MQControl结构中,并包含在MQ Header中。清单 2 中所示的代码说明了如何根据MQMD中格式字段的值修改转换逻辑。

  清单 2. 基于MQMD格式字段更改行为
    

以下是引用片段:
package com.ibm.custom.datahandlers;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import com.ibm.websphere.bo.BOFactory;
import com.ibm.websphere.bo.BOXMLDocument;
import com.ibm.websphere.bo.BOXMLSerializer;
import com.ibm.websphere.sca.ServiceManager;
import com.ibm.websphere.sca.mq.structures.MQControl;
import com.ibm.websphere.sibx.smobo.HeadersType;
import com.ibm.websphere.sibx.smobo.MQHeaderType;
import com.ibm.wsspi.session.ContextService;
import commonj.connector.runtime.DataHandler;
import commonj.connector.runtime.DataHandlerException;
import commonj.sdo.DataObject;
@SuppressWarnings(“serial”)
public class MQHeaderDataHandler implements DataHandler {
 Map bindingContext;
 String delimiter = “%”;
 /**
  * Reads a DataObject from an InputStream (Bytes support only)
  */
 public Object transform(Object source, Class target, Object options) 
           throws DataHandlerException {
  System.out.println(“MyDataHandler: Calling 
   transform(” + source + “,” + target + “,” + options + “)”);
  DataObject output = null;
  InputStream is = (InputStream) source;
  
  //Handle InputStreams and DataObjects
  if (source instanceof InputStream && 
   target.getName().equals(“commonj.sdo.DataObject”)) {
                if ((“MQXML”).equals(getMQMessageFormat())) {
    //If the format field of the incoming message 
                            is XML use BOXMLSerializer
    ServiceManager serviceMgr = new ServiceManager();
    BOXMLSerializer xmlSerializer = 
                        (BOXMLSerializer) serviceMgr
      .locateService
                        (“com/ibm/websphere/bo/BOXMLSerializer”);
    try {
     BOXMLDocument xmlDoc = xmlSerializer.
                        readXMLDocumentWithOptions
                            (is , options);
     output = xmlDoc.getDataObject();
     ((InputStream)source).close();
    } catch (IOException e) {
     throw new 
      DataHandlerException
                        (“Exception reading DataObject ” 
                        + e.getLocalizedMessage());
    }
   } else {
    //Read delimited data
    byte[] ch = new byte[2];
    StringBuffer buffer = new StringBuffer();
     
    try {
     int len = is.read(ch);
     while (len != -1) {
      buffer.append(new String(ch), 0, len);
      len = is.read(ch);
     }
    } catch (IOException e) {
     throw new DataHandlerException(e);
    }
    //Split the values using the delimiter
    String[] values = buffer.toString().split(delimiter);
    System.out.println(“Found name: “+values[0]+” and id: ”
                        +values[1]);
    //Build the CustomerType
    ServiceManager serviceManager = new ServiceManager();
                        BOFactory bofactory = 
      (BOFactory) serviceManager.locateService
                                (“com/ibm/websphere/bo/BOFactory”);
    output =
     bofactory.create(“http://
                        MQHeaderDataHandlerModule”, “CustomerType”);
    output.setString(“name”, values[0]);
    output.setInt(“id”, Integer.parseInt(values[1]));
   }
  } else {
   throw new 
    DataHandlerException(“Source type ” + 
                    source.getClass().getName() + ” or Target type ”
     + target.getName() + ” unsupported by 
                                            MyDataHandler”);
  }
  return output;
 }
 /**
  * Writes the DataObject out to a OutputStream (Bytes support only)
  */
 public void transformInto(Object source, Object target, Object options)
      throws DataHandlerException {
  System.out.println(“Calling 
   transformInto(” + source + “,” + target + “,” + options + “)”);
  if (source instanceof DataObject && target instanceof OutputStream) {
   OutputStream os = (OutputStream) target;
   DataObject bo = (DataObject) source;
   
   if ((“MQXML”).equals(getMQMessageFormat())) {
    //If the format field of the outgoing message is XML use 
                            BOXMLSerializer
    ServiceManager serviceMgr = new ServiceManager();
    BOXMLSerializer xmlSerializer = (BOXMLSerializer) 
                      serviceMgr.locateService(“com/ibm/websphere/
                                               bo/BOXMLSerializer”);
        
    try {
     xmlSerializer.writeDataObject(bo, bo.getType().
                             getURI(), bo.getType().getName(), os);
    } catch (IOException e) {
     throw new 
      DataHandlerException(“Exception writing 
                           DataObject ” + e.getLocalizedMessage());
    }
   } else {
    //Write delimited data    
    String name = bo.getString(“name”);
    int id = bo.getInt(“id”);    
    String message = name+delimiter+id;
    
    try {
     os.write(message.getBytes());
    } catch (IOException e) {
     throw new 
      DataHandlerException(“Exception writing 
                        DataObject ” + e.getLocalizedMessage());
    }
   }   
  } else {
  
   throw new 
    DataHandlerException(“Source type ” + source.getClass()
                    .getName() + ” or Target type ”
     + target.getClass().getName() + 
                    ” unsupported by MyDataHandler”);
  }
 }
  
 /**
  * Returns the format of the MQHeader or null if there is no MQHeader
  * @return
  */
 private String getMQMessageFormat() {
  String format = null;
  
  HeadersType headers = ContextService.INSTANCE.getHeaders();
  
  MQHeaderType mqHeader = null;   
     if(headers!= null){
      mqHeader = headers.getMQHeader();
      if (mqHeader != null) {
       MQControl mqControl = mqHeader.getControl();
       if (mqControl != null) {
        format = mqControl.getFormat();
        if (format != null) {
         format = format.trim();
        }
       }
      }
     }
  
  return format;
 }
 
 public void setBindingContext(Map context) {
  bindingContext = context;
 }
}
  此处理程序使用MQHeader格式字段,如果将其设置为MQXML,则会将消息正文视为XML,否则会将消息正文视为带分隔符的数据。如果将此数据处理程序与非MQ绑定一起使用,则不能在 ContextService 中定义 MQHeader。在这种情况下,getMQMessageFormat()方法将返回null,并导致将正文视为带分隔符的数据,因此,数据处理程序应保持协议独立性。 

  测试数据处理程序

  首先,使用MQ绑定创建一个简单的模块。假定已安装WebSphere MQ,并且可以在WebSphere ESB中访问它。

  在WID中创建名为MQHeaderDataHandlerModule的新模块

  在MQHeaderDataHandlerModule项目中,创建名为CustomerType的新数据类型

图 1. CustomerType Business对象

图 1. CustomerType Business对象

    添加名为name的String字段和名为id的int字段

  保存并关闭新数据类型

  创建名为CustomerInterface的新接口

  添加名为processCustomer的单向操作,它可以接受CustomerType作为输入参数

图 2. Customer接口

图 2. Customer接口

    将新的导出添加到组装图

  将CustomerInterface添加到导出

  在导出上配置MQ绑定

  设置请求队列管理器名称

  设置接收目标

  为WebSphere MQ队列管理器设置适当的连接详细信息

  要获取缺省的请求数据格式,请单击select

  选择Select your custom data format transformation from the workspace,并单击select

  选择MQHeaderDataHandler,并单击ok

  勾选Add custom class to binding registry,并单击next(此操作可以在现有数据格式列表中显示)

  输入名称MQHeaderDataHandler

  添加描述

  选择它适用的绑定类型,现在勾选MQ和JMS

图 3. 设置数据格式

图 3. 设置数据格式

    在导入上,按照上述同一数据格式配置MQ绑定

  为确保正确地创建DataObject,我们将使用Java组件将其输出,因此将一个数据对象添加到画布
 
  将Java组件连接到导出和导入,您的模块现在应该与图 4 所示类似

图 4. MQHeaderDataHandlerModule

图 4. MQHeaderDataHandlerModule

  双击Java组件创建一个实现

  将processCustomer的实现替换为清单 3 中的代码

  清单 3. 打印数据对象
    

以下是引用片段:
BOFactory factoryService = (BOFactory) new 
 ServiceManager().locateService(“com/ibm/websphere/bo/BOFactory”);
BOXMLSerializer xmlSerializerService =(BOXMLSerializer) new 
 ServiceManager().locateService(“com/ibm/websphere/bo/BOXMLSerializer”);
try {
 xmlSerializerService.writeDataObject(input1, input1.getType().getURI(), 
 input1.getType().getName(), System.out);
} catch (IOException e) {
 e.printStackTrace();
}
locateService_CustomerInterfacePartner().processCustomer(input1);
  
  保存Java组件
 
  保存模块

  运行模块

  为测试模块,我们将使用RFHUtil,它允许我们创建MQ消息,并将其放入导出接收目标。然后,我们可以使用WID中的服务器日志视图查看由Java组件输出的DataObject,并再次使用RFHUtil查看导入输出的消息。

  在MQHeaderDataHandlerModule项目中创建一个名为CustomerType.xml的文件

  将内容设置为清单 4 中显示的 XML

  清单 4. CustomerType XML数据
      

以下是引用片段:
<?xml version=”1.0″ encoding=”UTF-8″?>
<p:CustomerType  
  
 xsi:schemaLocation=”http://MQHeaderDataHandlerModule CustomerType.xsd “>
<name>Tim</name>
<id>114923</id>
</p:CustomerType>
   
  在MQHeaderDataHandlerModule项目中创建一个名为CustomerType.txt的文件
将内容设置为清单 5 中显示的XML

  清单 5. CustomerType带分隔符的数据
      
  

以下是引用片段:
Tim%114923

  
  将您的模块部署到服务器

  启动RFHUtil,并设置导出使用的队列管理器和队列

  使用Read File 读取 CustomerType.txt,您应该能够看到数据选项卡下的消息正文

  按WriteQ按钮将消息发送到队列

  选中WebSphere Integration Developer中的Server Logs视图,以便输出DataObject

  回到RFHUtil中,将队列更改为导入使用的队列,并单击ReadQ

  转到Data选项卡,看到的消息正文应该与图5所示类似

图 5. 输出带分隔符的消息

图 5. 输出带分隔符的消息

  转回RFHUtil中的Main选项卡

  使用Read File读取CustomerType.xml,您应该能够看到数据选项卡下的消息正文

  转到MQMD选项卡,并将消息格式设置为MQXML

图 6. 设置MQXML消息格式

图 6. 设置MQXML消息格式

  回到Main选项卡上,按WriteQ发送消息
 
  将队列更改为导入使用的队列,并单击ReadQ

  转到Data选项卡,看到的消息正文应该与图 7 所示类似

图 7. 输出XML消息

图 7. 输出XML消息

  结束语

  祝贺您,现在您已经成功地创建并测试了由MQ标头中的字段控制其行为的独立于协议的数据处理程序。我们可以看到,如果将格式字段设置为MQXML,则可以将消息视为XML。如果没有任何MQ标头,还可以将数据视为带分隔符的数据。请尝试将此数据处理程序与另一个绑定一起使用并检查其行为。

  在本文中,我们主要讨论了如何使用MQ格式字段更改数据处理程序逻辑,我们还可以在任何其他标头(如JMS标头或HTTP标头)中使用相应字段(如果处理这些协议)。值得注意的是,如果启用Propagate protocol header,则标头仅对数据处理程序可用,这是缺省设置。要查看设置,请选中绑定属性面板中的Propagation选项卡。

我们一直都在努力坚持原创.......请不要一声不吭,就悄悄拿走。

我原创,你原创,我们的内容世界才会更加精彩!

【所有原创内容版权均属TechTarget,欢迎大家转发分享。但未经授权,严禁任何媒体(平面媒体、网络媒体、自媒体等)以及微信公众号复制、转载、摘编或以其他方式进行使用。】

微信公众号

TechTarget微信公众号二维码

TechTarget

官方微博

TechTarget中国官方微博二维码

TechTarget中国

相关推荐

  • Hadoop 2:大数据演进中的一次大飞跃

    新的Hadoop不仅能够进一步刺激为Hadoop编写应用程序,同时也将在Hadoop内创造全新的数据处理方法,这在此前的架构限制下是根本不可能实现的。

  • BEST:SOAP/XML和REST的替代方案

    虽然拥有大量的机架服务器,以及大量软件开发人员的组织,基于web和集成服务的SOAP和REST很适合他们,但也会出现问题。

  • Spring 烂!差!

    有些人可能对Spring的第一印象不太好,它真的很烂,很差吗,也许这只是你的一种偏见,它也有是自己的优点的。

  • 基于SOA架构的业务安全性研究

    SOA在提供价值链上企业之间信息共享和业务流程自动化的同时,也给业务信息安全带来了负面影响,且存在安全隐患,这些你知道吗?