Example: Loading Avro Data from Kafka into Greenplum

Example: Loading Avro Data from Kafka into Greenplum

In this example, you load Avro-format key and value data as JSON from a Kafka topic named topic_avrokv into a Greenplum Database table named avrokv_from_kafka. You will perform the load as the Greenplum role gpadmin. The table avrokv_from_kafka resides in the public schema in a Greenplum database named testdb.

A producer of the Kafka topic_avrokv topic emits customer expense messages in JSON format that include the customer identifier (integer), the year (integer), and one or more expense amounts (decimal). For example, a message with key 1 for a customer with identifier 123 who spent $456.78 and $67.89 in the year 1997 follows:
1	{ "cust_id": 123, "year": 1997, "expenses":[456.78, 67.89] }

You will use the Confluent Schema Registry and run a Kafka Avro console producer to emit keys and Avro JSON-format customer expense messages, and use the Greenplum-Kafka Integration gpkafka load command to load the data into the avrokv_from_kafka table.

Prerequisites

Before you start this procedure, ensure that you:

  • Have administrative access to running Confluent Kafka and Greenplum Database clusters. Also ensure that these clusters have connectivity as described in the Prerequisites.
  • Identify and note the ZooKeeper hostname and port.
  • Identify and note the address of the Confluent Schema Registry server(s).
  • Identify and note the hostname and port of the Kafka broker(s).
  • Identify and note the hostname and port of the Greenplum Database master node.

This procedure assumes that you have installed the Confluent Kafka distribution.

Procedure

  1. Login to a host in your Kafka cluster. For example:
    $ ssh kafkauser@kafkahost
    kafkahost$ 
  2. Create a Kafka topic named topic_json_gpkafka. For example:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics --create \
        --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
        --topic topic_avrokv
  3. Start a Kafka Avro console producer. You will manually input message data to this producer. For example:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-avro-console-producer \
        --broker-list localhost:9092 \
        --topic topic_avrokv \
        --property parse.key=true --property key.schema='{"type" : "int", "name" : "id"}' \
        --property value.schema='{ "type" : "record", "name" : "example_schema", "namespace" : "com.example", "fields" : [ { "name" : "cust_id", "type" : "int", "doc" : "Id of the customer account" }, { "name" : "year", "type" : "int", "doc" : "year of expense" }, { "name" : "expenses", "type" : {"type": "array", "items": "float"}, "doc" : "Expenses for the year" } ], "doc:" : "A basic schema for storing messages" }'

    The producer waits for messages.

  4. Input the following messages to the Avro console producer.
    Note: You must enter a tab between the key and value. Replace TAB with a tab.
    1 TAB {"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]}
    2 TAB {"cust_id":3535353, "year":2011, "expenses":[761.35, 92.18, 14.41]}
    3 TAB {"cust_id":7979797, "year":2011, "expenses":[4489.00]}
  5. Verify that the Kafka Avro console producer published the messages to the topic by running a Kafka Avro console consumer. Specify the print.key property to have the consumer display the Kafka key. For example:
    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-avro-console-consumer \
        --bootstrap-server localhost:9092 --topic topic_avrokv \
        --from-beginning --property print.key=true
  6. Open a new terminal window, log in to the Greenplum Database master host as the gpadmin administrative user, and set up the Greenplum environment. For example:
    $ ssh gpadmin@gpmaster
    gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
  7. Construct the gpkafka load configuration file. Open a file named avrokvload_cfg.yaml in the editor of your choice. For example:
    gpmaster$ vi avrokvload_cfg.yaml
  8. Fill in the load configuration parameter values based on your environment. For example, if:
    • Your Greenplum Database master hostname is gpmaster.
    • The Greenplum Database server is running on the default port.
    • Your Kafka broker host and port is localhost:9092.
    • Your Confluent Schema Registry address is http://localhost:8081.
    The avrokvload_cfg.yaml file might include the following contents:
    DATABASE: testdb
    USER: gpadmin
    HOST: gpmaster
    PORT: 5432
    VERSION: 2
    KAFKA:
       INPUT:
         SOURCE:
            BROKERS: localhost:9092
            TOPIC: topic_avrokv
         VALUE:
            COLUMNS:
              - NAME: c1
                TYPE: json
            FORMAT: avro
            AVRO_OPTION:
              SCHEMA_REGISTRY_ADDR: http://localhost:8081
         KEY:
            COLUMNS:
              - NAME: id
                TYPE: json
            FORMAT: avro
            AVRO_OPTION:
              SCHEMA_REGISTRY_ADDR: http://localhost:8081
         ERROR_LIMIT: 0
       OUTPUT:
         TABLE: avrokv_from_kafka
         MAPPING:
            - NAME: id
              EXPRESSION: id
            - NAME: customer_id
              EXPRESSION: (c1->>'cust_id')::int
            - NAME: year
              EXPRESSION: (c1->>'year')::int
            - NAME: expenses
              EXPRESSION: array(select json_array_elements(c1->'expenses')::text::float)
       COMMIT:
         MAX_ROW: 100
         MINIMAL_INTERVAL: 500

    The mapping in this configuration assigns each message value field to a separate column and ignores the message key.

  9. Create the target Greenplum Database table named avrokv_from_kafka. For example:
    gpmaster$ psql -d testdb
    
    testdb=# CREATE TABLE avrokv_from_kafka( id json, customer_id int, year int, expenses decimal(9,2)[] );
  10. Exit the psql subsystem:
    testdb=# \q
  11. Run the gpkafka load command to batch load the JSON data published to the topic_json_gpkafka topic into the Greenplum table. For example:
    gpmaster$ gpkafka load --quit-at-eof ./avrokvload_cfg.yaml

    The command exits after it reads all data published to the topic.

  12. Examine the command output, looking for messages that identify the number of rows inserted/rejected. For example:
    ... -[INFO]:- ... Inserted 3 rows
    ... -[INFO]:- ... Rejected 0 rows
  13. View the contents of the Greenplum Database target table avrokv_from_kafka:
    gpmaster$ psql -d testdb
    
    testdb=# SELECT * FROM avrokv_from_kafka ORDER BY customer_id;
     id | customer_id | year |       expenses       
    ----+-------------+------+----------------------
     1  |     1313131 | 2012 | {1313.13,2424.24}
     2  |     3535353 | 2011 | {761.35,92.18,14.41}
     3  |     7979797 | 2011 | {4489.00}
    (3 rows)