Loading Kafka Data into Greenplum

Loading Kafka Data into Greenplum

You will perform the following tasks when you use the Greenplum-Kafka Integration to load Kafka data into a Greenplum Database table:

  1. Ensure that you meet the Prerequisites.
  2. Identify the format of the Kafka data.
  3. Register the data formatters.
  4. Construct the load configuration file.
  5. Create the target Greenplum Database table.
  6. Assign Greenplum Database role permissions, if required, as described in Configuring Greenplum Database Role Privileges.
  7. Run the gpkafka load command to load the Kafka data into Greenplum Database.
  8. Verify the load operation as described in Checking the Progress of a Load Operation.

About Supported Kafka Message Data Formats

The Greenplum-Kafka Integration supports Kafka message key and value data in the following formats:

Format Description
avro Avro-format data. gpkafka supports:
  • Loading Kafka message key or value data from a single-object encoded Avro file.
  • Using the Avro schema of a Kafka message key and/or value registered in a Confluent Schema Registry to load Avro-format key and/or value data.

In both cases, gpkafka reads Avro data from Kafka only as a single JSON-type column.

binary Binary format data. gpkafka reads binary data from Kafka only as a single bytea-type column.
csv Comma-delimited text format data.
custom Data of a custom format, parsed by a custom formatter.
delimited Text data separated by a configurable delimiter.
json JSON-format data. gpkafka reads JSON data from Kafka only as a single column.

To write Kafka data into a Greenplum Database table, you must identify the data format in the load configuration file.

Avro

Specify the avro format when your Kafka message data is a single-object encoded Avro file or you are using the Confluent Schema Registry to load Avro message key and/or value data. gpkafka reads Avro data from Kafka and loads it into a single JSON-type column. You must define a mapping if you want gpkafka to write the data into specific columns in the target Greenplum Database table.

Binary

Use the binary format when your Kafka message data is a stream of bytes. gpkafka reads binary data from Kafka and loads it into a single bytea-type column.

CSV

Use the csv format when your Kafka message data is comma-delimited text and conforms to RFC 4180. The message content may not contain line ending characters (CR and LF).

Data in csv format may appear in Kafka messages as follows:

"1313131","12","backorder","1313.13"
"3535353","11","shipped","761.35"
"7979797","11","partial","18.72"

Custom

The Greenplum-Kafka Integration provides a custom data formatter plug-in framework for Kafka messages using user-defined functions. The type of Kafka message data supported by a custom formatter is formatter-specific. For example, a custom formatter may support compressed or complex data.

Delimited Text

The Greenplum-Kafka Integration supports loading Kafka data delimited by one or more characters that you specify. Use the delimited format for such data. The delimiter may be a multi-byte value and up to 32 bytes in length. You cannot specify a quote or an escape character in the delimiter.

Sample data using a pipe ('|') delimiter character follows:

1313131|12|backorder|1313.13
3535353|11|shipped|761.35
7979797|11|partial|18.72

JSON

Specify the json format when your Kafka message data is in JSON format. gpkafka reads JSON data from Kafka only as a single column. You must define a mapping if you want gpkafka to write the data into specific columns in the target Greenplum Database table.

Sample JSON message data:

{ "cust_id": 1313131, "month": 12, "amount_paid":1313.13 }
{ "cust_id": 3535353, "month": 11, "amount_paid":761.35 }
{ "cust_id": 7979797, "month": 11, "amount_paid":18.82 }

Registering Data Formatters

Before you load Kafka message data into Greenplum Database, you must register Greenplum-Kafka Integration formatter functions in every database in which you will write Kafka data to Greenplum tables. To register these functions, you must have Greenplum Database SUPERUSER privileges on the database, or you must be the database owner.

Perform the following procedure to register the Greenplum-Kafka Integration formatter functions:

  1. Open a new terminal window, log in to the Greenplum Database master host as the gpadmin administrative user, and set up the Greenplum environment. For example:
    $ ssh gpadmin@gpmaster
    gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
  2. Start the psql subsystem, connecting to a database in which you want to register the formatter functions. For example:
    gpmaster$ psql -d testdb
  3. Enter the following command to register the functions:
    testdb=# CREATE EXTENSION gpss;
  4. Perform steps 2 and 3 for each database into which you will load Kafka data.

