Thursday, March 15, 2018

Working with WSO2 kafka inbound endpoint

To use the Kafka inbound endpoint, download Apache Kafka.
To configure the Kafka inbound endpoint, copy the following client libraries from the <KAFKA_HOME>/lib directory to the <ESB_HOME>/repository/components/lib directory (Tested with ESB 4.9.0).
  • kafka_2.12-0.11.0.0.jar, 
  • scala-parser-combinators_2.12-1.0.4.jar, 
  • metrics-core-2.2.0.jar
  • kafka-clients-0.11.0.0.jar, 
  • zkclient-0.10.jar, 
  • metrics-core-3.2.2.jar, 
  • zookeeper-3.4.10.jar, 
  • scala-library-2.12.2.jar
Add jaas.conf file to <ESB_HOME>/repository/conf/security. Please refer this doc to get jaas.conf file.

Sample Inbound configuration

Following is a sample kafka configuration that can consume messages using the specified topic or topics :
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                 name="kafka"
                 sequence="request"
                 onError="fault"
                 class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer"
                 suspend="false">
   <parameters>
      <parameter name="coordination">true</parameter>
      <parameter name="interval">10</parameter>
      <parameter name="topic.names">test</parameter>
      <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
      <parameter name="bootstrap.servers">localhost:9092</parameter>
      <parameter name="poll.timeout">100</parameter>
      <parameter name="contentType">application/json</parameter>
      <parameter name="group.id">hello</parameter>
      <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
      <parameter name="inbound.behavior">polling</parameter>
      <parameter name="sequential">true</parameter>
   </parameters>
</inboundEndpoint>

It is also possible to add the above inbound configuration via the Management Console:














Run the following command to start the ZooKeeper server :

bin/zookeeper-server-start.sh config/zookeeper.properties

Run the following command to start the Kafka server :
bin/kafka-server-start.sh config/server.properties

Start the kafka producer using :
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

After the executing the above command, a console will be prompt then send the following message using console producer :
{"test":"wso2"}

Configuring the sample scenario 

Create a sample sequence as below :
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="request" onError="fault" xmlns="http://ws.apache.org/ns/synapse">
    <log level="full"/>
    <log level="custom">
        <property name="STATUS" value="Fetching inbound endpoint name"/>
        <property expression="$ctx:inbound.endpoint.name"
            name="inboundEndpointName" xmlns:ns="http://org.apache.synapse/xsd"/>
    </log>
</sequence>

The ESB console output will be like below :

[2018-03-15 15:08:38,436]  INFO - LogMediator To: , MessageID: 379c9c2a-7db1-4210-ba0f-7c550c93b9f3, Direction: request, Payload: {"test":"wso2"}
[2018-03-15 15:08:38,443]  INFO - LogMediator STATUS = Fetching inbound endpoint name, inboundEndpointName = kafka

No comments:

Post a Comment