Connect a kafka topic
Learn how to make a Kafka Topic available for Vyne to query
Vyne can read data from a Kafka topic to stream data as part of a query.
Add a topic through the UI
The UI allows you to connect a Kafka topic directly, without having to manually edit taxi files.
When importing a topic, it's expected that the model type already exists.
Either declare your model directly in your taxi schema, or consider importing a model type, for example - importing a Protobuf schema
Declaring a topic in Taxi
Kafka topics are declared in taxi as simple Operations which return a Stream of data.
// 1: Add the required imports
import io.vyne.kafka.KafkaService
import io.vyne.kafka.KafkaOperation
// 2: Annotate the service as a `KafkaService`.
@KafkaService( connectionName = "moviesConnection" )
service MyKafkaService {
// 3: Annotate the operation as a `@KafkaOperation`
@KafkaOperation( topic = "hello-worlds", offset = "earliest" )
operation streamGoodThings():Stream<HelloWorld>
}
- Add the required imports
- Annotate the service with
@KafkaService
- The
connectionName
parameter should match a connection listed in the connections config file.
- The
- Annotate the operation as a
@KafkaOperation
, specifying the topic and offset. - The return type should be a
Stream<>
of the defined message type.
Controlling deserialization
How the message is deserialized is determined by the model type being exposed.
By default, models are expected to be JSON.
However, this can be controlled by annotating the model with a format annotation.
Currently, formats are limited to protobuf only, but we are working on adding more.
See Working with Protobuf for more information.