Registering a Custom Formatter

A custom data formatter for Kafka messages is a user-defined function. You must create/register a custom formatter function in each database in which you will use the custom formatter to write Kafka data to Greenplum tables.

Constructing the gpkafka.yaml Configuration File

You configure a data load operation from Kafka to Greenplum Database via a YAML-formatted configuration file. This configuration file includes parameters that identify the source Kafka data and information about the Greenplum Database connection and target table, as well as error and commit thresholds for the operation.

The Greenplum-Kafka Integration supports two versions of the YAML configuration file: VERSION: 1 and VERSION: 2. Version 2 of the configuration file format supports all features of Version 1 of the configuration file, and introduces support for loading both the Kafka message key and value to Greenplum.

Refer to the gpkafka.yaml reference page for Version 1 configuration file contents and syntax. Refer to the gpkafka-v2.yaml reference page for Version 2 configuration file format and the configuration parameters that this version supports.

Contents of a sample gpkafka Version 2 YAML configuration file named loadcfg2.yaml follows:

DATABASE: ops
USER: gpadmin
HOST: mdw-1
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: kbrokerhost1:9092
         TOPIC: customer_expenses2
      VALUE:
         COLUMNS:
           - NAME: c1
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      KEY:
         COLUMNS:
           - NAME: key
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: payables
      TABLE: expenses2
      MAPPING:
        - NAME: customer_id
          EXPRESSION: (c1->>'cust_id')::int
        - NAME: newcust
          EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean
        - NAME: expenses
          EXPRESSION: (c1->>'expenses')::decimal
        - NAME: tax_due
          EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal
   METADATA:
      SCHEMA: gpkafka_internal
   COMMIT:
      MAX_ROW: 1000
      MINIMAL_INTERVAL: 30000

Greenplum Database Options

You identify the Greenplum Database connection options via the DATABASE, USER, HOST, and PORT parameters.

The VERSION parameter identifies the version of the gpkafka YAML configuration file. The default version is Version 1.

KAFKA:INPUT Options

Specify the Kafka brokers and topic of interest using the SOURCE block. You must create the Kafka topic prior to loading data.

When you provide a VALUE block, you must specify the COLUMNS and FORMAT parameters. The VALUE:COLUMNS block includes the name and type of each data element in the Kafka message. The default source-to-target data mapping behaviour of gpkafka is to match a column name as defined in COLUMNS:NAME with a column name in the target Greenplum Database OUTPUT:TABLE:

  • You must identify the Kafka data elements in the order in which they appear in the Kafka message.
  • You may specify NAME: __IGNORED__ to omit a Kafka message value data element from the load operation.
  • You must provide the same name for each non-ignored Kafka data element and its associated Greenplum Database table column.
  • You must specify an equivalent data type for each non-ignored Kafka data element and its associated Greenplum Database table column.

The VALUE:FORMAT keyword identifies the format of the Kafka message value. gpkafka supports comma-delimited text format (csv ) and data that is separated by a configurable delimiter (delimited). gpkafka also supports binary (binary), JSON (json), custom (custom), and Avro (avro) format value data.

When you provide a KEY block, you must specify the COLUMNS and FORMAT parameters. The KEY:COLUMNS block includes the name and type of each element of the Kafka message key, and is subject to the same restrictions as identified for VALUE:COLUMNS above. The KEY:FORMAT keyword identifies the format of the Kafka message key. gpkafka supports avro, binary, csv, custom, delimited, and json format key data.

The ERROR_LIMIT parameter identifies the number of errors or the error percentage threshold after which gpkafka should exit the load operation.

KAFKA:OUTPUT Options

You identify the target Greenplum Database schema name and table name via the KAFKA:OUTPUT: SCHEMA and TABLE parameters. You must pre-create the Greenplum Database table before you attempt to load Kafka data.

You can override the default mapping of the INPUT VALUE:COLUMNS and KEY:COLUMNS by specifying a MAPPING block in which you identify the association between a specific column in the target Greenplum Database table and a Kafka message value or key data element. You can also map a Greenplum Database table column to a value expression.
Note: When you specify a MAPPING block, ensure that you provide entries for all Kafka data elements of interest - gpkafka does not automatically match column names when you provide a MAPPING.

