Writing Data to HDFS with PXF (Experimental)

A newer version of this documentation is available. Use the version menu above to view the most up-to-date release of the Greenplum 5.x documentation.

Warning: Writing text and binary data to HDFS an experimental PXF feature and is not intended for use in a production environment. Experimental features are subject to change without notice in future releases.

You can use the PXF HDFS connector to write text and SequenceFile format binary data to files stored on HDFS. When you create a writable external table with the PXF HDFS connector, you specify the name of a directory on HDFS. When you insert records into the external table, the block(s) of data you insert are written to one or more files in the directory you specified.

This section describes how to use PXF to write HDFS data, including how to create and insert data into a writable external table that references an HDFS directory.

Note: External tables that you create with a writable profile can only be used for INSERT operations. If you want to query the data you inserted, you must create a separate readable external table that references the HDFS directory.

Prerequisites

Before writing HDFS data using PXF, ensure that:

  • You have installed and configured a Hadoop client on each Greenplum Database segment host. Refer to Installing and Configuring Hadoop Clients for PXF for instructions.
  • You have initialized and started PXF on your Greenplum Database segment hosts. See Configuring, Initializing, and Managing PXF for PXF initialization, configuration, and startup information.
  • You have granted the gpadmin user read and write permissions to the appropriate directories in your HDFS file system.

Writing to PXF External Tables

The PXF HDFS connector supports two writable profiles: HdfsTextSimple and SequenceWritable.

  • Specify the HdfsTextSimple profile when you want to write single line plain text data to HDFS.
  • Specify the SequenceWritable profile when you want to write data in SequenceFile binary format.

Use the following syntax to create a Greenplum Database writable external table that references an HDFS directory: 

CREATE WRITABLE EXTERNAL TABLE <table_name> 
    ( <column_name> <data_type> [, ...] | LIKE <other_table> )
LOCATION ('pxf://<path-to-hdfs-dir>
    ?PROFILE=HdfsTextSimple|SequenceWritable[&<custom-option>=<value>[...]]')
FORMAT '[TEXT|CSV|CUSTOM]' (<formatting-properties>)
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];

The specific keywords and values used by the pxf protocol in the CREATE EXTERNAL TABLE command are described in the table below.

Keyword Value
<path-to-hdfs-dir> The the absolute path to the directory in the HDFS data store.
PROFILE The PROFILE keyword must specify one of the values HdfsTextSimple or SequenceWritable.
<custom-option> <custom-option> is profile-specific. Profile-specific options are discussed later in this topic.
FORMAT Use FORMAT 'TEXT' with the HdfsTextSimple profile to write plain, delimited text to <path-to-hdfs-dir>.
Use FORMAT 'CSV' with the HdfsTextSimple profile to write comma-separated value text to <path-to-hdfs-dir>.
Use FORMATCUSTOM’ with the SequenceWritable profile. The SequenceWritable 'CUSTOM' FORMAT supports only the built-in (formatter='pxfwritable_export') (write) and (formatter='pxfwritable_import') (read) <formatting-properties>.
<formatting-properties> <formatting-properties> are profile-specific. Profile-specific formatting options are identified later in this topic.
DISTRIBUTED BY If you plan to load the writable external table with data from an existing Greenplum Database table, consider specifying the same distribution policy or <column_name> on the writable external table as that defined for the table from which you plan to load the data. Doing so will avoid extra motion of data between segments on the load operation.

Note: When creating PXF external tables, you cannot use the HEADER option in your FORMAT specification.

Custom Options

The HdfsTextSimple and SequenceWritable writable profiles support the following custom options:

