GPSS Streaming Job API Service Definition

GPSS Streaming Job API Service Definition

The Greenplum Streaming Server (GPSS) is a gRPC server. GPSS uses gRPC protocol buffers (protobuf) to define the GPSS client interfaces and their message interchange formats. With protocol buffers, the structure of the data (messages) and the operations supported (services) are defined in a .proto file, an ordinary text file. Refer to the Protocol Buffers Language Guide for detailed information about this data serialization framework.

The GPSS Streaming Job API job.proto file defines the methods that clients can invoke to manage streaming jobs to Greenplum Database. For example, a GPSS client that you develop can submit a request to submit a Kafka data load job, or to insert data directly from a CSV-format file into a specific Greenplum table.

Service Definition

The GPSS Streaming Job API service definition follows. Copy/paste the contents to a file named job.proto, and note the file system location.

syntax = "proto3";

package gpss;
import "google/protobuf/timestamp.proto";

/*
* Submit may be the most important rpc. 
* When submitting a job, you need one of valid sources, one of valid targets which we name it as unit.
* Currently we provide source unit including kafka and file, and only one target unit GreenplumTarget.
* Some sources or targets may be added in the future.
* There are also some options for Submit job that control the job when it's running.
*/

// SubmitJob service Request message
message SubmitJobRequest {
  Target target = 1;         // where does data flow to
  Source source = 2;         // where is data come from
  Channel channel = 3;       // how to transport data
  JobOption job_option = 4;  // options to control the job
}

// SubmitJob and SubmitRawJob service Response message
message SubmitJobResponse {
  string job_id = 1;
  string job_name = 2;
  string message = 3;
}

// Channel is a way of data flow
message Channel {
  oneof unit {
    GPDBExternalTable gpdb_external_table = 1;
  }
}

// GPDBExternalTable is an implementation of Channel
message GPDBExternalTable {
  Mode mode = 1; // mode to process data, ex: insert/update/merge and etc. Default mode is insert.

  // when data flows, we will create some work tables
  // and put these work tables into work_schema
  // if it's empty, we will use the schema in GreenplumTargetUnit as default.
  string work_schema = 2;
  string error_limit = 3; // max error number in parsing source data per gpdb segment
  BatchWindow batch_window = 4;
  map<string, string> mapping = 5; // <column name, expression> funcExpression(input data) -> output data, the ordered expression is not guaranteed
}

// Batch
message Batch {
  // if number of rows >= min_row, TODO file job server  
  // for kafka source, it's better to max_count >= 200
  int64 max_count = 1;
  int64 interval_ms = 2; // time interval, unit: milliseconds , 100W
}

// Batch Window
message BatchWindow {
  Batch batch = 1;
  int64 window_size = 2; // batch number  batch_interval
  string window_statement = 3; // batch_sql
}

message Mode {
  oneof unit {
    InsertMode insert = 1;
    UpdateMode update = 2;
    MergeMode merge = 3;
  }
}

message InsertMode {
  string filter_expression = 1; // filter expression is used for filter source data
}

message UpdateMode {
  string filter_expression = 1; // filter expression is used for filter source data
  repeated string match_columns = 2; // primary key for update/upsert/delete
  repeated string update_columns = 3; // columns to be updated
  repeated string order_columns = 4; // columns used to sort rows, row with biggest value takes effect.
  string update_condition = 5; // where condition
}

message MergeMode {
  string filter_expression = 1; // filter expression is used for filter source data
  repeated string match_columns = 2; // primary key for update/upsert/delete
  repeated string update_columns = 3; // columns to be updated
  repeated string order_columns = 4; // columns used to sort rows, row with biggest value takes effect.
  string update_condition = 5; // where condition
  string delete_condition = 6; // where condition
}

// Target is the target of data
message Target {
  oneof unit {
    GPDBTarget gpdb = 1;
  }
}

// Source is the source of data
message Source {
  oneof unit {
    KafkaSource kafka = 1;
    FileSource file = 2;
  }
}

