gpkafka.yaml
A newer version of this documentation is available. Use the version menu above to view the most up-to-date release of the Greenplum 5.x documentation.
gpkafka.yaml
gpkafka configuration file (version 1).
Synopsis
DATABASE: db_name USER: user_name PASSWORD: password HOST: host PORT: greenplum_port [VERSION: 1] KAFKA: INPUT: SOURCE: BROKERS: kafka_broker_host:broker_port [, ... ] TOPIC: kafka_topic ENCRYPTION: bool_value [COLUMNS: - NAME: { column_name | __IGNORED__ } TYPE: column_data_type [ ... ]] FORMAT: data_format [[DELIMITED_OPTION: DELIMITER: delimiter_string] | [AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port [, ... ]] | [CUSTOM_OPTION: NAME: udf_name PARAMSTR: udf_parameter_string]] ERROR_LIMIT: { num_errors | percentage_errors } [LOCAL_HOSTNAME: local_hostname] [LOCAL_PORT: local_port] OUTPUT: [SCHEMA: output_schema_name] TABLE: table_name [MAPPING: - NAME: target_column_name EXPRESSION: { source_column_name | 'expression' } [ ... ]] [METADATA: [SCHEMA: metadata_schema_name]] COMMIT: MAX_ROW: num_rows MINIMAL_INTERVAL: wait_time [POLL: BATCHSIZE: num_records TIMEOUT: poll_time] [TASK: POST_BATCH_SQL: udf_or_sql_to_run BATCH_INTERVAL: num_batches]
Description
You specify load configuration parameters for the gpkafka utilities in a YAML-formatted configuration file. (This reference page uses the name gpkafka.yaml when referring to this file; you may choose your own name for the file.) Load parameters include Greenplum Database connection and target table information, Kafka broker and topic information, and error and commit thresholds.
The gpkafka utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.
Keywords and Values
- DATABASE: db_name
- The name of the Greenplum database.
- USER: user_name
- The name of the Greenplum Database user/role. This user_name must have permissions as described in the Greenplum Stream Server documentation.
- PASSWORD: password
- The password for the Greenplum Database user/role.
- HOST: host
- The host name or IP address of the Greenplum Database master host.
- PORT: greenplum_port
- The port number of the Greenplum Database server on the master host.
- VERSION: 1
- Optional. The version of the gpkafka configuration file. The default version is Version 1.
- SOURCE
- Kafka input configuration parameters.
- BROKERS: kafka_broker_host:broker_port
- The host and port identifying the Kafka broker.
- TOPIC: kafka_topic
- The name of the Kafka topic from which to load data. The topic must exist.
- ENCRYPTION: bool_value
- Encrypt the connections between the client and the Greenplum Stream
Server (GPSS), GPSS and Greenplum Database (data,
gpfdist), and GPSS and Kafka. The default is
false, do not use encryption. Note: If you want to use encryption, you must explicitly start a Greenplum Stream Server instance with the gpss command, providing a gpss.json configuration file that specifies the Certificate files. You must also use the gpsscli subcommands, not gpkafka, to submit and manage the load job.
- COLUMNS:
- The column names and data types. You must specify all Kafka data elements in the order in which they appear in the Kafka message. Optional when the column names and types match the target Greenplum Database table definition.
- The default source-to-target data mapping behaviour of gpkafka
is to match a column name as defined in COLUMNS:NAME with
a column name in the target Greenplum Database TABLE. You
can override the default mapping by specifying a MAPPING block.
- NAME: column_name
- The name of a column. column_name must match the column name of the target Greenplum Database table. Specify __IGNORED__ to omit this Kafka message data element from the load operation.
- TYPE: data_type
- The data type of the column. You must specify an equivalent data type for each non-ignored Kafka message data element and the associated Greenplum Database table column.
- FORMAT: data_format
- The format of the Kafka message value data. You may specify a FORMAT
of avro, binary, csv,
custom, delimited, or json.
- avro
- When you specify the avro data format, you must define only a single json type column in COLUMNS. If the Kafka message value schema is registered in a Confluent Schema Registry, you must also provide the AVRO_OPTION.
- binary
- When you specify the binary data format, you must define only a single bytea type column in COLUMNS.
- csv
- When you specify the csv data format, the message content cannot contain line ending characters (CR and LF).
- custom
- When you specify the custom data format, you must provide a CUSTOM_OPTION.
- delimited
- When you specify the delimited data format, you must provide a DELIMITED_OPTION.
- json
- When you specify the json data format, you must define only a single json type column in COLUMNS.
- AVRO_OPTION:SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port
- Optional. When you specify avro as the FORMAT and the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
- CUSTOM_OPTION
- Optional. When you specify custom as the
FORMAT, CUSTOM_OPTION is
required. This block identifies the name and the arguments of a
custom formatter user-defined function.
- NAME: udf_name
- The name of the custom formatter user-defined function.
- PARAMSTR: udf_parameter_string
- A string specifying the comma-separated list of arguments to pass to the custom formatter user-defined function.
- DELIMITED_OPTION:DELIMITER: delimiter_string
- Optional. When you specify delimited as the FORMAT, delimiter_string is required and must identify the Kafka message data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
- ERROR_LIMIT: { num_errors | percentage_errors }
- The error threshold, specified as either an absolute number or a percentage. gpkafka load exits when this limit is reached.
- LOCAL_HOSTNAME: local_hostname
- The name of the local host on which you are running gpkafka.
This host should be DNS resolvable from each Greenplum Database segment host.
Optional, the default value is the output of hostname -f, a short host name. You must explicitly set LOCAL_HOSTNAME
if you need the FQDN of the host.
Note: gpkafka launches a GPSS instance on your behalf on LOCAL_HOSTNAME.
- LOCAL_PORT: local_port
- The gpfdist port number on the local host. Optional, the default value is 8080.
- SCHEMA: output_schema_name
- The name of the Greenplum Database schema in which table_name resides. Optional, the default schema is the public schema.
- TABLE: table_name
- The name of the Greenplum Database table into which gpkafka loads the Kafka data.
- MAPPING:
- Optional. Overrides the default source-to-target column mapping.
-
Note: When you specify a MAPPING, ensure that you provide a mapping for all Kafka data elements of interest. gpkafka does not automatically match column names when you provide a MAPPING.
- NAME: target_column_name
- Specifies the target Greenplum Database table column name.
- EXPRESSION: source_column_name | 'expression'
- Specifies a Kafka COLUMNS:NAME (source_column_name) or an 'expression'. When you specify an 'expression', you may provide a value expression that you would specify in the SELECT list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
- SCHEMA: metadata_schema_name
- The name of the Greenplum Database schema in which gpkafka creates external and history tables. The default metadata_schema_name is KAFKA:OUTPUT:SCHEMA.
- COMMIT:
- Controls how gpkafka load commits data to Greenplum
Database. You must specify one of MAX_ROW or
MINIMAL_INTERVAL. You may specify both configuration parameters
as long as both values are not zero (0).
- MAX_ROW: number_of_rows
- The number of rows to batch before triggering an INSERT operation on the Greenplum Database table. The default value of MAX_ROW is 0, which instructs gpkafka to ignore this commit trigger condition.
- MINIMAL_INTERVAL: wait_time
- The minimum amount of time to wait (milliseconds) between each INSERT operation on the table. The default value is 0, wait forever.
- POLL:
- Controls the polling time period and batch size when reading Kafka data.
- BATCHSIZE: num_records
- The number of Kafka records in a batch. BATCHSIZE must be smaller than COMMIT:MAX_ROW. The default batch size is 200.
- TIMEOUT: poll_time
- The maximum time, in milliseconds, to wait in a polling cycle if Kafka data is not available. You must specify a TIMEOUT greater than 100 milliseconds and less than COMMIT:MINIMAL_INTERVAL. The default poll timeout is 1000 milliseconds.
- TASK:
- Controls the execution and scheduling of a periodic (maintenance) task.
- POST_BATCH_SQL: udf_or_sql_to_run
- The user-defined function or SQL command(s) that you want to run after the specified number of batches are read from Kafka. The default is null.
- BATCH_INTERVAL: num_batches
- The number of batches to read before executing udf_or_sql_to_run. The default batch interval is 0.
Notes
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the gpkafka.yaml configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" ("MyColumn" text);
Your gpkafka.yaml YAML configuration file would refer to the above table and column names as:
COLUMNS: - name: '"MyColumn"' type: text OUTPUT: TABLE: '"MyTable"'
Examples
Load data from Kafka as defined in the Version 1 configuration file named kafka2greenplum.yaml:
gpkafka load kafka2greenplum.yaml
Example kafka2greenplum.yaml configuration file:
DATABASE: ops USER: gpadmin HOST: mdw-1 PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: kbrokerhost1:9092 TOPIC: customer_expenses COLUMNS: - NAME: cust_id TYPE: int - NAME: month TYPE: int - NAME: expenses TYPE: decimal(9,2) FORMAT: delimited DELIMITED_OPTION: DELIMITER: '|' ERROR_LIMIT: 25 OUTPUT: SCHEMA: payables TABLE: expenses MAPPING: - NAME: customer_id EXPRESSION: cust_id - NAME: newcust EXPRESSION: cust_id > 5000000 - NAME: expenses EXPRESSION: expenses - NAME: tax_due EXPRESSION: expenses * .0725 METADATA: SCHEMA: gpkafka_internal COMMIT: MAX_ROW: 1000 MINIMAL_INTERVAL: 30000
See Also
gpkafka-v2.yaml, gpkafka check, gpkafka load, gpss, gpss.json