To configure the Kafka inbound endpoint, copy the following client libraries from the <KAFKA_HOME>/lib directory to the <ESB_HOME>/repository/components/lib directory (Tested with ESB 4.9.0).
- kafka_2.12-0.11.0.0.jar,
- scala-parser-combinators_2.12-1.0.4.jar,
- metrics-core-2.2.0.jar
- kafka-clients-0.11.0.0.jar,
- zkclient-0.10.jar,
- metrics-core-3.2.2.jar,
- zookeeper-3.4.10.jar,
- scala-library-2.12.2.jar
Add jaas.conf file to <ESB_HOME>/repository/conf/security. Please refer this doc to get jaas.conf file.
Sample Inbound configuration
Following is a sample kafka configuration that can consume messages using the specified topic or topics :
<?xml version="1.0" encoding="UTF-8"?> <inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="kafka" sequence="request" onError="fault" class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" suspend="false"> <parameters> <parameter name="coordination">true</parameter> <parameter name="interval">10</parameter> <parameter name="topic.names">test</parameter> <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter> <parameter name="bootstrap.servers">localhost:9092</parameter> <parameter name="poll.timeout">100</parameter> <parameter name="contentType">application/json</parameter> <parameter name="group.id">hello</parameter> <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter> <parameter name="inbound.behavior">polling</parameter> <parameter name="sequential">true</parameter> </parameters> </inboundEndpoint>
It is also possible to add the above inbound configuration via the Management Console:
Run the following command to start the ZooKeeper server :
bin/zookeeper-server-start.sh config/zookeeper.properties
Run the following command to start the Kafka server :
bin/kafka-server-start.sh config/server.properties
Start the kafka producer using :
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
After the executing the above command, a console will be prompt then send the following message using console producer :
{"test":"wso2"}
Configuring the sample scenario
Create a sample sequence as below :
<?xml version="1.0" encoding="UTF-8"?> <sequence name="request" onError="fault" xmlns="http://ws.apache.org/ns/synapse"> <log level="full"/> <log level="custom"> <property name="STATUS" value="Fetching inbound endpoint name"/> <property expression="$ctx:inbound.endpoint.name" name="inboundEndpointName" xmlns:ns="http://org.apache.synapse/xsd"/> </log> </sequence>
The ESB console output will be like below :
[2018-03-15 15:08:38,436] INFO - LogMediator To: , MessageID: 379c9c2a-7db1-4210-ba0f-7c550c93b9f3, Direction: request, Payload: {"test":"wso2"} [2018-03-15 15:08:38,443] INFO - LogMediator STATUS = Fetching inbound endpoint name, inboundEndpointName = kafka
No comments:
Post a Comment