Thursday, March 27, 2014

ActiveMQ


What is ActiveMQ?

Apache ActiveMQ is an open source message broker written in Java together with a full Java Message Service (JMS) client. 

To refer more on Apache-ActiveMQ click on this link.

How to configure ActiveMQ with WSO2 ESB?

One of the most popular releases of Apache-ActiveMQ is ActiveMQ 5.5.1 released version. In this post we will show you the steps for configuaring WSO2 ESB with ActiveMQ 5.5.1.

Before start running your ESB you need to configure ActiveMQ.

1. Download ActiveMQ from the above link.
2. Navigate to ActiveMQ_HOME/lib and copy below libraries to ESB_HOME/repository/components/lib directory.
     * activemq-core-5.5.1.jar
     * geronimo-j2ee-management_1.1_spec-1.0.1.jar
     * geronimo-jms_1.1_spec-1.1.1.jar
3. Configure transport listeners and senders in the ESB.

un-comment the following listener configuration related to ActiveMQ in ESB_HOME/repository/conf/axis2/axis2.xml file.

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
       <parameter name="myTopicConnectionFactory" locked="false">
           <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
           <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
           <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
       </parameter>
 
       <parameter name="myQueueConnectionFactory" locked="false">
           <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
           <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
           <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
       </parameter>
 
       <parameter name="default" locked="false">
           <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
           <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
           <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
       </parameter>
   </transportReceiver> 

4. Setup JMS sender.
un-comment the following configuration in ESB_HOME/repository/conf/axis2/axis2.xml file.

<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>

Can we execute an esb test class in wso2 PATS (Platform automated test suite) with ActiveMQ ?

Yes. we can do this. We will now show you how to do this in PATS 1.7.0.

This test scenario will focus on sending messages to JMS queue and consuming the sent messages.

Given below is the code snippet of two classes which can be used to achieve the task ahead of us now.

JMSMessageConsumerTestCase.java

package org.wso2.carbon.automation.platform.scenarios;

import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMFactory;
import org.apache.axiom.om.OMNamespace;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.automation.engine.frameworkutils.FrameworkPathUtil;
import org.wso2.carbon.automation.extensions.servers.jmsserver.client.JMSQueueMessageConsumer;
import org.wso2.carbon.automation.extensions.servers.jmsserver.controller.config.JMSBrokerConfigurationProvider;
import org.wso2.carbon.automation.platform.scenarios.esb.ESBBaseTest;
import org.wso2.carbon.automation.test.utils.axis2client.AxisServiceClient;

public class JMSMessageConsumerTestCase extends ESBBaseTest {

    private String activemqIP;

    private String activemqPort;

    @BeforeClass(alwaysRun = true)

    protected void init() throws Exception {
        super.init();
        activemqIP = automationContext.getDefaultInstance().getProperty("ip");
        activemqPort = automationContext.getDefaultInstance().getProperty("port");
        OMElement synapse = loadClasspathResource(FrameworkPathUtil.getSystemResourceLocation() +
                "artifacts/ESB/jms/transport/jms_message_store_and_processor_service.xml");
        updateESBConfiguration(setConfigurations(synapse, activemqIP, activemqPort));
    }

    @Test(groups = {"wso2.esb"}, description = "Test proxy service with jms transport")

    public void testJMSMessageStoreAndProcessor() throws Exception {

        JMSQueueMessageConsumer consumer = new JMSQueueMessageConsumer

                (JMSBrokerConfigurationProvider.getBrokerConfiguration(activemqIP, activemqPort));

        AxisServiceClient client = new AxisServiceClient();

        for (int i = 0; i < 5; i++) {
            client.sendRobust(getStockQuoteRequest("JMS"), automationContext.getContextUrls()
                    .getServiceUrl() + "/JMSMessageStoreTestCaseProxy", "getQuote");
        }

        Thread.sleep(10000);

        try {
            consumer.connect("JMSTestMessageStore");
            for (int i = 0; i < 5; i++) {
                if (i < 5) {
                    //05 messages should be in the queue
                    Assert.assertNotNull(consumer.popRawMessage(), "Cloud not consume the " +
                            "message from the queue");
                }
            }
        } finally {
            consumer.disconnect();
        }
    }

    public static OMElement getStockQuoteRequest(String symbol) {

        OMFactory fac = OMAbstractFactory.getOMFactory();
        OMNamespace omNs = fac.createOMNamespace("http://services.samples", "ns");
        OMElement method = fac.createOMElement("getQuote", omNs);
        OMElement value1 = fac.createOMElement("request", omNs);
        OMElement value2 = fac.createOMElement("symbol", omNs);

        value2.addChild(fac.createOMText(value1, symbol));

        value1.addChild(value2);
        method.addChild(value1);

        return method;

    }
}

ESBBaseTest.java

package org.wso2.carbon.automation.platform.scenarios.esb;

import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.impl.builder.StAXOMBuilder;
import org.apache.axiom.om.util.AXIOMUtil;
import org.wso2.carbon.automation.engine.context.AutomationContext;
import org.wso2.carbon.automation.engine.context.TestUserMode;
import org.wso2.carbon.automation.test.utils.esb.ESBTestCaseUtils;

