Showing posts with label WSO2 EI. Show all posts
Showing posts with label WSO2 EI. Show all posts

Thursday, March 15, 2018

Working with WSO2 kafka inbound endpoint

To use the Kafka inbound endpoint, download Apache Kafka.
To configure the Kafka inbound endpoint, copy the following client libraries from the <KAFKA_HOME>/lib directory to the <ESB_HOME>/repository/components/lib directory (Tested with ESB 4.9.0).
  • kafka_2.12-0.11.0.0.jar, 
  • scala-parser-combinators_2.12-1.0.4.jar, 
  • metrics-core-2.2.0.jar
  • kafka-clients-0.11.0.0.jar, 
  • zkclient-0.10.jar, 
  • metrics-core-3.2.2.jar, 
  • zookeeper-3.4.10.jar, 
  • scala-library-2.12.2.jar
Add jaas.conf file to <ESB_HOME>/repository/conf/security. Please refer this doc to get jaas.conf file.

Sample Inbound configuration

Following is a sample kafka configuration that can consume messages using the specified topic or topics :
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                 name="kafka"
                 sequence="request"
                 onError="fault"
                 class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer"
                 suspend="false">
   <parameters>
      <parameter name="coordination">true</parameter>
      <parameter name="interval">10</parameter>
      <parameter name="topic.names">test</parameter>
      <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
      <parameter name="bootstrap.servers">localhost:9092</parameter>
      <parameter name="poll.timeout">100</parameter>
      <parameter name="contentType">application/json</parameter>
      <parameter name="group.id">hello</parameter>
      <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
      <parameter name="inbound.behavior">polling</parameter>
      <parameter name="sequential">true</parameter>
   </parameters>
</inboundEndpoint>

It is also possible to add the above inbound configuration via the Management Console:














Run the following command to start the ZooKeeper server :

bin/zookeeper-server-start.sh config/zookeeper.properties

Run the following command to start the Kafka server :
bin/kafka-server-start.sh config/server.properties

Start the kafka producer using :
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

After the executing the above command, a console will be prompt then send the following message using console producer :
{"test":"wso2"}

Configuring the sample scenario 

Create a sample sequence as below :
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="request" onError="fault" xmlns="http://ws.apache.org/ns/synapse">
    <log level="full"/>
    <log level="custom">
        <property name="STATUS" value="Fetching inbound endpoint name"/>
        <property expression="$ctx:inbound.endpoint.name"
            name="inboundEndpointName" xmlns:ns="http://org.apache.synapse/xsd"/>
    </log>
</sequence>

The ESB console output will be like below :

[2018-03-15 15:08:38,436]  INFO - LogMediator To: , MessageID: 379c9c2a-7db1-4210-ba0f-7c550c93b9f3, Direction: request, Payload: {"test":"wso2"}
[2018-03-15 15:08:38,443]  INFO - LogMediator STATUS = Fetching inbound endpoint name, inboundEndpointName = kafka

Friday, September 15, 2017

WSO2 ESB Filter Mediator

Filter mediator is used to match or filter the message of a given xpath.

If we give only the xpath then it will return true or false. If we give regular expression, the string returned from evaluating the XPath will be matched against the regular expression.

There are two ways of operation

  • Specify the XPath (boolean expression), return true or false
  • XPath will be matched against the regular expression, return true or false

Examples :

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="test"
       transports="http https"
       startOnLoad="true">
   <description/>
   <target>
      <inSequence>
         <property name="statusCode" value="200"/>
         <filter xpath="get-property('statusCode')!='204'">
            <then>
               <log level="custom">
                  <property name="----------Status Code--------------------------"
                            value="status code is not equals to 204"/>
               </log>
            </then>
         </filter>
      </inSequence>
      <outSequence>
         <send/>
      </outSequence>
   </target>
</proxy>

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="test"
       transports="http https"
       startOnLoad="true">
   <description/>
   <target>
      <inSequence>
         <property name="statusCode" value="200"/>
         <filter source="get-property('statusCode')" regex="200">
            <then>
               <log level="custom">
                  <property name="----------Status Code--------------------------"
                            value="status code is equals to 200"/>
               </log>
            </then>
         </filter>
      </inSequence>
      <outSequence>
         <send/>
      </outSequence>
   </target>
</proxy>