ballerina/nats module

Module overview

'ballerina/nats' provides the capability to connect with a NATS server and perform the following

  • Point to point communication (Queues)
  • Pub/Sub (Topics)
  • Request/Reply

Samples

NATS Producer

Following program will produce a message to a NATS server

import ballerina/nats;
import ballerina/io;

public function main() {
    nats:Producer producer = new({ host: "localhost", port: 4222, clientId: "p0" });
    var result = producer->send("demo", "Hello Ballerina !!");
    if (result is error) {
       io:println("Error occurred while producing the message.");
    } else {
       io:println("GUID "+result+" received for the produced message.");
    }
}

NATS Subscriber

Following program will consume a message from a NATS server

import ballerina/nats;
import ballerina/io;

listener nats:Listener subscription = new({ host: "localhost", port: 4222, clientId: "s0" });

@nats:ConsumerConfig { subject: "demo" }
service demo on subscription {
    resource function onMessage(nats:Message msg) {
        io:println("Recived message : " + msg.getData());
    }
}

Module Detail

Records

Record Description
ConnectionConfig Represents the list of paramters required to establish a connection.
ConsumerConfigData Represents the list of paramters required to create a subscription.
IOError Represents an I/O error.

Objects

Object Description
Connection

Represents a NATS connection.

Listener

Represents a connection which will be used for subscription.

Message

Represents a message which will be pushed from the NATS server to the consumer.

Endpoints

Endpoint Description
Producer

NATS producer would act as a streaming client allowing to stream messages between NATS streaming server. Producer would create a new NATS connection if a connection was not provided during the initialization.

Annotations

Name Attaches To Data Type Description
ConsumerConfig service ConsumerConfigData

Service descriptor data generated at compile time. This is for internal use.

public type ConnectionConfig

Represents the list of paramters required to establish a connection.

Field Name Data Type Default Value Description
host string

Remote server host name/IP.

port int

Remote server port.

clusterId string test-cluster

Name of the NATS server cluster.

clientId string ballerina_client

Unique identifier for the client.

connectionTimeout int 30

Number of seconds to hold an idle connection.

maxPubAcksInFlight int 100

Number of messages which could be dispatched without receiving acks.

ackTimeout int 30

Time (in seconds) to wait for an acknowledment before retrying.

public type ConsumerConfigData

Represents the list of paramters required to create a subscription.

Field Name Data Type Default Value Description
subject string

Name of the subject.

manualAck boolean false

True if the acknowledgments will be done manually.

ackWait int

Amount of time (in seconds) the client should wait before retrying.

startSeq int

Sequence id of the message which should be consumed.

durableName string

Unique name to identify the durable subscription.

public type IOError

Represents an I/O error.

Field Name Data Type Default Value Description
message string

error message.

id int

error id.

public type Connection object

Represents a NATS connection.

  • <Connection> __init(nats:ConnectionConfig config)

    Initializes a connection from the given server config.

    Parameter Name Data Type Default Value Description
    config nats:ConnectionConfig

    Information neccasary for NATS client to establish a connection with the server.

  • <Connection> close() returns (error?<>)

    Close a given connection.

    Return Type Description
    error?<>

    () or error if unable to complete close operation.

public type Listener object

Represents a connection which will be used for subscription.

  • <Listener> __init(nats:ConnectionConfig|nats:Connection c)

    Creates a new consumer. A new connection will be created if a refernece to a connection is not provided.

    Parameter Name Data Type Default Value Description
    c nats:ConnectionConfig|nats:Connection

    An already established connection or configuration to create a new connection.

  • <Listener> __attach(service serviceType, string? name) returns (error?<>)

    Binds the NATS consumer to a service.

    Parameter Name Data Type Default Value Description
    serviceType service

    Type descriptor of the service.

    name string? ()

    Name of the service.

    Return Type Description
    error?<>

    Nil or error upon failure to register listener.

  • <Listener> __start() returns (error?<>)

    Starts the listener in the lifecyle.

    Return Type Description
    error?<>

    Error or ().

  • <Listener> __stop() returns (error?<>)

    Stops the listener in the lifecyle.

    Return Type Description
    error?<>

    error or ().

public type Message object

Represents a message which will be pushed from the NATS server to the consumer.

  • <Message> getData() returns (string)

    Get message content.

    Return Type Description
    string

    message content as a 'string' liternal.

  • <Message> ack() returns (error?<>)

    Acknowleges to a given message.

    Return Type Description
    error?<>

    an error if the acknowledgment fails.

Endpoint Producer

NATS producer would act as a streaming client allowing to stream messages between NATS streaming server. Producer would create a new NATS connection if a connection was not provided during the initialization.

  • <Producer> send(string subject, string|json message, string charset) returns (string|error<>)

    Produces a message to a NATS streaming server for the given subject.

    Parameter Name Data Type Default Value Description
    subject string

    Could also be referred as the 'topic/queue' name.

    message string|json

    Message could be either a string or json representation.

    charset string UTF-8

    Encoding of the message (by defaults it would be UTF-8).

    Return Type Description
    string|error<>

    GUID of acknowledgment or the specific error.

  • <Producer> requestReply(string subject, string|json message, string charset) returns (Message|error<>)

    Produces a message and would wait for a response.

    Parameter Name Data Type Default Value Description
    subject string

    Would represent the topic/queue name.

    message string|json

    Message could be either a string or json representation.

    charset string UTF-8

    Encoding of the message by default it would be UTF-8.

    Return Type Description
    Message|error<>

    Response message or an error.