Continuous Queries

Querying infinite streams of data


Vyne supports querying sources that expose an infinite stream of data.

At present, only HTTP sources that return a stream (text/event-stream) may be used in a continuous query, however support for message brokers is planned.

Exposing a stream of data

Services that expose an infinite stream of data should define their return type as Stream<T> instead of T[] (or Array<T>).

eg:

service MyService {
   getComments():Comment[] // returns a finite collection of comments
   getCommentStream():Stream<Comment> // returns an infinite stream of comments
}

Query syntax

The stream keyword is used in place of findAll and array notation is not used on return types.

For example, to access our stream of Comment types in the earlier example, a query would be:

stream { Comment }

This is the streaming equivalent of

find { Comment[] }

Streaming Query API

Streaming queries return an indefinite number of records therefore the results when returned over HTTP must be streamed using server side events and cannot be collected into a list of records

Request:

curl 'http://localhost:9022/api/vyneql?resultMode=RAW' \
-H 'Accept: text/event-stream' \
-H 'Content-Type: application/json' \
-H 'Origin: http://localhost:9022' \
--data-raw 'stream { Comment }'

Note: The header Accept: text/event-stream must be specified in the request

Response:

Content-Type: text/event-stream;charset=UTF-8

data:{"a":"a","b":"b"}

data:{"a":"a","b":"b"}

data:{"a":"a","b":"b"}
...

Continuous queries with projection

Continuous queries may also be projected to other types in the same way as regular Vyne queries.

stream { Comment } as {
   author : Author
   text : CommentText
}

Example

Given a service at https://prices.com/price that returns a stream of events as server side events as

data:{"instrument":"AAL","price":10.235533}

data:{"instrument":"AAL","price":10.245533}

data:{"instrument":"AAL","price":10.255533}

Taxi schema

type Instrument inherits String
type Price inherits Decimal

model InstrumentPrice
{
    instrument : Instrument
    price : Price
}

Define the service

service PriceService {
   @HttpOperation(method = "GET" , url = "https://prices.com/price")
   operation getPrices(): Stream<InstrumentPrice>
}

InstrumentPrice may be streamed from Vyne using the following query

stream { InstrumentPrice }
curl 'http://localhost:9022/api/vyneql?resultMode=RAW' \
-H 'Accept: text/event-stream' \
-H 'Content-Type: application/json' \
-H 'Origin: http://localhost:9022' \
--data-raw 'stream { InstrumentPrice }'