ballerina/streams module

Module Detail

Records

Record Description
SnapshottableStreamEvent This record represents a stream event which can be persisted.

Objects

Object Description
Aggregator

Abstract object, which should be implemented in order to create a new aggregator.

Average

Aggregator to calculate average in streams.

Count

Aggregator to count events in streams.

DelayWindow

This window will delay the incoming events for a given amount of time. E.g. from inputStream window delay(4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The delay window should only have one parameter ( delayTime)

DistinctCount

Aggregator to get the distinct counts of values in streams.

ExternalTimeBatchWindow

This is a batch (tumbling) time window based on external time, that holds events arrived during window time periods, and gets updated for every window time. E.g. from inputStream window externalTimeBatch(inputStream.timestamp, 1000, 500, 1200, true) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The externalTimeBatch window should only have two to five parameters (timestamp field, windowTime, startTime, timeout, replaceTimestampWithBatchEndTime)

ExternalTimeWindow

This is a sliding time window based on external time, that holds events for that arrived during last window time period from the external timestamp, and gets updated on every monotonically increasing timestamp. E.g. from inputStream window externalTime(inputStream.timestamp, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The externalTime window should only have two parameters (timestamp field, windowTime)

Filter

The Filter object represents the where clause in a streaming query. This object takes two parameter for initialization. nextProcessorPointer is the function pointer of the next processor to be invoked once the filtering is complete. conditionFunc is a function pointer which return true if the given where clause evaluates to true.

HoppingWindow

The hopping window releases the events in batches defined by a time period every given time interval. The batch is also determined by the time period given in the window. When the time interval the events being released and the time period it hold the events are equal, the hopping window acts as a TimeBatch window. E.g. from inputStream window hopping(5000, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } Hopping window should only have two parameters ( windowTime, hoppingTime)

IntSort

This class implements a merge sort algorithm to sort timestamp values for state persistence.

LengthBatchWindow

This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every given number of events arrival. E.g. from inputStream window lengthBatch(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The lengthBatch window should only have one parameter ( windowBatchLength)

LengthWindow

The LengthWindow is a sliding length window, that holds last windowLength events, and gets updated on every event arrival and expiry. E.g. from inputStream window length(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The length window should only have one parameter ( windowLength)

LinkedList

The LinkedList object which represents the linked list data structure.

Max

Aggregator to find the maximum value in a stream.

MaxForever

The aggregator to keep the maximum value received so far. It is similar to Max aggregator, but it keeps the maximum value it received so far, forever.

MergeSort

This object implements the merge sort algorithm to sort the provided value arrays. fieldFuncs are function pointers which returns the field values of each stream event's data map's values. sortTypes are an array of ( streams:ASCENDING or streams:DESCENDING).

Min

Aggregator to find the minimum value in a stream.

MinForever

The aggregator to keep the minimum value received so far. It is similar to Min aggregator, but it keeps the minimum value it received so far, forever.

Node

The Node object represents a node in the linkedlist data structure.

OrderBy

The OrderBy object represents the desugared code of order by clause of a streaming query. This object takes 3 parameters to initialize itself. nextProcessPointer is the process method of the next processor. fieldFuncs is an array of function pointers which returns the field values to be sorted. sortTypes is an array of string specifying whether the sort order (ascending or descending). Internally this processor uses a MergeSort object to sort.

OutputProcess

The OutputProcess object is responsible for sending the output (only the events of type streams:CURRENT to the destination stream. It takes a function pointer outputFunc which actually has the logic to process the output.

Scheduler

The Scheduler object is responsible for generating streams:TIMER events at the given timestamp. Once the event is generated, the timer event is passed to the provided processFunc function pointer. The function pointer is the process function of the target processor, to which the timer event should be sent.

Select

The Select object represents the select clause. Anything that comes under select clause like aggregator function invocations are also handled in this processor. Further, grouping of the events (provided by the groupby clause) is also performed in this processor. aggregatorArr is an array of aggregators which are used in the select clause. groupbyFuncArray is an array of function pointers which returns the values being grouped. selectFunc is a function pointer to a lambda function which creates the data field of the output stream event. scopeName is used as a breadcrumb to identify the select clause if there are multiple forever blocks.

Snapshotable

Abstract Snapshotable to be referenced by all snapshotable objects.

SortWindow

The sort window hold a given number of events and emit the expired events in the ordered by the given fields. E.g. from inputStream window sort(10, inputStream.age, "ascending", inputStream.name, "descending") select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The sort window should have three or more odd no of parameters ( windowLength, stream field, order1, stream field, order2, ...)

StdDev

The aggregator object to calculate standard deviation.

StreamEvent

The StreamEvent object is a wrapper around the actual data being received to the input stream. If a record is receive to a input stream, that record is converted to a map of anydata values and set that map to a field called data in a new StreamEvent object. StreamEvent is only used internally to transmit event data from one processor to another processor. At the time the record is converted to a map, the timestamp is set. If the record is first received by the input stream, the eventType is set to streams:CURRENT. Other than stream events of type streams:CURRENT, there are 3 types of StreamEvents. They are streams:EXPIRED, streams:RESET, streams:TIMER. An expired event is used to remove the state of its respective current event. A reset event is used to completely wipe the state and a timer event is used to trigger the process method of a particular processor in timely manner.

StreamJoinProcessor

The StreamJoinProcessor object is responsible for performing SQLish joins between two or more streams. The onConditionFunc is the lambda function which represents the where clause in the join clause. The joining happens only if the condition is statified. nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc. The lhsStream is the left hand side stream of the join and its attached window is 'lhsWindow. The rhsStream is the right hand side stream of the join and its attached window is 'rhsWindow. The unidirectionalStream stream defines the stream by which the joining is triggered when the events are received. Usually it is lhsStream, in rare cases it can be rhsStream. The joinType is the type of the join and it can be any value defined by streams:JoinType.

Sum

Aggregator to perform summation of values in a stream.

TableJoinProcessor

The TableJoinProcessor object handles joining streams with in-memory tables in ballerina. nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc. The streamName is the stream of the join and its attached window is 'windowInstance. The tableName is the name of the table with which the stream is joined. The joinType is the type of the join and it can be any value defined by streams:JoinType.

TimeAccumulatingWindow

The TimeAccumulatingWindow holds the events but if the events are not received for a specific time period, the collected events are released, at the point the time exceeds the given time period from the time when the last event is received. E.g. from inputStream window timeAccum(4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } Time accumulating window should only have one parameter ( timePeriod)

TimeBatchWindow

This is a batch (tumbling) time window, that holds events arrived between window time periods, and gets updated for every window time. E.g. from inputStream window timeBatch(5000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The timeBatch window should only have one parameter ( windowBatchTime)

TimeLengthWindow

This is a sliding time window that, at a given time holds the last windowLength events that arrived during last windowTime period, and gets updated for every event arrival and expiry. E.g. from inputStream window timeLength(4000, 10) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The timeLength window should only have two parameters ( windowTime, windowLength)

TimeOrderWindow

The TimeOrderWindow sorts the events to be expired by the given timestamp field by comparing that timestamp value to engine system time. E.g. from inputStream window timeOrder(inputStream.timestamp, 4000, true) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } timeOrder window should only have three parameters (timestamp field, windowTime, dropOlderEvents)

TimeWindow

The TimeWindow is a sliding time window, that holds events for that arrived during last windowTime period, and gets updated on every event arrival and expiry. E.g. from inputStream window time(5000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The time window should only have one parameter ( windowTime)

UniqueLengthWindow

This is a length window which only keeps the unique events. E.g. from inputStream window uniqueLength(inputStream.timestamp, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The uniqueLength window should only have two parameters (stream field, windowLength)

Window

The Window abstract objects is the base object for implementing windows in Ballerina streams. The process function contains the logic of processing events when events are received. getCandidateEvents function is used inside the Select object to return the events in the window to perform joining. The window names in the window objects cannot be used in the queries. Always a function which returns the specific window has to be used in streaing query. E.g. If LengthWindow has to be used in a streaming query, the function streams:length has to be used for streaming query without the module identifier streams. An example is shown below.

from inputStream window length(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } }

Functions

Function Description
avg

Returns a Average aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

buildStreamEvent

Creates streams:StreamEvent object array for a record t received by the stream denoted by the name streamNme.

count

Returns a Count aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

createFilter

Creates a Filter object and return it.

createOrderBy

Creates an OrderBy object and return it.

createOutputProcess

Creates and return a OutputProcess object.

createResetStreamEvent

Creates a RESET event from a given event.

createSelect

Creates and returns a select clause.

createStreamJoinProcessor

Creates a StreamJoinProcessor and returns it.

createTableJoinProcessor

Creates a TableJoinProcessor and return it.

delay

The delay function creates a DelayWindow object and returns it.

distinctCount

Returns a DistinctCount aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

externalTime

The externalTime function creates a ExternalTimeWindow object and returns it.

externalTimeBatch

The externalTimeBatch function creates a ExternalTimeBatchWindow object and returns it.

getStreamEvent

Get the stream event from any? field. This function can only be used only if we are sure that the anyEvent is a streams:StreamEvent.

hopping

The hopping function creates a HoppingWindow object and returns it.

initPersistence

Function to initialize and start snapshotting.

length

The length function creates a LengthWindow object and returns it.

lengthBatch

The lengthBatch function creates a LengthBatchWindow object and returns it.

max

Returns a Max aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

maxForever

Returns a MaxForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

min

Returns a Min aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

minForever

Returns a MinForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

registerSnapshotable

Function to register Snapshotables.

removeState

Function to clear an existing state.

restoreState

Function to restore state of a given object.

sort

The sort function creates a SortWindow object and returns it.

stdDev

Returns a StdDev aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

sum

Returns a Sum aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

time

The time function creates a TimeWindow object and returns it.

timeAccum

The timeAccum function creates a TimeAccumulatingWindow object and returns it.

timeBatch

The timeBatch function creates a TimeBatchWindow object and returns it.

timeLength

The timeLength function creates a TimeLengthWindow object and returns it.

timeOrder

The timeOrder function creates a TimeOrderWindow object and returns it.

toSnapshottableEvent

Convert a single streams:StreamEvent object to streams:SnapshottableStreamEvent object.

toSnapshottableEvents

Converts a given array of streams:StreamEvent objects to an array of streams:SnapshottableStreamEvent.

toStreamEvent

Convert a single streams:SnapshottableStreamEvent object to streams:StreamEvent object.

toStreamEvents

Converts a given array of snapshotable events to an array of streams:StreamEvent objects.

uniqueLength

The uniqueLength function creates a UniqueLengthWindow object and returns it.

Type Definitions

Type Values Description
EventType TIMER | RESET | EXPIRED | CURRENT | ALL

The type of stream events.

JoinType RIGHTOUTERJOIN | LEFTOUTERJOIN | JOIN | FULLOUTERJOIN

The types of joins between streams and tables.

public type SnapshottableStreamEvent

This record represents a stream event which can be persisted.

Field Name Data Type Default Value Description
eventType CURRENT|EXPIRED|ALL|RESET|TIMER CURRENT

description

timestamp int 0

description

data map<anydata> {}

description

public function avg() returns (Aggregator)

Returns a Average aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator object which performs averaging.

public function buildStreamEvent(any t, string streamName) returns (StreamEvent[])

Creates streams:StreamEvent object array for a record t received by the stream denoted by the name streamNme.

Parameter Name Data Type Default Value Description
t any

A record received by the stream streamName.

streamName string

Name of the stream to which the record t is received.

Return Type Description
StreamEvent[]

Returns an array of streams:StreamEvents|()

public function count() returns (Aggregator)

Returns a Count aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator object which performs counting.

public function createFilter(function (streams:StreamEvent?[]) returns () nextProcPointer, function (map<anydata>) returns (boolean) conditionFunc) returns (Filter)

Creates a Filter object and return it.

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent?[]) returns ()

The function pointer to the process function of the next processor.

conditionFunc function (map) returns (boolean)

The function pointer to the condition evaluator. This is a function which returns true or false based on the boolean expression given in the where clause.

Return Type Description
Filter

Returns a Filter object.

public function createOrderBy(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fields, string[] sortFieldMetadata) returns (OrderBy)

Creates an OrderBy object and return it.

Parameter Name Data Type Default Value Description
nextProcessorPointer function (streams:StreamEvent?[]) returns ()

A function pointer to the process function of the next processor.

fields function (map) returns (anydata)?[]

An array of function pointers which each returns a field by which the events are sorted. Events are sorted by the first field, if there are elements of same value, the second field is used and so on.

sortFieldMetadata string[]

sortTypes of the fields (streams:ASCENDING or streams:DESCENDING). First element is the sort type of the first element of fields and so on.

Return Type Description
OrderBy

Returns a OrderBy object.

public function createOutputProcess(function (map<anydata>[]) returns () outputFunc) returns (OutputProcess)

Creates and return a OutputProcess object.

Parameter Name Data Type Default Value Description
outputFunc function (map[]) returns ()

The function pointer to a lambda function created out of the statements in the streaming action

Return Type Description
OutputProcess

Returns a OutputProcess object.

public function createResetStreamEvent(streams:StreamEvent event) returns (StreamEvent)

Creates a RESET event from a given event.

Parameter Name Data Type Default Value Description
event streams:StreamEvent

The event from which the reset event is created.

Return Type Description
StreamEvent

A stream event of type streams:RESET.

public function createSelect(function (streams:StreamEvent?[]) returns () nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (anydata)?[]? groupbyFuncArray, function (streams:StreamEvent,streams:Aggregator[]) returns (map<anydata>) selectFunc, string scopeName) returns (Select)

Creates and returns a select clause.

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent?[]) returns ()

The function pointer to the process function of the next processor.

aggregatorArr streams:Aggregator[]

The array of aggregators used in the select clause. If the same aggregator is used twice, the aggregatorArr will contains that specific aggregator twice in the order they appear in the select clause.

groupbyFuncArray function (streams:StreamEvent) returns (anydata)?[]?

The array of function pointer which contains the lambda function which returns the expressions in the group by clause.

selectFunc function (streams:StreamEvent,streams:Aggregator[]) returns (map)

The function pointer to a lambda function that creates the data field of the output stream event.

scopeName string $scope$name

A unique id to identify the forever block if there are multiple forever blocks.

Return Type Description
Select

Returns a Select object.

public function createStreamJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc) returns (StreamJoinProcessor)

Creates a StreamJoinProcessor and returns it.

Parameter Name Data Type Default Value Description
nextProcessor function (streams:StreamEvent?[]) returns ()

The process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc.

joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN

Type of the join being performed ("JOIN"|"LEFTOUTERJOIN"|"RIGHTOUTERJOIN"|"FULLOUTERJOIN")

conditionFunc function (map,map) returns (boolean)? ()

A lambda function which contains the joining condition and return true if the condition satifies the condition.

Return Type Description
StreamJoinProcessor

Returns a StreamJoinProcessor object.

public function createTableJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery) returns (TableJoinProcessor)

Creates a TableJoinProcessor and return it.

Parameter Name Data Type Default Value Description
nextProcessor function (streams:StreamEvent?[]) returns ()

The function pointer to process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc

joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN

The type of the join and it can be any value defined by streams:JoinType.

tableQuery function (streams:StreamEvent) returns (map[])

The function pointer to a function which retrieves the records from the table and joins w ith each stream event.

Return Type Description
TableJoinProcessor

Returns a TableJoinProcessor object.

public function delay(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The delay function creates a DelayWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function distinctCount() returns (Aggregator)

Returns a DistinctCount aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator object which represents DistinctCount.

public function externalTime(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The externalTime function creates a ExternalTimeWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function externalTimeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The externalTimeBatch function creates a ExternalTimeBatchWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function getStreamEvent(any anyEvent) returns (StreamEvent)

Get the stream event from any? field. This function can only be used only if we are sure that the anyEvent is a streams:StreamEvent.

Parameter Name Data Type Default Value Description
anyEvent any

The object from which, the stream event is extracted.

Return Type Description
StreamEvent

Returns the extracted streams:StreamEvent object.

public function hopping(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The hopping function creates a HoppingWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function initPersistence()

Function to initialize and start snapshotting.

public function length(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The length function creates a LengthWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function lengthBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The lengthBatch function creates a LengthBatchWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function max() returns (Aggregator)

Returns a Max aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents Max.

public function maxForever() returns (Aggregator)

Returns a MaxForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents MaxForever.

public function min() returns (Aggregator)

Returns a Min aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents Min.

public function minForever() returns (Aggregator)

Returns a MinForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents MainForever.

public function registerSnapshotable(string key, any reference)

Function to register Snapshotables.

Parameter Name Data Type Default Value Description
key string

An unique string identifier for the snapshotable reference.

reference any

The snapshotable reference to be registered.

public function removeState(string key) returns (boolean)

Function to clear an existing state.

Parameter Name Data Type Default Value Description
key string

An unique string identifier for the snapshotable reference.

Return Type Description
boolean

A boolean indicating whether the state for the given key removed successfully.

public function restoreState(string key, any reference)

Function to restore state of a given object.

Parameter Name Data Type Default Value Description
key string

An unique string identifier for the snapshotable reference.

reference any

The snapshotable reference to be restored.

public function sort(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The sort function creates a SortWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function stdDev() returns (Aggregator)

Returns a StdDev aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents StdDev.

public function sum() returns (Aggregator)

Returns a Sum aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which perform addition/summation.

public function time(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The time function creates a TimeWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function timeAccum(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The timeAccum function creates a TimeAccumulatingWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function timeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The timeBatch function creates a TimeBatchWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function timeLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The timeLength function creates a TimeLengthWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function timeOrder(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The timeOrder function creates a TimeOrderWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function toSnapshottableEvent(streams:StreamEvent event) returns (SnapshottableStreamEvent)

Convert a single streams:StreamEvent object to streams:SnapshottableStreamEvent object.

Parameter Name Data Type Default Value Description
event streams:StreamEvent

The streams:StreamEvent object to be converted to snapshotable event.

Return Type Description
SnapshottableStreamEvent

The converted streams:SnapshottableStreamEvent object.

public function toSnapshottableEvents(streams:StreamEvent[]|any[]? events) returns (SnapshottableStreamEvent[])

Converts a given array of streams:StreamEvent objects to an array of streams:SnapshottableStreamEvent.

Parameter Name Data Type Default Value Description
events streams:StreamEvent[]|any[]?

The events to be coverted to snapshotable events.

Return Type Description
SnapshottableStreamEvent[]

Returns the converted snapshotable events.

public function toStreamEvent(streams:SnapshottableStreamEvent event) returns (StreamEvent)

Convert a single streams:SnapshottableStreamEvent object to streams:StreamEvent object.

Parameter Name Data Type Default Value Description
event streams:SnapshottableStreamEvent

The streams:SnapshottableStreamEvent object to be converted to a stream event.

Return Type Description
StreamEvent

The converted streams:StreamEvent object.

public function toStreamEvents(streams:SnapshottableStreamEvent[]|any[]? events) returns (StreamEvent[])

Converts a given array of snapshotable events to an array of streams:StreamEvent objects.

Parameter Name Data Type Default Value Description
events streams:SnapshottableStreamEvent[]|any[]?

Snapshotable events to be converted to streams:StreamEvents.

Return Type Description
StreamEvent[]

Returns the converted streams:StreamEvents objects array.

public function uniqueLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The uniqueLength function creates a UniqueLengthWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]

Arguments which should be passed with the window function in the streams query in the order they appear in the argument list.

nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public type Aggregator object

Abstract object, which should be implemented in order to create a new aggregator.

  • <Aggregator> copy() returns (Aggregator)

    Returns a copy of self, but it does not contain the current state.

    Return Type Description
    Aggregator

    A Aggregator object.

  • <Aggregator> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the aggregated value and returns the final aggregated value.

    Parameter Name Data Type Default Value Description
    value anydata

    value being aggregated.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET. Based on the type of the event value will be added to the aggregation or removed from the aggregation.

    Return Type Description
    anydata

    Updated aggregated value after value being aggregated.

public type Average object

Aggregator to calculate average in streams.

Field Name Data Type Default Value Description
count int 0

description

sum float 0.0

description

  • <Average> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Returns the calculated average after value being aggregated into current average. If the eventType is streams:CURRENT,value is added to the current sum and count is increase by 1. If the eventType is streams:EXPIRED, value is subtracted from the current sum and count is descreased by 1. If the eventType is streams:RESET, Current summation and count is reset, regardless the value. Then by dividing the sum by count, the average is calculated.

    Parameter Name Data Type Default Value Description
    value anydata

    The numeric value being aggregated in order to calculate the new average.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated average value after value being aggregated.

  • <Average> copy() returns (Aggregator)

    Returns a copy of the Average aggregator without its current state.

    Return Type Description
    Aggregator

    Returns Average aggregator.

  • <Average> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <Average> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type Count object

Aggregator to count events in streams.

Field Name Data Type Default Value Description
count int 0

description

  • <Count> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current count when a new event arrives and return the updated count. If the eventType is streams:CURRENT, count is increase by 1. If the eventType is streams:EXPIRED, count is descreased by 1. If the eventTypeis streams:RESET, count is reset, regardless the value.

    Parameter Name Data Type Default Value Description
    value anydata

    In count aggregator the value is not used.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated count.

  • <Count> copy() returns (Aggregator)

    Returns a copy of the Count aggregator without its current state.

    Return Type Description
    Aggregator

    Returns Count aggregator.

  • <Count> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <Count> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type DelayWindow object

This window will delay the incoming events for a given amount of time. E.g. from inputStream window delay(4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The delay window should only have one parameter ( delayTime)

Field Name Data Type Default Value Description
delayInMilliSeconds int

description

windowParameters any[]

description

delayedEventQueue streams:LinkedList

description

lastTimestamp int 0

description

nextProcessPointer function (streams:StreamEvent?[]) returns ()?

description

scheduler streams:Scheduler

description

  • <DelayWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <DelayWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <DelayWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <DelayWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <DelayWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <DelayWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type DistinctCount object

Aggregator to get the distinct counts of values in streams.

Field Name Data Type Default Value Description
distinctValues map<int> {}

description

  • <DistinctCount> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current distinct count when a new event arrives and return the updated count. If the eventType is streams:CURRENT, count is increased by 1. If the eventType is streams:EXPIRED, count is descreased by 1. If the eventTypeis streams:RESET, count is reset, regardless of the value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being counted uniquely.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated distinct count.

  • <DistinctCount> copy() returns (Aggregator)

    Returns a copy of the DistinctCount aggregator without its current state.

    Return Type Description
    Aggregator

    Returns DistinctCount aggregator.

  • <DistinctCount> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <DistinctCount> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type ExternalTimeBatchWindow object

This is a batch (tumbling) time window based on external time, that holds events arrived during window time periods, and gets updated for every window time. E.g. from inputStream window externalTimeBatch(inputStream.timestamp, 1000, 500, 1200, true) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The externalTimeBatch window should only have two to five parameters (timestamp field, windowTime, startTime, timeout, replaceTimestampWithBatchEndTime)

Field Name Data Type Default Value Description
timeToKeep int

description

currentEventChunk streams:LinkedList

description

expiredEventChunk streams:LinkedList

description

resetEvent streams:StreamEvent? ()

description

startTime int -1

description

isStartTimeEnabled boolean false

description

replaceTimestampWithBatchEndTime boolean false

description

flushed boolean false

description

endTime int -1

description

schedulerTimeout int 0

description

lastScheduledTime int

description

lastCurrentEventTime int 0

description

nextProcessPointer function (streams:StreamEvent?[]) returns ()?

description

timeStamp string

description

storeExpiredEvents boolean false

description

outputExpectsExpiredEvents boolean false

description

windowParameters any[]

description

scheduler streams:Scheduler

description

  • <ExternalTimeBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <ExternalTimeBatchWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <ExternalTimeBatchWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <ExternalTimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <ExternalTimeBatchWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <ExternalTimeBatchWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

  • <ExternalTimeBatchWindow> cloneAppend(streams:StreamEvent currStreamEvent)

    Parameter Name Data Type Default Value Description
    currStreamEvent streams:StreamEvent
  • <ExternalTimeBatchWindow> flushToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)

    Parameter Name Data Type Default Value Description
    complexEventChunks streams:LinkedList
    currentTime int
    preserveCurrentEvents boolean
  • <ExternalTimeBatchWindow> appendToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)

    Parameter Name Data Type Default Value Description
    complexEventChunks streams:LinkedList
    currentTime int
    preserveCurrentEvents boolean
  • <ExternalTimeBatchWindow> findEndTime(int currentTime, int startTime_, int timeToKeep_) returns (int)

    Parameter Name Data Type Default Value Description
    currentTime int
    startTime_ int
    timeToKeep_ int
    Return Type Description
    int
  • <ExternalTimeBatchWindow> initTiming(streams:StreamEvent firstStreamEvent)

    Parameter Name Data Type Default Value Description
    firstStreamEvent streams:StreamEvent
  • <ExternalTimeBatchWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int

public type ExternalTimeWindow object

This is a sliding time window based on external time, that holds events for that arrived during last window time period from the external timestamp, and gets updated on every monotonically increasing timestamp. E.g. from inputStream window externalTime(inputStream.timestamp, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The externalTime window should only have two parameters (timestamp field, windowTime)

Field Name Data Type Default Value Description
timeInMillis int

description

windowParameters any[]

description

expiredEventQueue streams:LinkedList

description

nextProcessPointer function (streams:StreamEvent?[]) returns ()?

description

timeStamp string

description

  • <ExternalTimeWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <ExternalTimeWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <ExternalTimeWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <ExternalTimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <ExternalTimeWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <ExternalTimeWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

  • <ExternalTimeWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int

public type Filter object

The Filter object represents the where clause in a streaming query. This object takes two parameter for initialization. nextProcessorPointer is the function pointer of the next processor to be invoked once the filtering is complete. conditionFunc is a function pointer which return true if the given where clause evaluates to true.

  • <Filter> process(streams:StreamEvent?[] streamEvents)

    Process the incoming stream events. This function takes an array of stream events, iterate each of the events in the array, then call the conditionFunc on each and see if conditionFunc is evaluates to true. if so, those events will be passed to the nextPrcessorPointer which can be the process function of the next processor object ( e.g. Select, Window, Aggregator.. etc).

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The events being filtered.

public type HoppingWindow object

The hopping window releases the events in batches defined by a time period every given time interval. The batch is also determined by the time period given in the window. When the time interval the events being released and the time period it hold the events are equal, the hopping window acts as a TimeBatch window. E.g. from inputStream window hopping(5000, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } Hopping window should only have two parameters ( windowTime, hoppingTime)

Field Name Data Type Default Value Description
timeInMilliSeconds int

description

hoppingTime int

description

windowParameters any[]

description

nextEmitTime int -1

description

currentEventQueue streams:LinkedList

description

resetEvent streams:StreamEvent?

description

nextProcessPointer function (streams:StreamEvent?[]) returns ()?

description

scheduler streams:Scheduler

description

  • <HoppingWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <HoppingWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <HoppingWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <HoppingWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <HoppingWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <HoppingWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type IntSort object

This class implements a merge sort algorithm to sort timestamp values for state persistence.

  • <IntSort> sort(int[] arr)

    Sorts a given array of int values.

    Parameter Name Data Type Default Value Description
    arr int[]

    The array of int values to be sorted.

public type LengthBatchWindow object

This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every given number of events arrival. E.g. from inputStream window lengthBatch(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The lengthBatch window should only have one parameter ( windowBatchLength)

Field Name Data Type Default Value Description
length int

description

windowParameters any[]

description

count int

description

resetEvent streams:StreamEvent?

description

currentEventQueue streams:LinkedList

description

nextProcessPointer function (streams:StreamEvent?[]) returns ()?

description

  • <LengthBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <LengthBatchWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <LengthBatchWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <LengthBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <LengthBatchWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <LengthBatchWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type LengthWindow object

The LengthWindow is a sliding length window, that holds last windowLength events, and gets updated on every event arrival and expiry. E.g. from inputStream window length(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The length window should only have one parameter ( windowLength)

Field Name Data Type Default Value Description
size int

description

linkedList streams:LinkedList

description

windowParameters any[]

description

nextProcessPointer function (streams:StreamEvent?[]) returns ()?

description

  • <LengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <LengthWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <LengthWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <LengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <LengthWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <LengthWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type LinkedList object

The LinkedList object which represents the linked list data structure.

Field Name Data Type Default Value Description
first streams:Node?

description

last streams:Node?

description

curr streams:Node?

description

size int 0

description

ascend boolean true

description

  • <LinkedList> isEmpty() returns (boolean)

    Checks if the linked list is empty.

    Return Type Description
    boolean

    Returns true if the linked list is empty, otherwise returns false.

  • <LinkedList> resetToFront()

    Moves the cursoer to the front/head of the linked list.

  • <LinkedList> resetToRear()

    Moves the cursor to the end of the linked list if the cursor is not already at the last element of the linked list.

  • <LinkedList> hasNext() returns (boolean)

    Returns true if the linked list has more elements starting from the current cursor location.

    Return Type Description
    boolean

    Returns true if there are more elements onwards from the current cursor location, otherwise false.

  • <LinkedList> hasPrevious() returns (boolean)

    Returns true if there are prior elements to the current element/cursor location, otherwise false.

    Return Type Description
    boolean

    Returns true, if there are elements prior to the current cursor location, otherwise false.

  • <LinkedList> next() returns (any)

    Returns the next element of the linked list and moves the cursor to the next element.

    Return Type Description
    any

    The next element from the current cursor location.

  • <LinkedList> previous() returns (any)

    Returns the previous element of the linked list and moves the cursor to the previous element.

    Return Type Description
    any

    The previous element from the current cursor location.

  • <LinkedList> removeCurrent()

    Removes the element at the current cursor location.

  • <LinkedList> getSize() returns (int)

    Returns the current number of elements in the linked list.

    Return Type Description
    int

    The number of elements in the linked list.

  • <LinkedList> clear()

    Empties the linked list.

  • <LinkedList> removeFirstOccurrence(any elem) returns (boolean)

    Removes the first occurence of the element pass as elem and return true of the removal is successful.

    Parameter Name Data Type Default Value Description
    elem any
    Return Type Description
    boolean

    Return true if removal is successful otherwise false.

  • <LinkedList> remove(any elem) returns (boolean)

    Removes the first occurence of the element pass as elem and return true of the removal is successful.

    Parameter Name Data Type Default Value Description
    elem any
    Return Type Description
    boolean

    Return true if removal is successful otherwise false.

  • <LinkedList> getFirst() returns (any)

    Returns the first element of the linked list, without moving the cursor.

    Return Type Description
    any

    First element of the linked list.

  • <LinkedList> getLast() returns (any)

    Returns the last element of the linked list, without moving the cursor.

    Return Type Description
    any

    Last element of the linked list.

  • <LinkedList> addFirst(any data)

    Adds a new element to the front of the linked list without moving the cursor.

    Parameter Name Data Type Default Value Description
    data any

    Data to be added to the front of the linked list.

  • <LinkedList> addLast(any data)

    Adds a new element to the end of the linked list without moving the cursor.

    Parameter Name Data Type Default Value Description
    data any

    Data to be added to the end of the linked list.

  • <LinkedList> removeFirst() returns (any)

    Removes the first element in the linked list without moving the cursor.

    Return Type Description
    any

    Returns the removed element.

  • <LinkedList> removeLast() returns (any)

    Removes the last element in the linked list without moving the cursor.

    Return Type Description
    any

    Returns the removed element.

  • <LinkedList> insertBeforeCurrent(any data)

    Insert a new element before the current cursor location.

    Parameter Name Data Type Default Value Description
    data any

    Data to be inserted.

  • <LinkedList> dequeue() returns (any)

    Returns the first element which is added to the linked list.

    Return Type Description
    any

    The dequeued element.

  • <LinkedList> asArray() returns (any[])

    Creates an array from the elements in the linked list and return it. The cursor will not be changed.

    Return Type Description
    any[]

    An array of elements in the linked list.

  • <LinkedList> addAll(any[] data)

    Adds elements of an array to the current cursor location and moves the cursor to the end of the list.

    Parameter Name Data Type Default Value Description
    data any[]

    The array to be added to the linked list.

public type Max object

Aggregator to find the maximum value in a stream.

Field Name Data Type Default Value Description
iMaxQueue streams:LinkedList BLangTypeInit: new null ([])

description

fMaxQueue streams:LinkedList BLangTypeInit: new null ([])

description

iMax int? ()

description

fMax float? ()

description

  • <Max> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current maximum value and return the updated maximum value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being checked whether it is greater than the current maximum value.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated max value.

  • <Max> copy() returns (Aggregator)

    Returns a copy of the Max aggregator without its current state.

    Return Type Description
    Aggregator

    Returns Max aggregator.

  • <Max> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <Max> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type MaxForever object

The aggregator to keep the maximum value received so far. It is similar to Max aggregator, but it keeps the maximum value it received so far, forever.

Field Name Data Type Default Value Description
iMax int? ()

description

fMax float? ()

description

  • <MaxForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current maximum value and return the updated maximum value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being checked whether it is greater than the current maximum value.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated maximum value.

  • <MaxForever> copy() returns (Aggregator)

    Returns a copy of the MaxForever aggregator.

    Return Type Description
    Aggregator

    A Aggregator object which represents MaxForever aggregator.

  • <MaxForever> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <MaxForever> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type MergeSort object

This object implements the merge sort algorithm to sort the provided value arrays. fieldFuncs are function pointers which returns the field values of each stream event's data map's values. sortTypes are an array of ( streams:ASCENDING or streams:DESCENDING).

Field Name Data Type Default Value Description
fieldFuncs function (map<anydata>) returns (anydata)?[]

description

sortTypes string[]

description

  • <MergeSort> __init(function (map<anydata>) returns (anydata)?[] fieldFuncs, string[] sortTypes)

    Parameter Name Data Type Default Value Description
    fieldFuncs function (map) returns (anydata)?[]
    sortTypes string[]
  • <MergeSort> topDownMergeSort(streams:StreamEvent?[] events)

    Sorts the given stream events using the merge sort algorithm.

    Parameter Name Data Type Default Value Description
    events streams:StreamEvent?[]

    The array of stream events to be sorted.

public type Min object

Aggregator to find the minimum value in a stream.

Field Name Data Type Default Value Description
iMinQueue streams:LinkedList BLangTypeInit: new null ([])

description

fMinQueue streams:LinkedList BLangTypeInit: new null ([])

description

iMin int? ()

description

fMin float? ()

description

  • <Min> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current minimum value and return the updated minimum value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being checked whether it is lesser than the current minimum value.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated minimum value.

  • <Min> copy() returns (Aggregator)

    Returns a copy of the Min aggregator.

    Return Type Description
    Aggregator

    A Aggregator object which represents Min aggregator.

  • <Min> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <Min> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type MinForever object

The aggregator to keep the minimum value received so far. It is similar to Min aggregator, but it keeps the minimum value it received so far, forever.

Field Name Data Type Default Value Description
iMin int? ()

description

fMin float? ()

description

  • <MinForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current minimum value and return the updated minimum value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being checked whether it is lesser than the current minimum value.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated minimum value.

  • <MinForever> copy() returns (Aggregator)

    Returns a copy of the MinForever aggregator.

    Return Type Description
    Aggregator

    A Aggregator object which represents MinForever aggregator.

  • <MinForever> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <MinForever> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type Node object

The Node object represents a node in the linkedlist data structure.

Field Name Data Type Default Value Description
data any

description

next streams:Node?

description

prev streams:Node?

description

  • <Node> __init(streams:Node? prev, any data, streams:Node? next)

    Parameter Name Data Type Default Value Description
    prev streams:Node?
    data any
    next streams:Node?

public type OrderBy object

The OrderBy object represents the desugared code of order by clause of a streaming query. This object takes 3 parameters to initialize itself. nextProcessPointer is the process method of the next processor. fieldFuncs is an array of function pointers which returns the field values to be sorted. sortTypes is an array of string specifying whether the sort order (ascending or descending). Internally this processor uses a MergeSort object to sort.

Field Name Data Type Default Value Description
nextProcessorPointer function (streams:StreamEvent?[]) returns ()

description

fieldFuncs function (map<anydata>) returns (anydata)?[]

description

sortTypes string[]

description

mergeSort streams:MergeSort

description

  • <OrderBy> __init(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fieldFuncs, string[] sortTypes)

    Parameter Name Data Type Default Value Description
    nextProcessorPointer function (streams:StreamEvent?[]) returns ()
    fieldFuncs function (map) returns (anydata)?[]
    sortTypes string[]
  • <OrderBy> process(streams:StreamEvent?[] streamEvents)

    Sorts the given array of stream events according to the given parameters (fieldFuncs and sortTypes).

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be sorted.

public type OutputProcess object

The OutputProcess object is responsible for sending the output (only the events of type streams:CURRENT to the destination stream. It takes a function pointer outputFunc which actually has the logic to process the output.

  • <OutputProcess> __init(function (map<anydata>[]) returns () outputFunc)

    Parameter Name Data Type Default Value Description
    outputFunc function (map[]) returns ()
  • <OutputProcess> process(streams:StreamEvent?[] streamEvents)

    Sends the output to the streaming action. most of the time the output is published to a destination stream at the streaming action. Only the events with type streams:CURRENT are passed to the outputFunc.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be filtered out for CURRENT events.

public type Scheduler object

The Scheduler object is responsible for generating streams:TIMER events at the given timestamp. Once the event is generated, the timer event is passed to the provided processFunc function pointer. The function pointer is the process function of the target processor, to which the timer event should be sent.

  • <Scheduler> __init(function (streams:StreamEvent?[]) returns () processFunc)

    Parameter Name Data Type Default Value Description
    processFunc function (streams:StreamEvent?[]) returns ()
  • <Scheduler> notifyAt(int timestamp)

    Schedule to send a timer events at the given timestamp.

    Parameter Name Data Type Default Value Description
    timestamp int

    The timestamp at which the timer event will be generated and passed to the provided processFunc.

  • <Scheduler> sendTimerEvents() returns (error?<>)

    Creates the timer events.

    Return Type Description
    error?<>

    Returns error if sending timer events failed.

public type Select object

The Select object represents the select clause. Anything that comes under select clause like aggregator function invocations are also handled in this processor. Further, grouping of the events (provided by the groupby clause) is also performed in this processor. aggregatorArr is an array of aggregators which are used in the select clause. groupbyFuncArray is an array of function pointers which returns the values being grouped. selectFunc is a function pointer to a lambda function which creates the data field of the output stream event. scopeName is used as a breadcrumb to identify the select clause if there are multiple forever blocks.

  • <Select> process(streams:StreamEvent?[] streamEvents)

    Selects only the selected fields in the select clause.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events passed to the select clause.

  • <Select> getGroupByKey(function (streams:StreamEvent) returns (anydata)?[]? groupbyFunctionArray, streams:StreamEvent e) returns (string)

    Creates a unique key for each group with the given values in the group by clause.

    Parameter Name Data Type Default Value Description
    groupbyFunctionArray function (streams:StreamEvent) returns (anydata)?[]?

    Function pointer array to the lambda function which returns each group by field.

    e streams:StreamEvent

    Stream Event object being grouped.

    Return Type Description
    string

    Returns a unique groupby key by which the event e is grouped.

public type Snapshotable object

Abstract Snapshotable to be referenced by all snapshotable objects.

  • <Snapshotable> saveState() returns (map<any>)

    Function to return the current state of a snapshotable object.

    Return Type Description
    map

    A map<any> that represents the current state of the snapshotable instance.

  • <Snapshotable> restoreState(map state)

    Function to restore a previous state intoa a snapshotable object.

    Parameter Name Data Type Default Value Description
    state map

    A map<any> state that can be used to restore snapshotable instance to a previous state.

public type SortWindow object

The sort window hold a given number of events and emit the expired events in the ordered by the given fields. E.g. from inputStream window sort(10, inputStream.age, "ascending", inputStream.name, "descending") select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The sort window should have three or more odd no of parameters ( windowLength, stream field, order1, stream field, order2, ...)

Field Name Data Type Default Value Description
lengthToKeep int

description

windowParameters any[]

description

sortedWindow streams:LinkedList

description

fields string[]

description

sortTypes string[]

description

nextProcessPointer function (streams:StreamEvent?[]) returns ()?

description

fieldFuncs function (map<anydata>) returns (anydata)?[]

description

mergeSort streams:MergeSort

description

  • <SortWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <SortWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <SortWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <SortWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <SortWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <SortWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type StdDev object

The aggregator object to calculate standard deviation.

Field Name Data Type Default Value Description
mean float 0.0

description

stdDeviation float 0.0

description

sumValue float 0.0

description

count int 0

description

  • <StdDev> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current standard deviation as the new values come into the aggregation.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being added or removed from aggregation in order to calculate the new standard deviation.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated standard deviation.

  • <StdDev> copy() returns (Aggregator)

    Returns a copy of the StdDev aggregator.

    Return Type Description
    Aggregator

    A Aggregator object which represents StdDev aggregator.

  • <StdDev> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <StdDev> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type StreamEvent object

The StreamEvent object is a wrapper around the actual data being received to the input stream. If a record is receive to a input stream, that record is converted to a map of anydata values and set that map to a field called data in a new StreamEvent object. StreamEvent is only used internally to transmit event data from one processor to another processor. At the time the record is converted to a map, the timestamp is set. If the record is first received by the input stream, the eventType is set to streams:CURRENT. Other than stream events of type streams:CURRENT, there are 3 types of StreamEvents. They are streams:EXPIRED, streams:RESET, streams:TIMER. An expired event is used to remove the state of its respective current event. A reset event is used to completely wipe the state and a timer event is used to trigger the process method of a particular processor in timely manner.

Field Name Data Type Default Value Description
eventType CURRENT|EXPIRED|ALL|RESET|TIMER

description

timestamp int

description

data map<anydata> {}

description

  • <StreamEvent> __init((string,map<anydata>)|map<anydata> eventData, CURRENT|EXPIRED|ALL|RESET|TIMER eventType, int timestamp)

    Parameter Name Data Type Default Value Description
    eventData (string,map)|map
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    timestamp int
  • <StreamEvent> copy() returns (StreamEvent)

    Returns a copy of the stream event instance.

    Return Type Description
    StreamEvent

    A copy of the StreamEvent object with its state.

  • <StreamEvent> addData(map<anydata> eventData)

    Adds key values pairs in a given map to the field data.

    Parameter Name Data Type Default Value Description
    eventData map

    map of anydata values to be added to field data.

  • <StreamEvent> addAttribute(string key, anydata val)

    Adds an attribute of an event to the map with its value.

    Parameter Name Data Type Default Value Description
    key string

    The key of the map entry.

    val anydata

    Respective value of the key.

public type StreamJoinProcessor object

The StreamJoinProcessor object is responsible for performing SQLish joins between two or more streams. The onConditionFunc is the lambda function which represents the where clause in the join clause. The joining happens only if the condition is statified. nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc. The lhsStream is the left hand side stream of the join and its attached window is 'lhsWindow. The rhsStream is the right hand side stream of the join and its attached window is 'rhsWindow. The unidirectionalStream stream defines the stream by which the joining is triggered when the events are received. Usually it is lhsStream, in rare cases it can be rhsStream. The joinType is the type of the join and it can be any value defined by streams:JoinType.

Field Name Data Type Default Value Description
lhsWindow streams:Window?

description

rhsWindow streams:Window?

description

lhsStream string?

description

rhsStream string?

description

unidirectionalStream string?

description

joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN

description

lockField int 0

description

  • <StreamJoinProcessor> __init(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? onConditionFunc)

    Parameter Name Data Type Default Value Description
    nextProcessor function (streams:StreamEvent?[]) returns ()
    joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
    onConditionFunc function (map,map) returns (boolean)?
  • <StreamJoinProcessor> process(streams:StreamEvent?[] streamEvents)

    Process the events from both lhsStream and the rhsStream and perform the joining.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    Stream events being joined.

  • <StreamJoinProcessor> setLHS(string streamName, streams:Window windowInstance)

    Sets the left hand side stream name and the respective window instance.

    Parameter Name Data Type Default Value Description
    streamName string

    The name of the left hand side stream.

    windowInstance streams:Window

    The window attached to the left hand side stream.