Loading Kafka Data into Greenplum

A newer version of this documentation is available. Click here to view the most up-to-date release of the Greenplum 5.x documentation.

Loading Kafka Data into Greenplum

You will perform the following tasks when you use the Greenplum-Kafka Connector to load Kafka data into a Greenplum Database table:

  1. Ensure that you meet the Prerequisites.
  2. Construct the load configuration file.
  3. Create the target Greenplum Database table.
  4. Assign Greenplum Database role permissions, if required, as described in Configuring Greenplum Database Role Privileges.
  5. Run the gpkafka load command to load the Kafka data into Greenplum Database.
  6. Verify the load operation as described in Checking the Progress of a Load Operation.

Constructing the gpkafka.yaml Configuration File

You configure a data load operation from Kafka to Greenplum Database via a YAML-formatted configuration file. This configuration file includes parameters that identify the source Kafka data and information about the Greenplum Database connection and target table, as well as error and commit thresholds for the operation.

Contents of a sample gpkafka YAML configuration file named loadcfg.yaml:

DATABASE: ops
USER: gpadmin
HOST: mdw-1
PORT: 5432
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: kbrokerhost1:9092
         TOPIC: customer_expenses
      COLUMNS:
         - NAME: cust_id
           TYPE: int
         - NAME: month
           TYPE: int
         - NAME: expenses
           TYPE: decimal(9,2)
      FORMAT: csv
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: payables
      TABLE: expenses
   COMMIT:
      MAX_ROW: 1000
      MINIMAL_INTERVAL: 30000

You identify the Greenplum Database connection options via the DATABASE, USER, HOST, and PORT parameters.

Specify the Kafka brokers and topic of interest using the KAFKA:INPUT:SOURCE block. You must create the Kafka topic prior to loading data.

The COLUMNS block includes the name and type of each data element in the Kafka message.

The FORMAT keyword identifies the format of the Kafka message.
Note: gpkafka supports only delimited text format data at this time.

The ERROR_LIMIT parameter identifies the number of errors or the error percentage threshold after which gpkafka should exit the load operation.

You identify the target Greenplum Database schema name and table name via the KAFKA:OUTPUT: SCHEMA and TABLE parameters. You must pre-create the Greenplum Database table before you attempt to load Kafka data.

gpkafka commits Kafka data to the Greenplum Database table at the row and/or time intervals that you specify in the KAFKA:COMMIT: MAX_ROW and/or MINIMAL_INTERVAL parameters. You must specify at least one of these parameters.

Refer to the gpkafka.yaml reference page for detailed information about the gpkafka configuration file format and the configuration parameters that the utility supports.

Creating the Greenplum Table

You must pre-create the Greenplum table before you load Kafka data into Greenplum Database. You use the KAFKA:OUTPUT: SCHEMA and TABLE load configuration file parameters to identify the schema and table names.

The columns of the target Greenplum table must match the KAFKA:INPUT:COLUMNS that you specify in the load configuration file in both order and number.

The data types that you specify for the columns of the target Greenplum Database table must reflect the type of the related Kafka message element.

The CREATE TABLE command for the target Greenplum Database table receiving the Kafka topic data defined in the loadcfg.yaml file presented in the Constructing the gpkafka.yaml Configuration File section follows:

 testdb=# CREATE TABLE payables.expenses( id int8, month int2, expenses decimal(9,2) );

Configuring Greenplum Database Role Privileges

If you transfer data from Kafka to Greenplum Database using a non-admin Greenplum user/role name, the Greenplum administrator must assign the role certain privileges:

  • The role must have USAGE and CREATE privileges on each non-public database schema in which the user will write to tables:
    dbname=# GRANT USAGE, CREATE ON SCHEMA schema_name TO role_name;
  • If the role writing to Greenplum Database is not a database or table owner, the role must have SELECT and INSERT privileges on each Greenplum Database table to which the user will write Kafka data:
    dbname=# GRANT SELECT, INSERT ON schema_name.table_name TO role_name;
  • The role must have permission to create readable external tables using the Greenplum Database gpfdist protocol:
    dbname=# ALTER ROLE role_name CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist');

Refer to the Greenplum Database Managing Roles and Privileges documentation for further information on assigning privileges to Greenplum Database users.

Running the gpkafka load Command

You run the gpkafka load command to load Kafka data to Greenplum. When you run the command, you provide the name of the configuration file that defines the parameters of the load operation. For example:

$ gpkafka load loadcfg.yaml

The default mode of operation for gpkafka load is to read all pending messages and then to wait for, and then consume, new Kafka messages. When running in this mode, gpkafka load waits indefinitely; you can interrupt and exit the command with Control-c.

To run the command in batch mode, you provide the --quit-at-eof option. In this mode, gpkafka load exits when there are no new messages in the Kafka stream.

gpkafka load resumes a subsequent data load operation specifying the same Kafka topic and target Greenplum Database table names from the last recorded offset.

Refer to the gpkafka load reference page for additional information about this command.

Checking the Progress of a Load Operation

You can check the commit history of a load operation with the gpkafka check command. When you run gpkafka check, you provide the name of the configuration file that defined the load operation of interest. For example:

$ gpkafka check loadcfg.yaml

Sample command output:

PartitionID    StartTime    EndTime    BeginOffset    EndOffset
0    2018-07-13T16:19:11Z    2018-07-13T16:19:11Z    0    9

When you run gpkafka check without any options, it displays the latest commit. To view the complete commit history of a load operation, run the command with the --show-commit-history all argument.

Refer to the gpkafka check reference page for additional information about this command.