Example: Loading CSV Data from Kafka into Greenplum

Example: Loading CSV Data from Kafka into Greenplum

In this example, you load data from a Kafka topic named topic_for_gpkafka into a Greenplum Database table named data_from_kafka. You will perform the load as the Greenplum role gpadmin. The table data_from_kafka resides in the public schema in a Greenplum database named testdb.

A producer of the Kafka topic_for_gpkafka topic emits customer expense messages in CSV format that include the customer identifier (integer), the month (integer), and an expense amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 in the month of September follows:
"123","09","456.78"

You will run a Kafka console producer to emit customer expense messages, and use the Greenplum-Kafka Integration gpkafka load and gpkafka check commands to transform and load the data into the data_from_kafka table and verify the load operation.

Prerequisites

Before you start this procedure, ensure that you:

  • Have administrative access to running Kafka and Greenplum Database clusters, and that these clusters have connectivity as described in the Prerequisites.
  • Identify and note the ZooKeeper hostname and port.
  • Identify and note the hostname and port of the Kafka broker(s).
  • Identify and note the hostname and port of the Greenplum Database master node.

This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.

Procedure

  1. Log in to a host in your Kafka cluster. For example:
    $ ssh kafkauser@kafkahost
    kafkahost$ 
  2. Create a Kafka topic named topic_for_gpkafka. For example:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
        --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
        --topic topic_for_gpkafka
  3. Open a file named sample_data.csv in the editor of your choice. For example:
    kafkahost$ vi sample_data.csv
  4. Copy/paste the following text to add CSV-format data into the file, and then save and exit:
    "1313131","12","1313.13"
    "3535353","11","761.35"
    "7979797","10","4489.00"
    "7979797","11","18.72"
    "3535353","10","6001.94"
    "7979797","12","173.18"
    "1313131","10","492.83"
    "3535353","12","81.12"
    "1313131","11","368.27"
  5. Stream the contents of the sample_data.csv file to a Kafka console producer. For example:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
        --broker-list localhost:9092 \
        --topic topic_for_gpkafka < sample_data.csv
  6. Verify that the Kafka console producer published the messages to the topic by running a Kafka console consumer. For example:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
        --bootstrap-server localhost:9092 --topic topic_for_gpkafka \
        --from-beginning
  7. Open a new terminal window, log in to the Greenplum Database master host as the gpadmin administrative user, and set up the Greenplum environment. For example:
    $ ssh gpadmin@gpmaster
    gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
  8. Construct the gpkafka load configuration file. Open a file named firstload_cfg.yaml in the editor of your choice. For example:
    gpmaster$ vi firstload_cfg.yaml
  9. Fill in the load configuration parameter values based on your environment. For example, if:
    • Your Greenplum Database master hostname is gpmaster.
    • The Greenplum Database server is running on the default port.
    • Your Kafka broker host and port is localhost:9092.
    • You want to write the Kafka data to a Greenplum Database table named data_from_kafka located in the public schema of a database named testdb.
    • You want to write the customer identifier and expenses data to Greenplum. You also want to calculate and write the the tax due (7.25%) on the expense data.
    The firstload_cfg.yaml file would include the following contents:
    DATABASE: testdb
    USER: gpadmin
    HOST: gpmaster
    PORT: 5432
    KAFKA:
       INPUT:
         SOURCE:
            BROKERS: localhost:9092
            TOPIC: topic_for_gpkafka
         COLUMNS:
            - NAME: cust_id
              TYPE: int
            - NAME: __IGNORED__
              TYPE: int
            - NAME: expenses
              TYPE: decimal(9,2)
         FORMAT: csv
         ERROR_LIMIT: 125
       OUTPUT:
         TABLE: data_from_kafka
         MAPPING:
            - NAME: customer_id
              EXPRESSION: cust_id
            - NAME: expenses
              EXPRESSION: expenses
            - NAME: tax_due
              EXPRESSION: expenses * .0725
       COMMIT:
         MAX_ROW: 100
    
  10. Create the target Greenplum Database table named data_from_kafka. For example:
    gpmaster$ psql -d testdb
    
    testdb=# CREATE TABLE data_from_kafka( customer_id int8, expenses decimal(9,2),
               tax_due decimal(7,2) );
  11. Exit the psql subsystem:
    testdb=# \q
  12. Run the gpkafka load command to batch load the CSV data published to the topic_for_gpkafka topic into the Greenplum table. For example:
    gpmaster$ gpkafka load --quit-at-eof ./firstload_cfg.yaml

    The command exits after it reads all data published to the topic.

  13. Examine the command output, looking for messages identifying the number of rows inserted/rejected. For example:
    ... -[INFO]:- ... Inserted 9 rows
    ... -[INFO]:- ... Rejected 0 rows
  14. Run the gpkafka load command again, this time in streaming mode. For example:
    gpmaster$ gpkafka load ./firstload_cfg.yaml

    The command waits for a producer to publish new messages to the topic.

  15. Navigate back to your Kafka host terminal window. Stream the contents of the sample_data.csv file to the Kafka console producer once more:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic topic_for_gpkafka < sample_data.csv
  16. Notice the activity in your Greenplum Database master terminal window. gpkafka load consumes the new round of messages and waits.
  17. Interrupt and exit the waiting gpkafka load command by entering Control-c in the Greenplum Database master host terminal window.
  18. Run the gpkafka check command to examine the complete commit history of the load operation:
    gpmaster$ gpkafka check --show-commit-history all firstload_cfg.yaml
    PartitionID    StartTime    EndTime    BeginOffset    EndOffset
    0    2018-07-13T16:19:36Z    2018-07-13T16:19:36Z    9    18
    0    2018-07-13T16:19:11Z    2018-07-13T16:19:11Z    0    9
  19. Finally, view the contents of the Greenplum Database target table data_from_kafka:
    gpmaster$ psql -d testdb
    
    testdb=# SELECT * FROM data_from_kafka WHERE customer_id='1313131' 
               ORDER BY expenses;
     customer_id | expenses | tax_due 
    -------------+----------+---------
         1313131 |   368.27 |   26.70
         1313131 |   368.27 |   26.70
         1313131 |   492.83 |   35.73
         1313131 |   492.83 |   35.73
         1313131 |  1313.13 |   95.20
         1313131 |  1313.13 |   95.20
    (6 rows)
    
    

    The table contains two entries for each expense because the producer published the sample_data.csv file twice.