Example: Loading JSON Data from Kafka (Simple)

Example: Loading JSON Data from Kafka (Simple)

In this example, you load JSON format data from a Kafka topic named topic_json into a single column Greenplum Database table named single_json_column. You perform the load as the Greenplum role gpadmin. The table single_json_column resides in the public schema in a Greenplum database named testdb.

A producer of the Kafka topic_json topic emits customer expense messages in JSON format that include the customer identifier (integer), the month (integer), and an expense amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 in the month of September follows:
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }

You will run a Kafka console producer to emit JSON-format customer expense messages, and use the Greenplum-Kafka Integration gpkafka load command to load each Kafka message into a row in the single_json_column table.

Prerequisites

Before you start this procedure, ensure that you:

  • Have administrative access to running Kafka and Greenplum Database clusters.
  • Have configured connectivity as described in the loading Prerequisites.
  • Identify and note the ZooKeeper hostname and port.
  • Identify and note the hostname and port of the Kafka broker(s).
  • Identify and note the hostname and port of the Greenplum Database master node.

This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.

Procedure

  1. Login to a host in your Kafka cluster. For example:
    $ ssh kafkauser@kafkahost
    kafkahost$ 
  2. Create a Kafka topic named topic_json. For example:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
        --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
        --topic topic_json
  3. Open a file named sample_data.json in the editor of your choice. For example:
    kafkahost$ vi sample_data.json
  4. Copy/paste the following text to add JSON-format data into the file, and then save and exit:
    { "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
    { "cust_id": 3535353, "month": 11, "expenses": 761.35 }
    { "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
    { "cust_id": 7979797, "month": 11, "expenses": 18.72 }
    { "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
    { "cust_id": 7979797, "month": 12, "expenses": 173.18 }
    { "cust_id": 1313131, "month": 10, "expenses": 492.83 }
    { "cust_id": 3535353, "month": 12, "expenses": 81.12 }
    { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
  5. Stream the contents of the sample_data.json file to a Kafka console producer. For example:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
        --broker-list localhost:9092 \
        --topic topic_json < sample_data.json
  6. Verify that the Kafka console producer published the messages to the topic by running a Kafka console consumer. For example:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
        --bootstrap-server localhost:9092 --topic topic_json \
        --from-beginning
  7. Open a new terminal window, log in to the Greenplum Database master host as the gpadmin administrative user, and set up the Greenplum environment. For example:
    $ ssh gpadmin@gpmaster
    gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
  8. Construct the gpkafka load configuration file. Open a file named simple_jsonload_cfg.yaml in the editor of your choice. For example:
    gpmaster$ vi simple_jsonload_cfg.yaml
  9. Fill in the load configuration parameter values based on your environment. This example assumes:
    • Your Greenplum Database master hostname is gpmaster.
    • The Greenplum Database server is running on the default port.
    • Your Kafka broker host and port is localhost:9092.
    • You want to write the Kafka data to a Greenplum Database table named single_json_column located in the public schema of a database named testdb.
    • You want to write the data to Greenplum as a single json type column.
    The simple_jsonload_cfg.yaml file would include the following contents:
    DATABASE: testdb
    USER: gpadmin
    HOST: gpmaster
    PORT: 5432
    KAFKA:
       INPUT:
         SOURCE:
            BROKERS: localhost:9092
            TOPIC: topic_json
         FORMAT: json
         ERROR_LIMIT: 10
       OUTPUT:
         TABLE: single_json_column
       COMMIT:
         MINIMAL_INTERVAL: 1000
    
  10. Create the target Greenplum Database table named single_json_column. For example:
    gpmaster$ psql -d testdb
    
    testdb=# CREATE TABLE single_json_column( value json );
  11. Exit the psql subsystem:
    testdb=# \q
  12. Run the gpkafka load command to batch load the JSON data published to the topic_json topic into the Greenplum table. For example:
    gpmaster$ gpkafka load --quit-at-eof ./simple_jsonload_cfg.yaml

    The command exits after it reads all data published to the topic.

  13. Examine the command output, looking for messages that identify the number of rows inserted/rejected. For example:
    ... -[INFO]:- ... Inserted 9 rows
    ... -[INFO]:- ... Rejected 0 rows
  14. View the contents of the Greenplum Database target table single_json_column:
    gpmaster$ psql -d testdb
    
    testdb=# SELECT * FROM single_json_column; 
                              value                           
    ----------------------------------------------------------
     { "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
     { "cust_id": 7979797, "month": 11, "expenses": 18.72 }
     { "cust_id": 3535353, "month": 12, "expenses": 81.12 }
     { "cust_id": 3535353, "month": 11, "expenses": 761.35 }
     { "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
     { "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
     { "cust_id": 7979797, "month": 12, "expenses": 173.18 }
     { "cust_id": 1313131, "month": 10, "expenses": 492.83 }
     { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
    (9 rows)
    
  15. Use json operators to view the expenses associated with a specific customer:
     testdb=# SELECT (value->>'expenses')::decimal AS expenses FROM single_json_column
                WHERE (value->>'cust_id')::int = 1313131;
     expenses 
    ----------
      1313.13
       492.83
       368.27
    (3 rows)