filesource-v3.yaml (Beta)

filesource-v3.yaml (Beta)

GPSS load configuration file for a File data source (version 3).

Synopsis

target:
  host: host
  port: greenplum_port
  user: user_name
  password: password
  database: db_name
  schema: schema_name
  table: table_name
source:
  file:
    uri:
      - file_path
      ...
    content:
      data_format:
        column_spec
        other_props
    meta:
      json:
        column:
          name: meta
          type: json
channel:
  gpdb_channel:
    mode:
      # specify a single mode property block (described below)
      insert:
        mode_specific_property: value
        ...
      update:
        mode_specific_property: value
        ...
      merge:
        mode_specific_property: value
        ...
    work_schema: work_schema_name
    error_limit: num_errors | percentage_errors
    window:
      batch:
        max_count: number_of_rows
        interval_ms: wait_time
        idle_duration_ms: idle_time
      window_size: num_batches
      window_statement: udf_or_sql_to_run
    mapping:
      target_column_name : source_column_name | expression
      ...
Where data_format, column_spec, and other_props are one of the following blocks:
avro:
  source_column_name: column_name
  schema_url: http://schemareg_host:schemareg_port %, ...%
  bytes_to_base64: boolean
binary:
  source_column_name: column_name
csv:
  columns:
    - name: column_name
      type: column_data_type
    ...
custom:
  columns:
    - name: column_name
      type: column_data_type
    ...
  name: formatter_name
  options:
    - optname=optvalue
    ...
delimited:
  columns:
    - name: column_name
      type: column_data_type
    ...
  delimiter: delimiter_string
json:
  column:
    name: column_name
    type: json | jsonb

Where the mode_specific_propertys that you can specify for each mode follow:

insert:
  filter_expression: filter_string
update:
  filter_expression: filter_string
  match_columns: [match_column_names]
  order_columns: [order_column_names]
  update_columns: [update_column_names]
  update_condition: update_condition
merge:
  filter_expression: filter_string
  match_columns: [match_column_names]
  update_columns: [update_column_names]
  order_columns: [order_column_names]
  update_condition: update_condition
  delete_condition: delete_condition

Description

Note: Version 3 of the GPSS load configuration file is different in both content and format than previous versions of the file. Certain symbols used in the GPSS version 1 and 2 configuration file reference page syntax have different meanings in version 3 syntax:
  • Brackets [] are literal and are used to specify a list in version 3. They are no longer used to signify the optionality of a property.
  • Curly braces {} are literal and are used to specify YAML mappings in version 3 syntax. They are no longer used with the pipe symbol (|) to identify a list of choices.

You specify the configuration properties for a Greenplum Streaming Server (GPSS) file load job in a YAML-formatted configuration file that you provide to the gpsscli submit or gpsscli load commands. There are three types of configuration properties in this file - those that identify the Greenplum Database connection and target table, properties specific to the file data source that you will load into Greenplum, and job-related properties.

This reference page uses the name filesource-v3.yaml to refer to this file; you may choose your own name for the file.

The gpsscli utility processes the YAML configuration file keywords in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant. Keywords are not case-sensitive.

Keywords and Values