Option Value Description Profile
COMPRESSION_CODEC The compression codec Java class name. If this option is not provided, Greenplum Database performs no data compression. Supported compression codecs include:
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.GzipCodec
HdfsTextSimple, SequenceWritable
COMPRESSION_TYPE The compression type to employ; supported values are RECORD (the default) or BLOCK. HdfsTextSimple, SequenceWritable
DATA-SCHEMA The name of the writer serialization/deserialization class. The jar file in which this class resides must be in the PXF classpath. This option is required for the SequenceWritable profile and has no default value. SequenceWritable
THREAD-SAFE Boolean value determining if a table query can run in multi-threaded mode. The default value is TRUE. Set this option to FALSE to handle all requests in a single thread for operations that are not thread-safe (for example, compression). HdfsTextSimple, SequenceWritable

Writing Text Data

Use the HdfsTextSimple profile when you want to write single line delimited data to HDFS.

Writable external tables you create using the HdfsTextSimple profile can optionally use record or block compression. The PXF HdfsTextSimple profile supports the following compression codecs:

  • org.apache.hadoop.io.compress.DefaultCodec
  • org.apache.hadoop.io.compress.GzipCodec
  • org.apache.hadoop.io.compress.BZip2Codec

<formatting-properties> supported by the HdfsTextSimple profile when writing text data include:

Keyword Syntax, Example(s) Description
delimiter (delimiter=E'\t')
(delimiter ':')
The delimiter character in the data. For FORMAT 'CSV', the default value is a comma ,. Preface the value with an E when the value is an escape sequence.

Example: Writing Text Data Using the HdfsTextSimple Profile

This example utilizes the data schema introduced in Example: Using the HdfsTextSimple Profile.

Field Name Data Type
location text
month text
number_of_orders int
total_sales float8

This example also optionally uses the Greenplum Database external table named pxf_hdfs_textsimple that you created in that exercise.

Procedure