// GPDBTarget is a Greenplum Database target.
message GPDBTarget {
  string host = 1;
  int32 port = 2;
  string user = 3;
  string password = 4;
  string database = 5;
  string schema = 6;
  string table = 7;
}

message KafkaSource {
  string topic = 1; // kafka topic
  string brokers = 2; // the brokers must be separated by comma: brokers1, brokers2 ...
  SourceDataFormat key_content = 3; // key data source format
  SourceDataFormat value_content = 4; // value data source format
  SourceDataFormat meta = 5; // source data format
  map<string, string> rdkafka_prop = 6; // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
}

message FileSource {
  repeated string uri = 1; // file uri 
  SourceDataFormat content = 2; // source data format
  SourceDataFormat meta = 3; // source data format
}

// JobOption is an option used to control the job when it is running.
// for example:
// - name for an optional job name
// - schedule for controlling  the retry times and etc when job failed.
message JobOption {
  string name = 1;
  Schedule schedule = 2;
}

// identify a job by id or name
message JobIdentifier {
  oneof unit {
    string job_id = 1; // unique hash id
    string job_name = 2; // unique job name, map[name][job_uid], TODO rpc
  }
}	

// StartJob service Request message
message StartJobRequest {
  JobIdentifier job_identifier = 1;
  ExtraOption extra_option = 2;
}

message ExtraOption {
  oneof unit {
    KafkaSourceExtraOption kafka = 1;
  }
}

message KafkaSourceExtraOption {
  bool quit_at_eof = 1; // specific for rdkafka, if there is no more data, an eof will return and the job will end.
  KafkaReset kafka_reset = 2; // reset option
}

// reset kafka from earliest, latest or from a timestamp
message KafkaReset {
  oneof unit {
    bool earliest = 1;
    bool latest = 2;
    string timestamp = 3;
  }
}

// StartJob service Response message
message StartJobResponse {}

message Schedule {
  int64 max_retries = 1;
  string retry_interval = 2; // valid unit is d/h/m/s/ms
}

message RemoveJobRequest {
  JobIdentifier job_identifier = 1;
}

message RemoveJobResponse {}

message GetJobConfigRequest { string job_id = 1; }

message GetJobConfigResponse { Source config = 1; }

// ListJobs service Request message
message ListJobRequest {
	repeated JobIdentifier job_identifiers = 1; // id xxx not found, valid values returned
}

// ListJobs service Response message
message ListJobResponse {
  repeated JobInfo job_infos = 1;
}

message JobInfo {
  string id = 1;
  string name =2;
  JobStatus status = 3;
  Target target = 4;
  Source source = 5;
}

message JobStatus {
  JobStatusCode code = 1;
  string msg = 2;
  google.protobuf.Timestamp time = 3;
}

enum JobStatusCode {
  JOB_UNSPECIFIED = 0;
  JOB_STOPPED = 1;
  JOB_RUNNING = 2;
  JOB_ERROR = 3;
}

// StopJob service Request message
message StopJobRequest {
  JobIdentifier job_identifier = 1;
}

// StopJob service Response message
message StopJobResponse {}

// SubmitRawJob service Request message
message SubmitRawJobRequest {
  // yaml_content is the yaml file content in the gpsscli.
  // It's a compatibility for cli users using rpc.
  // the yaml_content can be history gpsscli Config V1, V2 and new rpc style config.
  string yaml_content = 1;
  string job_name = 2; // optional job name
 }

// ***********************end of open proto message******************************

message JobProgressRecord {
  JobHistoryRecord record = 1;
  int64 msg_size = 2;
}

message JobHistoryRecord {
  google.protobuf.Timestamp start_time = 1;
  google.protobuf.Timestamp end_time = 2;
  int64 begin_offset = 3;
  int64 end_offset = 4;
  int32 partition_id = 5;
}

enum HistoryRequestMode {
  REQUEST_UNSPECIFIED = 0;
  REQUEST_LATEST = 1;
  REQUEST_ALL = 2;
  REQUEST_DEFAULT = 3;
}

