Example: Custom Formatter for Kafka

Example: Custom Formatter for Kafka

In this example, you create a custom formatter that prepends a text string (provided via an option) to the Kafka data that it receives. All of the formatter code is provided for you. You register the custom formatter with Greenplum Database and use it to process incoming Kafka data.

(Refer to Understanding Custom Formatters for information on developing and using a custom formatter with GPSS.)

The custom formatter example implementation requires that the data start with a four byte header that identifies the length of the text. For example:

 4 bytes header        content 
 0x03 0x00 0x00 0x00   ABC     

To run this example, you must have access to running Kafka and Greenplum Database clusters, and you must have administrative access to Greenplum.

Procedure

Perform the following procedure to register and use a custom formatter in GPSS.

  1. Log in to a Greenplum Database host as the gpadmin user and set up your Greenplum environment.
  2. Create a work directory:
    gpadmin@gpmaster$ mkdir customfmt_work
    gpadmin@gpmaster$ cd customfmt_work
  3. Open a file named customfmt.c in an editor and copy/paste the following custom formatter code into the file:
    #include "postgres.h"
    
    #include "access/formatter.h"
    #include "catalog/pg_proc.h"
    #include "fmgr.h"
    #include "funcapi.h"
    #include "utils/builtins.h"
    #include "utils/memutils.h"
    #include "utils/syscache.h"
    #include "utils/typcache.h"
    
    /* Do the module magic dance */
    PG_MODULE_MAGIC;
    PG_FUNCTION_INFO_V1(customfmt_import);
    
    Datum customfmt_import(PG_FUNCTION_ARGS);
    
    typedef struct
    {
    	int    ncols;
    	Datum* values;
    	bool*  nulls;
    	int    buflen;
    	bytea* buffer;
    
    	/* formatter options */
    	/* The prefix string to be added to the data in text column, like
    	 * prefix='abc_' */
    	char* prefix;
    	/* When internal_error='1', the query will be stopped immediately. */
    	bool internal_error;
    	/* When data_exception='1', the query won't be stopped unless it reaches
    	 * the error limit. */
    	bool data_exception;
    	/* When data_exception_once='1', the formatter throw the data exception once
    	 * only. Unless it reaches the error limit, the query should continue. */
    	bool data_exception_once;
    } format_t;
    
    /*
     * Our format converts all NULLs to real values, for floats that value is NaN
     */
    #define NULL_FLOAT8_VALUE get_float8_nan()
    
    static void
    parse_params(FunctionCallInfo fcinfo, format_t* myData)
    {
    	int nargs = FORMATTER_GET_NUM_ARGS(fcinfo);
    	for (int i = 0; i < nargs; i++)
    	{
    		/* FORMATTER_GET_NTH_ARG_KEY expects index starts from 1 */
    		const char* key = FORMATTER_GET_NTH_ARG_KEY(fcinfo, i + 1);
    		const char* val = FORMATTER_GET_NTH_ARG_VAL(fcinfo, i + 1);
    		if (strcmp(key, "prefix") == 0)
    		{
    			myData->prefix = pstrdup(val);
    		}
    		if (strcmp(key, "internal_error") == 0 && (strcmp(val, "1") == 0))
    		{
    			myData->internal_error = true;
    		}
    		if (strcmp(key, "data_exception") == 0 && (strcmp(val, "1") == 0))
    		{
    			myData->data_exception = true;
    		}
    		if (strcmp(key, "data_exception_once") == 0 && (strcmp(val, "1") == 0))
    		{
    			myData->data_exception_once = true;
    		}
    	}
    }
    
    Datum
    customfmt_import(PG_FUNCTION_ARGS)
    {
    	HeapTuple     tuple;
    	TupleDesc     tupdesc;
    	MemoryContext m, oldcontext;
    	format_t*     myData;
    	char*         data_buf;
    	int           ncolumns = 0;
    	int           data_cur;
    	int           data_len;
    	int           i;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_FORMATTER(fcinfo))
    		elog(ERROR, "customfmt_import: not called by format manager");
    
    	tupdesc = FORMATTER_GET_TUPDESC(fcinfo);
    
    	/* Get our internal description of the formatter */
    	ncolumns = tupdesc->natts;
    	myData   = (format_t*)FORMATTER_GET_USER_CTX(fcinfo);
    
    	if (myData == NULL)
    	{
    		myData         = palloc0(sizeof(format_t));
    		myData->ncols  = ncolumns;
    		myData->values = palloc(sizeof(Datum) * ncolumns);
    		myData->nulls  = palloc(sizeof(bool) * ncolumns);
    
    		/* parse parameters */
    		parse_params(fcinfo, myData);
    
    		/* misc verification */
    		for (i = 0; i < ncolumns; i++)
    		{
    			Oid type = tupdesc->attrs[i]->atttypid;
    			// int32 typmod = TupleDescAttr(tupdesc, i)->atttypmod;
    
    			/* Don't know how to format dropped columns, error for now */
    			if (tupdesc->attrs[i]->attisdropped)
    				ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
    				                errmsg("customfmt_import: dropped columns")));
    
    			switch (type)
    			{
    				case FLOAT8OID:
    				case VARCHAROID:
    				case BPCHAROID:
    				case TEXTOID:
    					break;
    
    				default: {
    					ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
    					                errmsg("customfmt_import error: "
    					                       "unsupported data type")));
    					break;
    				}
    			}
    		}
    
    		FORMATTER_SET_USER_CTX(fcinfo, myData);
    	}
    	if (myData->ncols != ncolumns)
    		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
    		                errmsg("customfmt_import: unexpected change of output "
    		                       "record type")));
    
    	/* get our input data buf and number of valid bytes in it */
    	data_buf = FORMATTER_GET_DATABUF(fcinfo);
    	data_len = FORMATTER_GET_DATALEN(fcinfo);
    	data_cur = FORMATTER_GET_DATACURSOR(fcinfo);
    
    	/* start clean */
    	MemSet(myData->values, 0, ncolumns * sizeof(Datum));
    	MemSet(myData->nulls, true, ncolumns * sizeof(bool));
    
    	/* =======================================================================
    	 *                            MAIN FORMATTING CODE
    	 *
    	 * Currently this code assumes:
    	 *  - Homogoneos hardware => No need to convert data to network byte order
    	 *  - Support for TEXT/VARCHAR/BPCHAR/FLOAT8 only
    	 *  - Length Prefixed strings
    	 *  - No end of record tags, checksums, or optimizations for alignment.
    	 *  - NULL values are cast to some sensible default value (NaN, "")
    	 *
    	 * =======================================================================
    	 */
    	m          = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo);
    	oldcontext = MemoryContextSwitchTo(m);
    
    	if (myData->internal_error)
    	{
    		/* Reporting an internal error will stop query immediately. NOTHING will
    		 * be saved into the error log.*/
    		MemoryContextSwitchTo(oldcontext);
    		ereport(ERROR,
    		        (errcode(ERRCODE_INTERNAL_ERROR),
    		         errmsg("reports error in example. data_len: %d, data_cur: %d",
    		                data_len, data_cur)));
    	}
    	if (myData->data_exception)
    	{
    		MemoryContextSwitchTo(oldcontext);
    		ereport(ERROR,
    		        (errcode(ERRCODE_DATA_EXCEPTION),
    		         errmsg("data exception in example. data_len: %d, data_cur: %d",
    		                data_len, data_cur)));
    	}
    	if (myData->data_exception_once)
    	{
    		int32 len;
    		memcpy(&len, data_buf + data_cur, sizeof(len));
    		myData->data_exception_once = false;
    		MemoryContextSwitchTo(oldcontext);
    		ereport(ERROR,
    		        (errcode(ERRCODE_DATA_EXCEPTION),
    		         errmsg("data exception in example. data_len: %d, data_cur: %d",
    		                data_len, data_cur)));
    	}
    
    	for (i = 0; i < ncolumns; i++)
    	{
    		Oid type      = tupdesc->attrs[i]->atttypid;
    		int remaining = 0;
    		int attr_len  = 0;
    
    		remaining = data_len - data_cur;
    
    		switch (type)
    		{
    			case FLOAT8OID: {
    				float8 value;
    
    				attr_len = sizeof(value);
    
    				if (remaining < attr_len)
    				{
    					MemoryContextSwitchTo(oldcontext);
    					ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
    					                errmsg("incomplete data")));
    				}
    
    				memcpy(&value, data_buf + data_cur, attr_len);
    
    				if (value != NULL_FLOAT8_VALUE)
    				{
    					myData->nulls[i]  = false;
    					myData->values[i] = Float8GetDatum(value);
    				}
    
    				/* TODO: check for nan? */
    
    				break;
    			}
    
    			case TEXTOID:
    			case VARCHAROID:
    			case BPCHAROID: {
    				text* value;
    				int32 len;
    				bool  nextlen = remaining >= sizeof(len);
    
    				if (nextlen)
    				{
    					memcpy(&len, data_buf + data_cur, sizeof(len));
    
    					if (len < 0)
    					{
    						MemoryContextSwitchTo(oldcontext);
    						ereport(ERROR,
    						        (errcode(ERRCODE_DATA_EXCEPTION),
    						         errmsg("invalid length of varlen datatype: %d",
    						                len)));
    					}
    				}
    
    				/* if len or data bytes don't exist in this buffer, return */
    				if (!nextlen || (nextlen && (remaining - sizeof(len) < len)))
    				{
    					MemoryContextSwitchTo(oldcontext);
    					/* gpss extension handled the data integrity already. This
    					 * should not happen.*/
    					ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
    					                errmsg("incomplete data")));
    				}
    
    				if (len > 0)
    				{
    					int prefixlen = 0;
    					/* Add the prefix if it has been set in the formatter
    					 * options. */
    					if (myData->prefix)
    					{
    						prefixlen = strlen(myData->prefix);
    					}
    					value = (text*)palloc(len + prefixlen + VARHDRSZ);
    					SET_VARSIZE(value, len + prefixlen + VARHDRSZ);
    
    					memcpy(VARDATA(value), myData->prefix, prefixlen);
    					memcpy(VARDATA(value) + prefixlen,
    					       data_buf + data_cur + sizeof(len), len);
    
    					myData->nulls[i]  = false;
    					myData->values[i] = PointerGetDatum(value);
    				}
    
    				attr_len = len + sizeof(len);
    
    				break;
    			}
    
    			default:
    				MemoryContextSwitchTo(oldcontext);
    				ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
    				                errmsg("customfmt_import: unsupported "
    				                       "datatype, id %d:",
    				                       type)));
    				break;
    		}
    
    		/* add byte length of last attribute to the temporary cursor */
    		data_cur += attr_len;
    	}
    	/* =======================================================================
    	 */
    
    	MemoryContextSwitchTo(oldcontext);
    
    	tuple = heap_form_tuple(tupdesc, myData->values, myData->nulls);
    
    	/* hack... pass tuple here. don't free prev tuple - the executor does it  */
    	((FormatterData*)fcinfo->context)->fmt_tuple = tuple;
    
    	FORMATTER_RETURN_TUPLE(tuple);
    }
                
  4. Save the file and exit the editor.
  5. Open a file named Makefile in an editor and copy/paste the following directives into the file:
    MODULE_big = customfmt_example
    OBJS = customfmt.o
    PG_CPPFLAGS = -I$(shell $(PG_CONFIG) --includedir)
    SHLIB_LINK = -L$(shell $(PG_CONFIG) --libdir)
    
    PG_CONFIG = pg_config
    PGXS := $(shell $(PG_CONFIG) --pgxs)
    include $(PGXS)
  6. Save the file and exit the editor.
  7. Generate the custom formatter function definition. Open a file named customfmt_example.sql in an editor and copy/paste the following CREATE FUNCTION call into the file:
    CREATE OR REPLACE FUNCTION customfmt_in() RETURNS record
    AS '$libdir/customfmt_example.so', 'customfmt_import'
    LANGUAGE C STABLE;
  8. Save the file and exit the editor.
  9. Copy the file to your Greenplum Database installation; you must have administrative privileges to copy the file:
    gpadmin@gpmaster$ cp customfmt_example.sql /usr/local/greenplum-db/lib/postgresql/
  10. Build the custom formatter shared library:
    gpadmin@gpmaster$ make

    The make command generates a shared library named customfmt_example.so in the current directory.

  11. Copy the shared library to your Greenplum Database installation; you must have administrative privileges to copy the file:
    gpadmin@gpmaster$ cp customfmt_example.so /usr/local/greenplum-db/lib/postgresql/
  12. Create a test database:
    gpadmin@gpmaster$ createdb testdb
  13. Register the custom formatter function in this database:
    gpadmin@gpmaster$ psql -d testdb -U gpadmin -f $GPHOME/share/postgresql/customfmt_example.sql
  14. Create a Greenplum table in the database:
    gpadmin@gpmaster$ psql -d testdb -U gpadmin -c 'CREATE TABLE test_table( str_column text );'
  15. Create a Kafka topic named customtest.
  16. Start a GPSS server:
    gpadmin@gpmaster$ gpss & 
  17. Create a version 2 Kafka load configuration file; copy/paste the following into a file named kafka_custom_formatter.yml:
    DATABASE: testdb
    USER: gpadmin
    HOST: localhost
    PORT: 15432
    VERSION: 2
    KAFKA:
       INPUT:
          SOURCE:
            BROKERS: localhost:9092
            TOPIC: test
          VALUE:
            COLUMNS:
            - NAME: value
              TYPE: text
            FORMAT: custom
            CUSTOM_OPTION:
              NAME: customfmt_in
              PARAMSTR: prefix="kafka_msg_"
          ERROR_LIMIT: 2
       OUTPUT:
          TABLE: test_table
          MODE: INSERT
          MAPPING:
            - NAME: str_column
              EXPRESSION: value
                
  18. Submit the job:
    gpadmin@gpmaster$ gpsscli submit kafka_custom_formatter.yml
  19. Start the job:
    gpadmin@gpmaster$ gpsscli start kafka_custom_formatter
  20. Generate a binary test data record and save to a file named data_example.bin.
    gpadmin@gpmaster$ cat "0x03 0x00 0x00 0x00 0x41 0x42 0x43" > input.txt
    gpadmin@gpmaster$ xxd -r -p input.txt data_example.bin
  21. Load the test data into Kafka:
    gpadmin@gpmaster$ cat data_example.bin | kafka-console-producer --broker-list localhost:9292 --topic customtest
  22. Examine the Greenplum test_table table. psql -d testdb -U gpadmin -c 'SELECT * FROM test_table;'