Example: Loading JSON Data from Kafka Using the Greenplum Stream Server
Example: Loading JSON Data from Kafka Using the Greenplum Stream Server
In this example, you load JSON format data from a Kafka topic named topic_json_gpkafka into a Greenplum Database table named json_from_kafka. You will perform the load as the Greenplum role gpadmin. The table json_from_kafka resides in the public schema in a Greenplum database named testdb.
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }
You will run a Kafka console producer to emit JSON-format customer expense messages, start a Greenplum Stream Server instance, and use the GPSS gpsscli subcommands to load the data into the json_from_kafka table.
Prerequisites
Before you start this procedure, ensure that you:
- Have administrative access to running Kafka and Greenplum Database clusters, and that these clusters have connectivity as described in both the Greenplum Stream Server Prerequisites section and the Prerequisites section in the Greenplum-Kafka Integration documentation.
- Identify and note the ZooKeeper hostname and port.
- Identify and note the hostname and port of the Kafka broker(s).
- Identify and note the hostname and port of the Greenplum Database master node.
- Register the GPSS extension.
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Procedure
- Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost kafkahost$
- Create a Kafka topic named topic_json_gpkafka. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \ --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \ --topic topic_json_gpkafka
- Open a file named sample_data.json in the editor of your choice.
For example:
kafkahost$ vi sample_data.json
- Copy/paste the following text to add JSON-format data into the file, and then
save and exit:
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 } { "cust_id": 3535353, "month": 11, "expenses": 761.35 } { "cust_id": 7979797, "month": 10, "expenses": 4489.00 } { "cust_id": 7979797, "month": 11, "expenses": 18.72 } { "cust_id": 3535353, "month": 10, "expenses": 6001.94 } { "cust_id": 7979797, "month": 12, "expenses": 173.18 } { "cust_id": 1313131, "month": 10, "expenses": 492.83 } { "cust_id": 3535353, "month": 12, "expenses": 81.12 } { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
- Stream the contents of the sample_data.json file to a Kafka
console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic topic_json_gpkafka < sample_data.json
- Verify that the Kafka console producer published the messages to the topic by
running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic topic_json_gpkafka \ --from-beginning
- Open a new terminal window, log in to the Greenplum Database master host as
the gpadmin administrative user, and set up the Greenplum
environment. For example:
$ ssh gpadmin@gpmaster gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
- Construct the Greenplum Stream Server configuration file. For example, open a
file named gpsscfg_ex.json in the editor of your choice:
gpmaster$ vi gpsscfg_ex.json
- Designate a GPSS listen port number of 50007 and a gpfdist port
number of 8319 in the configuration file. For example, copy/paste the following
into the gpsscfg_ex.json file, and then save and exit the editor:
{ "ListenAddress": { "Host": "", "Port": 50007, "SSL": false }, "Gpfdist": { "Host": "", "Port": 8319 } }
-
Start the
Greenplum Stream Server instance in the background, specifying the log directory
./gpsslogs. For example:
gpmaster$ gpss gpsscfg_ex.json --log-dir ./gpsslogs &
- Construct the gpkafka load configuration file. Open a file
named jsonload_cfg.yaml in the editor of your choice.
For example:
gpmaster$ vi jsonload_cfg.yaml
- Fill in the load configuration parameter values based on your environment.
For example, if:
- Your Greenplum Database master hostname is gpmaster.
- The Greenplum Database server is running on the default port.
- Your Kafka broker host and port is localhost:9092.
- You want to write the Kafka data to a Greenplum Database table named json_from_kafka located in the public schema of a database named testdb.
- You want to write the customer identifier and expenses data to Greenplum.
The jsonload_cfg.yaml file would include the following contents:DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_json_gpkafka COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 10 OUTPUT: TABLE: json_from_kafka MAPPING: - NAME: customer_id EXPRESSION: (jdata->>'cust_id')::int - NAME: month EXPRESSION: (jdata->>'month')::int - NAME: amount_paid EXPRESSION: (jdata->>'expenses')::decimal COMMIT: MAX_ROW: 1000
- Create the target Greenplum Database table named
json_from_kafka. For example:
gpmaster$ psql -d testdb testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );
- Exit the psql subsystem:
testdb=# \q
- Submit the Kafka data load job to the GPSS instance running on
port number 50007. (You may consider opening a new terminal window to run the
command.) For example to submit a job named
kafkajson2gp:
gpmaster$ gpsscli submit --name kafkajson2gp --gpss-port 50007 ./jsonload_cfg.yaml 20181214:22:37:49.168 gpsscli:gpadmin:gpmaster:075435-[INFO]:-JobID: kafkajson2gp
- List all GPSS jobs. For example:
gpmaster$ gpsscli list --all --gpss-port 50007 JobID GPHost GPPort DataBase Schema Table Topic Status kafkajson2gp localhost 5432 testdb public json_from_kafka topic_json_gpkafka Stopped
The list subcommand displays all jobs. Notice the entry for the kafkajson2gp that you just submitted, and that the job is in the Stopped state.
- Start the job named kafkajson2gp. For example:
gpmaster$ gpsscli start kafkajson2gp --gpss-port 50007 20181214:22:43:32.590 gpsscli:gpadmin:gpmaster:075490-[INFO]:-JobID: kafkajson2gp is started
- Stop the job named kafkajson2gp. For example:
gpmaster$ gpsscli stop kafkajson2gp --gpss-port 50007 20181214:22:51:21.960 gpsscli:gpadmin:e11517afb6f6:075781-[INFO]:-Stop a job: kafkajson2gp, status Stopped
- Examine the gpss command output and log file, looking for
messages that identify the number of
rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows ... -[INFO]:- ... Rejected 0 rows
- View the contents of the Greenplum Database target table json_from_kafka:
gpmaster$ psql -d testdb testdb=# SELECT * FROM json_from_kafka WHERE customer_id='1313131' ORDER BY amount_paid; customer_id | month | amount_paid -------------+-------+------------- 1313131 | 11 | 368.27 1313131 | 10 | 492.83 1313131 | 12 | 1313.13 (3 rows)