message WaitJobRequest { 
  JobIdentifier job_identifier = 1;
}
message WaitJobResponse { JobStatus status = 1; }

message GetJobStatusRequest { 
  JobIdentifier job_identifier = 1;
}

message GetJobStatusResponse { JobStatus status = 1; }

message GetJobPartitionStatusRequest {
  string job_id = 1;
  int32 partition_id = 2;
}

message GetJobPartitionStatusResponse {
  int64 offset = 1;
  bool eof = 2;
}

message GetCurrentProgressRequest {
  JobIdentifier job_identifier = 1;
}

message GetCurrentProgressResponse {
  repeated JobProgressRecord partition_records = 1;
  int64 inserted_record_num = 2;
  int64 rejected_record_num = 3;
}

service JobService {
  /*
  * begin of published api
  */
  rpc SubmitJob(SubmitJobRequest) returns (SubmitJobResponse) {}
  rpc StartJob(StartJobRequest) returns (StartJobResponse) {}
  rpc StopJob(StopJobRequest) returns (StopJobResponse) {}
  rpc ListJobs(ListJobRequest) returns (ListJobResponse) {}
  // SubmitRawJob rpc is a compatible one which provides a quick submit by yaml file from gpsscli command 
  // we strongly recomand using SubmitJob rpc instead
  rpc SubmitRawJob(SubmitRawJobRequest) returns (SubmitJobResponse) {}
  /*
  * end of published api
  */

  // unpublished api
  rpc GetJobStatus(GetJobStatusRequest) returns (GetJobStatusResponse) {}
  rpc RemoveJob(RemoveJobRequest) returns (RemoveJobResponse) {}
  rpc GetJobConfig(GetJobConfigRequest) returns (GetJobConfigResponse) {}
  rpc WaitJob(WaitJobRequest) returns (WaitJobResponse) {}
  rpc GetJobPartitionStatus(GetJobPartitionStatusRequest)
      returns (GetJobPartitionStatusResponse) {}
  rpc GetCurrentProgress(GetCurrentProgressRequest)
      returns (stream GetCurrentProgressResponse) {}
}

// source data's format.
// if there is an intermediate column inside Format,
// then the source data will be transformed to the intermediate column.
// If there is no source_column_name in Format,
// then the column name will be the Target table column name,
// and the source column data type should be matched with Target column type.
message SourceDataFormat {
  oneof unit {
    FormatAvro avro = 1;
    FormatBinary binary = 2;
    FormatCSV csv = 3;
    FormatDelimited delimited = 4;
    FormatJSON json = 5;
  }
}

message FormatAvro {
  string source_column_name = 1; // the source column name , may be used in Expression/Mapping node.
  string schema_url = 2; // used for avro schema, if exists, then request avro schema from url
  bool   bytes_to_base64 = 3; // when set to true, bytes field in avro message will be converted to base64 encoded string. It works only if 'schema_url' exists.
}

message FormatBinary {
  string source_column_name = 1; // the source column name , may be used in Expression/Mapping node.
}

message FormatCSV {
  repeated IntermediateColumn columns = 1; // source column, move to format.Column c1:bin, c2:json ...
}

message FormatDelimited {
  repeated IntermediateColumn columns = 1; // the source columns name , may be used in Expression/Mapping node.
  string delimiter = 2;
}

message FormatJSON {
  IntermediateColumn column = 1; // the source column name , may be used in Expression/Mapping node.
}

// IntermediateColumn is an intermediate result after parsing SourceDataFormat,
// IntermediateColumn looks like an virtual table columns and
// will be used directly in Channel to filter or convert types or do sth else.
// It will parse the Source Data to a table column style data.
// source column: name and type, the type must be valid.
// ex: convert a string "123" to 123 integer.
// caustion: the FormatJSON is not a decomposed format, we trate json as a integral type.
message IntermediateColumn {
  string name = 1;
  string type = 2; // current we support basic data types as Greenplum Database
}