Other Options

The KAFKA:METADATA:SCHEMA parameter specifies the name of the Greenplum Database schema in which gpkafka creates external and history tables.

gpkafka commits Kafka data to the Greenplum Database table at the row and/or time intervals that you specify in the KAFKA:COMMIT: MAX_ROW and/or MINIMAL_INTERVAL parameters. You must specify at least one of these parameters.

About KEYs, VALUEs, and FORMATs

You can specify any data format in the Version 2 configuration file KEY:FORMAT and VALUE:FORMAT parameters, with some restrictions. The Greenplum-Kafka Integration supports the following KEY:FORMAT and VALUE:FORMAT combinations:

KEY:FORMAT VALUE:FORMAT Description
any none (VALUE block omitted) gpkafka loads only the Kafka message key data, subject to any MAPPING that you specify, to Greenplum Database.
none (KEY block omitted) any Equivalent to gpkafka configuration file Version 1. gpkafka ignores the Kafka message key and loads only the Kafka message value data, subject to any MAPPING that you specify, to Greenplum Database.
csv any Not permitted.
any csv Not permitted.
avro, binary, delimited, or json avro, binary, delimited, or json Any combination is permitted. gpkafka loads both the Kafka message key and value data, subject to any MAPPING that you specify, to Greenplum Database.

About Mapping Kafka Input Data

You can define a MAPPING between the Kafka input data (VALUE:COLUMNS and KEY:COLUMNS) and the columns in the target Greenplum Database table. Defining a mapping may be useful when you have a multi-field input column (such as a JSON-type column), and you want to assign individual components of the input field to specific columns in the target table.

You might also use a MAPPING to assign a value expression to a target table column. The expression must be one that you could specify in the SELECT list of a query, and can include a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so forth.

If you choose to map more than one input column in an expression, you can can create a user-defined function to parse and transform the input column and return the columns of interest.

For example, suppose a Kafka producer emits the following JSON messages to a topic:

{ "customer_id": 1313131, "some_intfield": 12 }
{ "customer_id": 77, "some_intfield": 7 }
{ "customer_id": 1234, "some_intfield": 56 }

You could define a user-defined function, udf_parse_json(), to parse the data as follows:

=> CREATE OR REPLACE FUNCTION udf_parse_json(value json)
     RETURNS TABLE (x int, y text)
   LANGUAGE plpgsql AS $$
     BEGIN
        RETURN query
        SELECT ((value->>'customer_id')::int), ((value->>'some_intfield')::text);
     END $$;

This function returns the two fields in each JSON record, casting the fields to integer and text, respectively.

An example MAPPING for the topic data in a JSON-type KAFKA:INPUT:COLUMNS named jdata follows:

MAPPING: 
  - NAME: cust_id
    EXPRESSION: (jdata->>'customer_id')
  - NAME: field2
    EXPRESSION: ((jdata->>'some_intfield') * .075)::decimal
  - NAME: j1, j2
    EXPRESSION: (udf_parse_json(jdata)).*

The Greenplum Database table definition for this example scenario is:

=> CREATE TABLE t1map( cust_id int, field2 decimal(7,2), j1 int, j2 text );

Creating the Greenplum Table

You must pre-create the Greenplum table before you load Kafka data into Greenplum Database. You use the KAFKA:OUTPUT: SCHEMA and TABLE load configuration file parameters to identify the schema and table names.

The target Greenplum table definition must include each column that gpkafka will load into the table. The table definition may include additional columns; gpkafka ignores these columns, and loads no data into them.

The name and data type that you specify for a column of the target Greenplum Database table must match the name and data type of the related, non-ignored Kafka message element. If you have defined a column mapping, the name of the Greenplum Database column must match the target column name that you specified for the mapping, and the type must match the target column type or expression that you define.

The CREATE TABLE command for the target Greenplum Database table receiving the Kafka topic data defined in the loadcfg2.yaml file presented in the Constructing the gpkafka.yaml Configuration File section follows:

 testdb=# CREATE TABLE payables.expenses2( customer_id int8, newcust bool,
            expenses decimal(9,2), tax_due decimal(7,2) );

