Example: Loading CSV Data from Kafka
Example: Loading CSV Data from Kafka
In this example, you load data from a Kafka topic named topic_for_gpkafka into a Greenplum Database table named data_from_kafka. You perform the load as the Greenplum role gpadmin. The table data_from_kafka resides in the public schema in a Greenplum database named testdb.
"123","09","456.78"
You will run a Kafka console producer to emit customer expense messages, and use the Greenplum Streaming Server gpkafka load command to transform and load the data into the data_from_kafka table and verify the load operation.
Prerequisites
Before you start this procedure, ensure that you:
- Have administrative access to running Kafka and Greenplum Database clusters.
- Have configured connectivity as described in the loading Prerequisites.
- Identify and note the ZooKeeper hostname and port.
- Identify and note the hostname and port of the Kafka broker(s).
- Identify and note the hostname and port of the Greenplum Database master node.
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Procedure
- Log in to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost kafkahost$
- Create a Kafka topic named topic_for_gpkafka. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \ --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \ --topic topic_for_gpkafka
- Open a file named sample_data.csv in the editor of your choice.
For example:
kafkahost$ vi sample_data.csv
- Copy/paste the following text to add CSV-format data into the file, and then
save and exit:
"1313131","12","1313.13" "3535353","11","761.35" "7979797","10","4489.00" "7979797","11","18.72" "3535353","10","6001.94" "7979797","12","173.18" "1313131","10","492.83" "3535353","12","81.12" "1313131","11","368.27"
- Stream the contents of the sample_data.csv file to a Kafka
console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic topic_for_gpkafka < sample_data.csv
- Verify that the Kafka console producer published the messages to the topic by
running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic topic_for_gpkafka \ --from-beginning
- Open a new terminal window, log in to the Greenplum Database master host as
the gpadmin administrative user, and set up the Greenplum
environment. For example:
$ ssh gpadmin@gpmaster gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
- Construct the gpkafka load configuration file. Open a file
named firstload_cfg.yaml in the editor of your choice.
For example:
gpmaster$ vi firstload_cfg.yaml
- Fill in the load configuration parameter values based on your environment.
This example assumes:
- Your Greenplum Database master hostname is gpmaster.
- The Greenplum Database server is running on the default port.
- Your Kafka broker host and port is localhost:9092.
- You want to write the Kafka data to a Greenplum Database table named data_from_kafka located in the public schema of a database named testdb.
- You want to write the customer identifier and expenses data to Greenplum. You also want to calculate and write the tax due (7.25%) on the expense data.
The firstload_cfg.yaml file would include the following contents:DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_for_gpkafka COLUMNS: - NAME: cust_id TYPE: int - NAME: __IGNORED__ TYPE: int - NAME: expenses TYPE: decimal(9,2) FORMAT: csv ERROR_LIMIT: 125 OUTPUT: TABLE: data_from_kafka MAPPING: - NAME: customer_id EXPRESSION: cust_id - NAME: expenses EXPRESSION: expenses - NAME: tax_due EXPRESSION: expenses * .0725 COMMIT: MINIMAL_INTERVAL: 2000
- Create the target Greenplum Database table named
data_from_kafka. For example:
gpmaster$ psql -d testdb testdb=# CREATE TABLE data_from_kafka( customer_id int8, expenses decimal(9,2), tax_due decimal(7,2) );
- Exit the psql subsystem:
testdb=# \q
- Run the gpkafka load command to batch load the CSV data
published to the topic_for_gpkafka topic into the Greenplum
table. For example:
gpmaster$ gpkafka load --quit-at-eof ./firstload_cfg.yaml
The command exits after it reads all data published to the topic.
- Examine the command output, looking for messages identifying the number of
rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows ... -[INFO]:- ... Rejected 0 rows
- Run the gpkafka load command again, this time in streaming
mode. For example:
gpmaster$ gpkafka load ./firstload_cfg.yaml
The command waits for a producer to publish new messages to the topic.
- Navigate back to your Kafka host terminal window. Stream the contents of the
sample_data.csv file to the Kafka console producer once more:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic topic_for_gpkafka < sample_data.csv
- Notice the activity in your Greenplum Database master terminal window. gpkafka load consumes the new round of messages and waits.
- Interrupt and exit the waiting gpkafka load command by entering Control-c in the Greenplum Database master host terminal window.
- View the contents of the Greenplum Database target table data_from_kafka:
gpmaster$ psql -d testdb testdb=# SELECT * FROM data_from_kafka WHERE customer_id='1313131' ORDER BY expenses; customer_id | expenses | tax_due -------------+----------+--------- 1313131 | 368.27 | 26.70 1313131 | 368.27 | 26.70 1313131 | 492.83 | 35.73 1313131 | 492.83 | 35.73 1313131 | 1313.13 | 95.20 1313131 | 1313.13 | 95.20 (6 rows)
The table contains two entries for each expense because the producer published the sample_data.csv file twice.