Continuous Queries
Querying infinite streams of data
Vyne supports querying sources that expose an infinite stream of data.
At present, only HTTP sources that return a stream (text/event-stream
) may be used in a continuous query, however
support for message brokers is planned.
Exposing a stream of data
Services that expose an infinite stream of data should define their return type as Stream<T>
instead of T[]
(or Array<T>
).
eg:
service MyService {
getComments():Comment[] // returns a finite collection of comments
getCommentStream():Stream<Comment> // returns an infinite stream of comments
}
Query syntax
The stream
keyword is used in place of findAll
and array notation is not used on return types.
For example, to access our stream of Comment
types in the earlier example, a query would be:
stream { Comment }
This is the streaming equivalent of
find { Comment[] }
Streaming Query API
Streaming queries return an indefinite number of records therefore the results when returned over HTTP must be streamed using server side events and cannot be collected into a list of records
Request:
curl 'http://localhost:9022/api/vyneql?resultMode=RAW' \
-H 'Accept: text/event-stream' \
-H 'Content-Type: application/json' \
-H 'Origin: http://localhost:9022' \
--data-raw 'stream { Comment }'
Note: The header Accept: text/event-stream
must be specified in the request
Response:
Content-Type: text/event-stream;charset=UTF-8
data:{"a":"a","b":"b"}
data:{"a":"a","b":"b"}
data:{"a":"a","b":"b"}
...
Continuous queries with projection
Continuous queries may also be projected to other types in the same way as regular Vyne queries.
stream { Comment } as {
author : Author
text : CommentText
}
Example
Given a service at https://prices.com/price
that returns a stream of events as server side events as
data:{"instrument":"AAL","price":10.235533}
data:{"instrument":"AAL","price":10.245533}
data:{"instrument":"AAL","price":10.255533}
Taxi schema
type Instrument inherits String
type Price inherits Decimal
model InstrumentPrice
{
instrument : Instrument
price : Price
}
Define the service
service PriceService {
@HttpOperation(method = "GET" , url = "https://prices.com/price")
operation getPrices(): Stream<InstrumentPrice>
}
InstrumentPrice
may be streamed from Vyne using the following query
stream { InstrumentPrice }
curl 'http://localhost:9022/api/vyneql?resultMode=RAW' \
-H 'Accept: text/event-stream' \
-H 'Content-Type: application/json' \
-H 'Origin: http://localhost:9022' \
--data-raw 'stream { InstrumentPrice }'