Example: Custom Formatter for Kafka
Example: Custom Formatter for Kafka
In this example, you create a custom formatter that prepends a text string (provided via an option) to the Kafka data that it receives. All of the formatter code is provided for you. You register the custom formatter with Greenplum Database and use it to process incoming Kafka data.
(Refer to Understanding Custom Formatters for information on developing and using a custom formatter with GPSS.)
The custom formatter example implementation requires that the data start with a four byte header that identifies the length of the text. For example:
4 bytes header content 0x03 0x00 0x00 0x00 ABC
To run this example, you must have access to running Kafka and Greenplum Database clusters, and you must have administrative access to Greenplum.
Procedure
Perform the following procedure to register and use a custom formatter in GPSS.
- Log in to a Greenplum Database host as the gpadmin user and set up your Greenplum environment.
- Create a work directory:
gpadmin@gpmaster$ mkdir customfmt_work gpadmin@gpmaster$ cd customfmt_work
- Open a file named customfmt.c in an editor and
copy/paste the following custom formatter code into the file:
#include "postgres.h" #include "access/formatter.h" #include "catalog/pg_proc.h" #include "fmgr.h" #include "funcapi.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/typcache.h" /* Do the module magic dance */ PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(customfmt_import); Datum customfmt_import(PG_FUNCTION_ARGS); typedef struct { int ncols; Datum* values; bool* nulls; int buflen; bytea* buffer; /* formatter options */ /* The prefix string to be added to the data in text column, like * prefix='abc_' */ char* prefix; /* When internal_error='1', the query will be stopped immediately. */ bool internal_error; /* When data_exception='1', the query won't be stopped unless it reaches * the error limit. */ bool data_exception; /* When data_exception_once='1', the formatter throw the data exception once * only. Unless it reaches the error limit, the query should continue. */ bool data_exception_once; } format_t; /* * Our format converts all NULLs to real values, for floats that value is NaN */ #define NULL_FLOAT8_VALUE get_float8_nan() static void parse_params(FunctionCallInfo fcinfo, format_t* myData) { int nargs = FORMATTER_GET_NUM_ARGS(fcinfo); for (int i = 0; i < nargs; i++) { /* FORMATTER_GET_NTH_ARG_KEY expects index starts from 1 */ const char* key = FORMATTER_GET_NTH_ARG_KEY(fcinfo, i + 1); const char* val = FORMATTER_GET_NTH_ARG_VAL(fcinfo, i + 1); if (strcmp(key, "prefix") == 0) { myData->prefix = pstrdup(val); } if (strcmp(key, "internal_error") == 0 && (strcmp(val, "1") == 0)) { myData->internal_error = true; } if (strcmp(key, "data_exception") == 0 && (strcmp(val, "1") == 0)) { myData->data_exception = true; } if (strcmp(key, "data_exception_once") == 0 && (strcmp(val, "1") == 0)) { myData->data_exception_once = true; } } } Datum customfmt_import(PG_FUNCTION_ARGS) { HeapTuple tuple; TupleDesc tupdesc; MemoryContext m, oldcontext; format_t* myData; char* data_buf; int ncolumns = 0; int data_cur; int data_len; int i; /* Must be called via the external table format manager */ if (!CALLED_AS_FORMATTER(fcinfo)) elog(ERROR, "customfmt_import: not called by format manager"); tupdesc = FORMATTER_GET_TUPDESC(fcinfo); /* Get our internal description of the formatter */ ncolumns = tupdesc->natts; myData = (format_t*)FORMATTER_GET_USER_CTX(fcinfo); if (myData == NULL) { myData = palloc0(sizeof(format_t)); myData->ncols = ncolumns; myData->values = palloc(sizeof(Datum) * ncolumns); myData->nulls = palloc(sizeof(bool) * ncolumns); /* parse parameters */ parse_params(fcinfo, myData); /* misc verification */ for (i = 0; i < ncolumns; i++) { Oid type = tupdesc->attrs[i]->atttypid; // int32 typmod = TupleDescAttr(tupdesc, i)->atttypmod; /* Don't know how to format dropped columns, error for now */ if (tupdesc->attrs[i]->attisdropped) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("customfmt_import: dropped columns"))); switch (type) { case FLOAT8OID: case VARCHAROID: case BPCHAROID: case TEXTOID: break; default: { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("customfmt_import error: " "unsupported data type"))); break; } } } FORMATTER_SET_USER_CTX(fcinfo, myData); } if (myData->ncols != ncolumns) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("customfmt_import: unexpected change of output " "record type"))); /* get our input data buf and number of valid bytes in it */ data_buf = FORMATTER_GET_DATABUF(fcinfo); data_len = FORMATTER_GET_DATALEN(fcinfo); data_cur = FORMATTER_GET_DATACURSOR(fcinfo); /* start clean */ MemSet(myData->values, 0, ncolumns * sizeof(Datum)); MemSet(myData->nulls, true, ncolumns * sizeof(bool)); /* ======================================================================= * MAIN FORMATTING CODE * * Currently this code assumes: * - Homogoneos hardware => No need to convert data to network byte order * - Support for TEXT/VARCHAR/BPCHAR/FLOAT8 only * - Length Prefixed strings * - No end of record tags, checksums, or optimizations for alignment. * - NULL values are cast to some sensible default value (NaN, "") * * ======================================================================= */ m = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo); oldcontext = MemoryContextSwitchTo(m); if (myData->internal_error) { /* Reporting an internal error will stop query immediately. NOTHING will * be saved into the error log.*/ MemoryContextSwitchTo(oldcontext); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("reports error in example. data_len: %d, data_cur: %d", data_len, data_cur))); } if (myData->data_exception) { MemoryContextSwitchTo(oldcontext); ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("data exception in example. data_len: %d, data_cur: %d", data_len, data_cur))); } if (myData->data_exception_once) { int32 len; memcpy(&len, data_buf + data_cur, sizeof(len)); myData->data_exception_once = false; MemoryContextSwitchTo(oldcontext); ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("data exception in example. data_len: %d, data_cur: %d", data_len, data_cur))); } for (i = 0; i < ncolumns; i++) { Oid type = tupdesc->attrs[i]->atttypid; int remaining = 0; int attr_len = 0; remaining = data_len - data_cur; switch (type) { case FLOAT8OID: { float8 value; attr_len = sizeof(value); if (remaining < attr_len) { MemoryContextSwitchTo(oldcontext); ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("incomplete data"))); } memcpy(&value, data_buf + data_cur, attr_len); if (value != NULL_FLOAT8_VALUE) { myData->nulls[i] = false; myData->values[i] = Float8GetDatum(value); } /* TODO: check for nan? */ break; } case TEXTOID: case VARCHAROID: case BPCHAROID: { text* value; int32 len; bool nextlen = remaining >= sizeof(len); if (nextlen) { memcpy(&len, data_buf + data_cur, sizeof(len)); if (len < 0) { MemoryContextSwitchTo(oldcontext); ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("invalid length of varlen datatype: %d", len))); } } /* if len or data bytes don't exist in this buffer, return */ if (!nextlen || (nextlen && (remaining - sizeof(len) < len))) { MemoryContextSwitchTo(oldcontext); /* gpss extension handled the data integrity already. This * should not happen.*/ ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("incomplete data"))); } if (len > 0) { int prefixlen = 0; /* Add the prefix if it has been set in the formatter * options. */ if (myData->prefix) { prefixlen = strlen(myData->prefix); } value = (text*)palloc(len + prefixlen + VARHDRSZ); SET_VARSIZE(value, len + prefixlen + VARHDRSZ); memcpy(VARDATA(value), myData->prefix, prefixlen); memcpy(VARDATA(value) + prefixlen, data_buf + data_cur + sizeof(len), len); myData->nulls[i] = false; myData->values[i] = PointerGetDatum(value); } attr_len = len + sizeof(len); break; } default: MemoryContextSwitchTo(oldcontext); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("customfmt_import: unsupported " "datatype, id %d:", type))); break; } /* add byte length of last attribute to the temporary cursor */ data_cur += attr_len; } /* ======================================================================= */ MemoryContextSwitchTo(oldcontext); tuple = heap_form_tuple(tupdesc, myData->values, myData->nulls); /* hack... pass tuple here. don't free prev tuple - the executor does it */ ((FormatterData*)fcinfo->context)->fmt_tuple = tuple; FORMATTER_RETURN_TUPLE(tuple); }
- Save the file and exit the editor.
- Open a file named Makefile in an editor
and copy/paste the following directives into the file:
MODULE_big = customfmt_example OBJS = customfmt.o PG_CPPFLAGS = -I$(shell $(PG_CONFIG) --includedir) SHLIB_LINK = -L$(shell $(PG_CONFIG) --libdir) PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) include $(PGXS)
- Save the file and exit the editor.
- Generate the custom formatter function definition. Open a file
named customfmt_example.sql in an
editor and copy/paste the following CREATE FUNCTION
call into the file:
CREATE OR REPLACE FUNCTION customfmt_in() RETURNS record AS '$libdir/customfmt_example.so', 'customfmt_import' LANGUAGE C STABLE;
- Save the file and exit the editor.
- Copy the file to your Greenplum Database installation; you must have administrative privileges to copy the file:
gpadmin@gpmaster$ cp customfmt_example.sql /usr/local/greenplum-db/lib/postgresql/
- Build the custom formatter shared library:
gpadmin@gpmaster$ make
The make command generates a shared library named customfmt_example.so in the current directory.
- Copy the shared library to your Greenplum Database installation;
you must have administrative privileges to copy the file:
gpadmin@gpmaster$ cp customfmt_example.so /usr/local/greenplum-db/lib/postgresql/
- Create a test database:
gpadmin@gpmaster$ createdb testdb
- Register the custom formatter function in this database:
gpadmin@gpmaster$ psql -d testdb -U gpadmin -f $GPHOME/share/postgresql/customfmt_example.sql
- Create a Greenplum table in the database:
gpadmin@gpmaster$ psql -d testdb -U gpadmin -c 'CREATE TABLE test_table( str_column text );'
- Create a Kafka topic named customtest.
- Start a GPSS server:
gpadmin@gpmaster$ gpss &
- Create a version 2 Kafka load configuration file; copy/paste the following into a file named kafka_custom_formatter.yml:
DATABASE: testdb USER: gpadmin HOST: localhost PORT: 15432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: test VALUE: COLUMNS: - NAME: value TYPE: text FORMAT: custom CUSTOM_OPTION: NAME: customfmt_in PARAMSTR: prefix="kafka_msg_" ERROR_LIMIT: 2 OUTPUT: TABLE: test_table MODE: INSERT MAPPING: - NAME: str_column EXPRESSION: value
- Submit the job:
gpadmin@gpmaster$ gpsscli submit kafka_custom_formatter.yml
- Start the job:
gpadmin@gpmaster$ gpsscli start kafka_custom_formatter
- Generate a binary test data record and save to a file named
data_example.bin.
gpadmin@gpmaster$ cat "0x03 0x00 0x00 0x00 0x41 0x42 0x43" > input.txt gpadmin@gpmaster$ xxd -r -p input.txt data_example.bin
- Load the test data into Kafka:
gpadmin@gpmaster$ cat data_example.bin | kafka-console-producer --broker-list localhost:9292 --topic customtest
- Examine the Greenplum test_table table. psql -d testdb -U gpadmin -c 'SELECT * FROM test_table;'