Greenplum Database target Options
host: host
The host name or IP address of the Greenplum Database master host.
port: greenplum_port
The port number of the Greenplum Database server on the master host.
user: user_name
The name of the Greenplum Database user/role. This user_name must have permissions as described in the Configuring Greenplum Database Role Privileges.
password: password
The password for the Greenplum Database user/role.
database: db_name
The name of the Greenplum database.
schema: schema_name
The name of the Greenplum Database schema in which table_name resides. Optional, the default schema is the public schema.
table: table_name
The name of the Greenplum Database table into which GPSS loads the data.
source:file: Options
uri
The path to the file.
file_path
A URL identifying a file or files to be loaded. You can specify wildcards in any element of the path. To load all files in a directory, specify dirname/*.
content:
The file type, field names, and type-specific properties of the file data. You must specify all data elements in the order in which they appear in the file.
column_spec
The source to Greenplum column mapping. The supported column specification differs for different data formats as described below.
The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in source_column_name, column:name, or columns:name with a column name in the target Greenplum Database table. You can override the default mapping by specifying a mapping: block.
data_format
The format of the Kafka message key or value data. You may specify a data_format of avro, binary, csv, custom, delimited, or json for the key and value, with some restrictions.
avro
When you specify the avro data format for a key or value, GPSS reads the data into a single json-type column. You may specify a schema registery location and optional SSL certificates and keys, and whether or not you want GPSS to convert bytes fields into base64-encoded strings.
source_column_name: column_name
The name of the single json-type column into which GPSS reads the Kafka key or value data.
schema_url: schemareg_host:schemareg_port
When you specify the avro format and the Avro schema of the JSON data that you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
schema_ca_on_gpdb: sr_ca_file_path
The file system path to the CA certificate that GPSS uses to verify the peer. This file must reside in sr_ca_file_path on all Greenplum Database segment hosts.
schema_cert_on_gpdb: sr_cert_file_path
The file system path to the client certificate that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_cert_file_path on all Greenplum Database segment hosts.
schema_key_on_gpdb: sr_key_file_path
The file system path to the private key file that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_key_file_path on all Greenplum Database segment hosts.
schema_min_tls_version: minimum_version
The minimum transport layer security (TLS) version that GPSS requests on the connection to the schema registry. Supported versions are 1.0, 1.1, 1.2, or 1.3. The default minimum TLS version is 1.0.
schema_path_on_gpdb: path_to_file
When you specify the avro format and the Avro schema of the JSON key or value data that you want to load is specified in a separate .avsc file, you must identify the file system location in path_to_file, and the file must reside in this location on every Greenplum Database segment host.
Note: GPSS does not cache the schema. GPSS must reload the schema for every batch of Kafka data. Also, GPSS supports providing the schema for either the key or the value, but not both.
bytes_to_base64: boolean
When true, GPSS converts Avro bytes fields into base64-encoded strings. The default value is false, GPSS does not perform the conversion.
binary
When you specify the binary data format, GPSS reads the data into a single bytea-type column.
source_column_name: column_name
The name of the single bytea-type column into which GPSS reads the Kafka key or value data.
csv
When you specify the csv data format, GPSS reads the data into the list of columns that you specify. The message content cannot contain line ending characters (CR and LF).
Note: You must not provide a value_content block when you specify csv format for the key_content block. Similarly, you must not provide a key_content block when you specify csv format for a value_content block.
columns:
A set of column name/type mappings. The value [] specifies all columns.
name: column_name
The name of a key or value column. column_name must match the column name of the target Greenplum Database table.
type: column_data_type
The data type of the column. You must specify an equivalent data type for each Kafka message data element and the associated Greenplum Database table column.
custom
When you specify the custom data format, GPSS uses the custom formatter that you specify to process the input data before writing it to Greenplum Database.
columns:
A set of column name/type mappings. The value [] specifies all columns.
name: column_name
The name of a key or value column. column_name must match the column name of the target Greenplum Database table.
type: column_data_type
The data type of the column. You must specify an equivalent data type for each Kafka message data element and the associated Greenplum Database table column.
name: formatter_name
When you specify the custom data format, formatter_name is required and must identify the name of the formatter user-defined function that GPSS should use when loading the data.
options:
A set of function argument name=value pairs.
optname=optvalue
The name and value of the set of arguments to pass into the formatter_name UDF.
delimited
When you specify the delimited data format, GPSS reads the data into the list of columns that you specify. You must specify the data delimiter.
columns:
A set of column name/type mappings. The value [] specifies all columns.
name: column_name
The name of a key or value column. column_name must match the column name of the target Greenplum Database table.
type: column_data_type
The data type of the column. You must specify an equivalent data type for each Kafka message data element and the associated Greenplum Database table column.
delimiter: delimiter_string
Optional. When you specify the delimited data format, delimiter_string is required and must identify the Kafka message data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
json
When you specify the json data format, GPSS reads the data into a single json- or jsonb-type column of which you choose the name.
column:
A single column name/type mapping.
name: column_name
The name of the key or value column. column_name must match the column name of the target Greenplum Database table.
type: json | jsonb | gp_jsonb (Beta)
The data type of the column.
meta:
The data type and field name of the Kafka meta data. meta: must specify the json or jsonb (Greenplum 6 only) data format, and a single json-type column. The available Kafka meta data properties include:
  • topic (text) - the Kafka topic name
  • partition (int) - the partition number
  • offset (bigint) - the record location within the partition
You can load any of these properties into the target table with a mapping, or use a property in the update or merge criteria for a load operation.
channel:gpdb_channel Options
mode:
The table load mode; insert, merge, or update. The default mode is insert.
Note: update and merge are not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
insert:
Inserts source data into Greenplum.
update:
Updates the target table columns that are listed in update_columns when the input columns identified in match_columns match the named target table columns and the optional update_condition is true.
merge:
Inserts new rows and updates existing rows when:
  • columns are listed in update_columns,
  • the match_columns target table column values are equal to the input data, and
  • an optional update_condition is specified and met.
Deletes rows when:
  • the match_columns target table column values are equal to the input data, and
  • an optional delete_condition is specified and met.
New rows are identified when the match_columns value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the match_columns and update_columns. If there are multiple new match_columns values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify order_columns, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.
mode_property_name: value
The name to value mapping for a mode property. Each mode supports one or more of the following properties as specified in the Synopsis.
filter_expression: filter_string
The filter to apply to the input data before GPSS loads the data into Greenplum Database. If the filter evaluates to true, GPSS loads the message. If the filter evaluates to false, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more source value, key, or meta column names.
match_columns: [match_column_names]
A comma-separated list that specifies the column(s) to use as the join condition for the update. The attribute value in the specified target column(s) must be equal to that of the corresponding source data column(s) in order for the row to be updated in the target table.
Required when mode is merge or update.
order_columns: [order_column_names]
A comma-separated list that specifies the column(s) by which GPSS sorts the rows. When multiple matching rows exist in a batch, order_columns is used with match_columns to determine the input row with the largest value; GPSS uses that row to write/update the target.
Optional. May be specified in merge mode to sort the input data rows.
update_columns: [update_column_names]
A column-sparated list that specifies the column(s) to update for the rows that meet the match_columns criteria and the optional update_condition.
Required when mode is merge or update.
update_condition: update_condition
Specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a merge). Optional.
delete_condition: delete_condition
In merge mode, specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met for GPSS to delete rows in the target table that meet the match_columns criteria. Optional.
work_schema: work_schema_name
The name of the Greenplum Database schema in which GPSS creates internal tables. The default work_schema_name is public.
error_limit: num_errors | percentage_errors
The error threshold, specified as either an absolute number or a percentage. GPSS stops running the job when this limit is reached.
window:
The batch size and commit window.
batch:
Controls how GPSS commits data to Greenplum Database. You may specify both configuration properties as long as both values are not zero (0). Try setting and tuning interval_ms to your environment; introduce a max_count setting only if you encounter high memory usage associated with message buffering.
max_count: number_of_rows
The number of rows to batch before triggering an INSERT operation on the Greenplum Database table. The default value of max_count is 0, which instructs GPSS to ignore this commit trigger condition.
interval_ms: wait_time
The minimum amount of time to wait (milliseconds) between each INSERT operation on the table. The default value is 5000.
idle_duration_ms: idle_time
The maximum amount of time to wait (milliseconds) for the first batch of Kafka data. When you use this property to enable lazy load, GPSS waits until Kafka data is available before locking the target Greenplum table. You can specify:
  • 0 (lazy load is disabled)
  • -1 (lazy load is enabled, the job never stops), or
  • a positive value (lazy load is enabled, the job stops after idle_time duration of no data in the Kafka topic)
The default value is 0.
window_size: num_batches
The number of batches to read before executing window_statement. The default batch interval is 0.
window_statement: udf_or_sql_to_run
A user-defined function or SQL command(s) that you want to run after GPSS reads window_size number of batches. The default is null, no command to execute.
mapping:
Optional. Overrides the default source-to-target column mapping.
Note: When you specify a mapping, ensure that you provide a mapping for all source data elements of interest. GPSS does not automatically match column names when you provide a mapping block.
target_column_name: source_column_name | expression
target_column_name specifies the target Greenplum Database table column name. GPSS maps this column name to the source column name specified in source_column_name, or to an expression. When you specify an expression, you may provide a value expression that you would specify in the SELECT list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
job_option: Options
name: job_name
Identifies the name of the job.
save_failing_batch: boolean
Determines whether or not GPSS saves data into a backup table before it writes the data to Greenplum Database. Saving the data in this manner aids recovery when GPSS encounters errors during the evaluation of expressions. The default is false; GPSS does not use a backup table, and returns immediately when it encounters an expression error. When you set this property to true, GPSS writes both the good and the bad data in the batch to a backup table named gpssbackup_jobhash, and continues to process incoming data. You must then manually load the good data from the backup table into Greenplum or set recover_failing_batch (Beta) to true to have GPSS automatically reload the good data.
Note: Using a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
recover_failing_batch: boolean (Beta)
When set to true and save_failing_batch is also true, GPSS automatically reloads the good data in the batch and retains only the error data in the backup table. The default value is false; GPSS does not process the backup table.
Note: Enabling this property requires that GPSS has the Greenplum Database privileges to create a function.
consistency: consistency_value
Specify how GPSS should manage message offsets when it acts as a high-level Kafka consumer. Valid values are strong, at-least, at-most, and none. The default value is strong. Refer to Understanding Kafka Message Offset Management for more detailed information.
schedule:
Controls the frequency and interval of restarting failed jobs.
retry_interval: retry_time
The period of time that GPSS waits before retrying the job. You can specify the time interval in day (d), hour (h), minute (m), second (s), or millisecond (ms) integer units; do not mix units. The default retry interval is 5m (5 minutes).
max_retries: num_retries
The maximum number of times that GPSS attempts to retry the job. The default is 0, do not retry. If you specify a negative value, GPSS retries the job indefinitely.

Notes

If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the load configuration file. For example, if you create a table as follows:

CREATE TABLE "MyTable" (c1 text);

Your YAML configuration file would refer to the table name as:

target:
   table: '"MyTable"'

Examples

Submit a job to load data from an Avro file as defined in the version 2 load configuration file named loadfromfile_v3.yaml:

$ gpsscli submit loadfromfile_v3.yaml

Example loadfromfile_v3.yaml configuration file:

target:
  host: mdw-1
  port: 15432
  user: gpadmin
  database: testdb
  schema: public
  table: orders
source:
  file:
    uri:
    - file:///tmp/orderdata/*
    content:
      csv:
        columns:
        - name: pk
          type: int
        - name: seq
          type: int
        - name: data
          type: text
        - name: flag
          type: int
channel:
  gpdb_channel:
    mode:
      merge:
        match_columns: [pk]
        order_columns: [seq]
        delete_condition: flag = 0
    work_schema: public
    error_limit: "25"
    mapping:
      data: data
      pk: pk
      seq: seq