Pipeline spec reference

Reference documentation on how to configure inputs and outputs of pipelines


Introduction

Pipeline configs are json files, with a name ending in .pipeline.json, with a single file per pipeline.

The high level format is as follows:

{
   "id" : "pipeline-lnuYHU", // A unique Id.  Will be assigned if one isn't already present
  "description" : "My first pipeline" // A human readable description
  "name" : "Pipeline One", // The name of the pipeline
  "input" : {
    "type" : "Generic", // The type of the source
    ... // other config here, as defined by the source type
  },
  "output" : {
      "type" : "Generic" // The type of the output
    ... // other config here, as defined by the output type
  },

}

Most of the details of the pipeline are defined by the config for the selected Input and Output

Inputs

AWS S3

Pipeline Type KeyDirectionMaturity
awsS3INPUTBeta

A source that consumes a single file/object from an S3 bucket.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionNameThe name of the connection, as registered in Vyne's connection managertrue
bucketThe bucket nametrue
objectKeyThe name of the object in the S3 bucket - generally a file nametrue
targetTypeNameThe name of the type that content from the S3 bucket should be consumed astrue

Example

{
  "input" : {
    "type" : "awsS3",
    "direction" : "INPUT",
    "connectionName" : "my-aws-connection",
    "bucket" : "my-bucket",
    "objectKey" : "customers.csv",
    "targetTypeName" : "com.demo.customers.Customer",
    "targetType" : "com.demo.customers.Customer"
  }
}

AWS S3 via SQS

Pipeline Type KeyDirectionMaturity
awsS3INPUTBeta

A source that consumes a stream of S3 events via a preconfigured SQS queue

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionThe name of the connection, as registered in Vyne's connection managertrue
targetTypeNameThe name of the type that content from the S3 bucket should be consumed astrue
queueNameThe name of the SQS queuetrue
pollScheduleA cron expression that defines how frequently to check for new messages. Defaults to every second.false
preventConcurrentExecutionWhen set to true, specifically controls the next execution time when the last execution finishes.false

Example

{
  "input" : {
    "type" : "awsS3",
    "direction" : "INPUT",
    "connectionName" : "my-aws-connection",
    "bucket" : "my-bucket",
    "objectKey" : "customers.csv",
    "targetTypeName" : "com.demo.customers.Customer",
    "targetType" : "com.demo.customers.Customer"
  }
}

Polling operation input

Pipeline Type KeyDirectionMaturity
taxiOperationINPUTBeta

Invokes an operation (as defined or published to Vyne), on a periodic basis.

Accepts inputs defined in the configuration, which will be passed to the service on invocation. The result of this operation is published downstream on the pipeline to be transformed to another type, and published to an output.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
operationNameThe name of the operation, as defined in the schema. Should be in the format of a fully qualified operation name. See the sample for an exampletrue
pollScheduleA Spring-flavored cron expression, defining the frequency this operation should be invoked.true
parameterMapAn optional map of parameters to pass to the operationfalse

Example

{
  "input" : {
    "type" : "taxiOperation",
    "direction" : "INPUT",
    "operationName" : "com.demo.customers.CustomerService@@listCustomers",
    "pollSchedule" : "* * * * * *",
    "parameterMap" : {
      "customerStatus" : "ACTIVE"
    }
  }
}

Polling query input

Pipeline Type KeyDirectionMaturity
queryINPUTBeta

Invokes a TaxiQL query on Vyne, on a periodic basis.

The result of this query is published downstream on the pipeline to be transformed to another type, and published to an output.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
queryThe query to be executed. See the sample for an example.true
pollScheduleA Spring-flavored cron expression, defining the frequency this query should be invoked.true
preventConcurrentExecutionWhen set to true, specifically controls the next execution time when the last execution finishes.false

Example

{
  "input" : {
    "type" : "query",
    "direction" : "INPUT",
    "query" : "find { Person( FirstName == 'Jim' ) }",
    "pollSchedule" : "* * * * * *",
    "preventConcurrentExecution" : false
  }
}

Kafka topic

Pipeline Type KeyDirectionMaturity
kafkaINPUTBeta

Defines an input that reads from a Kafka topic.

The kafka broker is configured using Vyne's connection manager, along with a topic defined for this pipeline input.

Controlling deserialization (protobuf / avro etc)

Deserialization is controlled using annotations declared on the configured type (targetTypeName).

If not specified, Vyne attempts to read the content as JSON, using a StringDecoder

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionNameThe name of the connection, as registered in Vyne's connection managertrue
topicThe name of the topic to consume fromtrue
targetTypeNameThe fully qualified name of the type that content should be read as.true

Example

{
  "input" : {
    "type" : "kafka",
    "direction" : "INPUT",
    "connectionName" : "my-kafka-connection",
    "topic" : "customerNotifications",
    "targetTypeName" : "com.demo.CustomerEvent",
    "targetType" : "com.demo.CustomerEvent"
  }
}

Outputs

Cask

Pipeline Type KeyDirectionMaturity
caskOUTPUTBeta

An output that writes directly to a Vyne Cask.

