Loading Kafka Data into Greenplum
A newer version of this documentation is available. Use the version menu above to view the most up-to-date release of the Greenplum 6.x documentation.
Loading Kafka Data into Greenplum
You will perform the following tasks when you use the Greenplum-Kafka Integration to load Kafka data into a Greenplum Database table:
- Ensure that you meet the Prerequisites.
- Register the Pivotal Greenplum Stream Server (GPSS) extension as described in the Pivotal Greenplum Stream Server documentation.
- Identify the format of the Kafka data.
- (Optional) Register custom data formatters.
- Construct the load configuration file.
- Create the target Greenplum Database table.
- Assign Greenplum Database role permissions to the table, if required, as described in the Pivotal Greenplum Stream Server documentation.
- Run the gpkafka load command to load the Kafka data into Greenplum Database.
- Check the progress of the load operation.
- Check for load errors as described in the Pivotal Greenplum Stream Server documentation. (Note that the naming format for gpkafka log files is gpkafka_date.log.)
Prerequisites
The Greenplum-Kafka Integration is installed when you install Greenplum Database. Before using the gpkafka utilities to load Kafka data to Greenplum Database, ensure that you:
- Meet the Prerequisites documented for the Pivotal Greenplum Stream Server. gpkafka automatically launches a GPSS server instance for you on the local host.
- Have access to a running Kafka cluster with ZooKeeper, and that you can identify the hostname(s) and port number(s) of the Kafka broker(s) serving the data.
- Can identify the Kafka topic of interest.
- Can run the command on a host that has connectivity to:
- Each Kafka broker host in the Kafka cluster.
- The Greenplum Database master and all segment hosts.
About Supported Kafka Message Data Formats
The Greenplum-Kafka Integration supports Kafka message key and value data in the following formats:
Format | Description |
---|---|
avro | Avro-format data. gpkafka supports:
In both cases, gpkafka reads Avro data from Kafka only as a single JSON-type column. gpkafka supports libz-, lzma- and snappy-compressed Avro data from Kafka. |
binary | Binary format data. gpkafka reads binary data from Kafka only as a single bytea-type column. |
csv | Comma-delimited text format data. |
custom | Data of a custom format, parsed by a custom formatter. |
delimited | Text data separated by a configurable delimiter. |
json | JSON-format data. gpkafka reads JSON data from Kafka only as a single column. |
To write Kafka data into a Greenplum Database table, you must identify the data format in the load configuration file.
Avro
Specify the avro format when your Kafka message data is a single-object encoded Avro file or you are using the Confluent Schema Registry to load Avro message key and/or value data. gpkafka reads Avro data from Kafka and loads it into a single JSON-type column. You must define a mapping if you want gpkafka to write the data into specific columns in the target Greenplum Database table.
Binary
Use the binary format when your Kafka message data is a stream of bytes. gpkafka reads binary data from Kafka and loads it into a single bytea-type column.
CSV
Use the csv format when your Kafka message data is comma-delimited text and conforms to RFC 4180. The message content may not contain line ending characters (CR and LF).
Data in csv format may appear in Kafka messages as follows:
"1313131","12","backorder","1313.13" "3535353","11","shipped","761.35" "7979797","11","partial","18.72"
Custom
The Greenplum-Kafka Integration provides a custom data formatter plug-in framework for Kafka messages using user-defined functions. The type of Kafka message data supported by a custom formatter is formatter-specific. For example, a custom formatter may support compressed or complex data.
Delimited Text
The Greenplum-Kafka Integration supports loading Kafka data delimited by one or more characters that you specify. Use the delimited format for such data. The delimiter may be a multi-byte value and up to 32 bytes in length. You cannot specify a quote or an escape character in the delimiter.
Sample data using a pipe ('|') delimiter character follows:
1313131|12|backorder|1313.13 3535353|11|shipped|761.35 7979797|11|partial|18.72
JSON
Specify the json format when your Kafka message data is in JSON format. gpkafka reads JSON data from Kafka only as a single column. You must define a mapping if you want gpkafka to write the data into specific columns in the target Greenplum Database table.
Sample JSON message data:
{ "cust_id": 1313131, "month": 12, "amount_paid":1313.13 } { "cust_id": 3535353, "month": 11, "amount_paid":761.35 } { "cust_id": 7979797, "month": 11, "amount_paid":18.82 }
Registering a Custom Formatter
A custom data formatter for Kafka messages is a user-defined function. If you are using a custom formatter, you must create and register the formatter function in each database in which you will use it to write Kafka data to Greenplum tables.
Constructing the gpkafka.yaml Configuration File
You configure a data load operation from Kafka to Greenplum Database via a YAML-formatted configuration file. This configuration file includes parameters that identify the source Kafka data and information about the Greenplum Database connection and target table, as well as error and commit thresholds for the operation.
The Greenplum-Kafka Integration supports two versions of the YAML configuration file: VERSION: 1 and VERSION: 2. Version 2 of the configuration file format supports all features of Version 1 of the configuration file, and introduces support for loading both the Kafka message key and value to Greenplum.
Refer to the gpkafka.yaml reference page for Version 1 configuration file contents and syntax. Refer to the gpkafka-v2.yaml reference page for Version 2 configuration file format and the configuration parameters that this version supports.
Contents of a sample gpkafka Version 2 YAML configuration file named loadcfg2.yaml follows:
DATABASE: ops USER: gpadmin PASSWORD: changeme HOST: mdw-1 PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: kbrokerhost1:9092 TOPIC: customer_expenses2 VALUE: COLUMNS: - NAME: c1 TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 KEY: COLUMNS: - NAME: key TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 ERROR_LIMIT: 25 OUTPUT: SCHEMA: payables TABLE: expenses2 MAPPING: - NAME: customer_id EXPRESSION: (c1->>'cust_id')::int - NAME: newcust EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean - NAME: expenses EXPRESSION: (c1->>'expenses')::decimal - NAME: tax_due EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal METADATA: SCHEMA: gpkafka_internal COMMIT: MAX_ROW: 1000 MINIMAL_INTERVAL: 30000 POLL: BATCHSIZE: 100 TIMEOUT: 3000
Greenplum Database Options
You identify the Greenplum Database connection options via the DATABASE, USER, PASSWORD, HOST, and PORT parameters.
The VERSION parameter identifies the version of the gpkafka YAML configuration file. The default version is Version 1.
KAFKA:INPUT Options
Specify the Kafka brokers and topic of interest using the SOURCE block. You must create the Kafka topic prior to loading data.
When you provide a VALUE block, you must specify the COLUMNS and FORMAT parameters. The VALUE:COLUMNS block includes the name and type of each data element in the Kafka message. The default source-to-target data mapping behaviour of gpkafka is to match a column name as defined in COLUMNS:NAME with a column name in the target Greenplum Database OUTPUT:TABLE:
- You must identify the Kafka data elements in the order in which they appear in the Kafka message.
- You may specify NAME: __IGNORED__ to omit a Kafka message value data element from the load operation.
- You must provide the same name for each non-ignored Kafka data element and its associated Greenplum Database table column.
- You must specify an equivalent data type for each non-ignored Kafka data element and its associated Greenplum Database table column.
The VALUE:FORMAT keyword identifies the format of the Kafka message value. gpkafka supports comma-delimited text format (csv ) and data that is separated by a configurable delimiter (delimited). gpkafka also supports binary (binary), JSON (json), custom (custom), and Avro (avro) format value data.
When you provide a KEY block, you must specify the COLUMNS and FORMAT parameters. The KEY:COLUMNS block includes the name and type of each element of the Kafka message key, and is subject to the same restrictions as identified for VALUE:COLUMNS above. The KEY:FORMAT keyword identifies the format of the Kafka message key. gpkafka supports avro, binary, csv, custom, delimited, and json format key data.
The ERROR_LIMIT parameter identifies the number of errors or the error percentage threshold after which gpkafka should exit the load operation.
KAFKA:OUTPUT Options
You identify the target Greenplum Database schema name and table name via the KAFKA:OUTPUT: SCHEMA and TABLE parameters. You must pre-create the Greenplum Database table before you attempt to load Kafka data.
Other Options
The KAFKA:METADATA:SCHEMA parameter specifies the name of the Greenplum Database schema in which gpkafka creates external and history tables.
gpkafka commits Kafka data to the Greenplum Database table at the row and/or time intervals that you specify in the KAFKA:COMMIT: MAX_ROW and/or MINIMAL_INTERVAL parameters. You must specify at least one of these parameters.
gpkafka reads data from Kafka with the batch size and at the time interval that you specify in the KAFKA:POLL: BATCHSIZE and TIMEOUT parameters.
You can configure gpkafka to execute a task (user-defined function or SQL commands) after GPSS reads a configurable number of batches from Kafka. Use the KAFKA:TASK: POST_BATCH_SQL and BATCH_INTERVAL configuration parameters to specify the task and the batch interval.
Specify a KAFKA:PROPERTIES block to set Kafka consumer configuration properties. gpkafka sends the property names and values to Kafka when it instantiates a consumer for the load operation.
About KEYs, VALUEs, and FORMATs
You can specify any data format in the Version 2 configuration file KEY:FORMAT and VALUE:FORMAT parameters, with some restrictions. The Greenplum-Kafka Integration supports the following KEY:FORMAT and VALUE:FORMAT combinations:
KEY:FORMAT | VALUE:FORMAT | Description |
---|---|---|
any | none (VALUE block omitted) | gpkafka loads only the Kafka message key data, subject to any MAPPING that you specify, to Greenplum Database. |
none (KEY block omitted) | any | Equivalent to gpkafka configuration file Version 1. gpkafka ignores the Kafka message key and loads only the Kafka message value data, subject to any MAPPING that you specify, to Greenplum Database. |
csv | any | Not permitted. |
any | csv | Not permitted. |
avro, binary, delimited, or json | avro, binary, delimited, or json | Any combination is permitted. gpkafka loads both the Kafka message key and value data, subject to any MAPPING that you specify, to Greenplum Database. |
About Transforming and Mapping Kafka Input Data
You can define a MAPPING between the Kafka input data (VALUE:COLUMNS and KEY:COLUMNS) and the columns in the target Greenplum Database table. Defining a mapping may be useful when you have a multi-field input column (such as a JSON-type column), and you want to assign individual components of the input field to specific columns in the target table.
You might also use a MAPPING to assign a value expression to a target table column. The expression must be one that you could specify in the SELECT list of a query, and can include a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so forth.
If you choose to map more than one input column in an expression, you can can create a user-defined function to parse and transform the input column and return the columns of interest.
For example, suppose a Kafka producer emits the following JSON messages to a topic:
{ "customer_id": 1313131, "some_intfield": 12 } { "customer_id": 77, "some_intfield": 7 } { "customer_id": 1234, "some_intfield": 56 }
You could define a user-defined function, udf_parse_json(), to parse the data as follows:
=> CREATE OR REPLACE FUNCTION udf_parse_json(value json) RETURNS TABLE (x int, y text) LANGUAGE plpgsql AS $$ BEGIN RETURN query SELECT ((value->>'customer_id')::int), ((value->>'some_intfield')::text); END $$;
This function returns the two fields in each JSON record, casting the fields to integer and text, respectively.
An example MAPPING for the topic data in a JSON-type KAFKA:INPUT:COLUMNS named jdata follows:
MAPPING: - NAME: cust_id EXPRESSION: (jdata->>'customer_id') - NAME: field2 EXPRESSION: ((jdata->>'some_intfield') * .075)::decimal - NAME: j1, j2 EXPRESSION: (udf_parse_json(jdata)).*
The Greenplum Database table definition for this example scenario is:
=> CREATE TABLE t1map( cust_id int, field2 decimal(7,2), j1 int, j2 text );
Creating the Greenplum Table
You must pre-create the Greenplum table before you load Kafka data into Greenplum Database. You use the KAFKA:OUTPUT: SCHEMA and TABLE load configuration file parameters to identify the schema and table names.
The target Greenplum table definition must include each column that gpkafka will load into the table. The table definition may include additional columns; gpkafka ignores these columns, and loads no data into them.
The name and data type that you specify for a column of the target Greenplum Database table must match the name and data type of the related, non-ignored Kafka message element. If you have defined a column mapping, the name of the Greenplum Database column must match the target column name that you specified for the mapping, and the type must match the target column type or expression that you define.
The CREATE TABLE command for the target Greenplum Database table receiving the Kafka topic data defined in the loadcfg2.yaml file presented in the Constructing the gpkafka.yaml Configuration File section follows:
testdb=# CREATE TABLE payables.expenses2( customer_id int8, newcust bool, expenses decimal(9,2), tax_due decimal(7,2) );
Running the gpkafka load Command
Pivotal recommends that you migrate to using the GPSS utilities directly.
You run the gpkafka load command to load Kafka data to Greenplum. When you run the command, you provide the name of the configuration file that defines the parameters of the load operation. For example:
$ gpkafka load loadcfg2.yaml
The default mode of operation for gpkafka load is to read all pending messages and then to wait for, and then consume, new Kafka messages. When running in this mode, gpkafka load waits indefinitely; you can interrupt and exit the command with Control-c.
To run the command in batch mode, you provide the --quit-at-eof option. In this mode, gpkafka load exits when there are no new messages in the Kafka stream.
gpkafka load resumes a subsequent data load operation specifying the same Kafka topic and target Greenplum Database table names from the last recorded offset.
Refer to the gpkafka load reference page for additional information about this command.
About Kafka Offsets, Message Retention, and Loading
Kafka maintains a partitioned log for each topic, assigning each record/message within a partition a unique sequential id number. This id is referred to as an offset. Kafka retains, for each gpkafka load invocation specifying the same Kafka topic and Greenplum Database table names, the last offset within the log consumed by the load operation. The Greenplum-Kafka Integration also records this offset value.
Kafka persists a message for a configurable retention time period and/or log size, after which it purges messages from the log. Kafka topics or messages can also be purged on demand. This may result in an offset mismatch between Kafka and the Greenplum-Kafka Integration.
gpkafka load returns an error if its recorded offset for the Kafka topic and Greenplum Database table combination is behind that of the current earliest Kafka message offset for the topic. In this situation, gpkafka load returns a message of the following format:
<date:time> gpkafkaload:<user_name>:<host>:<pid>-[CRITICAL]:-Offset gap detected, last load offset is <num>, but earliest available is [<topic_name>[<partition_num>]@<num>]
For example:
20181009:23:15:00.187 gpkafkaload:gpadmin:gpmaster:055450-[CRITICAL]:-Offset gap detected, last load offset is 7, but earliest available is [delimkeynval2[0]@9]
When you receive this message, you can choose to resume the load operation from the earliest available message published to the topic by specifying the --force-reset-earliest option to gpkafka load:
$ gpkafka load --force-reset-earliest loadcfg2.yaml
If you want to load only new messages published to the Kafka topic, use the --force-reset-latest option with the command:
$ gpkafka load --force-reset-latest loadcfg2.yaml
Checking the Progress of a Load Operation
You can check the commit history of a load operation with the gpkafka check command. When you run gpkafka check, you provide the name of the configuration file that defined the load operation of interest. For example:
$ gpkafka check loadcfg2.yaml
Sample command output:
PartitionID StartTime EndTime BeginOffset EndOffset 0 2018-07-13T16:19:11Z 2018-07-13T16:19:11Z 0 9
When you run gpkafka check without any options, it displays the latest commit. To view the complete commit history of a load operation, run the command with the --show-commit-history all argument.
Refer to the gpkafka check reference page for additional information about this command.