## mx.interware.caudal.streams.stateful

### acum-stats

`(acum-stats [state-key avg-key variance-key stdev-key count-key] & children)`

Stream function that acumulates stats (mean,variance,stdev,count) recomputing them, it receives an event that is decorated with mean,stdev,variance and count and takes information from the state and the resultin mixing is stored in the state, then the event with the new values is propagated to children

Arguments:state-key: Key name to store accumulating mean,stdev,variance and countavg-key: Key where the mean is in the event and in the statevariance-key: Key where the variance is in the event and in the statestdev-key: Key where the stdev is in the event and in the statecount-key: Key where the count is in the event and in the statechildren: List of streams to propagate result

– if event is missign required fields, the event is propagated with :error set to message error

### batch

`(batch [state-key n delta] & children)`

Streamer function that propagates a vector of events once n elements are stored in or dt time has passed

Arguments:state-key: Key name to storen: Max number of elements storeddt: Max time to propagatechildren: Children streamer functions to be propagated

### changed

`(changed [state-key change-pred] & children)`

Streamer function that checks if a value of an event has changed using a predicate

Arguments:

state-key: State key to store

change-pred: predicate for evaluating

children: Children streamer functions to be propagated

### concurrent-meter

`(concurrent-meter [state-key init-tx-pred end-tx-pred tx-id-fn concurrent-fld-cntr] & children)`

Stream function that stores in event the current number of transactions opened

when an initial event passes, the counter increments, if an end event paseses, the counter decrements

Arguments:state-key: State key name to store the counterinit-tx-pred: Predicate to detect the initial event of an operationinit-tx-end: Predicate to detect the finish event of an operation

*tx-id-fn: Function to get the transaction id

*[]: list that contains: - event key name to store the meter

- children functions to propagate

### counter

`(counter [state-key event-counter-key] & children)`

Streamer function that appends to an event the count of passed events

Arguments:

state-key: State key to store the count

event-counter-key: Event key to hold the count dafaults to :countchildren: Children streamer functions to be propagated

### dump-every

`(dump-every [state-key file-name-prefix date-format cycle dir-path])`

Stream function that writes to the file system information stored y the state.

Arguments:state-key: Key of the state to write to the file systemfile-name-prefix,date-formatanddir-path: are used to copute the file name like this:

- Caudal take the JVM time and formats it using a java.text.SimpleDateFormat withdate-formatand - concatenates it with ‘-’ and file-name-prefix, then appends to it the current‘by string list’then - appends ‘.edn’ the file will be located at thedir-pathdirectory.cycle: represents a long number for milliseconds or a vetor with a number ant time messure like:

- [5 :minutes] or [1 :day]

-cyclewill tell Caudal to write this file width this frequency posibly overwriting it.

### dumper-fn

`(dumper-fn file-name-prefix date-format dir-path state by-path d-k)`

### ewma-timeless

`(ewma-timeless [state-key r metric-key] & children)`

Streamer function that normalizes the metric value, it calculates the ewma which represents the exponential waited moving average and is calculated by the next equation:

(last-metric * r) + (metric-history * (1-r)), events with no metric will be ignoredÂ

Arguments:state-key: state key to store the metricr: Ratio value(metric-key): Optional for key holding metric, default :metricchildrenthe streamer functions to propagate

### matcher

`(matcher [state-key init-pred end-pred tx-id-fn timeout-delta-or-pred ts-key] & children)`

Stream function that correlates initial and end events, extract the metric value and calculates the metric difference

Arguments:state-key: Key name to store the timeinit-tx-pred: Predicate to detect the initial event of an operationinit-tx-end: Predicate to detect the finish event of an operationtx-id-fn: Function to get the transaction idtimeout-delta-or-pred: Time or predicate to wait the end event in milliseconds, when timeout, it will discard the event[]:List that contains the key name to store the difference miliseconds and childs to propagate

### mixer

`(mixer [state-key delay ts-key priority-fn] & children)`

### moving-time-window

`(moving-time-window [state-key n time-key] & children)`

Streamer function that propagate a vector of events to streamers using a moving time window

Arguments:state-key: State keyn: Max number of events(time-key): Optional key holding the time i millis, default :time& children: functions to propagate

### rate

`(rate [state-key ts-key rate-key days bucket-size] & children)`

Stream function that matains a matrix fo *days* rows and 60*24 columns, and stores it in the state, then for each event that passes through increments the number in the position of row 0 and collum equals to the number of minute in the day, remainder rows contain past *days* minus 1 (days shold be less than 60 ‘2 months’).

Arguments:state-key: Key of the state to hold the matrixts-key: keyword of the timestamp in millisrate-key: keyword holding the matrix when the outgoing event is propagated tochildrendays: Number of days to hold (number of rows in the matrix)bucket-size: number of minutes to group, larger numbers require less memory (ex: 15 means we know how many events are dividing the hour in 4 (15 minutes) it is required that this number divides 60 with no fraction(MEJORAR).children: List of streams to propagate result

– if event dosent have ‘ts-key’ event is ignored!!

### reduce-with

`(reduce-with [state-key reduce-fn] & children)`

Streamer function that reduces values of an event using a function and propagates it

Arguments:state-key: Key name to reducereduce-fn: function that reduces the valueschildren: Children streamer functions to be propagated

### rollup

`(rollup [state-key n delta] & children)`

Streamer function that propagates n events each dt time

Arguments:state-key: Key name to roll backn: Max number of elements storeddelta: Max time to perform operationschildren: Children streamer functions to be propagated

### uneven-tx

`(uneven-tx [state-key init-pred end-pred thread-id tx-id ts-id timeout unstarted-key unended-key] & children)`

### welford

`(welford [state-key metric-key mean-key variance-key stdev-key sum-of-squares-key count-key {:keys [prefix date-fmt freq path]}] & children)`

Stream function that implements Welford’s Algorithm to calculate online mean, variance, standard deviation and a count of associated events.

*state-key*key name to store current mean, standard deviation, variance and count*metric-key*numeric value to calculate statistics*mean-key*key to store the calculated mean into each event*variance-key*key to store the calculated variance into each event*stdev-key*key to store the calculated standard deviation into each event*sum-of-squares-key*key to store the calculated sum of squared differences (in algorithm, S of the kth iteration) into each event*count-key*key to store the calculated count into each event

For more info see this link