gpkafka-v2.yaml

gpkafka-v2.yaml

gpkafka configuration file (version 2).

Synopsis

DATABASE: db_name
USER: user_name
HOST: host
PORT: greenplum_port
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: kafka_broker_host:broker_port [, ... ]
        TOPIC: kafka_topic
      [VALUE:
        COLUMNS:
           - NAME: { column_name | __IGNORED__ }
             TYPE: column_data_type
           [ ... ]
         FORMAT: value_data_format
         [[DELIMITED_OPTION:
            DELIMITER: delimiter_string] |
         [AVRO_OPTION:
            SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port [, ... ]]] |
         [CUSTOM_OPTION:
            NAME: udf_name
            PARAMSTR: udf_parameter_string]]
      [KEY:
        COLUMNS:
           - NAME: { column_name | __IGNORED__ }
             TYPE: column_data_type
           [ ... ]
         FORMAT: key_data_format
         [[DELIMITED_OPTION:
            DELIMITER: delimiter_string] |
         [AVRO_OPTION:
            SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port [, ... ]] |
         [CUSTOM_OPTION:
            NAME: udf_name
            PARAMSTR: udf_parameter_string]]
      ERROR_LIMIT: { num_errors | percentage_errors }
      [LOCAL_HOSTNAME: local_hostname]
      [LOCAL_PORT: local_port]
   OUTPUT:
      [SCHEMA: output_schema_name]
      TABLE: table_name
      [MAPPING: 
         - NAME: target_column_name
           EXPRESSION: { source_column_name | 'expression' }
         [ ... ]]
   [METADATA:
      [SCHEMA: metadata_schema_name]]
   COMMIT:
      MAX_ROW: num_rows
      MINIMAL_INTERVAL: wait_time

Description

You specify load configuration parameters for the gpkafka utilities in a YAML-formatted configuration file. (This reference page uses the name gpkafka.yaml when referring to this file; you may choose your own name for the file.) Load parameters include Greenplum Database connection and target table information, Kafka broker and topic information, and error and commit thresholds.

Note: Version 2 of the gpkafka.yaml configuration file syntax supports KEY and VALUE blocks. Version 1 does not.

The gpkafka utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant.

Note: The gpkafka.yaml keywords are case-sensitive.

Keywords and Values

Greenplum Database Options
DATABASE: db_name
The name of the Greenplum database.
USER: user_name
The name of the Greenplum Database user/role. This user_name must have permissions as described in Configuring Greenplum Database Role Privileges.
HOST: host
The host name or IP address of the Greenplum Database master host.
PORT: greenplum_port
The port number of the Greenplum Database server on the master host.
VERSION: 2
The version of the gpkafka configuration file. You must specify VERSION: 2 when you configure VALUE and/or KEY blocks in the file.
KAFKA:INPUT: Options
SOURCE
Kafka input configuration parameters.
BROKERS: kafka_broker_host:broker_port
The host and port identifying the Kafka broker.
TOPIC: kafka_topic
The name of the Kafka topic from which to load data. The topic must exist.
VALUE:
The Kafka message value field names, data types, and format. You must specify all Kafka data elements in the order in which they appear in the Kafka message. Optional when you specify a KEY block; gpkafka ignores the Kafka message value in this circumstance.
KEY:
The Kafka message key field names, data types, and format. You must specify all Kafka key elements in the order in which they appear in the Kafka message. Optional when you specify a VALUE block; gpkafka ignores the Kafka message key in this circumstance.
COLUMNS:NAME: column_name
The name of a key or value column. column_name must match the column name of the target Greenplum Database table. Specify __IGNORED__ to omit this Kafka message data element from the load operation.
The default source-to-target data mapping behaviour of gpkafka is to match a column name as defined in COLUMNS:NAME with a column name in the target Greenplum Database TABLE. You can override the default mapping by specifying a MAPPING block.
COLUMNS:TYPE: data_type
The data type of the column. You must specify an equivalent data type for each non-ignored Kafka message data element and the associated Greenplum Database table column.
FORMAT: data_format
The format of the Kafka message key or value data. You may specify a FORMAT of avro, binary, csv, custom, delimited, or json for the key and value, with some restrictions.
avro
When you specify the avro data format, you must define only a single json type column in COLUMNS. If the Kafka message key or value schema is registered in a Confluent Schema Registry, you must also provide the AVRO_OPTION.
binary
When you specify the binary data format, you must define only a single bytea type column in COLUMNS.
csv
When you specify the csv data format, the message content cannot contain line ending characters (CR and LF).
You must not provide a VALUE block when you specify csv format for a KEY block. Similarly, you must not provide a KEY block when you specify csv format for a VALUE block.
custom
When you specify the custom data format, you must provide a CUSTOM_OPTION.
delimited
When you specify the delimited data format, you must provide a DELIMITED_OPTION.
json
When you specify the json data format, you must define only a single json type column in COLUMNS.
AVRO_OPTION:SCHEMA_REGISTRY_ADDR: schemareg_host:schemareg_port
Optional. When you specify avro as the FORMAT and the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
CUSTOM_OPTION
Optional. When you specify custom as the FORMAT, CUSTOM_OPTION is required. This block identifies the name and the arguments of a custom formatter user-defined function.
NAME: udf_name
The name of the custom formatter user-defined function.
PARAMSTR: udf_parameter_string
A string specifying the comma-separated list of arguments to pass to the custom formatter user-defined function.
DELIMITED_OPTION:DELIMITER: delimiter_string
Optional. When you specify delimited as the FORMAT, delimiter_string is required and must identify the Kafka message data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
ERROR_LIMIT: { num_errors | percentage_errors }
The error threshold, specified as either an absolute number or a percentage. gpkafka load exits when this limit is reached.
LOCAL_HOSTNAME: local_hostname
The name of the local host on which you are running gpkafka. This host should be DNS resolvable from each Greenplum Database segment host. Optional, the default value is the output of hostname -f, a short host name.
Note: You must explicitly set LOCAL_HOSTNAME if you need the FQDN of the host.
LOCAL_PORT: local_port
The gpfdist port number on the local host. Optional, the default value is 8080.
KAFKA:OUTPUT: Options
SCHEMA: output_schema_name
The name of the Greenplum Database schema in which table_name resides. Optional, the default schema is the public schema.
TABLE: table_name
The name of the Greenplum Database table into which gpkafka loads the Kafka data.
MAPPING:
Optional. Overrides the default source-to-target column mapping.
Note: When you specify a MAPPING, ensure that you provide a mapping for all Kafka message key and value elements of interest. gpkafka does not automatically match column names when you provide a MAPPING.
NAME: target_column_name
Specifies the target Greenplum Database table column name.
EXPRESSION: source_column_name | 'expression'
Specifies a Kafka COLUMNS:NAME (source_column_name) or an 'expression'. When you specify an 'expression', you may provide a value expression that you would specify in the SELECT list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
KAFKA:METADATA: Options
SCHEMA: metadata_schema_name
The name of the Greenplum Database schema in which gpkafka creates external and history tables. The default metadata_schema_name is KAFKA:OUTPUT:SCHEMA.
Greenplum Database COMMIT: Options
COMMIT:
Controls how gpkafka load commits data to Greenplum Database. You must specify one of MAX_ROW or MINIMAL_INTERVAL. You may specify both configuration parameters as long as both values are not zero (0).
MAX_ROW: number_of_rows
The number of rows to batch before triggering an INSERT operation on the Greenplum Database table. The default value of MAX_ROW is 0, which instructs gpkafka to ignore this commit trigger condition.
MINIMAL_INTERVAL: wait_time
The minimum amount of time to wait (milliseconds) between each INSERT operation on the table. The default value is 0, wait forever.

Notes

If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the gpkafka.yaml configuration file. For example, if you create a table as follows:

CREATE TABLE "MyTable" ("MyColumn" text);

Your gpkafka.yaml YAML configuration file would refer to the above table and column names as:

  COLUMNS:
     - name: '"MyColumn"'
       type: text
OUTPUT:
   TABLE: '"MyTable"'

Examples

Load data from Kafka as defined in the Version 2 configuration file named kafka2greenplumv2.yaml:

gpkafka load kafka2greenplumv2.yaml

Example kafka2greenplumv2.yaml configuration file:

DATABASE: ops
USER: gpadmin
HOST: mdw-1
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: kbrokerhost1:9092
         TOPIC: customer_expenses2
      VALUE:
         COLUMNS:
           - NAME: c1
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      KEY:
         COLUMNS:
           - NAME: key
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: payables
      TABLE: expenses2
      MAPPING:
        - NAME: customer_id
          EXPRESSION: (c1->>'cust_id')::int
        - NAME: newcust
          EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean
        - NAME: expenses
          EXPRESSION: (c1->>'expenses')::decimal
        - NAME: tax_due
          EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal
   METADATA:
      SCHEMA: gpkafka_internal
   COMMIT:
      MAX_ROW: 1000
      MINIMAL_INTERVAL: 30000