Pipeline spec reference
Reference documentation on how to configure inputs and outputs of pipelines
Introduction
Pipeline configs are json files, with a name ending in .pipeline.json
, with a single file per pipeline.
The high level format is as follows:
{
"id" : "pipeline-lnuYHU", // A unique Id. Will be assigned if one isn't already present
"description" : "My first pipeline" // A human readable description
"name" : "Pipeline One", // The name of the pipeline
"input" : {
"type" : "Generic", // The type of the source
... // other config here, as defined by the source type
},
"output" : {
"type" : "Generic" // The type of the output
... // other config here, as defined by the output type
},
}
Most of the details of the pipeline are defined by the config for the selected Input and Output
Inputs
AWS S3
Pipeline Type Key | Direction | Maturity |
---|---|---|
awsS3 | INPUT | Beta |
A source that consumes a single file/object from an S3 bucket.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connectionName | The name of the connection, as registered in Vyne's connection manager | true | |
bucket | The bucket name | true | |
objectKey | The name of the object in the S3 bucket - generally a file name | true | |
targetTypeName | The name of the type that content from the S3 bucket should be consumed as | true |
Example
{
"input" : {
"type" : "awsS3",
"direction" : "INPUT",
"connectionName" : "my-aws-connection",
"bucket" : "my-bucket",
"objectKey" : "customers.csv",
"targetTypeName" : "com.demo.customers.Customer",
"targetType" : "com.demo.customers.Customer"
}
}
AWS S3 via SQS
Pipeline Type Key | Direction | Maturity |
---|---|---|
awsS3 | INPUT | Beta |
A source that consumes a stream of S3 events via a preconfigured SQS queue
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connection | The name of the connection, as registered in Vyne's connection manager | true | |
targetTypeName | The name of the type that content from the S3 bucket should be consumed as | true | |
queueName | The name of the SQS queue | true | |
pollSchedule | A cron expression that defines how frequently to check for new messages. Defaults to every second. | false | |
preventConcurrentExecution | When set to true, specifically controls the next execution time when the last execution finishes. | false |
Example
{
"input" : {
"type" : "awsS3",
"direction" : "INPUT",
"connectionName" : "my-aws-connection",
"bucket" : "my-bucket",
"objectKey" : "customers.csv",
"targetTypeName" : "com.demo.customers.Customer",
"targetType" : "com.demo.customers.Customer"
}
}
Polling operation input
Pipeline Type Key | Direction | Maturity |
---|---|---|
taxiOperation | INPUT | Beta |
Invokes an operation (as defined or published to Vyne), on a periodic basis.
Accepts inputs defined in the configuration, which will be passed to the service on invocation. The result of this operation is published downstream on the pipeline to be transformed to another type, and published to an output.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
operationName | The name of the operation, as defined in the schema. Should be in the format of a fully qualified operation name. See the sample for an example | true | |
pollSchedule | A Spring-flavored cron expression, defining the frequency this operation should be invoked. | true | |
parameterMap | An optional map of parameters to pass to the operation | false |
Example
{
"input" : {
"type" : "taxiOperation",
"direction" : "INPUT",
"operationName" : "com.demo.customers.CustomerService@@listCustomers",
"pollSchedule" : "* * * * * *",
"parameterMap" : {
"customerStatus" : "ACTIVE"
}
}
}
Polling query input
Pipeline Type Key | Direction | Maturity |
---|---|---|
query | INPUT | Beta |
Invokes a TaxiQL query on Vyne, on a periodic basis.
The result of this query is published downstream on the pipeline to be transformed to another type, and published to an output.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
query | The query to be executed. See the sample for an example. | true | |
pollSchedule | A Spring-flavored cron expression, defining the frequency this query should be invoked. | true | |
preventConcurrentExecution | When set to true, specifically controls the next execution time when the last execution finishes. | false |
Example
{
"input" : {
"type" : "query",
"direction" : "INPUT",
"query" : "find { Person( FirstName == 'Jim' ) }",
"pollSchedule" : "* * * * * *",
"preventConcurrentExecution" : false
}
}
Kafka topic
Pipeline Type Key | Direction | Maturity |
---|---|---|
kafka | INPUT | Beta |
Defines an input that reads from a Kafka topic.
The kafka broker is configured using Vyne's connection manager, along with a topic defined for this pipeline input.
Controlling deserialization (protobuf / avro etc)
Deserialization is controlled using annotations declared on the configured type (targetTypeName
).
If not specified, Vyne attempts to read the content as JSON, using a StringDecoder
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connectionName | The name of the connection, as registered in Vyne's connection manager | true | |
topic | The name of the topic to consume from | true | |
targetTypeName | The fully qualified name of the type that content should be read as. | true |
Example
{
"input" : {
"type" : "kafka",
"direction" : "INPUT",
"connectionName" : "my-kafka-connection",
"topic" : "customerNotifications",
"targetTypeName" : "com.demo.CustomerEvent",
"targetType" : "com.demo.CustomerEvent"
}
}
Outputs
Cask
Pipeline Type Key | Direction | Maturity |
---|---|---|
cask | OUTPUT | Beta |
An output that writes directly to a Vyne Cask.
Casks provide a way to store content in a database, and expose an auto-generated RESTful service over the top, with all the correct Vyne operation schemas generated.
You may wish to consider using a Jdbc database output instead.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
targetType | The type that defines the cask containing this data | true |
Example
{
"output" : {
"type" : "cask",
"direction" : "OUTPUT",
"targetType" : "com.demo.Customer"
}
}
Operation output
Pipeline Type Key | Direction | Maturity |
---|---|---|
taxiOperation | OUTPUT | Beta |
Invokes an operation (as defined or published to Vyne), using the data provided upstream in the pipeline.
If the provided data does not satisfy the contract of the operation, Vyne will use the provided input as the basis of a discovery search, to find additional data.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
operationName | The name of the operation, as defined in the schema. Should be in the format of a fully qualified operation name. See the sample for an example | true |
Example
{
"output" : {
"type" : "taxiOperation",
"direction" : "OUTPUT",
"operationName" : "com.demo.customers.CustomerService@@DisableCustomerAccounts"
}
}
Kafka topic
Pipeline Type Key | Direction | Maturity |
---|---|---|
kafka | OUTPUT | Beta |
Defines an output that writes to a Kafka topic.
The kafka broker is configured using Vyne's connection manager, along with a topic defined for this pipeline output.
Controlling serialization (protobuf / avro etc)
Serialization is controlled using annotations declared on the configured type (targetTypeName
).
If not specified, Vyne attempts to write the content as JSON, using a StringDecoder
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connectionName | The name of the connection, as registered in Vyne's connection manager | true | |
topic | The name of the topic to write to | true | |
targetTypeName | The fully qualified name of the type that content should be written as. | true |
Example
{
"output" : {
"type" : "kafka",
"direction" : "OUTPUT",
"connectionName" : "my-kafka-connection",
"topic" : "CustomerEvents",
"targetTypeName" : "com.demo.customers.CustomerEvent",
"targetType" : "com.demo.customers.CustomerEvent"
}
}
Database Output
Pipeline Type Key | Direction | Maturity |
---|---|---|
jdbc | OUTPUT | Beta |
A pipeline output that writes to a database.
The pipeline uses a connection that has been defined using Vyne's connection manager. Most database types are supported, providing they expose a JDBC driver.
Table definition
A table is created, if it doesn't already exist, using the config defined on the target type.
If the table contains a io.vyne.jdbc.Table
annotation, then this is used
to define the table name. Otherwise, a table name is derived from the
name of the type.
Similarly, columns are created for all attributes annotated with a io.vyne.jdbc.Column
annotation.
The table creation occurs when the pipeline is first initiated, and run once.
Table creation occurs using a CREATE IF NOT EXISTS
statement, so if the type
has been changed since the table was first created, changes will not be propagated
to the database.
Batching inserts
In order to reduce load on the database, inserts are batched in windows of 500ms.
Write disposition
Different pipelines have different needs in terms of what should be the result of subsequent runs. In some cases it is preferable to append the new data into the same table. For example when the new data is an increment to the existing dataset. In other cases it makes sense to replace the data with the new batch which is preferable when data always contains the full dataset. There are two supported write disposition modes to cater for different needs:
APPEND
: The data is appended to the existing table on each run of the pipeline. This is the default.RECREATE
: The data is written to a new table, and the view is switched to point to the new table. It is the responsibility of the user to ensure that there aren't two concurrent runs as this introduces a race condition for the switch over and especially stale table deletion if enabled. The source needs to support this by providing a unique identifier for each run for it to work. In general, this only works for batch sources as the streaming sources don't ever complete. Currently only the AWS SQS S3 and polling query sources supports this. The default write disposition isAPPEND
.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connection | The name of a connection, configured in Vyne's connection manager | true | |
targetTypeName | The fully qualified name of the type which content being pushed to the database should be read as | true | |
writeDisposition | Whether to append new data into the existing table (APPEND), or to create a new table with a unique name and switch over the view to point to the newly created table (RECREATE). | false |
Example
{
"output" : {
"type" : "jdbc",
"direction" : "OUTPUT",
"connection" : "my-connection",
"targetTypeName" : "com.demo.Customer",
"writeDisposition" : "APPEND",
"targetType" : "com.demo.Customer",
"windowDurationMs" : 500
}
}
AWS S3
Pipeline Type Key | Direction | Maturity |
---|---|---|
awsS3 | OUTPUT | Beta |
An output that produces a single file to an S3 bucket.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connectionName | The name of the connection, as registered in Vyne's connection manager | true | |
bucket | The bucket name | true | |
objectKey | The name of the object in the S3 bucket - generally a file name. To obtain unique file names you can use the "{env.now}" as part of the object key to add a timestamp in the ISO format (YYYY-MM-DDTHH:mm:ss.sssZ). If the object exists already, it will be overwritten. | true | |
targetTypeName | The name of the type based on which to produce the file | true |
Example
{
"output" : {
"type" : "awsS3",
"direction" : "OUTPUT",
"connectionName" : "my-aws-connection",
"bucket" : "my-bucket",
"objectKey" : "customers.csv",
"targetTypeName" : "com.demo.customers.Customer",
"targetType" : "com.demo.customers.Customer"
}
}