Example: Loading JSON Data from Kafka (Simple)
Example: Loading JSON Data from Kafka (Simple)
In this example, you load JSON format data from a Kafka topic named topic_json into a single column Greenplum Database table named single_json_column. You perform the load as the Greenplum role gpadmin. The table single_json_column resides in the public schema in a Greenplum database named testdb.
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }
You will run a Kafka console producer to emit JSON-format customer expense messages, and use the Greenplum-Kafka Integration gpkafka load command to load each Kafka message into a row in the single_json_column table.
Prerequisites
Before you start this procedure, ensure that you:
- Have administrative access to running Kafka and Greenplum Database clusters.
- Have configured connectivity as described in the loading Prerequisites.
- Identify and note the ZooKeeper hostname and port.
- Identify and note the hostname and port of the Kafka broker(s).
- Identify and note the hostname and port of the Greenplum Database master node.
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Procedure
- Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost kafkahost$
- Create a Kafka topic named topic_json. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \ --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \ --topic topic_json
- Open a file named sample_data.json in the editor of your choice.
For example:
kafkahost$ vi sample_data.json
- Copy/paste the following text to add JSON-format data into the file, and then
save and exit:
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 } { "cust_id": 3535353, "month": 11, "expenses": 761.35 } { "cust_id": 7979797, "month": 10, "expenses": 4489.00 } { "cust_id": 7979797, "month": 11, "expenses": 18.72 } { "cust_id": 3535353, "month": 10, "expenses": 6001.94 } { "cust_id": 7979797, "month": 12, "expenses": 173.18 } { "cust_id": 1313131, "month": 10, "expenses": 492.83 } { "cust_id": 3535353, "month": 12, "expenses": 81.12 } { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
- Stream the contents of the sample_data.json file to a Kafka
console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic topic_json < sample_data.json
- Verify that the Kafka console producer published the messages to the topic by
running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic topic_json \ --from-beginning
- Open a new terminal window, log in to the Greenplum Database master host as
the gpadmin administrative user, and set up the Greenplum
environment. For example:
$ ssh gpadmin@gpmaster gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
- Construct the gpkafka load configuration file. Open a file
named simple_jsonload_cfg.yaml in the editor of your choice.
For example:
gpmaster$ vi simple_jsonload_cfg.yaml
- Fill in the load configuration parameter values based on your environment.
This example assumes:
- Your Greenplum Database master hostname is gpmaster.
- The Greenplum Database server is running on the default port.
- Your Kafka broker host and port is localhost:9092.
- You want to write the Kafka data to a Greenplum Database table named single_json_column located in the public schema of a database named testdb.
- You want to write the data to Greenplum as a single json type column.
The simple_jsonload_cfg.yaml file would include the following contents:DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_json FORMAT: json ERROR_LIMIT: 10 OUTPUT: TABLE: single_json_column COMMIT: MINIMAL_INTERVAL: 1000
- Create the target Greenplum Database table named
single_json_column. For example:
gpmaster$ psql -d testdb testdb=# CREATE TABLE single_json_column( value json );
- Exit the psql subsystem:
testdb=# \q
- Run the gpkafka load command to batch load the JSON data
published to the topic_json topic into the Greenplum
table. For example:
gpmaster$ gpkafka load --quit-at-eof ./simple_jsonload_cfg.yaml
The command exits after it reads all data published to the topic.
- Examine the command output, looking for messages that identify the number of
rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows ... -[INFO]:- ... Rejected 0 rows
- View the contents of the Greenplum Database target table single_json_column:
gpmaster$ psql -d testdb testdb=# SELECT * FROM single_json_column; value ---------------------------------------------------------- { "cust_id": 7979797, "month": 10, "expenses": 4489.00 } { "cust_id": 7979797, "month": 11, "expenses": 18.72 } { "cust_id": 3535353, "month": 12, "expenses": 81.12 } { "cust_id": 3535353, "month": 11, "expenses": 761.35 } { "cust_id": 1313131, "month": 12, "expenses": 1313.13 } { "cust_id": 3535353, "month": 10, "expenses": 6001.94 } { "cust_id": 7979797, "month": 12, "expenses": 173.18 } { "cust_id": 1313131, "month": 10, "expenses": 492.83 } { "cust_id": 1313131, "month": 11, "expenses": 368.27 } (9 rows)
- Use json operators to view the expenses associated with
a specific customer:
testdb=# SELECT (value->>'expenses')::decimal AS expenses FROM single_json_column WHERE (value->>'cust_id')::int = 1313131; expenses ---------- 1313.13 492.83 368.27 (3 rows)