For Data Engineers

Setting up Kafka to Test with FeatureBase

Disclaimer: There may be a newer Kafka version available than the version demonstrated in this walkthrough.

Environment Dependencies:

Host environment must have:

  • Java 8+ installed
  • jq installed

STEP 1: GET KAFKA

Download the latest Kafka release and extract it:


$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0

Note: You can download an RPM package as well.

STEP 2: START THE KAFKA ENVIRONMENT

Run the following commands in order to start all services in the correct order:


# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

The server.properties config found in the config directory of kafka controls several aspects of kafka, namely limits on messages. For nodes that do not have public IPs the following needs to be verified prior to launch:

listeners=PLAINTEXT://0.0.0.0:9092

Uncomment: listener.security.protocol.map

Open another terminal session and run:


# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.

STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS

Kafka is a distributed event streaming platform that lets you read, write, store, and process events (also called records or messages in the documentation) across many machines.

Before you can write your first events, you must create a topic. Open another terminal session and run:


$ bin/kafka-topics.sh --create --topic [Enter topic name] --bootstrap-server localhost:9092

This command will use the single broker created to host a single topic.

STEP 4: WRITE SOME EVENTS INTO THE TOPIC

Now to populate the created topic you can use several methods, the SE team prefers to create a static JSON file that constitutes several thousand messages. For smaller scale testing it is sufficient to use jq to stream the file into a kafka topic using the following CLI command:


jq --slurp .[] / | jq -r tojson | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic [Enter topic name]

Note: In the event incorrect data is populated into the topic, it’s easy to delete the topic, recreate and repopulate. To delete the topic use: bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic <Enter topic name>

STEP 5: READ THE EVENTS

In order to test the populated topic, it is best practice to use the standard Kafka consumer (as opposed to FeatureBase consumer) to check the records output on a CLI. Open another terminal session and run the console consumer client to read the events you just created:


$ bin/kafka-console-consumer.sh --topic [Enter topic name] --from-beginning --bootstrap-server localhost:9092

You can stop the consumer client with Ctrl-C at any time.

If the records are correct, then move to using a FeatureBase Consumer, in this case the static consumer with the basis of the command as follows:


./molecula-consumer-kafka-static --topics [Enter topic name] \ 
   --index  [Index Name]\ 
   --header  header.config\         
   --kafka-hosts localhost:9092 \        
   --featurebase-hosts "localhost:10101"\
   --future.rename \
   --allow-missing-fields \
   --track-progress \
   --concurrency 1 \
   --batch-size 100000 \
   --track-progress \

For more information from the consumer you can add -v for verbose

You must provide either a primary key or use auto generate to assign external keys:

--primary-key-fields

OR

--auto-generate \

--external-generate \

Additionally, It’s recommended to adjust --batch-size 1 for the first test to pull one message, as the consumer will fail to start if there are issues in the initial batch grab from Kafka e.g. an incorrect character in a field, missing keys, etc

Here is a small snippet of data along with the correct header.config to use in a test.

JSON Data:


{"user_id" : "1a", "name" : "joe","age" : 91}
{"user_id" : "2a", "name" : "sam","age" : 20}
{"user_id" : "3a", "name" : "lisa","age" : 52}
{"user_id" : "3a", :name" : "larry", "age" : 31}

Header.config:


[
    {
        "name": "user_id",
        "path": ["user_id"],
        "type": "string"
    },
    {
        "name": "name",
        "path": ["name"],
        "type": "string"
    },
    {
        "name": "age",
        "path": ["age"],
        "type": "id"
    }
]

SCHEDULE A DEMO