Example: Loading JSON Data from Kafka Using gpsscli
Example: Loading JSON Data from Kafka Using gpsscli
In this example, you load JSON format data from a Kafka topic named topic_json_gpkafka into a Greenplum Database table named json_from_kafka. You perform the load as the Greenplum role gpadmin. The table json_from_kafka resides in the public schema in a Greenplum database named testdb.
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }
You will run a Kafka console producer to emit JSON-format customer expense messages, start a Greenplum Streaming Server instance, and use the GPSS gpsscli subcommands to load the data into the json_from_kafka table.
Prerequisites
Before you start this procedure, ensure that you:
- Have administrative access to running Kafka and Greenplum Database clusters.
- Have configured connectivity as described in both the Greenplum Streaming Server Prerequisites section and the Kafka Prerequisites.
- Identify and note the ZooKeeper hostname and port.
- Identify and note the hostname and port of the Kafka broker(s).
- Identify and note the hostname and port of the Greenplum Database master node.
- Register the GPSS extension.
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Procedure
- Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost kafkahost$
- Create a Kafka topic named topic_json_gpkafka. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \ --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \ --topic topic_json_gpkafka
- Open a file named sample_data.json in the editor of your choice.
For example:
kafkahost$ vi sample_data.json
- Copy/paste the following text to add JSON-format data into the file, and then
save and exit:
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 } { "cust_id": 3535353, "month": 11, "expenses": 761.35 } { "cust_id": 7979797, "month": 10, "expenses": 4489.00 } { "cust_id": 7979797, "month": 11, "expenses": 18.72 } { "cust_id": 3535353, "month": 10, "expenses": 6001.94 } { "cust_id": 7979797, "month": 12, "expenses": 173.18 } { "cust_id": 1313131, "month": 10, "expenses": 492.83 } { "cust_id": 3535353, "month": 12, "expenses": 81.12 } { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
- Stream the contents of the sample_data.json file to a Kafka
console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic topic_json_gpkafka < sample_data.json
- Verify that the Kafka console producer published the messages to the topic by
running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic topic_json_gpkafka \ --from-beginning
- Open a new terminal window, log in to the Greenplum Database master host as
the gpadmin administrative user, and set up the Greenplum
environment. For example:
$ ssh gpadmin@gpmaster gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
- Construct the Greenplum Streaming Server configuration file. For example, open a
file named gpsscfg_ex.json in the editor of your choice:
gpmaster$ vi gpsscfg_ex.json
- Designate a GPSS listen port number of 5019 and a gpfdist port
number of 8319 in the configuration file. For example, copy/paste the following
into the gpsscfg_ex.json file, and then save and exit the editor:
{ "ListenAddress": { "Host": "", "Port": 5019 }, "Gpfdist": { "Host": "", "Port": 8319 } }
-
Start the
Greenplum Streaming Server instance in the background, specifying the log directory
./gpsslogs. For example:
gpmaster$ gpss gpsscfg_ex.json --log-dir ./gpsslogs &
- Construct the gpkafka load configuration file. Open a file
named jsonload_cfg.yaml in the editor of your choice.
For example:
gpmaster$ vi jsonload_cfg.yaml
- Fill in the load configuration parameter values based on your environment.
This example assumes:
- Your Greenplum Database master hostname is gpmaster.
- The Greenplum Database server is running on the default port.
- Your Kafka broker host and port is localhost:9092.
- You want to write the Kafka data to a Greenplum Database table named json_from_kafka located in the public schema of a database named testdb.
- You want to write the customer identifier and expenses data to Greenplum.
The jsonload_cfg.yaml file would include the following contents:DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_json_gpkafka COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 10 OUTPUT: TABLE: json_from_kafka MAPPING: - NAME: customer_id EXPRESSION: (jdata->>'cust_id')::int - NAME: month EXPRESSION: (jdata->>'month')::int - NAME: amount_paid EXPRESSION: (jdata->>'expenses')::decimal COMMIT: MINIMAL_INTERVAL: 2000
- Create the target Greenplum Database table named
json_from_kafka. For example:
gpmaster$ psql -d testdb testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );
- Exit the psql subsystem:
testdb=# \q
- Submit the Kafka data load job to the GPSS instance running on
port number 5019. (You may consider opening a new terminal window to run the
command.) For example to submit a job named
kafkajson2gp:
gpmaster$ gpsscli submit --name kafkajson2gp --gpss-port 5019 ./jsonload_cfg.yaml 20200804 12:54:19.25262,116652,info,JobID: d577cf37890b5b6bf4e713a9586e86c9,JobName: kafkajson2gp
- List all GPSS jobs. For example:
gpmaster$ gpsscli list --all --gpss-port 5019 JobName JobID GPHost GPPort DataBase Schema Table Topic Status kafkajson2gp d577cf37890b5b6bf4e713a9586e86c9 localhost 5432 testdb public json_from_kafka topic_json_gpkafka JOB_STOPPED
The list subcommand displays all jobs. Notice the entry for the kafkajson2gp that you just submitted, and that the job is in the Stopped state.
- Start the job named kafkajson2gp. For example:
gpmaster$ gpsscli start kafkajson2gp --gpss-port 5019 20200804 12:57:57.35153,117918,info,Job kafkajson2gp is started
- Stop the job named kafkajson2gp. For example:
gpmaster$ gpsscli stop kafkajson2gp --gpss-port 5019 20200804 13:05:09.24280,117506,info,stop job: kafkajson2gp success
- Examine the gpss command output and log file, looking for
messages that identify the number of
rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows ... -[INFO]:- ... Rejected 0 rows
- View the contents of the Greenplum Database target table json_from_kafka:
gpmaster$ psql -d testdb testdb=# SELECT * FROM json_from_kafka WHERE customer_id='1313131' ORDER BY amount_paid; customer_id | month | amount_paid -------------+-------+------------- 1313131 | 11 | 368.27 1313131 | 10 | 492.83 1313131 | 12 | 1313.13 (3 rows)