Example: Loading JSON Data from Kafka into Greenplum
A newer version of this documentation is available. Use the version menu above to view the most up-to-date release of the Greenplum 6.x documentation.
Example: Loading JSON Data from Kafka into Greenplum
In this example, you load JSON format data from a Kafka topic named topic_json_gpkafka into a Greenplum Database table named json_from_kafka. You will perform the load as the Greenplum role gpadmin. The table json_from_kafka resides in the public schema in a Greenplum database named testdb.
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }
You will run a Kafka console producer to emit JSON-format customer expense messages, and use the Greenplum-Kafka Integration gpkafka load command to transform and load the data into the json_from_kafka table.
Prerequisites
Before you start this procedure, ensure that you:
- Have administrative access to running Kafka and Greenplum Database clusters, and that these clusters have connectivity as described in the loading Prerequisites.
- Identify and note the ZooKeeper hostname and port.
- Identify and note the hostname and port of the Kafka broker(s).
- Identify and note the hostname and port of the Greenplum Database master node.
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Procedure
- Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost kafkahost$
- Create a Kafka topic named topic_json_gpkafka. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \ --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \ --topic topic_json_gpkafka
- Open a file named sample_data.json in the editor of your choice.
For example:
kafkahost$ vi sample_data.json
- Copy/paste the following text to add JSON-format data into the file, and then
save and exit:
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 } { "cust_id": 3535353, "month": 11, "expenses": 761.35 } { "cust_id": 7979797, "month": 10, "expenses": 4489.00 } { "cust_id": 7979797, "month": 11, "expenses": 18.72 } { "cust_id": 3535353, "month": 10, "expenses": 6001.94 } { "cust_id": 7979797, "month": 12, "expenses": 173.18 } { "cust_id": 1313131, "month": 10, "expenses": 492.83 } { "cust_id": 3535353, "month": 12, "expenses": 81.12 } { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
- Stream the contents of the sample_data.json file to a Kafka
console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic topic_json_gpkafka < sample_data.json
- Verify that the Kafka console producer published the messages to the topic by
running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic topic_json_gpkafka \ --from-beginning
- Open a new terminal window, log in to the Greenplum Database master host as
the gpadmin administrative user, and set up the Greenplum
environment. For example:
$ ssh gpadmin@gpmaster gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
- Construct the gpkafka load configuration file. Open a file
named jsonload_cfg.yaml in the editor of your choice.
For example:
gpmaster$ vi jsonload_cfg.yaml
- Fill in the load configuration parameter values based on your environment.
For example, if:
- Your Greenplum Database master hostname is gpmaster.
- The Greenplum Database server is running on the default port.
- Your Kafka broker host and port is localhost:9092.
- You want to write the Kafka data to a Greenplum Database table named json_from_kafka located in the public schema of a database named testdb.
- You want to write the customer identifier and expenses data to Greenplum.
The jsonload_cfg.yaml file would include the following contents:DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_json_gpkafka COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 10 OUTPUT: TABLE: json_from_kafka MAPPING: - NAME: customer_id EXPRESSION: (jdata->>'cust_id')::int - NAME: month EXPRESSION: (jdata->>'month')::int - NAME: amount_paid EXPRESSION: (jdata->>'expenses')::decimal COMMIT: MAX_ROW: 100
- Create the target Greenplum Database table named
json_from_kafka. For example:
gpmaster$ psql -d testdb testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );
- Exit the psql subsystem:
testdb=# \q
- Run the gpkafka load command to batch load the JSON data
published to the topic_json_gpkafka topic into the Greenplum
table. For example:
gpmaster$ gpkafka load --quit-at-eof ./jsonload_cfg.yaml
The command exits after it reads all data published to the topic.
- Examine the command output, looking for messages that identify the number of
rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows ... -[INFO]:- ... Rejected 0 rows
- View the contents of the Greenplum Database target table json_from_kafka:
gpmaster$ psql -d testdb testdb=# SELECT * FROM json_from_kafka WHERE customer_id='1313131' ORDER BY amount_paid; customer_id | month | amount_paid -------------+-------+------------- 1313131 | 11 | 368.27 1313131 | 10 | 492.83 1313131 | 12 | 1313.13 (3 rows)