Configuring Greenplum Database Role Privileges

If you transfer data from Kafka to Greenplum Database using a non-admin Greenplum user/role name, the Greenplum administrator must assign the role certain privileges:

  • The role must have USAGE and CREATE privileges on any non-public database schema where the user:
    • writes Kafka data to a table in the schema (KAFKA:OUTPUT:SCHEMA), or
    • stores gpkafka external and history tables (KAFKA:METADATA:SCHEMA).

    For example:

    dbname=# GRANT USAGE, CREATE ON SCHEMA schema_name TO role_name;
  • If the role writing to Greenplum Database is not a database or table owner, the role must have SELECT and INSERT privileges on each Greenplum Database table to which the user will write Kafka data:
    dbname=# GRANT SELECT, INSERT ON schema_name.table_name TO role_name;
  • The role must have permission to create readable external tables using the Greenplum Database gpfdist protocol:
    dbname=# ALTER ROLE role_name CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist');

Refer to the Greenplum Database Managing Roles and Privileges documentation for further information on assigning privileges to Greenplum Database users.

Running the gpkafka load Command

You run the gpkafka load command to load Kafka data to Greenplum. When you run the command, you provide the name of the configuration file that defines the parameters of the load operation. For example:

$ gpkafka load loadcfg2.yaml

The default mode of operation for gpkafka load is to read all pending messages and then to wait for, and then consume, new Kafka messages. When running in this mode, gpkafka load waits indefinitely; you can interrupt and exit the command with Control-c.

To run the command in batch mode, you provide the --quit-at-eof option. In this mode, gpkafka load exits when there are no new messages in the Kafka stream.

gpkafka load resumes a subsequent data load operation specifying the same Kafka topic and target Greenplum Database table names from the last recorded offset.

Refer to the gpkafka load reference page for additional information about this command.

About Kafka Offsets, Message Retention, and Loading

Kafka maintains a partitioned log for each topic, assigning each record/message within a partition a unique sequential id number. This id is referred to as an offset. Kafka retains, for each gpkafka load invocation specifying the same Kafka topic and Greenplum Database table names, the last offset within the log consumed by the load operation. The Greenplum-Kafka Integration also records this offset value.

Kafka persists a message for a configurable retention time period and/or log size, after which it purges messages from the log. Kafka topics or messages can also be purged on demand. This may result in an offset mismatch between Kafka and the Greenplum-Kafka Integration.

gpkafka load returns an error if its recorded offset for the Kafka topic and Greenplum Database table combination is behind that of the current earliest Kafka message offset for the the topic. In this situation, gpkafka load returns a message of the following format:

<date:time> gpkafkaload:<user_name>:<host>:<pid>-[CRITICAL]:-Offset gap detected, last load offset is <num>, but earliest available is [<topic_name>[<partition_num>]@<num>]

For example:

20181009:23:15:00.187 gpkafkaload:gpadmin:gpmaster:055450-[CRITICAL]:-Offset gap detected, last load offset is 7, but earliest available is [delimkeynval2[0]@9]

When you receive this message, you can choose to resume the load operation from the earliest available message published to the topic by specifying the --force-reset-earliest option to gpkafka load:

$ gpkafka load --force-reset-earliest loadcfg2.yaml

If you want to load only new messages published to the Kafka topic, use the --force-reset-latest option with the command:

$ gpkafka load --force-reset-latest loadcfg2.yaml

Checking the Progress of a Load Operation

You can check the commit history of a load operation with the gpkafka check command. When you run gpkafka check, you provide the name of the configuration file that defined the load operation of interest. For example:

$ gpkafka check loadcfg2.yaml

Sample command output:

PartitionID    StartTime    EndTime    BeginOffset    EndOffset
0    2018-07-13T16:19:11Z    2018-07-13T16:19:11Z    0    9

When you run gpkafka check without any options, it displays the latest commit. To view the complete commit history of a load operation, run the command with the --show-commit-history all argument.

Refer to the gpkafka check reference page for additional information about this command.