Example: Loading Avro Data from Kafka
Example: Loading Avro Data from Kafka
In this example, you load Avro-format key and value data as JSON from a Kafka topic named topic_avrokv into a Greenplum Database table named avrokv_from_kafka. You perform the load as the Greenplum role gpadmin. The table avrokv_from_kafka resides in the public schema in a Greenplum database named testdb.
1 { "cust_id": 123, "year": 1997, "expenses":[456.78, 67.89] }
You will use the Confluent Schema Registry and run a Kafka Avro console producer to emit keys and Avro JSON-format customer expense messages, and use the Greenplum-Kafka Integration gpkafka load command to load the data into the avrokv_from_kafka table.
Prerequisites
Before you start this procedure, ensure that you:
- Have administrative access to running Confluent Kafka and Greenplum Database clusters
- Have configured connectivity as described in the loading Prerequisites.
- Identify and note the ZooKeeper hostname and port.
- Identify and note the address of the Confluent Schema Registry server(s).
- Identify and note the hostname and port of the Kafka broker(s).
- Identify and note the hostname and port of the Greenplum Database master node.
This procedure assumes that you have installed the Confluent Kafka distribution.
Procedure
- Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost kafkahost$
- Create a Kafka topic named topic_json_gpkafka. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics --create \ --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \ --topic topic_avrokv
- Start a Kafka Avro console producer. You will manually input message data to
this producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-avro-console-producer \ --broker-list localhost:9092 \ --topic topic_avrokv \ --property parse.key=true --property key.schema='{"type" : "int", "name" : "id"}' \ --property value.schema='{ "type" : "record", "name" : "example_schema", "namespace" : "com.example", "fields" : [ { "name" : "cust_id", "type" : "int", "doc" : "Id of the customer account" }, { "name" : "year", "type" : "int", "doc" : "year of expense" }, { "name" : "expenses", "type" : {"type": "array", "items": "float"}, "doc" : "Expenses for the year" } ], "doc:" : "A basic schema for storing messages" }'
The producer waits for messages.
- Input the following messages to the Avro console producer. Note: You must enter a tab between the key and value. Replace TAB with a tab.
1 TAB {"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]} 2 TAB {"cust_id":3535353, "year":2011, "expenses":[761.35, 92.18, 14.41]} 3 TAB {"cust_id":7979797, "year":2011, "expenses":[4489.00]}
- Verify that the Kafka Avro console producer published the messages to the
topic by running a Kafka Avro console consumer. Specify the print.key
property to have the consumer display the Kafka key. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-avro-console-consumer \ --bootstrap-server localhost:9092 --topic topic_avrokv \ --from-beginning --property print.key=true
- Open a new terminal window, log in to the Greenplum Database master host as
the gpadmin administrative user, and set up the Greenplum
environment. For example:
$ ssh gpadmin@gpmaster gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
- Construct the gpkafka load configuration file. Open a file
named avrokvload_cfg.yaml in the editor of your choice.
For example:
gpmaster$ vi avrokvload_cfg.yaml
- Fill in the load configuration parameter values based on your environment.
This example assumes:
- Your Greenplum Database master hostname is gpmaster.
- The Greenplum Database server is running on the default port.
- Your Kafka broker host and port is localhost:9092.
- Your Confluent Schema Registry address is http://localhost:8081.
The avrokvload_cfg.yaml file might include the following contents:DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_avrokv VALUE: COLUMNS: - NAME: c1 TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 KEY: COLUMNS: - NAME: id TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 ERROR_LIMIT: 0 OUTPUT: TABLE: avrokv_from_kafka MAPPING: - NAME: id EXPRESSION: id - NAME: customer_id EXPRESSION: (c1->>'cust_id')::int - NAME: year EXPRESSION: (c1->>'year')::int - NAME: expenses EXPRESSION: array(select json_array_elements(c1->'expenses')::text::float) COMMIT: MINIMAL_INTERVAL: 2000
The mapping in this configuration assigns each message value field to a separate column and ignores the message key.
- Create the target Greenplum Database table named
avrokv_from_kafka. For example:
gpmaster$ psql -d testdb testdb=# CREATE TABLE avrokv_from_kafka( id json, customer_id int, year int, expenses decimal(9,2)[] );
- Exit the psql subsystem:
testdb=# \q
- Run the gpkafka load command to batch load the JSON data
published to the topic_json_gpkafka topic into the Greenplum
table. For example:
gpmaster$ gpkafka load --quit-at-eof ./avrokvload_cfg.yaml
The command exits after it reads all data published to the topic.
- Examine the command output, looking for messages that identify the number of
rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 3 rows ... -[INFO]:- ... Rejected 0 rows
- View the contents of the Greenplum Database target table avrokv_from_kafka:
gpmaster$ psql -d testdb testdb=# SELECT * FROM avrokv_from_kafka ORDER BY customer_id; id | customer_id | year | expenses ----+-------------+------+---------------------- 1 | 1313131 | 2012 | {1313.13,2424.24} 2 | 3535353 | 2011 | {761.35,92.18,14.41} 3 | 7979797 | 2011 | {4489.00} (3 rows)