Loading Kafka Data into Greenplum
A newer version of this documentation is available. Click here to view the most up-to-date release of the Greenplum 5.x documentation.
Loading Kafka Data into Greenplum
You will perform the following tasks when you use the Greenplum-Kafka Connector to load Kafka data into a Greenplum Database table:
- Ensure that you meet the Prerequisites.
- Construct the load configuration file.
- Create the target Greenplum Database table.
- Assign Greenplum Database role permissions, if required, as described in Configuring Greenplum Database Role Privileges.
- Run the gpkafka load command to load the Kafka data into Greenplum Database.
- Verify the load operation as described in Checking the Progress of a Load Operation.
Constructing the gpkafka.yaml Configuration File
You configure a data load operation from Kafka to Greenplum Database via a YAML-formatted configuration file. This configuration file includes parameters that identify the source Kafka data and information about the Greenplum Database connection and target table, as well as error and commit thresholds for the operation.
Contents of a sample gpkafka YAML configuration file named loadcfg.yaml:
DATABASE: ops USER: gpadmin HOST: mdw-1 PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: kbrokerhost1:9092 TOPIC: customer_expenses COLUMNS: - NAME: cust_id TYPE: int - NAME: month TYPE: int - NAME: expenses TYPE: decimal(9,2) FORMAT: csv ERROR_LIMIT: 25 OUTPUT: SCHEMA: payables TABLE: expenses COMMIT: MAX_ROW: 1000 MINIMAL_INTERVAL: 30000
You identify the Greenplum Database connection options via the DATABASE, USER, HOST, and PORT parameters.
Specify the Kafka brokers and topic of interest using the KAFKA:INPUT:SOURCE block. You must create the Kafka topic prior to loading data.
The COLUMNS block includes the name and type of each data element in the Kafka message.
The ERROR_LIMIT parameter identifies the number of errors or the error percentage threshold after which gpkafka should exit the load operation.
You identify the target Greenplum Database schema name and table name via the KAFKA:OUTPUT: SCHEMA and TABLE parameters. You must pre-create the Greenplum Database table before you attempt to load Kafka data.
gpkafka commits Kafka data to the Greenplum Database table at the row and/or time intervals that you specify in the KAFKA:COMMIT: MAX_ROW and/or MINIMAL_INTERVAL parameters. You must specify at least one of these parameters.
Refer to the gpkafka.yaml reference page for detailed information about the gpkafka configuration file format and the configuration parameters that the utility supports.
Creating the Greenplum Table
You must pre-create the Greenplum table before you load Kafka data into Greenplum Database. You use the KAFKA:OUTPUT: SCHEMA and TABLE load configuration file parameters to identify the schema and table names.
The columns of the target Greenplum table must match the KAFKA:INPUT:COLUMNS that you specify in the load configuration file in both order and number.
The data types that you specify for the columns of the target Greenplum Database table must reflect the type of the related Kafka message element.
The CREATE TABLE command for the target Greenplum Database table receiving the Kafka topic data defined in the loadcfg.yaml file presented in the Constructing the gpkafka.yaml Configuration File section follows:
testdb=# CREATE TABLE payables.expenses( id int8, month int2, expenses decimal(9,2) );
Configuring Greenplum Database Role Privileges
If you transfer data from Kafka to Greenplum Database using a non-admin Greenplum user/role name, the Greenplum administrator must assign the role certain privileges:
- The role must have USAGE and CREATE
privileges on each non-public database schema in which the user will
write to tables:
dbname=# GRANT USAGE, CREATE ON SCHEMA schema_name TO role_name;
- If the role writing to Greenplum Database is not a database or table
owner, the role must have SELECT and INSERT
privileges on each Greenplum Database table to which the user will
write Kafka data:
dbname=# GRANT SELECT, INSERT ON schema_name.table_name TO role_name;
- The role must have permission to create readable external tables using
the Greenplum Database gpfdist protocol:
dbname=# ALTER ROLE role_name CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist');
Refer to the Greenplum Database Managing Roles and Privileges documentation for further information on assigning privileges to Greenplum Database users.
Running the gpkafka load Command
You run the gpkafka load command to load Kafka data to Greenplum. When you run the command, you provide the name of the configuration file that defines the parameters of the load operation. For example:
$ gpkafka load loadcfg.yaml
The default mode of operation for gpkafka load is to read all pending messages and then to wait for, and then consume, new Kafka messages. When running in this mode, gpkafka load waits indefinitely; you can interrupt and exit the command with Control-c.
To run the command in batch mode, you provide the --quit-at-eof option. In this mode, gpkafka load exits when there are no new messages in the Kafka stream.
gpkafka load resumes a subsequent data load operation specifying the same Kafka topic and target Greenplum Database table names from the last recorded offset.
Refer to the gpkafka load reference page for additional information about this command.
Checking the Progress of a Load Operation
You can check the commit history of a load operation with the gpkafka check command. When you run gpkafka check, you provide the name of the configuration file that defined the load operation of interest. For example:
$ gpkafka check loadcfg.yaml
Sample command output:
PartitionID StartTime EndTime BeginOffset EndOffset 0 2018-07-13T16:19:11Z 2018-07-13T16:19:11Z 0 9
When you run gpkafka check without any options, it displays the latest commit. To view the complete commit history of a load operation, run the command with the --show-commit-history all argument.
Refer to the gpkafka check reference page for additional information about this command.