import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.xpath.XPathExpressionException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;

public class ESBBaseTest {

    protected AutomationContext automationContext = null;

    private OMElement synapseConfiguration = null;
    private ESBTestCaseUtils esbTestCaseUtils = new ESBTestCaseUtils();

    protected void init() throws Exception {

        automationContext = new AutomationContext("ESB", TestUserMode.SUPER_TENANT_ADMIN);
    }

    protected String getBackendURL() throws XPathExpressionException {

        return automationContext.getContextUrls().getBackEndUrl();
    }

    protected OMElement loadClasspathResource(String path) throws FileNotFoundException,

            XMLStreamException {
        OMElement documentElement = null;
        FileInputStream inputStream = null;
        XMLStreamReader parser = null;
        StAXOMBuilder builder = null;
        File file = new File(path);
        if (file.exists()) {
            try {
                inputStream = new FileInputStream(file);
                parser = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
                //create the builder
                builder = new StAXOMBuilder(parser);
                //get the root element (in this case the envelope)
                documentElement = builder.getDocumentElement().cloneOMElement();
            } finally {
                if (builder != null) {
                    builder.close();
                }
                if (parser != null) {
                    try {
                        parser.close();
                    } catch (XMLStreamException e) {
                        //ignore
                    }
                }
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        //ignore
                    }
                }

            }

        } else {
            throw new FileNotFoundException("File Not Exist at " + path);
        }
        return documentElement;
    }

    protected void updateESBConfiguration(OMElement synapseConfig) throws Exception {


        if (synapseConfiguration == null) {

            synapseConfiguration = synapseConfig;
        } else {
            Iterator<OMElement> itr = synapseConfig.cloneOMElement().getChildElements();
            while (itr.hasNext()) {
                synapseConfiguration.addChild(itr.next());
            }
        }
        esbTestCaseUtils.updateESBConfiguration(synapseConfig, automationContext.getContextUrls().getBackEndUrl(), automationContext.login());
    }

    public static OMElement setConfigurations(OMElement synapseConfig, String ip, String port) throws XMLStreamException {


        String config = synapseConfig.toString();


        config = config.replace("tcp://localhost:61616", "tcp://" + ip + ":" + port);

        config = config.replace("tcp://localhost:61616", "tcp://" + ip + ":" + port);
        return AXIOMUtil.stringToOM(config);
    }
}

jms_message_store_and_processor_service.xml

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
    <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
        <parameter name="cachableDuration">15000</parameter>
    </registry>
    <proxy name="JMSMessageStoreTestCaseProxy"
           transports="https http"
           startOnLoad="true"
           trace="disable">
        <target>
            <inSequence>
                <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
                <property name="OUT_ONLY" value="true"/>
                <log level="full"/>
                <store messageStore="JMSTestMessageStore"/>
            </inSequence>
        </target>
    </proxy>
    <sequence name="fault">
        <log level="full">
            <property name="MESSAGE" value="Executing default 'fault' sequence"/>
            <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
            <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
        </log>
        <drop/>
    </sequence>
    <sequence name="main">
        <in>
            <log level="full"/>
            <filter source="get-property('To')" regex="http://localhost:9000.*">
                <send/>
            </filter>
        </in>
        <out>
            <send/>
        </out>
        <description>The main sequence for the message mediation</description>
    </sequence>
    <messageStore class="org.apache.synapse.message.store.impl.jms.JmsStore"
                  name="JMSTestMessageStore">
        <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
        <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
        <parameter name="store.jms.destination">JMSTestMessageStore</parameter>
    </messageStore>
</definitions>


Now lets dig into more finer edges of the context. This test class JMSMessageConsumerTestCase.java needs you to start your ESB and the ActiveMQ instance before running the test case. Remember in PATS you need to provide/ up the setup manually. Now lets imagine your ESB server runs on the default ip - 12.12.12.100 and the default https port 9443 and ActiveMQ instance runs on 12.12.12.100 and on the default port 61616. i.e both the ESB and the ActiveMQ instances runs in a remote machine, other than the machine which hosts PATS.

Next step is to access the remotely hosted ESB console.

You should be able to access ESB management console by typing the below url in your browser.

https://12.12.12.100:9443/carbon/admin/login.jsp

To access ActiveMQ console through your browser first you need to start the ActiveMQ instance. To do so navigate to apache-activemq-5.5.1/bin folder and and type as below in the command prompt.

.../apache-activemq-5.5.1/bin$java -jar run.jar start

Thats fine.Your ActiveMQ instance is up and running now. !!!
Now the matter is to access it's console from the browser. To do so just type the below url in the browser url panel.

http://12.12.12.100:8161/admin/

Draw your attention on two different port values for ActiveMQ. (61616 & 8161).

If every thing is fine now you should be able view incoming messages are being queued in the JMSTestMessageStore queue and when you consume these messages it shows as dequeued.

An example status image of the ActiveMQ console is given below.

Name Number Of Pending Messages  Number Of Consumers  Messages Enqueued  Messages Dequeued  Views  Operations  
JMSTestMessageStore2153Browse Active Consumers
 
Send To Purge Delete






No comments:

Post a Comment