Pipe Connector#
Getting Started#
"Hello World!"#
The No-Op Example#
This example just passes the payload to the next segment, without doing anything with it.
Vert.x Verticle:#
class MyPipeVerticle : CoroutineVerticle() {
override suspend fun start() {
vertx.eventBus().consumer(ADDRESS, this::handlePipe)
awaitResult<PipeConnector> { createPipeConnector(vertx, it) }.pulishTo(ADDRESS)
}
private fun handlePipe(message: Message<PipeContext>): Unit = with(message.body()) {
// The this pointer is now the PipeContext object
}
companion object {
const val ADDRESS: String = "io.piveau.pipe.myservice.queue"
}
}
public class MyPipeVerticle : AbstractVerticle() {
public static final String ADDRESS = "io.piveau.pipe.myservice.queue";
@Override
public void start(Promise<Void> startPromise) {
vertx.eventBus().consumer(ADDRESS, this::handlePipe);
PipeConnector.create(vertx, ar -> {
if (ar.succeded()) {
ar.result().publishTo(ADDRESS);
} else {
starPromise.fail(ar.cause());
}
});
}
private void handlePipe(Message<PipeContext> message) {
}
}
Tip
You can also separate the connector creation from the pipe receiving verticle. E.g. the createPipeConnector()
method can be called in a main verticle that on success starts your pipe handling verticle and if that is succesful
installs the publishTo()
address. So you have better control of configuring your pipe handling verticle worker
pool and instances.
Endpoint#
Configuration#
Pipe Context#
Within your handler, the PipeContext
object is the man interface to the pipe related functionality.
Access the actual data:
Access the configuration for your segment:
Set and forward any processed data to the next segment: