gpkafka-v2.yaml
gpkafka-v2.yaml
gpkafka configuration file (version 2).
Synopsis
DATABASE: db_name USER: user_name PASSWORD: password HOST: host PORT: greenplum_port VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: kafka_broker_host:broker_port [, ... ] TOPIC: kafka_topic [PARTITIONS: (partition_numbers)] [FALLBACK_OFFSET: { earliest | latest }] [VALUE: COLUMNS: - NAME: { column_name | __IGNORED__ } TYPE: column_data_type [ ... ] FORMAT: value_data_format [[DELIMITED_OPTION: DELIMITER: delimiter_string] | [AVRO_OPTION: [SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port [, ... ]] [SCHEMA_CA_ON_GPDB: sr_ca_file_path] [SCHEMA_CERT_ON_GPDB: sr_cert_file_path] [SCHEMA_KEY_ON_GPDB: sr_key_file_path] [SCHEMA_MIN_TLS_VERSION: minimum_version] [SCHEMA_PATH_ON_GPDB: path_to_file]r [BYTES_TO_BASE64: boolean]] | [CUSTOM_OPTION: NAME: udf_name PARAMSTR: udf_parameter_string]] [KEY: COLUMNS: - NAME: { column_name | __IGNORED__ } TYPE: column_data_type [ ... ] FORMAT: key_data_format [[DELIMITED_OPTION: DELIMITER: delimiter_string] | [AVRO_OPTION: [SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port [, ... ]] [SCHEMA_CA_ON_GPDB: sr_ca_file_path] [SCHEMA_CERT_ON_GPDB: sr_cert_file_path] [SCHEMA_KEY_ON_GPDB: sr_key_file_path] [SCHEMA_MIN_TLS_VERSION: minimum_version] [SCHEMA_PATH_ON_GPDB: path_to_file] [BYTES_TO_BASE64: boolean]] | [CUSTOM_OPTION: NAME: udf_name PARAMSTR: udf_parameter_string]] [META: COLUMNS: - NAME: meta_column_name TYPE: { json | jsonb } FORMAT: json] [FILTER: filter_string] [ERROR_LIMIT: { num_errors | percentage_errors }] OUTPUT: [SCHEMA: output_schema_name] TABLE: table_name [MODE: mode] [MATCH_COLUMNS: - match_column_name [ ... ]] [ORDER_COLUMNS: - order_column_name [ ... ]] [UPDATE_COLUMNS: - update_column_name [ ... ]] [UPDATE_CONDITION: update_condition] [DELETE_CONDITION: delete_condition] [MAPPING: - NAME: target_column_name EXPRESSION: { source_column_name | expression } [ ... ] | target_column_name : { source_column_name | expression } [ ... ] ] [METADATA: [SCHEMA: metadata_schema_name]] COMMIT: SAVE_FAILING_BATCH: boolean RECOVER_FAILING_BATCH: boolean (Beta) MAX_ROW: num_rows MINIMAL_INTERVAL: wait_time CONSISTENCY: { strong | at-least | at-most| none } IDLE_DURATION: idle_time [POLL: BATCHSIZE: num_records TIMEOUT: poll_time] [TASK: POST_BATCH_SQL: udf_or_sql_to_run BATCH_INTERVAL: num_batches] [PROPERTIES: kafka_property_name: kafka_property_value [ ... ]] [SCHEDULE: RETRY_INTERVAL: retry_time MAX_RETRIES: num_retries]
PROPERTY: {{template_var}}
Description
You specify load configuration parameters for the gpkafka utilities in a YAML-formatted configuration file. (This reference page uses the name gpkafka.yaml when referring to this file; you may choose your own name for the file.) Load parameters include Greenplum Database connection and target table information, Kafka broker and topic information, and error and commit thresholds.
The gpkafka utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.
Keywords and Values
- DATABASE: db_name
- The name of the Greenplum database.
- USER: user_name
- The name of the Greenplum Database user/role. This user_name must have permissions as described in the Greenplum Streaming Server documentation.
- PASSWORD: password
- The password for the Greenplum Database user/role.
- HOST: host
- The host name or IP address of the Greenplum Database master host.
- PORT: greenplum_port
- The port number of the Greenplum Database server on the master host.
- VERSION: 2
- The version of the gpkafka configuration file. You must specify VERSION: 2 when you configure VALUE and/or KEY blocks in the file.
- SOURCE
- Kafka input configuration parameters.
- BROKERS: kafka_broker_host:broker_port
- The host and port identifying the Kafka broker.
- TOPIC: kafka_topic
- The name of the Kafka topic from which to load data. The topic must exist.
- PARTITIONS: (partition_numbers)
- A single, a comma-separated list, and/or a range of partition numbers from
which GPSS reads messages from the Kafka topic. A range that you specify with
the M...N syntax
includes both the range start and end values. By default, GPSS reads messages
from all partitions of the Kafka topic.
Note: Ensure that you do not configure multiple jobs that specify overlapping partition numbers in the same topic; GPSS behavior is undefined.
- FALLBACK_OFFSET: { earliest | latest }
- Specifies the behaviour of GPSS when it detects a Kafka message offset gap. When set to earliest, GPSS automatically resumes a load operation from the earliest available published message. When set to latest, GPSS loads only new messages to the Kafka topic. If this property is not set, GPSS returns an error.
- VALUE:
- The Kafka message value field names, data types, and format. You must specify all Kafka data elements in the order in which they appear in the Kafka message. Optional when you specify a KEY block; gpkafka ignores the Kafka message value in this circumstance.
- KEY:
- The Kafka message key field names, data types, and format. You must specify all Kafka key elements in the order in which they appear in the Kafka message. Optional when you specify a VALUE block; gpkafka ignores the Kafka message key in this circumstance.
- COLUMNS:NAME: column_name
- The name of a key or value column. column_name must match the column name of the target Greenplum Database table. Specify __IGNORED__ to omit this Kafka message data element from the load operation.
- The default source-to-target data mapping behaviour of gpkafka is to match a column name as defined in COLUMNS:NAME with a column name in the target Greenplum Database TABLE. You can override the default mapping by specifying a MAPPING block.
- COLUMNS:TYPE: data_type
- The data type of the column. You must specify an equivalent data type for each non-ignored Kafka message data element and the associated Greenplum Database table column.
- FORMAT: data_format
- The format of the Kafka message key or value data. You may specify a FORMAT
of avro, binary, csv,
custom, delimited, or json for the
key and value, with some restrictions.
- avro
- When you specify the avro data format, you must define only a single json type column in COLUMNS. If the Kafka message key or value schema is registered in a Confluent Schema Registry, you must also provide the AVRO_OPTION.
- binary
- When you specify the binary data format, you must define only a single bytea type column in COLUMNS.
- csv
- When you specify the csv data format, the message content cannot contain line ending characters (CR and LF).
- You must not provide a VALUE block when you specify csv format for a KEY block. Similarly, you must not provide a KEY block when you specify csv format for a VALUE block.
- custom
- When you specify the custom data format, you must provide a CUSTOM_OPTION.
- delimited
- When you specify the delimited data format, you must provide a DELIMITED_OPTION.
- json
- When you specify the json data format, you must define only a single json type column in COLUMNS.
- AVRO_OPTION
- Optional. When you specify avro as the
FORMAT, you may provide AVRO_OPTIONs
that identify a schema registry location and optional SSL
certificates and keys, and whether or not you want
GPSS to convert Avro bytes fields into
base64-encoded strings.
- SCHEMA_REGISTRY_ADDR: schemareg_host:schemareg_port
- When you specify FORMAT: avro and the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
- SCHEMA_CA_ON_GPDB: sr_ca_file_path
- The file system path to the CA certificate that GPSS uses to verify the peer. This file must reside in sr_ca_file_path on all Greenplum Database segment hosts.
- SCHEMA_CERT_ON_GPDB: sr_cert_file_path
- The file system path to the client certificate that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_cert_file_path on all Greenplum Database segment hosts.
- SCHEMA_KEY_ON_GPDB: sr_key_file_path
- The file system path to the private key file that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_key_file_path on all Greenplum Database segment hosts.
- SCHEMA_MIN_TLS_VERSION: minimum_version
- The minimum transport layer security (TLS) version that GPSS requests on the connection to the schema registry. Supported versions are 1.0, 1.1, 1.2, or 1.3. The default minimum TLS version is 1.0.
- SCHEMA_PATH_ON_GPDB: path_to_file
- When you specify the avro format and
the Avro schema of the JSON key or value data that you
want to load is specified in a separate
.avsc file, you must identify the file
system location in path_to_file,
and the file must reside in this location on every
Greenplum Database segment host. Note: GPSS does not cache the schema. GPSS must reload the schema for every batch of Kafka data. Also, GPSS supports providing the schema for either the key or the value, but not both.
- BYTES_TO_BASE64: boolean
- When true, GPSS converts Avro bytes fields into base64-encoded strings. The default value is false, GPSS does not perform the conversion.
- CUSTOM_OPTION
- Optional. When you specify FORMAT: custom,
you are required to provide the CUSTOM_OPTION
properties. This block identifies the name and the arguments of a
custom formatter user-defined function.
- NAME: udf_name
- The name of the custom formatter user-defined function.
- PARAMSTR: udf_parameter_string
- A string specifying the comma-separated list of arguments to pass to the custom formatter user-defined function.
- DELIMITED_OPTION:DELIMITER: delimiter_string
- Optional. When you specify delimited as the FORMAT, delimiter_string is required and must identify the Kafka message data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
- META:
- The field name, type, and format of the Kafka meta data.
META must specify a single json
or jsonb (Greenplum 6 only) type column and
FORMAT: json.
The available Kafka meta data properties include:
- topic - text
- partition - int
- offset - bigint
- FILTER: filter_string
- The filter to apply to the Kafka input messages before gpkafka loads the data into Greenplum Database. If the filter evaluates to true, gpkafka loads the message. If the filter evaluates to false, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more KEY, VALUE, or META column names.
- ERROR_LIMIT: { num_errors | percentage_errors }
- The error threshold, specified as either an absolute number or a percentage. gpkafka load exits when this limit is reached. The default ERROR_LIMIT is zero; GPSS disables error logging and stops the load operation when it encounters the first error. Due to a limitation of the Greenplum Database external table framework, GPSS does not accept ERROR_LIMIT: 1.
- SCHEMA: output_schema_name
- The name of the Greenplum Database schema in which table_name resides. Optional, the default schema is the public schema.
- TABLE: table_name
- The name of the Greenplum Database table into which gpkafka loads the Kafka data.
- MODE: mode
- The table load mode. Valid mode values are INSERT, MERGE, or UPDATE. The default value is INSERT.
- UPDATE - Updates the target table columns that are listed in UPDATE_COLUMNS when the input columns identified in MATCH_COLUMNS match the named target table columns and the optional UPDATE_CONDITION is true.
- UPDATE is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
-
MERGE - Inserts new rows and updates existing
rows when:
- columns are listed in UPDATE_COLUMNS,
- the MATCH_COLUMNS target table column values are equal to the input data, and
- an optional UPDATE_CONDITION is specified and met.
- the MATCH_COLUMNS target table column values are equal to the input data, and
- an optional DELETE_CONDITION is specified and met.
- MERGE is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
- MATCH_COLUMNS:
- Required if MODE is MERGE or UPDATE.
- match_column_name
- Specifies the column(s) to use as the join condition for the update. The attribute value in the specified target column(s) must be equal to that of the corresponding source data column(s) in order for the row to be updated in the target table.
- ORDER_COLUMNS:
- Optional. May be specified in MERGE
MODE to sort the input data rows.
- order_column_name
- Specify the column(s) by which GPSS sorts the rows. When multiple matching rows exist in a batch, ORDER_COLUMNS is used with MATCH_COLUMNS to determine the input row with the largest value; GPSS uses that row to write/update the target.
- UPDATE_COLUMNS:
- Required if MODE is MERGE or UPDATE.
- update_column_name
- Specifies the column(s) to update for the rows that meet the MATCH_COLUMNS criteria and the optional UPDATE_CONDITION.
- UPDATE_CONDITION: update_condition
- Optional. Specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a MERGE).
- DELETE_CONDITION: delete_condition
- Optional. In MERGE MODE, specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met for GPSS to delete rows in the target table that meet the MATCH_COLUMNS criteria.
- MAPPING:
- Optional. Overrides the default source-to-target column mapping. gpkafka supports two mapping syntaxes.
-
Note: When you specify a MAPPING, ensure that you provide a mapping for all Kafka message key and value elements of interest. gpkafka does not automatically match column names when you provide a MAPPING.
- NAME: target_column_name
- Specifies the target Greenplum Database table column name.
- EXPRESSION: { source_column_name | expression }
- Specifies a Kafka COLUMNS:NAME (source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in the SELECT list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
- target_column_name: { source_column_name | expression }
- When you use this MAPPING syntax, specify the target_column_name and {source_column_name | expression} as described above.
- SCHEMA: metadata_schema_name
- The name of the Greenplum Database schema in which gpkafka creates external and history tables. The default metadata_schema_name is KAFKA:OUTPUT:SCHEMA.
- COMMIT:
- Controls how gpkafka load commits a batch
of data to Greenplum
Database. You must specify one of MAX_ROW or
MINIMAL_INTERVAL. You may specify both configuration parameters
as long as both values are not zero (0). Try setting
and tuning MINIMAL_INTERVAL to your environment;
introduce a MAX_ROW setting only if you encounter
high memory usage associated with message buffering.
- SAVE_FAILING_BATCH: boolean
- Determines whether or not GPSS saves data into a backup table
before it writes the data to Greenplum Database. Saving the data
in this manner aids recovery when GPSS encounters errors during
the evaluation of expressions. The default is
false; GPSS does not use a backup table, and
returns immediately when it encounters an expression error.
When you set this property to true, GPSS writes
both the good and the bad data in the batch to a backup table named
gpssbackup_jobhash, and
continues to process incoming Kafka messages. You must then
manually load the good data from the backup table into Greenplum
or set RECOVER_FAILING_BATCH (Beta) to
true to have GPSS automatically reload
the good data.
Note: Using a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
- RECOVER_FAILING_BATCH: boolean (Beta)
- When set to true and
SAVE_FAILING_BATCH is also
true, GPSS automatically reloads the good data
in the batch and retains only the error data in the backup table.
The default value is false; GPSS does not process
the backup table. Note: Enabling this property requires that GPSS has the Greenplum Database privileges to create a function.
- MAX_ROW: number_of_rows
- The number of rows to batch before triggering an INSERT operation on the Greenplum Database table. The default value of MAX_ROW is 0, which instructs gpkafka to ignore this commit trigger condition.
- MINIMAL_INTERVAL: wait_time
- The minimum amount of time to wait (milliseconds) between each INSERT operation on the table. The default value is 0, wait forever.
- CONSISTENCY: consistency_value
- Specify how GPSS should manage message offsets when it acts as a high-level consumer. Valid values are strong, at-least, at-most, and none. The default value is strong. Refer to Understanding Kafka Message Offset Management for more detailed information.
- IDLE_DURATION: idle_time
- The maximum amount of time to wait (milliseconds) for the first
batch of Kafka data. When you use this property to enable lazy load,
GPSS waits until Kafka data is available before locking the target
Greenplum table. You can specify:
- 0 (lazy load is disabled)
- -1 (lazy load is enabled, the job never stops), or
- a positive value (lazy load is enabled, the job stops after idle_time duration of no data in the Kafka topic)
- POLL:
-
Note: These properties are deprecated and will be removed in a future release.
Controls the polling time period and batch size when reading Kafka data.
- BATCHSIZE: num_records
- The number of Kafka records in a batch. BATCHSIZE should be smaller than COMMIT:MAX_ROW. The default batch size is 200.
- TIMEOUT: poll_time
- The maximum time, in milliseconds, to wait in a polling cycle if Kafka data is not available. You must specify a TIMEOUT greater than 100 milliseconds and less than COMMIT:MINIMAL_INTERVAL. The default poll timeout is 1000 milliseconds.
- TASK:
- Controls the execution and scheduling of a periodic (maintenance) task.
- POST_BATCH_SQL: udf_or_sql_to_run
- The user-defined function or SQL command(s) that you want to run after the specified number of batches are read from Kafka. The default is null.
- BATCH_INTERVAL: num_batches
- The number of batches to read before executing udf_or_sql_to_run. The default batch interval is 0.
- PROPERTIES:
- Kafka consumer configuration property names and values.
- kafka_property_name
- The name of a Kafka property.
- kafka_property_value
- The Kafka property value.
- SCHEDULE:
- Controls the frequency and interval of restarting failed jobs.
- RETRY_INTERVAL: retry_time
- The period of time that gpkafka waits before retrying the job. You can specify the time interval in day (d), hour (h), minute (m), second (s), or millisecond (ms) integer units; do not mix units. The default retry interval is 5m (5 minutes).
- MAX_RETRIES: num_retries
- The maximum number of times gpkafka attempts to retry the job. The default is 0, do not retry. If you specify a negative value, gpkafka retries the job indefinitely.
Template Variables
GPSS supports using template variables to specify property values in the load configuration file.
PROPERTY: {{template_var}}
MAX_RETRIES: {{numretries}}
GPSS substitutes the template variable with a value that you specify via the -p | --property template_var=value option to the gpsscli submit, gpsscli load, or gpkafka load command.
--property numretries=10GPSS substitutes occurrences of {{numretries}} in the load configuration file with the value 10 before submitting the job, and uses that value during job execution.
Notes
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the gpkafka.yaml configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" ("MyColumn" text);
Your gpkafka.yaml YAML configuration file would refer to the above table and column names as:
COLUMNS: - name: '"MyColumn"' type: text OUTPUT: TABLE: '"MyTable"'
PROPERTIES: api.version.request: false broker.version.fallback: 0.8.2.1
Examples
Load data from Kafka as defined in the Version 2 configuration file named kafka2greenplumv2.yaml:
gpkafka load kafka2greenplumv2.yaml
Example kafka2greenplumv2.yaml configuration file:
DATABASE: ops USER: gpadmin HOST: mdw-1 PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: kbrokerhost1:9092 TOPIC: customer_expenses2 PARTITIONS: (2, 5...7, 13) VALUE: COLUMNS: - NAME: c1 TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 KEY: COLUMNS: - NAME: key TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 META: COLUMNS: - NAME: meta TYPE: json FORMAT: json ERROR_LIMIT: 25 OUTPUT: SCHEMA: payables TABLE: expenses2 MAPPING: - NAME: customer_id EXPRESSION: (c1->>'cust_id')::int - NAME: newcust EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean - NAME: expenses EXPRESSION: (c1->>'expenses')::decimal - NAME: tax_due EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal - NAME: t EXPRESSION: (meta->>'topic')::text METADATA: SCHEMA: gpkafka_internal COMMIT: MINIMAL_INTERVAL: 2000