mx.interware.caudal.streams.stateful

acum-stats

(acum-stats [state-key avg-key variance-key stdev-key count-key] & children)

Stream function that acumulates stats (mean,variance,stdev,count) recomputing them, it receives an event that is decorated with mean,stdev,variance and count and takes information from the state and the resultin mixing is stored in the state, then the event with the new values is propagated to children

Arguments:
state-key: Key name to store accumulating mean,stdev,variance and count avg-key: Key where the mean is in the event and in the state
variance-key: Key where the variance is in the event and in the state
stdev-key: Key where the stdev is in the event and in the state
count-key: Key where the count is in the event and in the state
children: List of streams to propagate result
– if event is missign required fields, the event is propagated with :error set to message error

batch

(batch [state-key n delta] & children)

Streamer function that propagates a vector of events once n elements are stored in or dt time has passed

Arguments:
state-key: Key name to store
n: Max number of elements stored
dt: Max time to propagate
children: Children streamer functions to be propagated

changed

(changed [state-key change-pred] & children)

Streamer function that checks if a value of an event has changed using a predicate

Arguments:
state-key: State key to store
change-pred: predicate for evaluating
children: Children streamer functions to be propagated

concurrent-meter

(concurrent-meter [state-key init-tx-pred end-tx-pred tx-id-fn concurrent-fld-cntr] & children)

Stream function that stores in event the current number of transactions opened
when an initial event passes, the counter increments, if an end event paseses, the counter decrements

Arguments:
state-key: State key name to store the counter
init-tx-pred: Predicate to detect the initial event of an operation
init-tx-end: Predicate to detect the finish event of an operation
*tx-id-fn: Function to get the transaction id
*[]: list that contains: - event key name to store the meter
- children functions to propagate

counter

(counter [state-key event-counter-key] & children)

Streamer function that appends to an event the count of passed events

Arguments:
state-key: State key to store the count
event-counter-key: Event key to hold the count dafaults to :count children: Children streamer functions to be propagated

dump-every

(dump-every [state-key file-name-prefix date-format cycle dir-path])

Stream function that writes to the file system information stored y the state.

Arguments:
state-key: Key of the state to write to the file system
file-name-prefix, date-format and dir-path: are used to copute the file name like this:
- Caudal take the JVM time and formats it using a java.text.SimpleDateFormat with date-format and - concatenates it with ‘-’ and file-name-prefix, then appends to it the current ‘by string list’ then - appends ‘.edn’ the file will be located at the dir-path directory.
cycle: represents a long number for milliseconds or a vetor with a number ant time messure like:
- [5 :minutes] or [1 :day]
- cycle will tell Caudal to write this file width this frequency posibly overwriting it.

dumper-fn

(dumper-fn file-name-prefix date-format dir-path state by-path d-k)

ewma-timeless

(ewma-timeless [state-key r metric-key] & children)

Streamer function that normalizes the metric value, it calculates the ewma which represents the exponential waited moving average and is calculated by the next equation:
(last-metric * r) + (metric-history * (1-r)), events with no metric will be ignored 

Arguments:
state-key: state key to store the metric
r: Ratio value
(metric-key): Optional for key holding metric, default :metric children the streamer functions to propagate

matcher

(matcher [state-key init-pred end-pred tx-id-fn timeout-delta-or-pred ts-key] & children)

Stream function that correlates initial and end events, extract the metric value and calculates the metric difference

Arguments:
state-key: Key name to store the time
init-tx-pred: Predicate to detect the initial event of an operation
init-tx-end: Predicate to detect the finish event of an operation
tx-id-fn: Function to get the transaction id
timeout-delta-or-pred: Time or predicate to wait the end event in milliseconds, when timeout, it will discard the event []:List that contains the key name to store the difference miliseconds and childs to propagate

mixer

(mixer [state-key delay ts-key priority-fn] & children)

moving-time-window

(moving-time-window [state-key n time-key] & children)

Streamer function that propagate a vector of events to streamers using a moving time window

Arguments:
state-key: State key
n: Max number of events
(time-key): Optional key holding the time i millis, default :time & children: functions to propagate

rate

(rate [state-key ts-key rate-key days bucket-size] & children)

Stream function that matains a matrix fo days rows and 60*24 columns, and stores it in the state, then for each event that passes through increments the number in the position of row 0 and collum equals to the number of minute in the day, remainder rows contain past days minus 1 (days shold be less than 60 ‘2 months’).

Arguments:
state-key: Key of the state to hold the matrix
ts-key: keyword of the timestamp in millis
rate-key: keyword holding the matrix when the outgoing event is propagated to children
days: Number of days to hold (number of rows in the matrix) bucket-size: number of minutes to group, larger numbers require less memory (ex: 15 means we know how many events are dividing the hour in 4 (15 minutes) it is required that this number divides 60 with no fraction(MEJORAR).
children: List of streams to propagate result
– if event dosent have ‘ts-key’ event is ignored!!

reduce-with

(reduce-with [state-key reduce-fn] & children)

Streamer function that reduces values of an event using a function and propagates it

Arguments:
state-key: Key name to reduce
reduce-fn: function that reduces the values children: Children streamer functions to be propagated

rollup

(rollup [state-key n delta] & children)

Streamer function that propagates n events each dt time

Arguments:
state-key: Key name to roll back
n: Max number of elements stored
delta: Max time to perform operations children: Children streamer functions to be propagated

uneven-tx

(uneven-tx [state-key init-pred end-pred thread-id tx-id ts-id timeout unstarted-key unended-key] & children)

welford

(welford [state-key metric-key mean-key variance-key stdev-key sum-of-squares-key count-key {:keys [prefix date-fmt freq path]}] & children)

Stream function that implements Welford’s Algorithm to calculate online mean, variance, standard deviation and a count of associated events.

  • state-key key name to store current mean, standard deviation, variance and count
  • metric-key numeric value to calculate statistics
  • mean-key key to store the calculated mean into each event
  • variance-key key to store the calculated variance into each event
  • stdev-key key to store the calculated standard deviation into each event
  • sum-of-squares-key key to store the calculated sum of squared differences (in algorithm, S of the kth iteration) into each event
  • count-key key to store the calculated count into each event

For more info see this link