To use the Kafka inbound endpoint, download
Apache Kafka.
To configure the Kafka inbound endpoint, copy the following client libraries from the <KAFKA_HOME>/lib directory to the <ESB_HOME>/repository/components/lib directory (Tested with ESB 4.9.0).
- kafka_2.12-0.11.0.0.jar,
- scala-parser-combinators_2.12-1.0.4.jar,
- metrics-core-2.2.0.jar
- kafka-clients-0.11.0.0.jar,
- zkclient-0.10.jar,
- metrics-core-3.2.2.jar,
- zookeeper-3.4.10.jar,
- scala-library-2.12.2.jar
Add jaas.conf file to <ESB_HOME>/repository/conf/security. Please refer
this doc to get jaas.conf file.
Sample Inbound configuration
Following is a sample kafka configuration that can consume messages using the specified topic or topics :
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="kafka"
sequence="request"
onError="fault"
class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer"
suspend="false">
<parameters>
<parameter name="coordination">true</parameter>
<parameter name="interval">10</parameter>
<parameter name="topic.names">test</parameter>
<parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="bootstrap.servers">localhost:9092</parameter>
<parameter name="poll.timeout">100</parameter>
<parameter name="contentType">application/json</parameter>
<parameter name="group.id">hello</parameter>
<parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="inbound.behavior">polling</parameter>
<parameter name="sequential">true</parameter>
</parameters>
</inboundEndpoint>
It is also possible to add the above inbound configuration via the Management Console:
Run the following command to start the ZooKeeper server :
bin/zookeeper-server-start.sh config/zookeeper.properties
Run the following command to start the Kafka server :
bin/kafka-server-start.sh config/server.properties
Start the kafka producer using :
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
After the executing the above command, a console will be prompt then send the following message using console producer :
Configuring the sample scenario
Create a sample sequence as below :
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="request" onError="fault" xmlns="http://ws.apache.org/ns/synapse">
<log level="full"/>
<log level="custom">
<property name="STATUS" value="Fetching inbound endpoint name"/>
<property expression="$ctx:inbound.endpoint.name"
name="inboundEndpointName" xmlns:ns="http://org.apache.synapse/xsd"/>
</log>
</sequence>
The ESB console output will be like below :
[2018-03-15 15:08:38,436] INFO - LogMediator To: , MessageID: 379c9c2a-7db1-4210-ba0f-7c550c93b9f3, Direction: request, Payload: {"test":"wso2"}
[2018-03-15 15:08:38,443] INFO - LogMediator STATUS = Fetching inbound endpoint name, inboundEndpointName = kafka