Casks provide a way to store content in a database, and expose an auto-generated RESTful service over the top, with all the correct Vyne operation schemas generated.

You may wish to consider using a Jdbc database output instead.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
targetTypeThe type that defines the cask containing this datatrue

Example

{
  "output" : {
    "type" : "cask",
    "direction" : "OUTPUT",
    "targetType" : "com.demo.Customer"
  }
}

Operation output

Pipeline Type KeyDirectionMaturity
taxiOperationOUTPUTBeta

Invokes an operation (as defined or published to Vyne), using the data provided upstream in the pipeline.

If the provided data does not satisfy the contract of the operation, Vyne will use the provided input as the basis of a discovery search, to find additional data.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
operationNameThe name of the operation, as defined in the schema. Should be in the format of a fully qualified operation name. See the sample for an exampletrue

Example

{
  "output" : {
    "type" : "taxiOperation",
    "direction" : "OUTPUT",
    "operationName" : "com.demo.customers.CustomerService@@DisableCustomerAccounts"
  }
}

Kafka topic

Pipeline Type KeyDirectionMaturity
kafkaOUTPUTBeta

Defines an output that writes to a Kafka topic.

The kafka broker is configured using Vyne's connection manager, along with a topic defined for this pipeline output.

Controlling serialization (protobuf / avro etc)

Serialization is controlled using annotations declared on the configured type (targetTypeName).

If not specified, Vyne attempts to write the content as JSON, using a StringDecoder

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionNameThe name of the connection, as registered in Vyne's connection managertrue
topicThe name of the topic to write totrue
targetTypeNameThe fully qualified name of the type that content should be written as.true

Example

{
  "output" : {
    "type" : "kafka",
    "direction" : "OUTPUT",
    "connectionName" : "my-kafka-connection",
    "topic" : "CustomerEvents",
    "targetTypeName" : "com.demo.customers.CustomerEvent",
    "targetType" : "com.demo.customers.CustomerEvent"
  }
}

Database Output

Pipeline Type KeyDirectionMaturity
jdbcOUTPUTBeta

A pipeline output that writes to a database.

The pipeline uses a connection that has been defined using Vyne's connection manager. Most database types are supported, providing they expose a JDBC driver.

Table definition

A table is created, if it doesn't already exist, using the config defined on the target type.

If the table contains a io.vyne.jdbc.Table annotation, then this is used to define the table name. Otherwise, a table name is derived from the name of the type.

Similarly, columns are created for all attributes annotated with a io.vyne.jdbc.Column annotation.

The table creation occurs when the pipeline is first initiated, and run once. Table creation occurs using a CREATE IF NOT EXISTS statement, so if the type has been changed since the table was first created, changes will not be propagated to the database.

Batching inserts

In order to reduce load on the database, inserts are batched in windows of 500ms.

Write disposition

Different pipelines have different needs in terms of what should be the result of subsequent runs. In some cases it is preferable to append the new data into the same table. For example when the new data is an increment to the existing dataset. In other cases it makes sense to replace the data with the new batch which is preferable when data always contains the full dataset. There are two supported write disposition modes to cater for different needs:

  • APPEND: The data is appended to the existing table on each run of the pipeline. This is the default.
  • RECREATE: The data is written to a new table, and the view is switched to point to the new table. It is the responsibility of the user to ensure that there aren't two concurrent runs as this introduces a race condition for the switch over and especially stale table deletion if enabled. The source needs to support this by providing a unique identifier for each run for it to work. In general, this only works for batch sources as the streaming sources don't ever complete. Currently only the AWS SQS S3 and polling query sources supports this. The default write disposition is APPEND.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionThe name of a connection, configured in Vyne's connection managertrue
targetTypeNameThe fully qualified name of the type which content being pushed to the database should be read astrue
writeDispositionWhether to append new data into the existing table (APPEND), or to create a new table with a unique name and switch over the view to point to the newly created table (RECREATE).false

Example

{
  "output" : {
    "type" : "jdbc",
    "direction" : "OUTPUT",
    "connection" : "my-connection",
    "targetTypeName" : "com.demo.Customer",
    "writeDisposition" : "APPEND",
    "targetType" : "com.demo.Customer",
    "windowDurationMs" : 500
  }
}

AWS S3

Pipeline Type KeyDirectionMaturity
awsS3OUTPUTBeta

An output that produces a single file to an S3 bucket.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionNameThe name of the connection, as registered in Vyne's connection managertrue
bucketThe bucket nametrue
objectKeyThe name of the object in the S3 bucket - generally a file name. To obtain unique file names you can use the "{env.now}" as part of the object key to add a timestamp in the ISO format (YYYY-MM-DDTHH:mm:ss.sssZ). If the object exists already, it will be overwritten.true
targetTypeNameThe name of the type based on which to produce the filetrue

Example

{
  "output" : {
    "type" : "awsS3",
    "direction" : "OUTPUT",
    "connectionName" : "my-aws-connection",
    "bucket" : "my-bucket",
    "objectKey" : "customers.csv",
    "targetTypeName" : "com.demo.customers.Customer",
    "targetType" : "com.demo.customers.Customer"
  }
}