ballerina/rabbitmq module

Module overview

'ballerina/rabbitmq' provides the capability to connect with a RabbitMQ server and perform the following

  • Point to point communication (Queues)
  • Pub/Sub (Topics)

Samples

RabbitMQ Producer

Following program will produce a message to a RabbitMQ server

import ballerina/rabbitmq;
import ballerina/log;

public function main() {
     rabbitmq:Channel chann = new({ host: "localhost", port: 5672 });
     var sendResult = chann->basicPublish("Hello from ballerina", "testingDemo", exchange = "");
     if (sendResult is error) {
          log:printError("An error occurred while sending the message");
     } else {
          log:printInfo("The message was sent successfully");
     }
}

RabbitMQ Subscriber

Following program will consume a message from a RabbitMQ server

import ballerina/rabbitmq;
import ballerina/log;

listener rabbitmq:ChannelListener chann = new({ host: "localhost", port: 5672 });

@rabbitmq:ServiceConfig {
        queueName: "testingDemo"
}
service testSimpleConsumer on chann {
    resource function onMessage(string message) {
            log:printInfo("The message received: " + message);
    }
}

Module Detail

Records

Record Description
ConnectionConfiguration Holds the parameters used to create a RabbitMQ `Connection`.
ExchangeConfiguration Holds the parameters used to declare an exchange.
QueueConfiguration Holds the parameters used to declare a queue.
RabbitMQServiceConfig Represents the list of parameters required to create a subscription.

Objects

Object Description
ChannelListener

Public Ballerina API - Ballerina RabbitMQ Message Listener. To provide a listener to consume messages from RabbitMQ.

Connection

Public Ballerina API - Interface to an AMQ Connection.

Endpoints

Endpoint Description
Channel

Public Ballerina API - Ballerina interface to an AMQP Channel. To provide AMQ Channel related functionalities.

Type Definitions

Type Values Description
ExchangeType topic | headers | fanout | direct

Types of exchanges supported by the Ballerina RabbitMQ Connector.

Annotations

Name Attaches To Data Type Description
ServiceConfig service RabbitMQServiceConfig

Service descriptor data generated at compile time.

public type ConnectionConfiguration

Holds the parameters used to create a RabbitMQ `Connection`.

Field Name Data Type Default Value Description
host string

The host used for establishing the connection.

port int 5672

The port used for establishing the connection.

username string? ()

The username used for establishing the connection.

password string? ()

The password used for establishing the connection.

connectionTimeout int? ()

Connection TCP establishment timeout in milliseconds; zero for infinite.

handshakeTimeout int? ()

The AMQP 0-9-1 protocol handshake timeout, in milliseconds.

shutdownTimeout int? ()

Shutdown timeout in milliseconds; zero for infinite; default 10000. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks) will be lost.

heartbeat int? ()

The initially requested heartbeat timeout, in seconds; zero for none.

public type ExchangeConfiguration

Holds the parameters used to declare an exchange.

Field Name Data Type Default Value Description
exchangeName string

The name of the exchange.

exchangeType direct|fanout|topic|headers DIRECT_EXCHANGE

The type of the exchange.

durable boolean false

True if declaring a durable exchange (the exchange will survive a server restart).

public type QueueConfiguration

Holds the parameters used to declare a queue.

Field Name Data Type Default Value Description
queueName string

The name of the queue, if not specified then autogenerated.

durable boolean false

True if declaring a durable queue (the queue will survive a server restart).

exclusive boolean false

True if we are declaring an exclusive queue (restricted to this connection).

autoDelete boolean true

True if we are declaring an autodelete queue (server will delete it when no longer in use).

public type RabbitMQServiceConfig

Represents the list of parameters required to create a subscription.

Field Name Data Type Default Value Description
queueConfig rabbitmq:QueueConfiguration

Specifies configuration details about the queue to be subscribed to.

public type ChannelListener object

Public Ballerina API - Ballerina RabbitMQ Message Listener. To provide a listener to consume messages from RabbitMQ.

  • <ChannelListener> __init(rabbitmq:ConnectionConfiguration|rabbitmq:Connection connectionOrConnectionConfig)

    Initializes a Ballerina ChannelListener object with the given Connection object or connection parameters. Creates a Connection object if only the connection configuration is given.

    Parameter Name Data Type Default Value Description
    connectionOrConnectionConfig rabbitmq:ConnectionConfiguration|rabbitmq:Connection

    Holds a Ballerina RabbitMQ Connection object or the connection parameters.

  • <ChannelListener> __start() returns (error?<>)

    Starts the endpoint. Function is ignored by the ChannelListener.

    Return Type Description
    error?<>

    Nil or error upon failure to start.

  • <ChannelListener> __stop() returns (error?<>)

    Stops consuming messages through ChannelListener endpoint.

    Return Type Description
    error?<>

    Nil or error upon failure to close ChannelListener.

  • <ChannelListener> __attach(service serviceType, string? name) returns (error?<>)

    Binds the ChannelListener to a service.

    Parameter Name Data Type Default Value Description
    serviceType service

    Type descriptor of the service to bind to.

    name string? ()

    Name of the service.

    Return Type Description
    error?<>

    () or error upon failure to register listener.