Perform the following procedure to create Greenplum Database writable external tables utilizing the same data schema as described above, one of which will employ compression. You will use the PXF HdfsTextSimple profile to write data to the underlying HDFS directory. You will also create a separate, readable external table to read the data you wrote to the HDFS directory.

  1. Create a Greenplum Database writable external table utilizing the data schema described above. Write to the HDFS directory /data/pxf_examples/pxfwritable_hdfs_textsimple1. Create the table specifying a comma , as the delimiter:

    postgres=# CREATE WRITABLE EXTERNAL TABLE pxf_hdfs_writabletbl_1(location text, month text, num_orders int, total_sales float8)
                LOCATION ('pxf://data/pxf_examples/pxfwritable_hdfs_textsimple1?PROFILE=HdfsTextSimple')
              FORMAT 'TEXT' (delimiter=',');
    

    You specify the FORMAT subclause delimiter value as the single ascii comma character ,.

  2. Write a few individual records to the pxfwritable_hdfs_textsimple1 HDFS directory by invoking the SQL INSERT command on pxf_hdfs_writabletbl_1:

    postgres=# INSERT INTO pxf_hdfs_writabletbl_1 VALUES ( 'Frankfurt', 'Mar', 777, 3956.98 );
    postgres=# INSERT INTO pxf_hdfs_writabletbl_1 VALUES ( 'Cleveland', 'Oct', 3812, 96645.37 );
    
  3. (Optional) Insert the data from the pxf_hdfs_textsimple table you created in Example: Using the HdfsTextSimple Profile into pxf_hdfs_writabletbl_1:

    postgres=# INSERT INTO pxf_hdfs_writabletbl_1 SELECT * FROM pxf_hdfs_textsimple;
    
  4. In another terminal window, display the data you just added to HDFS:

    $ hdfs dfs -cat /data/pxf_examples/pxfwritable_hdfs_textsimple1/*
    Frankfurt,Mar,777,3956.98
    Cleveland,Oct,3812,96645.37
    Prague,Jan,101,4875.33
    Rome,Mar,87,1557.39
    Bangalore,May,317,8936.99
    Beijing,Jul,411,11600.67
    

    Because you specified comma , as the delimiter when you created the writable external table, this character is the field separator used in each record of the HDFS data.

  5. Greenplum Database does not support directly querying a writable external table. To query the data you just added to HDFS, you must create a readable external Greenplum Database table that references the HDFS directory:

    postgres=# CREATE EXTERNAL TABLE pxf_hdfs_textsimple_r1(location text, month text, num_orders int, total_sales float8)
                LOCATION ('pxf://data/pxf_examples/pxfwritable_hdfs_textsimple1?PROFILE=HdfsTextSimple')
                FORMAT 'CSV';
    

    You specify the 'CSV' FORMAT when you create the readable external table because you created the writable table with a comma , as the delimiter character, the default delimiter for 'CSV' FORMAT.

  6. Query the readable external table:

    postgres=# SELECT * FROM pxf_hdfs_textsimple_r1 ORDER BY total_sales;
    
     location  | month | num_orders | total_sales 
    -----------+-------+------------+-------------
     Rome      | Mar   |         87 |     1557.39
     Frankfurt | Mar   |        777 |     3956.98
     Prague    | Jan   |        101 |     4875.33
     Bangalore | May   |        317 |     8936.99
     Beijing   | Jul   |        411 |    11600.67
     Cleveland | Oct   |       3812 |    96645.37
    (6 rows)
    

    The pxf_hdfs_textsimple_r1 table includes the records you individually inserted, as well as the full contents of the pxf_hdfs_textsimple table if you performed the optional step.

  7. Create a second Greenplum Database writable external table, this time using Gzip compression and employing a colon : as the delimiter:

    postgres=# CREATE WRITABLE EXTERNAL TABLE pxf_hdfs_writabletbl_2 (location text, month text, num_orders int, total_sales float8)
                LOCATION ('pxf://data/pxf_examples/pxfwritable_hdfs_textsimple2?PROFILE=HdfsTextSimple&COMPRESSION_CODEC=org.apache.hadoop.io.compress.GzipCodec')
              FORMAT 'TEXT' (delimiter=':');
    
  8. Write a few records to the pxfwritable_hdfs_textsimple2 HDFS directory by inserting directly into the pxf_hdfs_writabletbl_2 table:

    gpadmin=# INSERT INTO pxf_hdfs_writabletbl_2 VALUES ( 'Frankfurt', 'Mar', 777, 3956.98 );
    gpadmin=# INSERT INTO pxf_hdfs_writabletbl_2 VALUES ( 'Cleveland', 'Oct', 3812, 96645.37 );
    
  9. In another terminal window, display the contents of the data you added to HDFS; use the -text option to hdfs dfs to view the compressed data as text:

    $ hdfs dfs -text /data/pxf_examples/pxfwritable_hdfs_textsimple2/*
    Frankfurt:Mar:777:3956.98
    Cleveland:Oct:3812:96645.3
    

    Notice that the colon : is the field separator in this HDFS data.

    To query data from the newly-created HDFS directory named pxfwritable_hdfs_textsimple2, you can create a readable external Greenplum Database table as described above that references this HDFS directory and specifies FORMAT 'CSV' (delimiter=':').

Writing Binary Data

Use the HDFS connector SequenceWritable profile when you want to write SequenceFile format data to HDFS. Files of this type consist of binary key/value pairs. SequenceFile format is a common data transfer format between MapReduce jobs.

SequenceFile format data can optionally employ record or block compression. The PXF SequenceWritable profile supports the following compression codecs:

  • org.apache.hadoop.io.compress.DefaultCodec
  • org.apache.hadoop.io.compress.BZip2Codec

When you use the SequenceWritable profile to write SequenceFile format data, you must provide the name of the Java class to use for serializing/deserializing the binary data. This class must provide read and write methods for each data type referenced in the data schema.

Example: Writing Binary Data Using the SequenceWritable Profile

In this example, you create a Java class named PxfExample_CustomWritable that will serialize/deserialize the fields in the sample schema used in previous examples. You will then use this class to access a writable external table that you create with the SequenceWritable profile.

Perform the following procedure to create the Java class and writable table.

  1. Prepare to create the sample Java class:

    $ mkdir -p pxfex/com/example/pxf/hdfs/writable/dataschema
    $ cd pxfex/com/example/pxf/hdfs/writable/dataschema
    $ vi PxfExample_CustomWritable.java
    
  2. Copy and paste the following text into the PxfExample_CustomWritable.java file:

    package com.example.pxf.hdfs.writable.dataschema;
    
    import org.apache.hadoop.io.*;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.lang.reflect.Field;
    
    /**
    * PxfExample_CustomWritable class - used to serialize and deserialize data with
    * text, int, and float data types
    */
    public class PxfExample_CustomWritable implements Writable {
    
    public String st1, st2;
    public int int1;
    public float ft;
    
    public PxfExample_CustomWritable() {
        st1 = new String("");
        st2 = new String("");
        int1 = 0;
        ft = 0.f;
    }
    
    public PxfExample_CustomWritable(int i1, int i2, int i3) {
    
        st1 = new String("short_string___" + i1);
        st2 = new String("short_string___" + i1);
        int1 = i2;
        ft = i1 * 10.f * 2.3f;
    
    }
    
    String GetSt1() {
        return st1;
    }
    
    String GetSt2() {
        return st2;
    }
    
    int GetInt1() {
        return int1;
    }
    
    float GetFt() {
        return ft;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
    
        Text txt = new Text();
        txt.set(st1);
        txt.write(out);
        txt.set(st2);
        txt.write(out);
    
        IntWritable intw = new IntWritable();
        intw.set(int1);
        intw.write(out);
    
        FloatWritable fw = new FloatWritable();
        fw.set(ft);
        fw.write(out);
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
    
        Text txt = new Text();
        txt.readFields(in);
        st1 = txt.toString();
        txt.readFields(in);
        st2 = txt.toString();
    
        IntWritable intw = new IntWritable();
        intw.readFields(in);
        int1 = intw.get();
    
        FloatWritable fw = new FloatWritable();
        fw.readFields(in);
        ft = fw.get();
    }
    
    public void printFieldTypes() {
        Class myClass = this.getClass();
        Field[] fields = myClass.getDeclaredFields();
    
        for (int i = 0; i < fields.length; i++) {
            System.out.println(fields[i].getType().getName());
        }
    }
    }
    
  3. Compile and create a Java class JAR file for PxfExample_CustomWritable. Provide a classpath that includes the hadoop-common.jar file for your Hadoop distribution. For example, if you installed the Hortonworks Data Platform Hadoop client:

    $ javac -classpath /usr/hdp/current/hadoop-client/hadoop-common.jar  PxfExample_CustomWritable.java
    $ cd ../../../../../../
    $ jar cf pxfex-customwritable.jar com
    $ cp pxfex-customwritable.jar /tmp/
    

    (Your Hadoop library classpath may differ.)

  4. Copy the pxfex-customwritable.jar file to the Greenplum Database master node. For example:

    $ scp pxfex-customwritable.jar gpadmin@gpmaster:/home/gpadmin
    
  5. Log in to your Greenplum Database master node and set up the environment:

    $ ssh gpadmin@<gpmaster>
    gpadmin@gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
    
  6. Copy the pxfex-customwritable.jar JAR file to the same location on each Greenplum Database segment host, and note the location. For example:

    gpadmin@gpmaster$ gpscp -v -f seghostfile /home/gpadmin/pxfex-customwritable.jar =:/tmp/pxfex-customwritable.jar
    
  7. Edit the PXF Agent classpath and add the absolute path to the pxfex-customwritable.jar JAR file in $GPHOME/pxf/conf/pxf-public.classpath. For example:

    /tmp/pxfex-customwritable.jar
    
  8. Copy the updated pxf-public.classpath file to each segment host and then restart PXF on each host. For example:

    gpadmin@gpmaster$ gpscp -v -f seghostfile $GPHOME/pxf/conf/pxf-public.classpath =:/usr/local/greenplum-db/pxf/conf/pxf-public.classpath
    gpadmin@gpmaster$ gpssh -e -v -f seghostfile "/usr/local/greenplum-db/pxf/bin/pxf restart"
    
  9. Use the PXF SequenceWritable profile to create a Greenplum Database writable external table. Identify the serialization/deserialization Java class you created above in the DATA-SCHEMA <custom-option>. Use BLOCK mode compression with BZip2 when you create the writable table.

    postgres=# CREATE WRITABLE EXTERNAL TABLE pxf_tbl_seqwrit (location text, month text, number_of_orders integer, total_sales real)
                LOCATION ('pxf://data/pxf_examples/pxf_seqwrit_file?PROFILE=SequenceWritable&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable&COMPRESSION_TYPE=BLOCK&COMPRESSION_CODEC=org.apache.hadoop.io.compress.BZip2Codec')
              FORMAT 'CUSTOM' (formatter='pxfwritable_export');
    

    Notice that the 'CUSTOM' FORMAT <formatting-properties> specifies the built-in pxfwritable_export formatter.

  10. Write a few records to the pxf_seqwrit_file HDFS directory by inserting directly into the pxf_tbl_seqwrit table. For example:

    postgres=# INSERT INTO pxf_tbl_seqwrit VALUES ( 'Frankfurt', 'Mar', 777, 3956.98 );
    postgres=# INSERT INTO pxf_tbl_seqwrit VALUES ( 'Cleveland', 'Oct', 3812, 96645.37 );
    
    
  11. Recall that Greenplum Database does not support directly querying a writable external table. To query the data in pxf_seqwrit_file, create a readable external Greenplum Database referencing this HDFS directory:

    postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqwrit (location text, month text, number_of_orders integer, total_sales real)
                LOCATION ('pxf://data/pxf_examples/pxf_seqwrit_file?PROFILE=SequenceWritable&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
              FORMAT 'CUSTOM' (formatter='pxfwritable_import');
    

    You must specify the DATA-SCHEMA <custom-option> when you read HDFS data via the SequenceWritable profile. You need not provide compression-related options.

  12. Query the readable external table read_pxf_tbl_seqwrit:

    gpadmin=# SELECT * FROM read_pxf_tbl_seqwrit ORDER BY total_sales;
    
     location  | month | number_of_orders | total_sales 
    -----------+-------+------------------+-------------
     Frankfurt | Mar   |              777 |     3956.98
     Cleveland | Oct   |             3812 |     96645.4
    (2 rows)
    

Reading the Record Key

When a Greenplum Database external table references SequenceFile or another data format that stores rows in a key-value format, you can access the key values in Greenplum queries by using the recordkey keyword as a field name.

The field type of recordkey must correspond to the key type, much as the other fields must match the HDFS data. 

You can define recordkey to be any of the following Hadoop types:

  • BooleanWritable
  • ByteWritable
  • DoubleWritable
  • FloatWritable
  • IntWritable
  • LongWritable
  • Text

If no record key is defined for a row, Greenplum Database returns the id of the segment that processed the row.

Example: Using Record Keys

Create an external readable table to access the record keys from the writable table pxf_tbl_seqwrit that you created in Example: Writing Binary Data Using the SequenceWritable Profile. Define the recordkey in this example to be of type int8.

postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqwrit_recordkey(recordkey int8, location text, month text, number_of_orders integer, total_sales real)
                LOCATION ('pxf://data/pxf_examples/pxf_seqwrit_file?PROFILE=SequenceWritable&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
          FORMAT 'CUSTOM' (formatter='pxfwritable_import');
gpadmin=# SELECT * FROM read_pxf_tbl_seqwrit_recordkey;
 recordkey |  location   | month | number_of_orders | total_sales 
-----------+-------------+-------+------------------+-------------
         2 | Frankfurt   | Mar   |              777 |     3956.98
         1 | Cleveland   | Oct   |             3812 |     96645.4
(2 rows)

You did not define a record key when you inserted the rows into the writable table, so the recordkey identifies the segment on which the row data was processed.