public type Connection object

Public Ballerina API - Interface to an AMQ Connection.

  • <Connection> __init(rabbitmq:ConnectionConfiguration connectionConfiguration)

    Initializes a Ballerina RabbitMQ Connection object.

    Parameter Name Data Type Default Value Description
    connectionConfiguration rabbitmq:ConnectionConfiguration

    Holds connection parameters required to initialize the Connection.

  • <Connection> close(int? timeout) returns (error?<>)

    Closes the RabbitMQ Connection and all it's Channels. It waits with a provided timeout for all the close operations to complete. When timeout is reached the socket is forced to close.

    Parameter Name Data Type Default Value Description
    timeout int? ()

    Timeout (in milliseconds) for completing all the close-related operations, use -1 for infinity.

    Return Type Description
    error?<>

    An error if an I/O problem is encountered.

  • <Connection> isClosed() returns (boolean)

    Checks whether close was already called.

    Return Type Description
    boolean

    The value true if the Connection is already closed and false otherwise.

Endpoint Channel

Public Ballerina API - Ballerina interface to an AMQP Channel. To provide AMQ Channel related functionalities.

  • <Channel> queueDeclare(rabbitmq:QueueConfiguration? queueConfig) returns (string|error?<>)

    Actively declare a server-named exclusive, autodelete, non-durable queue or queue with the given configurations.

    Parameter Name Data Type Default Value Description
    queueConfig rabbitmq:QueueConfiguration? ()

    Holds the paramters required to declare a queue.

    Return Type Description
    string|error?<>

    Returns the name of the queue if autogenerated or nil if the queue was successfully generated with the given parameters. An error is returned if an I/O error is encountered.

  • <Channel> exchangeDeclare(rabbitmq:ExchangeConfiguration config) returns (error?<>)

    Actively declare a non-autodelete, non-durable exchange with no extra arguments, If the arguments are specifed, then the exchange is declared accordingly.

    Parameter Name Data Type Default Value Description
    config rabbitmq:ExchangeConfiguration

    Holds parameters required to declare an exchange.

    Return Type Description
    error?<>

    Returns an error if an I/O error is encountered or nil if successful.

  • <Channel> queueBind(string queueName, string exchangeName, string bindingKey) returns (error?<>)

    Binds a queue to an exchange with the given binding key.

    Parameter Name Data Type Default Value Description
    queueName string

    Name of the queue.

    exchangeName string

    Name of the exchange.

    bindingKey string

    Binding key used to bind the queue to the exchange.

    Return Type Description
    error?<>

    Returns an error if an I/O error is encountered or nil if successful.

  • <Channel> basicPublish(string message, string routingKey, string exchange) returns (error?<>)

    Publishes a message. Publishing to a non-existent exchange will result in a channel-level protocol exception, which closes the channel.

    Parameter Name Data Type Default Value Description
    message string

    The message body.

    routingKey string

    The routing key.

    exchange string

    The name of the exchange the message is published to.

    Return Type Description
    error?<>

    Returns an error if an I/O error is encountered or nil if successful.

  • <Channel> queueDelete(string queueName) returns (error?<>)

    Deletes a queue, without regard for whether it is in use or has messages on it, If the paramters ifUnused or ifEmpty is given, the queue is checked before deleting.

    Parameter Name Data Type Default Value Description
    queueName string

    Name of the queue to be deleted.

    Return Type Description
    error?<>

    Returns error if an I/O error is encountered or nil if successful.

  • <Channel> exchangeDelete(string exchange) returns (error?<>)

    Deletes an exchange.

    Parameter Name Data Type Default Value Description
    exchange string

    The name of the exchange.

    Return Type Description
    error?<>

    An I/O error if an error is encountered or nil otherwise.

  • <Channel> queuePurge(string queueName) returns (error?<>)

    Purges the contents of the given queue.

    Parameter Name Data Type Default Value Description
    queueName string

    The name of the queue.

    Return Type Description
    error?<>

    An error if an I/O error is encountered or nil if successful.