Intelie Pipes

Language Reference Documentation

version 0.25.0 (for older versions, click here)

Copyright 2021 Intelie


Table of Contents

1. Quick Start
1.1. Filters
1.2. Simple pipe
1.3. Chaining pipes
1.4. The join and union pipes
1.5. Sequences and rows
1.6. Macros and user functions
2. Concepts
2.1. Events and filters
2.2. Chained computation model
2.3. Output rates and aggregation windows
2.4. Grouping and group expiry
2.5. Expressions
2.5.1. Type system
2.5.2. Type arguments
2.5.3. Literals
2.5.4. Sequences and Rows
2.5.5. Scalars and aggregations
2.5.6. Aggregations state representation
2.5.7. Aggregation TTL
2.5.8. Property access
2.5.9. Special expressions
2.5.10. Function call
2.6. Macros and User Functions
3. Filters
3.1. * Special filter that allows all records through.
3.2. <term> Selects records where one of the current fields matches <term>.
3.3. <term>~<number maxEdits> Selects records where one of the current fields matches <term> with at most <maxEdits> edits.
3.4. [<term lower> TO <term upper>] Selects records where one of the current fields is between <lower> and <upper>.
3.5. <field>: <filter> Sets the current field to <field>. Usually followed by <term> (e.g. somefield:someterm).
3.6. <filter> & <filter> Selects the intersection of two other filters.
3.7. <filter> | <filter> Selects the union of two other filters.
3.8. -<filter> Selects the complement of another filter.
4. Pipes
4.1. <named...> [by <named...>] [over <window>] [every <period> | at the end] Transforms or aggregates records over configurable data window and output.
4.2. <pipe> join [on <fields...>] <pipe> Computes the cartesian product of the outputs from two pipes with same output rates.
4.3. <pipe> union <pipe> Concatenates the outputs from two pipes with compatible output rates.
4.4. @chain [<seq expr>] Chains many sequences in a batch into a single sequence
4.5. @compile <string>, <obj... args> Compiles a constant string into a pipe
4.6. @compress <number k>, [<number k2>,] <number... y> [by <object...>] Alias to either @compress.pip (default) or @compress.uniform.
4.7. @compress.pip <number k>, [<number k2>,] <number... y> [by <object...>] Compresses the result from the previous pipe to at most k most important rows.
4.8. @compress.uniform <number k>, [<number k2>,] <number... y> [by <object...>] Compresses the result from the previous pipe to at most k rows.
4.9. @debounce <period> [by <object...>] Only outputs rows that follows <period> time without any output.
4.10. @empty [over <window>...] Creates an empty pipe (pass-through) with optional window metadata
4.11. @filter <boolean condition> [by <object...>] Filters the results from previous pipe.
4.12. @for <named(seq)>, <named... additional> Unwinds events in the seq instance
4.13. @latest Keeps the latest batch of input events and output it at the end.
4.14. @makejoin <string... props>, <pipe...> Creates a join pipe containing all pipes passed as arguments
4.15. @makeljoin <string... props>, <pipe...> Creates a left join pipe containing all pipes passed as arguments
4.16. @makelrjoin <string... props>, <pipe...> Creates a full join pipe containing all pipes passed as arguments
4.17. @makerjoin <string... props>, <pipe...> Creates a right join pipe containing all pipes passed as arguments
4.18. @makeunion <pipe...> Creates a union pipe containing all pipes passed as arguments
4.19. @meta <named...> [over <window>...] Adds custom fields to query metadata
4.20. @onchange [<object...>] [by <object...>] Outputs only events that change the last value of any expression in the group
4.21. @onfalse <boolean condition> [by <object...>] Outputs only events for which condition evaluates false and was not false in the previous event
4.22. @ontrue <boolean condition> [by <object...>] Outputs only events for which condition evaluates true and was not true in the previous event
4.23. @resample <period>, (<object>, <period>, <string method>...) [by <object...>] Performs resampling of the incoming stream to match the specified period
4.24. @seq Changes the static type of any row to the highest corresponding seq.
4.25. @set <named...> [by <object...>] Sets fields in the original event (both typed and raw)
4.26. @skip <number> [by <object...>] Skips some results from previous pipe.
4.27. @skipwhile <boolean> Skips rows from every batch while some condition is true.
4.28. @slice [<number start>], [<number end>], [<number step>] [by <object...>] Gets some rows (based on index) from previous pipe.
4.29. @sort <sortfield... expr> Sorts the results from previous pipe.
4.30. @take <number> [by <object...>] Limits the results from previous pipe.
4.31. @takewhile <boolean> Takes rows from every batch while some condition is true.
4.32. @throttle [<number k>, ] <period|seq> [by <object...>] Limits the output of the previous pipe to at most <k> rows per <period>.
4.33. @top <number k>, <sortfield... expr> [by <object...>] Sorts the results and gets the first k rows (possibly grouped) from previous pipe.
4.34. @unbatch Extracts a batch of events in many batches, one for each event.
4.35. @unsafe Marks that any pipe executed after this must run in a non-distributed environment.
4.36. @yield [<object expr>] Extracts one field of the stream to be the output event.
4.37. @zip [<seq expr>] Takes elements from a batch of seqs, one by one and make a seq of seqs.
4.38. atomic <pipe> Modify a pipe to run entirely in the same node, ignoring any distribution invariants.
5. Operators
5.1. \<filter> Returns a boolean expression that evaluates true if the filter accepts the event.
5.2. <object># → <number> Coerces the expression to number. Shorthand to <object>:number() → <number>.
5.3. <object>$ → <string> Coerces the expression to string. Shorthand to <object>:string() → <string>.
5.4. <number> + <number> → <number> Adds two numbers.
5.5. <string> + <string> → <string> Concatenates two strings.
5.6. <seq> + <seq> → <seq> Concatenates two seqs.
5.7. <map> + <map> → <map> Concatenates two maps.
5.8. <number> - <number> → <number> Subtracts one number from another.
5.9. <number> * <number> → <number> Multiplies two numbers.
5.10. <number> / <number> → <number> Divides one number by another (float division).
5.11. <number> /? <number> → <number> Divides one number by another (float division). Legacy operator that returns 0.0 when the divisor is 0.0.
5.12. <number> // <number> → <number> Divides one number by another (integer division).
5.13. <number> //? <number> → <number> Divides one number by another (integer division). Legacy operator that returns 0.0 when the divisor is 0.0.
5.14. <number> ** <number> → <number> Raises one number to another's power.
5.15. <number> % <number> → <number> Returns the rest of the division of one number by another.
5.16. <number> %? <number> → <number> Returns the rest of the division of one number by another. Legacy operator that returns 0.0 when the divisor is 0.0.
5.17. -<number> → <number> Negates one number.
5.18. <boolean> and <boolean> → <boolean> Returns the logical AND of two booleans.
5.19. <boolean> or <boolean> → <boolean> Returns the logical OR of two booleans.
5.20. <boolean> xor <boolean> → <boolean> Returns the logical XOR of two booleans.
5.21. not <boolean> → <boolean> Returns the logical NOT of a boolean.
5.22. <object> == <object> → <boolean> Checks whether two objects are equal.
5.23. <object> != <object> → <boolean> Checks whether two objects are not equal.
5.24. <comparable> < <comparable> → <boolean> Checks whether the left operand compares lesser than the right one.
5.25. <comparable> <= <comparable> → <boolean> Checks whether the left operand compares lesser than or equal to the right one.
5.26. <comparable> > <comparable> → <boolean> Checks whether the left operand compares greater than the right one.
5.27. <comparable> >= <comparable> → <boolean> Checks whether the left operand compares greater than or equal to the right one.
5.28. <object>-><identifier> → <object> Extracts a property from an object.
5.29. <object>->(<object>) → <object> Evaluates an expression in the semantic context of the target
5.30. <object>[<object...>] → <object> Accesses an object by index/key in a list/string/map.
5.31. ^<object> → <object> Evaluates the expression in the parent semantic context (if any).
5.32. <object> ?? <object> → <object> Returns the first if it is not null; otherwise, returns the second.
5.33. <boolean> ? <object>, <object> → <object> If the condition is true, returns the first object; otherwise, returns the second.
5.34. <seq> |> <&fn> → <object> Transforms a sequence into another sequence or object.
6. Scalar Functions
6.1. <boolean>:compile.if(<object if_true>, <object if_false>) → <object> Chooses one of the expressions based on a constant condition in compile-time
6.2. <map>:containskey(<object>) → <boolean> Returns whether the target map contains the argument as a key.
6.3. <number>:abs() → <number> Calculates the absolute value of a number.
6.4. <number>:acos() → <number> Returns the arc cosine of the argument to an angle in radians.
6.5. <number>:aequal(<number>, [<number epsilon>]) → <number> Returns true if a and b are approximately equal. If no number is given, epsilon defaults to 1e-6.
6.6. <number>:asin() → <number> Returns the arc sine of the argument to an angle in radians.
6.7. <number>:atan() → <number> Returns the arc tangent of the argument to an angle in radians.
6.8. <number>:atan2(<number x>) → <number> Returns the two-argument arc tangent of the argument to an angle in radians.
6.9. <number>:bytes([<number precision>]) → <string> Formats a number as the best possible byte multiple.
6.10. <number>:ceil([<number precision>]) → <number> Returns the smallest number that is greatest than or equal to the argument.
6.11. <number>:chr() → <string> Converts a unicode codepoint to a single-char string.
6.12. <number>:cos() → <number> Returns the cosine of an angle in radians.
6.13. <number>:cosh() → <number> Returns the hyperbolic cosine of an angle in radians.
6.14. <number>:dateadd(<period>) → <number> Adds <period> to timestamp argument.
6.15. <number>:datefloor(<period>) → <number> Rounds timestamp down to the nearest date that is divisible by <period>.
6.16. <number>:dateformat([<string format>], [<string tz>]) → <string> Formats timestamp using specified format
6.17. <number>:datesub(<period>) → <number> Sutracts <period> from timestamp argument.
6.18. <number>:displaynumber() → <number> Formats a number in unicode for display in small spaces
6.19. <number>:exp() → <number> Calculates the exponential of a number.
6.20. <number>:floor([<number precision>]) → <number> Returns the largest number that is lesser than or equal to the argument.
6.21. <number>:format([<string format>], [<string locale>]) → <string> Formats a number according to format string and locale.
6.22. <number>:log([<number base>]) → <number> Calculates the logarithm of a number.
6.23. <number>:milliseconds([<precision>]) → <string> Formats a number representing a timespan in milliseconds
6.24. <number>:normdist(<number mean>, <number sdev>, [<boolean acc>]) → <number> Calculates the gaussian distribution function value, optionally integrated.
6.25. <number>:pow(<number exp>) → <number> Raises one number to another.
6.26. <number>:round([<number precision>]) → <number> Rounds a number to <precision> decimal places.
6.27. <number>:select(<object... list>) → <object> Selects the ith element from a list of arguments. Or null if it doesn't exist.
6.28. <number>:sin() → <number> Returns the sine of an angle in radians.
6.29. <number>:sinh() → <number> Returns the hyperbolic sine of an angle in radians.
6.30. <number>:sqrt() → <number> Returns the correctly rounded positive square root of a double value.
6.31. <number>:tan() → <number> Returns the tangent of an angle in radians.
6.32. <number>:tanh() → <number> Returns the hyperbolic tangent of an angle in radians.
6.33. <number>:tobase(<number base>) → <string> Converts a number to a string representing it in some base
6.34. <object>:boolean() → <boolean> Converts object to boolean.
6.35. <object>:class() → <string> Returns the underlying Java class of the object.
6.36. <object>:comparable() → <comparable> Converts object to comparable.
6.37. <object>:const() → <object> Makes any scalar act as a constant in compile time
6.38. <object>:decode(<object,object... pairs>) → <object> Transforms the parameter using the translation rules defined in <pairs>.
6.39. <object>:get(<object... keys>) → <object> Much like property[keys]. Works for strings, containers and arrays.
6.40. <object>:indexin(<object... list>) → <number> Returns the first index of the value in <list>, or null if <list> does not contain it.
6.41. <object>:isin(<object... list>) → <boolean> Returns true if <list> contains the value, false otherwise.
6.42. <object>:json() → <string> Converts the object to its JSON string representation.
6.43. <object>:keep([<number ttl>]) → <object> When used in a simple pipe, delays or disable (if ttl not supplied or < 0) inactive group removal.
6.44. <object>:len() → <number> Tries to get <target>'s size. Works for strings, containers and arrays.
6.45. <object>:map() → <map> Converts object to map.
6.46. <object>:nameof() → <string> Returns a suggested field name for some expression
6.47. <object>:number() → <number> Converts object to number.
6.48. <object>:object() → <object> Casts any object to its canonical object representation.
6.49. <object>:row() → <seq> Casts any object to a row.
6.50. <object>:seq() → <seq> Casts any object to a seq.
6.51. <object>:setfield(<named... fields>) → <object> Sets fields into the object in a type-safe way
6.52. <object>:string() → <string> Converts object to string.
6.53. <object>:typeof() → <string> Returns the compile-time type of an expression.
6.54. <object>:unfold(<&fn>) → <seq> Creates a sequence by applying the same function successively to a state.
6.55. <row>:tomap() → <map> Converts a row instance to a map, using the field names as keys
6.56. <seq>:contains(<object>) → <boolean> Returns whether the target seq contains the argument.
6.57. <seq>:enumerate([<number start>]) → <seq(row)> Returns a same-sized sequence with an incrementing number before each object.
6.58. <seq>:except(<seq b>) → <seq> Creates a sequence with the same elements, except the ones that are also present in <b>
6.59. <seq>:indexof(<object>) → <number> Returns the index of position of <s> inside the target string. Returns null otherwise.
6.60. <seq>:mapnames(<&string fn>) → <row> Converts a constant sequence into a strongly-typed row in compile-time.
6.61. <seq>:qpush(<object>, [<number bound>]) → <seq> Returns the merge of a sequence and a value with a maximum size of <bound>, removing elements in a FIFO manner.
6.62. <seq>:qpushseq(<seq b>, [<number bound>]) → <seq> Returns the merge of both sequences with a maximum size of <bound>, removing elements in a FIFO manner.
6.63. <seq>:repeat(<number>) → <seq> Repeats the sequence a certain number of times.
6.64. <seq>:retain(<seq b>) → <seq> Creates a sequence with the same elements, but only the ones that are also present in <b>
6.65. <seq>:skip([<number n>]) → <seq> Returns a new sequence that skips the first <n> elements
6.66. <seq>:slice([<number start>], [<number end>], [<number step>]) → <seq> Returns the sliced sequence starting at index <start>, ending at <end> getting every <step> elements.
6.67. <seq>:take([<number n>]) → <seq> Returns a new sequence that takes only the first <n> elements
6.68. <string>:compile(<object... args>) → <number> Compiles a constant string into a pipes expression
6.69. <string>:compile.map(<object... args>) → <number> Compiles a constant string with each argument into many pipes expressions
6.70. <string>:contains(<string>) → <boolean> Returns whether the target string contains the argument.
6.71. <string>:dateparse([<string format>], [<string tz>]) → <number> Parses timestamp using specified format
6.72. <string>:endswith(<string>) → <boolean> Returns whether the target string ends with the argument.
6.73. <string>:frombase(<number base>) → <number> Converts a string representing a number in some base to the number itself
6.74. <string>:hll.eval() → <number> Evaluates compressed base64 HyperLogLog data.
6.75. <string>:indexof(<string s>, [<number fromIndex>]) → <number> Returns the first index of position of <s> inside the target string. Returns null otherwise.
6.76. <string>:jsonparse() → <object> Parses JSON string into objects.
6.77. <string>:lower() → <string> Converts string to lowercase.
6.78. <string>:ltrim([<string chars>]) → <string> Trims leading characters from a string
6.79. <string>:ord() → <number> Gets the unicode codepoint from the first char in the string.
6.80. <string>:parse([<string format>], [<string locale>]) → <number> Parses a number according to format string and locale.
6.81. <string>:property() → <object> Compiles a string into the corresponding property access object
6.82. <string>:regex(<string regex>) → <row> Returns a strongly typed row composed by all named groups in <regex>.
6.83. <string>:regexall(<string regex>) → <seq(row)> Returns a sequence of rows composed by all matches of all named groups in <regex>.
6.84. <string>:regexfind(<string regex>, [<number|string group>]) → <string> Returns the matched string by <regex> in target (or one specific group).
6.85. <string>:regexfindall(<string regex>, [<number|string group>]) → <seq(string)> Returns all the matched strings by <regex> in target (or one specific group).
6.86. <string>:regexmatch(<string regex>) → <boolean> Returns true if the target matches <regex>. False otherwise.
6.87. <string>:regexsplit(<string regex>, [<number limit>]) → <seq(string)> Splits string by regular expression <regex> in up to <limit> pieces.
6.88. <string>:regexsub(<string regex>, <string replacement>) → <string> Replaces all matches of <regex> in target by <replacement>.
6.89. <string>:repeat(<number>) → <string> Repeats the string a number of times.
6.90. <string>:replace(<string from>, <string to>) → <string> Replaces all instances of <from> with the string <to>.
6.91. <string>:rindexof(<string s>, [<number fromIndex>]) → <number> Returns the last index of position of <s> inside the target string. Returns null otherwise.
6.92. <string>:rtrim([<string chars>]) → <string> Trims trailing characters from a string
6.93. <string>:rtruncate(<number length>, [<string ellipsis>]) → <string> Truncates the string to the specified length, keeping the end, optionally adding an ellipsis at the start.
6.94. <string>:split([<string delim>], [<number limit>]) → <seq(string)> Splits string by <delim> in up to <limit> pieces.
6.95. <string>:sprintf(<object... args>) → <string> Uses the target string as format to arguments.
6.96. <string>:startswith(<string>) → <boolean> Returns whether the target string starts with the argument.
6.97. <string>:substring(<number from>, [<number to>]) → <string> Returns the substring between the indices <from> and <to>.
6.98. <string>:trim([<string chars>]) → <string> Trims a string
6.99. <string>:truncate(<number length>, [<string ellipsis>]) → <string> Truncates the string to the specified length, optionally adding an ellipsis at the end.
6.100. <string>:upper() → <string> Converts string to uppercase.
6.101. <string>:urldecode([<string encoding>]) → <string> Decodes an application/x-www-form-urlencoded unicode string.
6.102. <string>:urlencode([<string encoding>]) → <string> Encodes a string as application/x-www-form-urlencoded.
6.103. cast(<object>, <string typename>) → <object> Casts a value into another type.
6.104. compare(<comparable a>, <comparable b>) → <number> Returns a number < 0 if a < b, > 0 if a > b or 0 if a = b.
6.105. concat(<string...>) → <string> Concatenetes many strings together
6.106. cron(<string>, [<string tz>]) → <period> Constructs a period instance from a constant cron string
6.107. exprange([<number start>], <number end>, <number step>) → <object> Creates a seq that iterates through numbers exponentially
6.108. happened([<number ref>], [<number tested>], <string>, [<string tz>]) → <number> Returns true if the tested timestamp is inside the span, false otherwise.
6.109. hll.merge(<string... data>) → <string> Merge many instances of compressed base64 HyperLogLog data.
6.110. itermap(<object>, [<string keyField>], [<string valueField>]) → <seq(row)> Creates a seq that iterates through java.util.Map, seq or row entries
6.111. max(<comparable>, <comparable>, <comparable...>) → <comparable> Returns the greatest value of all supplied arguments.
6.112. metadata() → <object> Returns a representation of the previous pipe metadata.
6.113. min(<comparable>, <comparable>, <comparable...>) → <comparable> Returns the least value of all supplied arguments.
6.114. minhash.eval(<string...>) → <number> Evaluates the similarity between many MinHash encoded data.
6.115. minhash.merge(<string... data>) → <string> Merge many MinHash encoded data.
6.116. mregression.apply(<seq(number) coefficients>, <number... xs>) → <number> Applies regression coefficients to variables
6.117. newmap(<object,object... pairs>) → <map> Creates a instance of java.util.Map with the supplied keys and values.
6.118. normrandom([<number mean>], [<number stdev>]) → <number> Returns a random value from a normal (Gaussian) distribution.
6.119. period(<number>, <string unit>, [<string tz>]) → <period> Constructs a period instance from parameters
6.120. pi() → <number> Returns the constant value of pi.
6.121. random([<number min>], [<number max>]) → <number> Returns a random value between <min> and <max> (0 and 1 if not defined).
6.122. range([<number start>], <number end>, [<number step>]) → <object> Creates a seq that iterates through numbers
6.123. span([<number ref>], <string>, [<string tz>]) → <row> Calculates start and end timestamps of span based on target.
6.124. spanend([<number ref>], <string>, [<string tz>]) → <number> Calculates end timestamp of span based on target.
6.125. spanstart([<number ref>], <string>, [<string tz>]) → <number> Calculates start timestamp of span based on target.
6.126. spantest([<number ref>], [<number tested>], <string>, [<string tz>]) → <number> Returns -1 if the tested timestamp is before the span, 0 if it is inside and 1 if it is after.
6.127. timestamp() → <number> Returns the most appropriate timestamp, whether in scalar or aggregation contexts.
6.128. uuid() → <string> Returns a random UUID string.
6.129. zip(<seq... args>) → <seq(row)> Creates a seq such that each element is a row with the respective element from input seqs
6.130. System functions
6.130.1. sys.cpu() → <row> Returns the system's CPU usage info.
6.130.2. sys.disks() → <seq(row)> Returns info about all devices in the filesystem.
6.130.3. sys.fds() → <row> Returns info about process file descriptors.
6.130.4. sys.heap() → <row> Returns the system's current JVM heap info.
6.130.5. sys.hostname() → <string> Returns the system's hostname.
6.130.6. sys.memory() → <row> Returns the system's memory usage info.
6.130.7. sys.properties() → <object> Returns a list of defined system properties
6.130.8. sys.threadlist() → <seq(row)> Returns a list of threads in the JVM
6.130.9. sys.threads() → <row> Returns info about running threads in current process.
6.130.10. sys.timestamp() → <number> Returns the system's real timestamp (not the query timestamp).
6.130.11. sys.uptime() → <number> Returns the number of milliseconds since the start of the JVM
6.130.12. sys.version() → <row> Returns Pipes version info
7. Aggregation Functions
7.1. <aggregation object expr>:if(<boolean condition>) → <object> Aggregates only events that evaluates true to <condition>.
7.2. <aggregation object expr>:overall() → <object> Merges all the results from the target aggregation.
7.3. <aggregation object expr>:overlast(<number window>) → <object> Merges the results of the last <window> aggregations.
7.4. <aggregation object expr>:prev([<number prev>]) → <object> Delays and returns the previous <number>th result from target aggregation.
7.5. all(<boolean>) → <boolean> Returns true if all ocurrences evaluate true.
7.6. amap(<row>, <aggregation object>) → <row> Applies the same aggregation to all fields of a row.
7.7. any(<boolean>) → <boolean> Returns true if any ocurrence evaluates true.
7.8. areduce(<seq>, <aggregation object>) → <object> Applies all elements of a sequence to the same aggregation
7.9. avg(<number>, [<number weight>]) → <number> Calculates the (possibly weighted) average of some expression.
7.10. count([<object>]) → <number> Counts all events in a window (except those that evaluate to false or null).
7.11. dcount(<object...>) → <number> Estimates the field's cardinality (distinct count) using HyperLogLog.
7.12. describe(<aggregation object expr>) → <string> Yields a string json explaining the target aggregation's inner state representation.
7.13. first(<object>) → <object> Yields the ocurrence with least timestamp.
7.14. firstn(<object>, <number n>) → <seq> Yields the <n> values with least timestamp.
7.15. firstv(<object>) → <object> Yields the non-null ocurrence with least timestamp.
7.16. fold(<object>, [<object initial>], <&fn>, [<&merger>]) → <object> Reduces sequence of elements applying function successively
7.17. foldv(<object>, [<object initial>], <&fn>, [<&merger>]) → <object> Reduces sequence of elements applying function successively (ignores nulls)
7.18. greatest(<object>, <sortfield...>) → <object> Yields the greatest ocurrence in the window based on the sort fields.
7.19. greatestv(<object>, <sortfield...>) → <object> Yields the greatest non-null ocurrence in the window based on the sort fields.
7.20. hll(<number log2m>, <object...>) → <number> Estimates the field's cardinality (distinct count) using HyperLogLog.
7.21. hll.init(<number log2m>, <object...>) → <string> Similar to hll, but it doesn't evaluate final cardinality, just return the sketch data.
7.22. hll.merge(<string>) → <string> Performs union of many HyperLogLog encoded data in a window.
7.23. hll.mergeeval(<string>) → <number> Performs union of many HyperLogLog encoded data in a window and evaluates the result.
7.24. intervals(<number begin>, <number end>) → <seq> Creates a list of disjoint intervals
7.25. last(<object>) → <object> Yields the ocurrence with greatest timestamp.
7.26. lastn(<object>, <number n>) → <seq> Yields the <n> values with greatest timestamp.
7.27. lastv(<object>) → <object> Yields the non-null ocurrence with greatest timestamp.
7.28. least(<object>, <sortfield...>) → <object> Yields the least ocurrence in the window based on the sort fields.
7.29. leastv(<object>, <sortfield...>) → <object> Yields the least non-null ocurrence in the window based on the sort fields.
7.30. list(<object>) → <seq> Creates a java.util.List from all events in a window.
7.31. map(<object key>, <aggregation object value>) → <map> Creates a java.util.Map from all events in a window.
7.32. max(<comparable>) → <comparable> Yields the greatest ocurrence in the window.
7.33. maxv(<comparable>) → <comparable> Yields the greatest non-null ocurrence in the window.
7.34. median(<number>) → <number> Estimates the median value of the population using Count-Min Sketch.
7.35. min(<comparable>) → <comparable> Yields the least ocurrence in the window.
7.36. minhash.init(<number size>, <object...>) → <string> Returns MinHash set signature.
7.37. minhash.merge(<string>) → <string> Merge many MinHash encoded data in a window.
7.38. minhash.mergeeval(<string...>) → <number> Merge many MinHash encoded data in a window and evaluates the similarity between them.
7.39. minv(<comparable>) → <comparable> Yields the least non-null ocurrence in the window.
7.40. mregression([<number... x>], <number y>) → <seq> Computes multivariate regression for given variables.
7.41. mregression.full([<number... x>], <number y>) → <row> Computes multivariate regression statistics for given variables.
7.42. pcount(<boolean>) → <number> Aggregates the proportion of events that evaluate true to expression.
7.43. pfold(<object>, [<object initial>], <&fn>) → <object> Same as fold, but with state persisted
7.44. quantile(<number>, <number q>) → <number> Estimates the q (0..1) quantile of the population using Count-Min Sketch.
7.45. regression([<number x>], <number y>) → <row> Computes regression and correlation statistics for given variables.
7.46. set(<object>) → <seq> Creates a java.util.Set from all events in a window.
7.47. similarity(<object...>) → <number> Estimates the Jaccard similarity coefficient between many sets.
7.48. smooth(<aggregation number expr>, [<number alpha>], [<number beta>]) → <number> Smoothes the curve of another aggregation.
7.49. statistics(<number>) → <row> Aggregates sample statistics for some property.
7.50. stdev(<number>, [<number weight>]) → <number> Calculates the (possibly weighted) standard deviation of some expression.
7.51. sum(<number>) → <number> Sums all evaluations of some expression.
7.52. summary(<string>, [<string separator>], [<string lastSeparator>]) → <string> Join all the strings in a window.
7.53. top(<number k>, <object>, [<sortfield...>]) → <seq> Yields the k minimum occurrences of the target object.
7.54. variance(<number>, [<number weight>]) → <number> Calculates the (possibly weighted) variance of some expression.
7.55. when(<boolean expr>) → <number> Yields the latest timestamp inside window when some condition was true.
7.56. whenfirst(<boolean expr>) → <number> Yields the first timestamp inside window when some condition was true.
7.57. Window Meta-aggregations
7.57.1. wcount() → <number> Yields how many outputs are merged in the current window.
7.57.2. wstart() → <number> Returns the window's first allowed timestamp (or item index).
7.57.3. wend() → <number> Returns the window's last allowed timestamp (or item index).
7.57.4. ostart() → <number> Returns the output's first allowed timestamp (or item index).
7.57.5. oend() → <number> Returns the output's last allowed timestamp (or item index).
7.57.6. otimestamp() → <number> Returns the timestamp when the outputs were merged (equal to oend() on time pipes).
8. Timespan Language
8.1. Span types
8.2. Period types

1. Quick Start

This chapter is an introduction to the Pipes language through examples. Each section will contain links to other chapters that develop a little more on the subjects presented here.


1.1. Filters

The simplest Pipes query is a filter. The simplest filter is * that matches all events.

*

Another simple filter selects all events where somefield is equal to somevalue:

somefield:somevalue

Use quotes (" or ') to filter by complex strings:

somefield:"a complex value"

There are other filter types available, for example:

somefield:aaa*zzz

Filters events where somefield has a string that starts with aaa and ends with zzz.

somefield#:[42, 1000)

Filters events where somefield, converted to number, has a value between 42 (inclusive) and 1000 (exclusive).


Filters are composable with the & and | operators:

(host:A & value:30) | (host:B & value:42)

The operator & can be omitted and has greater precedence than |:

host:A value:30 | host:B value:42

The operator - has the effect of select the complement of a filter:

host:A -value:30

Select events where host equals "A", but value isn't 30.


If you do not define the field name, the default field is selected. For example, if the default field is __type, you can write:

SomeType value:42

Select events from SomeType with value equal to 42.


You can write an entire sub-filter inside some field declaration:

SomeType value:(42 | (43 -othervalue:X))

Select events from SomeType if value is 42 or 43 (but the latter only if othervalue isn't equal to X).


To know more about...


1.2. Simple pipe

After the filter, you can chain one or more pipes. The simplest pipe just selects some fields from the event.

SomeType => field1, field2

Selects only field1 and field2 from events matching SomeType


By default, references to fields from filtered events are assumed to be strings. If you want to declare a field reference as number, use the operator #:

SomeType => field1#, field2#

Selects only field1 and field2 from events matching SomeType


You can aggregate events, just like you would do in SQL:

SomeType => count() every minute

Outputs every minute the number of events matching SomeType in that minute.


You can use more than one aggregation and give a name to the output:

SomeType => count() as {Event count}, avg(value#) as mean every minute

Outputs every minute the number of events and the mean of the variable value for events matching SomeType in that minute.


You can also define a time window to aggregate over:

SomeType => count() over last 2 hours every minute

Outputs every minute the number of events matching SomeType in the last 2 hours.


If you want to group the results by some other field, use:

SomeType => count() by host every minute

Outputs every minute, for every host, the number of events matching SomeType


To know more about...


1.3. Chaining pipes

Each query in Pipes creates new events. Those events can be chained into another pipe(s) to be transformed:

SomeType
=> count() by host every minute
=> @top 5, count desc

Outputs every minute the top 5 hosts that received most events matching SomeType.


You can use as many pipes as you want:

SomeType
=> count(), avg(response_time#) as time by host every minute
=> @filter time > 3
=> @top 5, count desc

Outputs every minute hosts with average response time greater than 3 seconds, getting the top 5 by event count.


There are many different pipe types:

SomeType
=> count() every minute
=> @filter count > 1000
=> @throttle 30 minutes

Outputs the number of events for every minute that has more than 1000 events, but only if it didn't already happen in the last 30 minutes.


To know more about...


1.4. The join and union pipes

Sometimes it is important to mix information from different streams into a single event type. For example, assuming CPU and Memory metrics come in different events, but with the same event type Metrics, you can write a query with the join pipe to mix them both:

Metrics
=> [
    @filter \metric_name:CPU => avg(value#) as cpu every minute
    join
    @filter \metric_name:Memory => avg(value#) as mem every minute
]

Outputs the average cpu and memory every minute, in the same event.


You can also define a field to join the results on. For example, if you need to calculate the metrics in previous example by host, you can join on this field using:

Metrics
=> [
    @filter \metric_name:CPU => avg(value#) as cpu by host every minute
    join on host
    @filter \metric_name:Memory => avg(value#) as mem by host every minute
]

Outputs the average cpu and memory for each host every minute, in the same event.


Another example: imagine you need to create a ranking with the top 10 hosts by event count in the minute, but with the 11th row being the total count in all hosts, you can use the union pipe to do that:

SomeType
=> [
    count() by host every minute => @top 10, count desc
    union
    count() by 'Total' every minute
]

Outputs 11 rows every minute: the top 10 hosts by event count and a total row.


One last example: if you need to filter hosts with average CPU in the last minute greater than the global average, you can write:

Metrics metric_name:CPU
=> [
    avg(value#) as cpu by host every minute
    join
    avg(value#) as global_avg every minute
]
=> @filter cpu > global_avg

Outputs every minute only hosts with average CPU greater than the global average.


To know more about...


1.5. Sequences and rows

There are event properties that represents lists and tuples. Pipes has two types to represent those properties: seq and row.

A seq represents a sequence of objects. The type of each object in the sequence is denoted along with the type name. For example, a seq(number) is a number sequence. As example, the expression range(10) is a seq(number).

A row represents a named tuple, that is, a tuple where each position has a distinct name and type. These metadata is also denoted along with the type name. For example, row(number a, string b) is a row with two properties, a is a number and b is a string. An example expression with that type is (42 as a, 'John' as b). Is is important to note that every row is also a seq(T), where T is the minimum common type to all fields.

Pipes has functions and operators to deal with sequences. The main one is <seq> |> <&fn> → <object>. It transforms sequences. For example, to get only the prices from a list of products inside an event, one can do:

Sales
=> products:seq |> price#

Gets a list of products inside a Sales event and picks only the price property as a number.


If you use a aggregation in the right side of a |> operator, it applies the aggregation to all elements in the sequence and returns a single value. For example, to get the sum of all product prices.

Sales
=> products:seq |> sum(price#)

Gets a list of products inside a Sales event and sums all the prices.


You can even use other pipes inside the |> operator, for example:

Sales
=> products:seq |> @filter(name:startswith('M&M')) |> sum(price#)

Gets a list of products inside a Sales event and sums all the prices for products whose name start with "M&M".


To know more about...


1.6. Macros and user functions

In Pipes, you can define macros to reduce code duplication. For example:

def last_gt(x, v):
    last(x#):if(x# > v);

WebAccess => response_time:last_gt(2), response_time:last_gt(3) every hour

Gets the last response time greater than 2 and 3, respectively.


It is possible to define new user pipes with macros, for example:

def @last_gt(x, v every &out):
    @filter x# > v => last(x) every=out;

WebAccess
=> [
    @last_gt response_time, 2 every hour
    join
    @last_gt response_time, 3 every hour
]

Gets the last response time greater than 2 and 3, respectively.


There is one special kind of macro that can also be used inside filters: constants. They always start with @@. For example:

def @@states: ('NY', 'CA');

Orders state:@@states => sum(value#) every hour

Gets all orders from states NY or CA and yields their sum every hour.


To know more about...


2. Concepts

Pipes is a language and an engine to perform distributed aggregations over real-time streams. It enables stream processing with low latency and minimum memory footprint (constant for most operations).

The language is desined to be as intuitive as possible, expressing more explicitly the data flow. The goal was to provide a language you can read from left to right (no visual backtracking) and still keep the declarative way to express the computations. The name Pipes is an allusion to the unix pipeline, which enables a powerful and elegant functional programming model on unix shells.


2.1. Events and filters

Pipes language operates over events. Events are (usually immutable) sets of (key, value) tuples and may have some time information assigned to it. In the Pipes event model, events have no intrinsic type. They're all part of a single public stream of tuples. When a type information is applicable, it is usually encoded as a property of each event (e.g. type or __type).

The engine makes no assumption on the nature, frequency or schema of the incoming events. Every operation is written keeping in mind that the event may not have the referenced field or the value may not be of same type always. It is safe to say events are pretty well representable by schemaless JSONs. As a Java library, Pipes' default configuration understands instances of java.util.Map container.

It is recommended that events have a property called 'timestamp', with a Java timestamp, that is the number of milliseconds since 1 January 1970 UTC. But even that is not enforced nor required.

Unlike a SQL database, stream processing allows very little preprocessing in the events (e.g. index creation), so the heavy optimization is done in the queries. This is why, most of the times, the queries will run in the same engine: this way, the engine can optimize and share the most processing between them.

The first part of a Pipes query always consist in selecting some events from that stream, using a filter (see Chapter 3: Filters). Filters select values from the public stream. Each filter contains basically predicates about some fields and boolean operations between them. This has a great potential for optimization. For example, given the filters type:http status:404, type:http status:200 and type:http status:(2?? | 3??), the engine builds automata like the ones in the picture bellow:

Filters automata

Filters automata


2.2. Chained computation model

After filtering the public stream, you can chain the result through one or more pipes (see Chapter 4: Pipes). Each pipe may transform, filter or aggregate the results from the previous one. In the language, this chaining is represented by the operator =>.

Pipes computation model

Pipes computation model

Each pipe in the chain can have one or more output rates. For example, a pipe can receive an input every time the filter matches some event in the public stream, but only output events in batches of one minute. There are five different types of output: every time period, every item, every batch, every condition, and at the end. For more information, see Section 2.3: Output rates and aggregation windows.

Inside each pipe, you can interact with events using expressions that access, transforms or aggregates its properties. For more information, see Section 2.5: Expressions.

Most pipes are implemented using parallel algorithms that allow seamless distribution over multiple physical machines. Usually, each pipe can be classified in three groups:

  • safe: pipes that can execute completelly in parallel without affecting the final result (e.g. filter pipe)
  • semi-safe: pipes that can execute most of their work in parallel, but must merge their output with another nodes (e.g. simple pipe)
  • unsafe: pipes that must execute in a single node in order to compute the correct result (e.g. compress pipe)

When writing a typical query, it's likely your pipes will distribute as follows:

Typical pipe in single machine

Typical pipe in single machine

Typical pipe in multiple machines

Typical pipe in multiple machines


2.3. Output rates and aggregation windows

Every pipe in the chain will have one or more output rates. In this section, we will focus on how to declare them in simple pipe, but the concept itself can be applied to any pipe type.

There are some types of output rates: every time period, every item, every batch, every condition, and at the end.

  • every <period>: outputs every time the pipe's internal clock ticks a constant period. E.g. every 5 seconds
  • every <number> items: outputs every time the specified amount of items is processed by the engine. E.g. every 5 items
  • every <number> batches: outputs every time the specified amount item batches is processed by the engine. E.g. every 5 batches
  • every <boolean>: outputs every time the specified condition becomes true. E.g. every (x>x:prev), every \SomeEventType
  • at the end: outputs only at the end of execution. Useful for summary queries.

The available period units are:

unitalso acceptsequivalent to
millisecondmilli, ms
secondsec1000 ms
minutemin60000 ms
hour360000 ms
day
weekwk
monthmon
bimester2 months
quarter3 months
semester6 months
yearyr

Keep in mind that in the case of simple pipe, there may be a difference between the declared rate and the effective rate. In the query bellow, for example, the declared output rate in the last pipe is every item, but effectivelly the query outputs every 10 seconds.

* => count() every 10 seconds => count, prev(count) every item

Outputs every 10 seconds

This happens because when writing a query, you declare the output with pipe's local view. But the effective output rate considers the entire query, from the filter to the last pipe. In the example above, although the last pipe outputs every time it receives two items

Also in simple pipe, it is allowed to aggregate over a period other than the output. You can, for example, output a result every minute, that corresponds to the last 5 minutes of aggregation. It is an aggregation over time window.

The window concept only applies to the simple pipe.

In traditional stream processing libraries, every event is stored inside a data window and the aggregation runs over them. This allows a fine grained configuration of outputs vs windows. In Pipes, the goal is to use the least resources possible. So, instead of storing events inside the window, we only store the aggregation outputs (in a mergeable format, see Section 2.5.6: Aggregations state representation). The picture below exemplifies:

Windows merges aggregations outputs

Windows merges aggregations outputs

This way, the memory footprint of a query is very low and predictable. Of course this approach introduces some limitations (e.g. the window size must be a multiple of the output rate) but the benefits pay it off.

There are four types of windows:

  • over last (<period>|<number> items|<number> batches): aggregates over last <period> / <output_rate> outputs. E.g. over last 5 minutes
  • over current (<period>|<number> items|<number> batches): aggregates since the beginning of the current period. E.g. over current day
  • over span <string>: aggregates over some timespan (cf. Chapter 8: Timespan Language). E.g. over span 'last 2 hours and 5 minutes'
  • over all: aggregates over all events, merges all outputs.

It is worth noticing that item-based windows can only be used with item-based outputs, but time-based windows can be used with all but at the end outputs. In fact, at the end pipes support no window definition as there is only a single output at the end. Below, there is a table with examples showing what is valid and what is not.

window/output examples every 2 minutesevery 2 itemsevery 2 batchesevery (count()==2)at the end
over last 10 minutes✔ valid✔ valid✔ valid✔ valid✖ invalid
over current 10 minutes✔ valid✔ valid✔ valid✔ valid✖ invalid
over last 10 items✖ invalid✔ valid✖ invalid✔ valid✖ invalid
over current 10 items✖ invalid✔ valid✖ invalid✔ valid✖ invalid
over last 10 batches✖ invalid✖ invalid✔ valid✖ invalid✖ invalid
over current 10 batches✖ invalid✖ invalid✔ valid✖ invalid✖ invalid
over span 'last 10 minutes'✔ valid✔ valid✔ valid✔ valid✖ invalid
over all✔ valid✔ valid✔ valid✔ valid✖ invalid

2.4. Grouping and group expiry

Some pipes, including the simple pipe, allow grouping the events by one or more properties during the processing. This can be done by appending by prop1, ..., propn in the pipe declaration.

* => avg(memory#) by host every minute

Yields, every minute, the average memory consumption of every host with an event in the last minute.

* => @onchange state by host

Only outputs events when the value of the variable state changes for that host.

Not all pipes support the grouping syntax, and each one defines the behavior of the grouping syntax, but most of the time, it will mean the engine will keep a separate state for each tuple on the grouping expression.

If the state for every tuple is kept forever, long running queries can leak memory, because once a group state is created, it will never be discarded. The engine tries its best to reclaim unused groups as soon as possible. For most simple pipes, the aggregation TTL information is used. That is, if the TTL for all agregations in a group state have expired, the group is automatically removed.

But for pipes with item outputs and some other pipes, it may be impossible to predict when a group has to expire. That's what the expiry keyword is for. It defines the maximum period a group state can be kept before it is expired and purged from memory.

* => avg(memory#) by host expiry 10 minutes over last 32 items every item
* => @onchange state by host expiry 10 minutes

In both cases, if a host doesn't receive an event in 10 minutes, its state will be purged.

The expiry keyword is available only since Pipes v0.15.


2.5. Expressions

Expressions access and transform data from events. We will introduce them here, explaing Pipes' type system and how aggregations work. For detailed information on expressions, please refer Chapter 5: Operators, Chapter 6: Scalar Functions and Chapter 7: Aggregation Functions.


2.5.1. Type system

The Pipes language is statically typed, but has a very simple type system. There are four main types: number, string, boolean and object. All types inherit from object. There are a few other types, used in some specific functions. The table below summarizes all types:

typejava equivalentexample
numberjava.lang.Double42, 42.0, 1e42
stringjava.lang.String"42"
booleanjava.lang.Booleantrue, false
objectjava.lang.Objectnull
comparablejava.lang.Comparable
periodnet.intelie.pipes.time.Period1 day, 42 minutes, 1 month {America/Sao_Paulo}
seqjava.util.Iterable(1, 2, 3, 4), range(10)
rownet.intelie.pipes.Row(123 as abc, 456 as defg)
mapjava.util.Mapnewmap('key1', 42, 'key2', 123)

Pipes does not require events to be strongly typed (although it allows them to be). Because of this, the type of a property access may be not known at compile time. Still, the language is statically typed, so some type must be assigned to the expression. The chosen type depends on the configuration, but the default is to be a string. This means that even if the value of the field in the event contains a numeric type, it will be cast as string before being used by the engine.

Types are checked in compile time. For example, trying to use a function that requires a number passing a string (like avg(someproperty)) results in error:

PipeException: Error in call avg(someproperty) with types <PropertyAccess[string]>, cause(s):
AvgAggregation: The parameter #1 <someproperty> must be 'number' instead of 'string'.

It is possible to tell the engine what type an expression must have, using the type's name as a function. E.g. number(someproperty) indicates that someproperty is a numeric value and will coerce to a number in cases it isn't. When used with other expressions, those functions also convert values: number("42") will just return 42.

The string and number types also have shorthands for these functions:

  • number(someproperty) is the same as someproperty#
  • string(someproperty) is the same as someproperty$

2.5.2. Type arguments

Some types also have arguments. In special, seq has a type argument that informs the type of all elements in the seq. E.g., the literal (1, 2, 3, 4) is a seq(number).

This is specially useful when using the pipe @for <named(seq)>, <named... additional>:

* => @for (1,2,3,4) as x

x in the query will be a number in the next pipe

The row type has an also an argument telling the field list of that row. E.g., in the query:

* => A, B# by C# => last(*) as result every minute

the field result is a row(timestamp<number>;C<number>;A<string>,B<number>)

An instance of typed row has three main uses:

The types row and seq share an intimate relation, because every row is also a seq by polymorphism. The type argument of the underlying seq depends on the types of the row. A row(number, number) is a seq(number), but a row(number, string) is a seq(comparable).

Examples

*
=> id, name, value#
=> last(*) every minute
=> @yield

Selects fields and outputs the last every minute. Please note that last(*) outputs a single field of type row with 3 fields in its metadata and the pipe @yield expands them into an event.

*
=> id, name, value#
=> expand last(*) every minute

Equivalent to above. But the pipe @yield accepts any scalar, while the expand keyword only works with row values with metadata.

* => expand name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as {extracted_*_name}

Yields an event with the fields extracted_first_name and extracted_last_name.


2.5.3. Literals

Most Pipes's literals are well known in many programming languages. In special, we can enumerate:

Numeric literals

Since Pipes has no integer data type, only number, the only numeric literals are those in the following examples: 42, 42.1, 1e42.

Boolean literals

Only true or false.

Also, not exactly a literal, but you can write boolean filters with the filter syntax using the operator \<filter>.

Null literal

null

String literals

There are some flavors of string literals:

nameexampleescaping?interpolation?useful for
Single quoted 'abcde'✔ yes✖ noordinary strings
Single quoted rawr'\(some text\)'✖ no✖ noregexes
Double quoted "abc${x#+y#}def"✔ yes✔ yesinterpolated strings

Row and seq literals

You can write row instances using the syntax as in the following examples: (1, 2, 3, 4), (123 as propa, '456' as propb).

The row will be strongly typed. And because of the polymorphism, the row will also be a strongly typed seq using the most specific type that covers all the values. For example, (1, '2', 3) will be a row(_1<number>, _2<string>, _3<number>), but also a seq(comparable).

If you wish to define a row with only one element, you MUST use a comma after the single element ((42,)). If you don't, the compiler will treat the value as an expression, i.e. (42) != (42,).


2.5.4. Sequences and Rows

Since Pipes 0.13, queries and functions can benefit from the concept of sequences and rows. The type row already existed before, but it has been completely overhauled in 0.13.

A sequence is implemented by the type seq. As the name suggests, it represents a (potentially infinite) sequence of objects. It always has a type. For example, the literal (1, 2, 3) is a seq<number>, and (1, '2', 3) is a seq<comparable> (because both numbers and strings are a subtype of comparable).

A row is a subtype of sequence. In special, it is a finite sequence where each position has a specific type and name. The literal from the last paragraph, (1, 2, 3), is actually a row<number _1, number _2, number _3>, that inherits from seq<number>. The underlying seq type of a row is always the minimum common type between all the elements.

Pipes provides many built-in functions and operators to deal with sequences and rows. Some examples: operator |>, aggregation amap(), aggregation areduce(), function mapnames().


2.5.5. Scalars and aggregations

Every Pipes expression can be either a scalar or an aggregation. We call it a level.

Scalars are expressions that receive an event and computes its result immediately. E.g. to get the length of a string, one can write len(someproperty). In this case, the whole expression is a scalar.

Aggregations are expressions that receive several events but only computes its result at the end of the window. E.g. to get the average of a property during the window, one can write avg(someproperty#). In this case, the whole expression is an aggregation.

When a scalar has only constant values (and operations over them) we can say it is a constant. E.g. the literal value "some value" is a constant. len("some value") is also a constant, and may be optimized in compile to time to the value 10.

Most functions just assume the same level as some (sometimes all) of its parameters. E.g. in the expression len(first(someproperty)) the resulting expression is an aggregation, because first(someproperty) is also an aggregation. When a function always result in an aggregation, we refer it just as aggregation.

Some functions (mostly aggregations) require parameters to have some specific level. E.g. in the aggregation avg(<number>, [<number weight>]) → <number>, the parameter must be a scalar. For example, trying to compile avg(first(someproperty#)) result in an error:

PipeException: Error in call avg(first(someproperty#)) with types <FirstAggregation[number]>, cause(s):
AvgAggregation: The parameter #1 <first(someproperty#)> must be a scalar instead of an aggregation.

For most purposes, every constant is also a scalar and every scalar may act like an aggregation, if required to.

Level set diagram

Level set diagram


2.5.6. Aggregations state representation

One of the main features in Pipes is the ability to run aggregations distributedly. To make this possible, most aggregations are written using parallel versions of the algorithms. Most of the solutions presented in this section only affect how semi-safe pipes (see Section 2.2: Chained computation model) represent their result over the wire. Additionally, this same mechanism is used by windows in simple pipe (see Section 2.3: Output rates and aggregation windows) and meta-aggregations (e.g. overlast, overall) to merge the results from many outputs.

Aggregation tree-merge-eval model

Aggregation tree-merge-eval model

Lets take as example the average aggregation. Suppose there will be two machines calculating the query type:http => avg(response_time#) every minute in parallel. They share their results every minute. Suppose:

  • Machine 1 received its events and calculated the average response time is: 237.44
  • Machine 2 received its events and calculated the average response time is: 1061.08

Using only this information, it is impossible to know the global average value. If we just take the mean value from both results (649.26) this value could be wrong, because the machine 1 could have received way more events than machine 2, so its result should weight more in the global result.

That's the reason why every aggregation has an internal state representation more complex than just its result. In the case of average aggregation its state representation has two fields: "mean" and "sumw".

  • mean holds the current result
  • sumw holds the sum of all weights (or the count, in the case of unweighted average)
Aggregations state representation

Aggregations state representation

Some aggregations have a more complex inner state, like the distinct count aggregation, that uses the HyperLogLog data structure and must share the sketch content with the other nodes to merge its results. But even that state uses constant memory for its representation.

There is a meta-aggregation in pipes called describe that operates over other aggregations, but instead of yielding the result at the end of the window, it show a json representation of the aggregation's inner state. Using the same example as before, writing describe(avg(response_time#)) would return a value like:

{"sumw":262.0,"mean":416.628854962}

2.5.7. Aggregation TTL

When using simple pipe with a non-empty group, the query tries to reduce memory consumption by removing any group that has not received any event in the last output period. Most aggregations do not hold any state other than the required in the last period, so this is fine. But some aggregations need additional state (e.g. overlast, prev) and may prevent this memory reclaim. There are even aggregations that need to be kept in memory forever to work properly (e.g. overall).

To solve this problem, every aggregation has a property called TTL (time to live). Which defines how long it should prevent row state from being deleted from a query if no events are received in that group. If there is no grouping, the property is ignored.

Most aggregations have TTL=1, but some aggregations with persistent state may have longer TTLs (even infinite). There is also the function <object>:keep([<number ttl>]) → <object> that allows to change the TTL of any expression to any desired value.


2.5.8. Property access

Property expressions are the way to access event's properties. In a glance, every identifier denotes a property access. Properties are scalar expressions, but they can also be used in aggregation pipes, yielding the last evaluated expression.

* => last(cpu#), len(host) by host

In the example above, len(host) is a scalar, but acts as aggregations and returns the length of the host name.

Sometimes, there are property names that collide with keywords in the language. Or cases where the property name doesn't match the traditional identifier syntax. In this case, you can use curly braces to define the property name, like in the example below:

* => last({CPU usage value}) as cpu, len(host) as {product} by host

The way properties are fetched is specific in each configuration, but by default, it reads values from a java.util.Map instance, always as a string (casting to it when necessary). To access it as a number, for example, use the cast operators, e.g. prop# or prop:number().

Some configurations may allow indexing properties access. It may be done by using brackets. For example, assuming the event has a property config that is itself a java.util.Map:

* => config['hostname']

This indexing is identic in syntax to the indexing operator (<object>[<object...>] → <object>), but the semantics described here only apply to property access and is useful only to apply indexing before the usual cast occurs. E.g. someprop is a string, but someprop[0] isn't the first character of the string in someprop. It has the indexing semantics defined in configuration. Usually this is necessary for multivalued fields. Or fields you already know that contain a list instance.

The property expression can also be used to access values from previous pipes. In this case, the value will have the same type as defined in the referenced pipe. Example:

*
=> avg(cpu#) as avg, stdev(cpu#) as stdev every minute
=> avg-2*stdev as lower, avg+2*stdev as upper

2.5.9. Special expressions

There are also two special expressions that act like properties, but have special semantics: _ and *.

The expression _ returns the value of the default property. In untyped pipes, this is the entire event. In typed pipes, it is the first property of the previous pipe. I.e., in untyped pipes, _ is equivalent to *, and in typed pipes, it is equivalent to {0}.

host1 | host2
=> avg(cpu#) as avg by _ every minute
=> _*2 as doubled
host:host1 | host:host2
=> avg(cpu#) as avg by * every minute
=> avg*2 as doubled

The expression * returns the raw input event. Very useful when you don't want to repeat every field present in the event (or when you don't even know them in advance).

host1 => last(*) every second => @yield

Yields only the last event every second. In the example above, the expression last(*) has the type object and is as weakly typed as the input stream.

host1
=> cpu#, memory#
=> last(*) every second
=> last->cpu, last->memory

In the (contrived) example above, the expression last(*) has the type row and is strongly typed. That's why when we use the operator 'peek' on the property last, the resulting values are also strongly typed.


2.5.10. Function call

Pipes language allows previously-configured functions to be called inside queries. The default syntax has nothing new. You can call a function by using parentheses after the function name. E.g. fn(param-1, param-2, ..., param-n). You can check the functions available by default at Chapter 6: Scalar Functions and Chapter 7: Aggregation Functions. Please note that it is possible to add other functions by configuration, so it isn't a complete list.

Other than the default syntax, there are other ways to call a function. For example, the syntax a:fn is equivalent to fn(a) and a:fn(b) is equivalent to fn(a, b). You can find a table bellow with all the alternative syntaxes for method call.

syntax equivalent to
a:fn fn(a)
a:fn(b, c) fn(a, b, c)
a#fn fn(a#)
a#fn(b, c) fn(a#, b, c)
a$fn fn(a$)
a$fn(b, c) fn(a$, b, c)
:fn fn(_)
:fn(b, c) fn(_, b, c)
#fn fn(_#)
#fn(b, c) fn(_#, b, c)
$fn fn(_$)
$fn(b, c) fn(_$, b, c)

2.6. Macros and User Functions

Since Pipes 0.14, uses can write their own functions to use on queries. The functions can be declared even inside a query (before the pipes) to reduce expression duplication. A simple function can be defined as:

def f(a, b) "sums two numbers":
    a+b;
=> f(2, 2), f(2, 3) at the end

Declares and uses the function f(a, b), that sums both arguments.

Please notice that user functions must have a semicolon at the end of its declaration. Also, the string after the function header and before the colon is used to create its documentation entry.

Despite of the name "user functions", they're actually macros. In practice, the function body is expanded with the parameters in compile time. This makes impossible to use recursion, for example.

You can also define user pipes with the same concept:

def @filtereven(a):
    @filter a%2==0;
* => @filtereven x#

Declares and uses the pipe @filtereven x, that selects only the events where the argument is even.

And, similarly, to declare constant macros:

def @@answer: 42;

=> @@answer at the end;

User functions can have optional arguments with default values, for example:

def f(a, b=2):
    a+b;
=> f(2), f(2, 3) at the end

Declares and uses the function f(a, b), where the argument b is optional.

The default expansion mode for the arguments is "at declaration", but sometimes this may cause problems with functions that receive an expression (like the amap() aggregation), so in this cases, it may be useful to pass an argument to be lazily expanded, like this:

def aggregate_both(&fn, a, b):
    (a,b):amap(fn);
* => expand aggregate_both(:last, response_time, host) at the end

Declares and uses the function aggregate_both(a, b, fn), that expands to an aggregation row with two fields, one for each previous argument.

To create a function that receives an arbitrary number of arguments, add an asterisk before the last argument. The argument will be passed as a row, with one field for each additional argument.

def aggregate_all(&fn, *fields):
    fields:amap(fn);
* => expand aggregate_all(:last, response_time, host) at the end

Declares and uses the function aggregate_both(a, b, fn), that expands to an aggregation row with as fields as passed to the function.

Since Pipes v0.19, you can use the def* syntax to define functions that will be applied successively to all arguments passed and return a tuple of results. This is specially useful for metaprogramming techniques.

def* f(name):
    count(name:property) as (name);

* => explode f('response_time', 'product_id') every hour

Compiles the pipe * => count(response_time) as response_time, count(product_id) as product_id every 1 hour.


3. Filters

Every query in Pipes is required to start with a filter. The filter runtime is optimized to allow many queries running over the same stream, sharing as much processing as possible.

The filter syntax is different from the rest of the syntax. It is inspired in the Lucene's query syntax.

Examples:

type:http status:404

Filters all records where the field "type" has a value "http" and "status" has a value "404"


3.1. *

Special filter that allows all records through.

When used in conjunction with field syntax, selects all records that contain that field.

Examples:

*

All records (no filter)

type:*

Filters records that contains any value in the field "type".


3.2. <term>

Selects records where one of the current fields matches <term>.

The match is case insensitive.

The use of wildcards (* and ?) is allowed.

Use the field syntax to change the default field.

Examples:

http

If the default field is "somefield", selects records where "somefield" has a value "http" (case insensitive)

otherfield:http

Selects records where "otherfield" has a value "http" (case insensitive)

otherfield:http*404

Selects records where "otherfield" has a value that starts with "http" and ends with "404" (case insensitive)


3.3. <term>~<number maxEdits>

Selects records where one of the current fields matches <term> with at most <maxEdits> edits.

The match is case insensitive.

Wildcards are not allowed in this filter.

Examples:

http~2

If the default field is "somefield", selects records where "somefield" has a value similar to "http" (e.g. "hxxp").


3.4. [<term lower> TO <term upper>]

Selects records where one of the current fields is between <lower> and <upper>.

The match is case insensitive.

The range filter is inclusive in both ends. To make it exclusive, change "[" to "(" and/or "]" to ")".

The TO keyword must be written in uppercase. It can also be replaced by a single comma with no change in meaning.

Wildcards are not allowed in this filter. Except for the single *. In this filter it means an unbounded end.

There are shorthand syntaxes for some versions of this query:

  • [lower TO *] can be writen as >= lower
  • (lower TO *] can be writen as > lower
  • [* TO upper] can be writen as <= upper
  • [* TO upper) can be writen as < upper

Examples:

status:[200 TO 299]

Selects records where status is between 200 and 299

status:[200, 300)

Selects records where status is between 200 and 299 (300 exclusive)

status:[200, *]

Selects records where status is greater than or equal to 200.

status:>=200

Same as the filter above.


3.5. <field>: <filter>

Sets the current field to <field>. Usually followed by <term> (e.g. somefield:someterm).

Any filter expression can be used inside a filter field. Even other filter fields.

Examples:

type:http

Selects records where the field "type" is equal to "http".

type:(http | cpu)

Selects records where the field "type" is equal to "http" or "cpu".


3.6. <filter> & <filter>

Selects the intersection of two other filters.

It's the default filter conjunction, so it can safely be omitted.

The operator & is equivalent to && and AND (must be uppercase).

Examples:

http verb:get status:404

Selects records with the default field equal to "http", "verb" equal to "get" and "status" equal to "404"

http & verb:get & status:404

Same as above, but with explicit &

tag:(important & resolved)

Selects records with the field "tag" containing both "important" and "resolved" values


3.7. <filter> | <filter>

Selects the union of two other filters.

The operator | is equivalent to || and OR (must be uppercase).

Examples:

http | verb:get | status:404

Selects records with the default field equal to "http", "verb" equal to "get" or "status" equal to "404"

tag:(important | resolved)

Selects records with the field "tag" containing either "important" or "resolved" values


3.8. -<filter>

Selects the complement of another filter.

The operator - is equivalent to ! and NOT (must be uppercase).

Examples:

-verb:get

Selects records with the field "verb" different of "get"

tag:(a* -abnormal)

Selects records with the field "tag" starting with "a", but not equal to "abnormal"


4. Pipes

Pipes are the main building blocks in this language. No surprise the language is named after them. Pipes represent one single processing element that consumes events from the input and produces other events in the output.

The first pipe reads from the filtered public stream. Then, the output from one pipe can be chained as the input to another using the operator =>.

Each kind of pipe has its own properties, like rate of output and data window type and size. Some pipes can run on a distributed environment, some cannot. Some pipes may consume very low memory, some may need a lot of memory to run. All of these properties are derived and checked in compile time.


4.1. <named...> [by <named...>] [over <window>] [every <period> | at the end]

Transforms or aggregates records over configurable data window and output.

This is the default type of pipe. It receives events, aggregates them and output the result based on what is configured on the every clause. By default, the aggregation resets its state on every output, but the event retention can be configured on the over clause. The acceptable values for both clauses are discussed at Section 2.3: Output rates and aggregation windows.

The first part of the pipe defines which aggregations will be made. It accepts all kinds of expressions and treats them all as aggregations. All expressions can act as aggregations. The expression can receive an automatic name based on its contents or you can defined custom one using the keyword as. Example:

* => sum(x#)/count()**2 as value

Outputs the sum of the variable x divided by the event count squared every second.

If the expression is of type row and has metadata associated to it, you can also use the keyword expand to expand its values. Fore more info, see Section 2.5.2: Type arguments. It's worth noting that expand keyword skips any timestamp fields the row might have.

The by clause allows one to group the results by one or more scalar values. Aggregations are not allowed in the by clause, only scalar values. When grouping, the group only outputs if at least one of its aggregations has still TTL, this usually means that it only outputs if the group received some event in the last period, but there are exceptions. For more info, see Section 2.5.7: Aggregation TTL.

All clauses are optional, so the simplest possible pipe is an empty string, which is equivalent to count() every 1 second. Examples:

* =>

Outputs how many events match the query type:http every second.

* => by host

Outputs how many events match the query type:http every second, aggregating by host.

This pipe can be classified as safe, semi-safe or unsafe, depending on which expressions, period and output are chosen. For more information on pipes safety, see Section 2.2: Chained computation model.

The simple pipe is safe if and only if all the expressions are at least scalars, it has no by nor over clauses and the every clause is ommitted or exactly every item. This means in practice the pipe is safe if it is only a stateless transformation item-by-item of the input stream. Example:

* => id, name, value# * 3.14

The simple pipe is unsafe if and only if the output type is every <n> items or it has some aggregation that forces the pipe to be unsafe (e.g. smooth). This means in practice the pipe is unsafe if it depends on some total order in the input, achieved running in a single node. This is true for item batch outputs and some aggregations. Please notice that unsafe pipes with no safe or semi-safe pipes before must be preceeded with @unsafe. Example:

* => @unsafe => avg(value#) every 10 items
* => @unsafe => smooth(value#) every 10 seconds

The simple pipe is semi-safe in all other cases, i.e., when the output is by time period and it has no aggregation that makes it unsafe.

* => avg(value#) every 10 seconds

4.2. <pipe> join [on <fields...>] <pipe>

Computes the cartesian product of the outputs from two pipes with same output rates.

The outputs from two pipes are compatible if they are same type. For example every 5 seconds is compatible with every 7 seconds, but not with every 5 items. In this case, the resulting pipe would assume both outputs as its own output definition.

The resulting pipe will have all the fields from both queries (some fields may be renamed to avoid collision).

When defining fields in the on clause, the fields must be present in both pipes and must have the same type. A compilation error will occur if these restrictions are not met. Also, fields in the on clause aren't renamed, so they'll happen only once in the resulting event.

Examples:

* => [
    sum(value#) by host
    join
    sum(value#) as total
]

Computes the sum grouped by host with a field containing the total sum in each group.

The example above would generate events like:

{"timestamp":1399927869000, "host":"host1","sum_value":42.0, "total_sum":129.0}
{"timestamp":1399927869000, "host":"host2","sum_value":43.0, "total_sum":129.0}
{"timestamp":1399927869000, "host":"host3","sum_value":44.0, "total_sum":129.0}

You can also define a field to join the results on. For example, if you need to calculate the metrics in previous example by host, you can join on this field using:

Metrics
=> [
    @filter \metric_name:CPU => avg(value#) as cpu by host every minute
    join on host
    @filter \metric_name:Memory => avg(value#) as mem by host every minute
]

Outputs the average cpu and memory for each host every minute, in the same event.

The example above would generate events like:

{"timestamp":1399927869000, "host":"host1","cpu":89.0, "mem":1314.0}
{"timestamp":1399927869000, "host":"host2","cpu":21.0, "mem":715.55}
{"timestamp":1399927869000, "host":"host3","cpu":40.0, "mem":16111.7}

Use ljoin, rjoin and lrjoin for outer joins.

Metrics
=> [
    @filter \metric_name:CPU => avg(value#) as cpu by host every minute
    lrjoin on host
    @filter \metric_name:Memory => avg(value#) as mem by host every minute
]

Outputs the average cpu and memory for each host every minute, in the same event.

The example above would generate events like:

{"timestamp":1399927869000, "host":"host1","mem":1314.0}
{"timestamp":1399927869000, "host":"host2","cpu":21.0, "mem":715.55}
{"timestamp":1399927869000, "host":"host3","cpu":40.0}

4.3. <pipe> union <pipe>

Concatenates the outputs from two pipes with compatible output rates.

The outputs from two pipes are compatible if they are same type. For example every 5 seconds is compatible with every 7 seconds, but not with every 5 items. In this case, the resulting pipe would assume both outputs as its own output definition.

If both pipes' type definitions are also compatible, the resulting pipe will assume the first pipe's types and field names. The field names must not be equal between the pipes, only their types and order.

Example:

* => [
    sum(value#) by host
    union
    'Total', sum(value#)
]

Computes the union of the sum grouped by host with the total sum. Note that the fields in the by clause come first in the final order, that's why 'Total' comes first in the second pipe.

The example above would generate events like:

{"timestamp":1399927869000, "host":"host1","sum_value":42.0}
{"timestamp":1399927869000, "host":"host2","sum_value":43.0}
{"timestamp":1399927869000, "host":"host3","sum_value":44.0}
{"timestamp":1399927869000, "host":"Total","sum_value":129.0}

4.4. @chain [<seq expr>]

Chains many sequences in a batch into a single sequence

If the expression is not defined, it is assumed to be _.

Examples:

*
=> @chain (x#, y#)

Yields a seq(number) for each batch, with 2 numbers for each event in the input.

*
=> (x1:seq, x2:seq) |> @chain

Concatenates the sequences x1 and x2 for each event.


4.5. @compile <string>, <obj... args>

Compiles a constant string into a pipe

Available since Pipes v0.14.7

The arguments are available as template macros starting from @@0.

Example:

*
=> @compile '@zip (x#, y#)'

Compiles the string into the pipe @zip (x#, y#).


4.6. @compress <number k>, [<number k2>,] <number... y> [by <object...>]

Alias to either @compress.pip (default) or @compress.uniform.


4.7. @compress.pip <number k>, [<number k2>,] <number... y> [by <object...>]

Compresses the result from the previous pipe to at most k most important rows.

Compress pipe keeps most of the previous pipe metadata, except for the output. It outputs only at the end. It is also an unsafe pipe (see Section 2.2: Chained computation model).

The pipe can optionally be grouped. In this case, it may be useful to define a global maximum to avoid that a group explosion causes the number of points to grow too much.

Examples:

*
=> count() every minute
=> @compress 300, _

Computes the event count every minute, but compress it to the 300 most important points at end of the query.

*
=> count() by host every minute
=> @compress 300, 1000, _ by host

Computes the event count by host every minute, but compress it to each host's 300 most important points at end of the query. Also limits to output at most 1000 points globally (even if there are 4 hosts or more).


4.8. @compress.uniform <number k>, [<number k2>,] <number... y> [by <object...>]

Compresses the result from the previous pipe to at most k rows.

Compress pipe keeps most of the previous pipe metadata, except for the output. It outputs only at the end. It is also an unsafe pipe (see Section 2.2: Chained computation model).

The pipe can optionally be grouped. In this case, it may be useful to define a global maximum to avoid that a group explosion causes the number of points to grow too much.

Examples:

*
=> count() every minute
=> @compress 300, _

Computes the event count every minute, but compress it to 300 points at end of the query.

*
=> count() by host every minute
=> @compress 300, 1000, _ by host

Computes the event count by host every minute, but compress it to each host's 300 points at end of the query. Also limits to output at most 1000 points globally (even if there are 4 hosts or more).


4.9. @debounce <period> [by <object...>]

Only outputs rows that follows <period> time without any output.

The exceeding rows are discarded. Due to its nature, this is an semi-safe pipe (see Section 2.2: Chained computation model). When running in distributed environment, this pipe is used as-is both in the map and reduce pipes.

Debouncing: the red events are discarded.

Debouncing: the red events are discarded.

Examples:

*
=> count() every minute
=> @filter _ > 100
=> @debounce 15 minutes

Alert when the number of events in a minute is greater than 100, but only outputs if there was a silence of at least 15 minutes.

*
=> count() by host every minute
=> @filter _ > 100
=> @debounce 15 minutes by host

Same as previous, but grouping by host.


4.10. @empty [over <window>...]

Creates an empty pipe (pass-through) with optional window metadata

Available since Pipes v0.19.3

It is useful to include additional windows to query metadata.


4.11. @filter <boolean condition> [by <object...>]

Filters the results from previous pipe.

Filter pipe does not change any pipe metadata, it just repeats previous pipe info.

Since Pipes v0.14.8, this pipe has a version that also accepts simple aggregations.

Examples:

*
=> count() by host
=> @filter _ > 10

Filter only hosts that have event count greater than 10.


4.12. @for <named(seq)>, <named... additional>

Unwinds events in the seq instance

Available since Pipes v0.13

If the seq has a type argument, the resulting field has the same type of that argument.

It is possible to use the keyword expand if the inner type is a strongly typed row.

Examples:

*
=> @for products:seq as product

Creates one event for each element inside the field products.

*
=> @for range(10) as i

Creates 10 events with timestamp<number> and i<number>.

*
=> @for expand range(10) |> (_+1 as a, _$ as b)

Creates 10 events with timestamp<number>, a<number> and b<string>.

=> 100 as upto at the end
=> @for range(2, upto) as prime
=> @for range(1, floor(prime**0.5)+1) as j, prime
=> count(prime%j==0) by prime
=> @filter _==1
=> summary(prime$) as primes

Computes prime numbers up to 100.


4.13. @latest

Keeps the latest batch of input events and output it at the end.

This pipe is a convenience to output only the latest batch of events with the output at the end. This concept of item batch doesn't exist in the syntax, so it would be harder to write queries that are only interested in the latest value for each group, for example. They are especially useful when replaying events in the past for a graph that only shows the latest value.

@latest is an unsafe pipe (see Section 2.2: Chained computation model).

Examples:

*
=> count() by host
=> @latest

Outputs only the latest batch of counts by host at the end of the execution.


4.14. @makejoin <string... props>, <pipe...>

Creates a join pipe containing all pipes passed as arguments

Available since Pipes v0.19

Example:

def* @example(&fn):
    @filter product_id != null => fn by product_id over last hour every 2 hours;

WebAccess
=> @makejoin 'product_id', explode @example(count(), avg(response_time#))

Compiles a join of two pipes.


4.15. @makeljoin <string... props>, <pipe...>

Creates a left join pipe containing all pipes passed as arguments

Available since Pipes v0.19

See @makejoin <string... props>, <pipe...>.


4.16. @makelrjoin <string... props>, <pipe...>

Creates a full join pipe containing all pipes passed as arguments

Available since Pipes v0.19

See @makejoin <string... props>, <pipe...>.


4.17. @makerjoin <string... props>, <pipe...>

Creates a right join pipe containing all pipes passed as arguments

Available since Pipes v0.19

See @makejoin <string... props>, <pipe...>.


4.18. @makeunion <pipe...>

Creates a union pipe containing all pipes passed as arguments

Available since Pipes v0.19

Example:

def* @example(prop):
    avg(prop#) as value by nameof(prop) as property over last hour every 2 hours;

WebAccess
=> @makeunion explode @example(response_time, error_count)

Compiles a union of two pipes.


4.19. @meta <named...> [over <window>...]

Adds custom fields to query metadata

Available since Pipes v0.20

All values must be constant.


4.20. @onchange [<object...>] [by <object...>]

Outputs only events that change the last value of any expression in the group

Please note that the first event is always outputed, even if the property value is null.

Examples:

*
=> @unsafe
=> @onchange valid by host

Outputs only events whose value in the property valid is different from the last event.


4.21. @onfalse <boolean condition> [by <object...>]

Outputs only events for which condition evaluates false and was not false in the previous event

Please note that the first event is always outputed.

Examples:

*
=> @unsafe
=> @onfalse valid by host

Outputs only events whose value in the property valid is false and different from the last event.


4.22. @ontrue <boolean condition> [by <object...>]

Outputs only events for which condition evaluates true and was not true in the previous event

Please note that the first event is always outputed.

Examples:

*
=> @unsafe
=> @ontrue valid by host

Outputs only events whose value in the property valid is true and different from the last event.


4.23. @resample <period>, (<object>, <period>, <string method>...) [by <object...>]

Performs resampling of the incoming stream to match the specified period

This pipe resamples input list of fields using a interpolation function for each field. Currently, the available functions are: 'LAST' and 'LINEAR'.

'LAST' just repeats the last non-expired value before each sampling point.

'LINEAR' performs a linear interpolation between two adjacent points of the sampling period. It may require waiting for the next point to arrive. That makes this function unsuited for real-time queries. It can only work correctly when the pipe clock is advanced solely by its events.

Examples:

Metric
=> @resample 30 minutes, value#, 1 hour, 'LINEAR'
   by host, type expiry null

Resamples metrics to be outputed every 30 minutes, performing a linear interpolation when there is no exact point.


4.24. @seq

Changes the static type of any row to the highest corresponding seq.

Available since Pipes v0.13

Examples:

*
=> x, y#
=> @seq

Changes the type from row(timestamp<number>;;x<string>,y<number>) to seq(comparable).


4.25. @set <named...> [by <object...>]

Sets fields in the original event (both typed and raw)

Since Pipes v0.14.8, this pipe has a version that also accepts simple aggregations.

Examples:

*
=> @set (date+time):dateparse('yyyyMMddHHmmss') as timestamp

Adds a new field to the original event that is the timestamp parsed using both date and time fields from the event.

*
=> date, time, text
=> @set (date+time):dateparse('yyyyMMddHHmmss') as timestamp

Adds a new field to the original event that is the timestamp parsed using both date and time fields from the event. Equivalent to expand *, (date+time):dateparse('yyyyMMddHHmmss') as timestamp as the previous pipe is typed.


4.26. @skip <number> [by <object...>]

Skips some results from previous pipe.

Skip pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.

Examples:

*
=> count() by url, host
=> @sort _ desc by host
=> @skip 3 by host

Gets most accessed urls by host, except the 3 most accessed in each host.


4.27. @skipwhile <boolean>

Skips rows from every batch while some condition is true.

Available since Pipes v0.13

This pipe also accepts aggregation as condition. In this case, it will behave like every row is a batch and with a over all window.

Examples:

*
=> count() by url
=> @sort _ desc
=> @skipwhile :sum < 1000

Gets the most accessed urls, skipping until the access count sum is greater than or equal to 1000.


4.28. @slice [<number start>], [<number end>], [<number step>] [by <object...>]

Gets some rows (based on index) from previous pipe.

Available since Pipes v0.13

Slice pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.

Examples:

*
=> count() by url, host
=> @sort _ desc by host
=> @slice 2, 10, 3 by host

Gets the 2nd, 5th and 8th most accessed urls by host


4.29. @sort <sortfield... expr>

Sorts the results from previous pipe.

Sort pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.

The expressions can be any comparable, optionally followed by asc or desc to make the sort order explicit.

Examples:

*
=> count() by host
=> @sort _ desc, host

Sorts the hosts by event count descending, and then by host (when the counts are equal).


4.30. @take <number> [by <object...>]

Limits the results from previous pipe.

Take pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.

Examples:

*
=> count() by url, host
=> @sort _ desc by host
=> @skip 3 by host
=> @take 2 by host

Gets the 4th and 5th most accessed urls by host


4.31. @takewhile <boolean>

Takes rows from every batch while some condition is true.

Available since Pipes v0.13

This pipe also accepts aggregation as condition. In this case, it will behave like every row is a batch and with a over all window.

Examples:

*
=> count() by url
=> @sort _ desc
=> @takewhile :sum < 1000

Gets the most accessed urls, while the access count sum is lower than 1000.


4.32. @throttle [<number k>, ] <period|seq> [by <object...>]

Limits the output of the previous pipe to at most <k> rows per <period>.

The exceeding rows are discarded. This is an unsafe pipe (see Section 2.2: Chained computation model).

Throttling: the red events are discarded.

Throttling: the red events are discarded.

When a <seq> instance is passed, the throttle period will increase as the backpressure grows. This behavior exists since Pipes v0.14.7.

Examples:

*
=> count() every minute
=> @filter _ > 100
=> @throttle 15 minutes

Alert when the number of events in a minute is greater than 100, but throttles the output to once in every 15 minutes.

*
=> count() by host every minute
=> @filter _ > 100
=> @throttle 2, 15 minutes by host

Same as previous, but allows at most two events by host every 15 minutes.


4.33. @top <number k>, <sortfield... expr> [by <object...>]

Sorts the results and gets the first k rows (possibly grouped) from previous pipe.

Top pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.

The expressions can be any comparable, optionally followed by asc or desc to make the sort order explicit.

Examples:

*
=> count() by host
=> @top 10, _ desc

Outputs the 10 hosts with the most events.

*
=> count() by url, host
=> @top 10, _ desc by host

Outputs, for every host, the 10 urls with the most events.


4.34. @unbatch

Extracts a batch of events in many batches, one for each event.

Available since Pipes v0.13

Specially useful if used in conjunction with batch output type.


4.35. @unsafe

Marks that any pipe executed after this must run in a non-distributed environment.

The engine forces that each unsafe pipe must be preceeded with a safe or semi-safe pipe. It is recommended to avoid excessive communication between nodes. When you really must write a query that breaks this rule, the unsafe pipe must be preceeded with this pipe, that is in fact an empty pipe that forces the pipe split, making everything after it to run in the reduce node. For more information, see Section 2.2: Chained computation model.

Examples:

*
=> @unsafe
=> count() over all every item

Outputs the total count of events since the beginning of the query every time an item arrives.


4.36. @yield [<object expr>]

Extracts one field of the stream to be the output event.

If the expression is not defined, it is assumed to be _.

If the expression type is row and it has metadata, the pipe assumes the type's metadata. See concepts-expression-typeargs for more information.

If the expression is an aggregation, the pipe will yield the all the input rows applied to the aggregation argument.

Examples:

*
=> name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as name
=> @yield name

Yields an event with the fields first and last.

*
=> name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as name
=> @yield list(name)

Yields a single event for every batch with a list of names in that batch.


4.37. @zip [<seq expr>]

Takes elements from a batch of seqs, one by one and make a seq of seqs.

Available since Pipes v0.13

If the expression is not defined, it is assumed to be _.

Examples:

*
=> @zip (x#, y#)

Yields 2 sequences every batch, one with all the x# in the batch, other with all the y#.

*
=> (x1:seq, x2:seq) |> @zip

Generates a sequence like ((x1[0], x2[0]), (x1[1], x2[1]), ... ).


4.38. atomic <pipe>

Modify a pipe to run entirely in the same node, ignoring any distribution invariants.

This modifier is useful when the data is known to be already partitioned. For example, if you are sure that each host will be consistently sent to the same node, you can avoid distribution costs by writing aggregations using this modifier.

Compare the following picture with the one described in Section 2.2: Chained computation model.

Typical atomic pipe in single machine

Typical atomic pipe in single machine

Typical atomic pipe in multiple machines

Typical atomic pipe in multiple machines

Examples:

* => atomic count() by host every minute

Outputs the event count by host every minute. Each output will be processed in the node the event arrived, never being merged between nodes (because it assumes that the data is already partitioned by host)


5. Operators

Operators are elements of syntax that allows simple manipulation of scalar and aggregation values.

Every operator translates directly to a function call. E.g. the expression 2+2==4 is equivalent to .eq(.add(2, 2), 4).

You can find bellow a table with the operator group precedence, from highest to lowest. Operators inside same group have the same precedence.

Group Operators
Primary <object>-><identifier> → <object>
<object>->(<object>) → <object>
<object># → <number>
<object>$ → <string>
\<filter>
<object>[<object...>] → <object>
Unary not <boolean> → <boolean>
-<number> → <number>
^<object> → <object>
Null coaslescing <object> ?? <object> → <object>
Power <number> ** <number> → <number>
Multiplicative <number> * <number> → <number>
<number> / <number> → <number>
<number> // <number> → <number>
<number> % <number> → <number>
Additive <number> - <number> → <number>
<number> + <number> → <number>
<string> + <string> → <string>
<seq> + <seq> → <seq>
<map> + <map> → <map>
Comparative <comparable> > <comparable> → <boolean>
<comparable> >= <comparable> → <boolean>
<comparable> < <comparable> → <boolean>
<comparable> <= <comparable> → <boolean>
Equality <object> == <object> → <boolean>
<object> != <object> → <boolean>
Logical AND <boolean> and <boolean> → <boolean>
Logical XOR <boolean> xor <boolean> → <boolean>
Logical OR <boolean> or <boolean> → <boolean>
Ternary <boolean> ? <object>, <object> → <object>
Sequence transformation <seq> |> <&fn> → <object>

Only the ternary operator is right-associative. All other operators are left-associative. You can change both procedence and associativity by using parentheses. E.g. a+b*c is equivalent to a+(b*c).


5.1. \<filter>

Returns a boolean expression that evaluates true if the filter accepts the event.

The filter syntax is the same described in Chapter 3: Filters.

Everything after the \ is considered part of the filter, until it finds a ], ), , or =>.

Sometimes it is necessary to enclose the expression in parentheses to avoid ambiguities. E.g. \field:value as something should be (\field:value) as something. This is so because as is a valid expression in filter language and means something different there.

Examples:

CPU | Memory
=> [
    @filter \CPU => max(value#) as cpu every minute
    join
    @filter \Memory => max(value#) as mem every minute
]

Yields the maximum value every minute for both streams on the same event.

CPU
=> max(value#):if(\host:A) as cpuA,
   max(value#):if(\host:B) as cpuB
   every minute

Yields the maximum CPU value every minute for both hosts on the same event.


5.2. <object># → <number>

Coerces the expression to number. Shorthand to <object>:number() → <number>.

The expression a# is equivalent to number(a).

Example:

* => avg(response_time#) every minute

Yields the average of the field response_time every minute.


5.3. <object>$ → <string>

Coerces the expression to string. Shorthand to <object>:string() → <string>.

The expression a$ is equivalent to string(a).

Example:

* => 'Count: ' + count()$ every minute

Yields the number of events every minute as a string 'Count: X'.


5.4. <number> + <number> → <number>

Adds two numbers.

The expression a+b is equivalent to .add(a, b).


5.5. <string> + <string> → <string>

Concatenates two strings.

The expression a+b is equivalent to .add(a, b).


5.6. <seq> + <seq> → <seq>

Concatenates two seqs.

Available since Pipes v0.13

The expression a+b is equivalent to .add(a, b).


5.7. <map> + <map> → <map>

Concatenates two maps.

Available since Pipes v0.17.1

The expression a+b is equivalent to .add(a, b).


5.8. <number> - <number> → <number>

Subtracts one number from another.

The expression a-b is equivalent to .sub(a, b).


5.9. <number> * <number> → <number>

Multiplies two numbers.

The expression a*b is equivalent to .mul(a, b).


5.10. <number> / <number> → <number>

Divides one number by another (float division).

The expression a/b is equivalent to .div(a, b).


5.11. <number> /? <number> → <number>

Divides one number by another (float division). Legacy operator that returns 0.0 when the divisor is 0.0.

The expression a/?b is equivalent to .divzero(a, b).


5.12. <number> // <number> → <number>

Divides one number by another (integer division).

The expression a//b is equivalent to .intdiv(a, b).


5.13. <number> //? <number> → <number>

Divides one number by another (integer division). Legacy operator that returns 0.0 when the divisor is 0.0.

The expression a//?b is equivalent to .intdivzero(a, b).


5.14. <number> ** <number> → <number>

Raises one number to another's power.

The expression a**b is equivalent to .pow(a, b) or even pow(a, b) .


5.15. <number> % <number> → <number>

Returns the rest of the division of one number by another.

The expression a%b is equivalent to .mod(a, b).


5.16. <number> %? <number> → <number>

Returns the rest of the division of one number by another. Legacy operator that returns 0.0 when the divisor is 0.0.

The expression a%?b is equivalent to .modzero(a, b).


5.17. -<number> → <number>

Negates one number.

The expression -a is equivalent to .neg(a).


5.18. <boolean> and <boolean> → <boolean>

Returns the logical AND of two booleans.

The expression a and b is equivalent to a&b, a&&b and .and(a, b).


5.19. <boolean> or <boolean> → <boolean>

Returns the logical OR of two booleans.

The expression a or b is equivalent to a|b, a||b and .or(a, b).


5.20. <boolean> xor <boolean> → <boolean>

Returns the logical XOR of two booleans.

The expression a xor b is equivalent to a^b and .xor(a, b).


5.21. not <boolean> → <boolean>

Returns the logical NOT of a boolean.

The expression not a is equivalent to !a and .not(a).


5.22. <object> == <object> → <boolean>

Checks whether two objects are equal.

The expression a==b is equivalent to .eq(a, b).


5.23. <object> != <object> → <boolean>

Checks whether two objects are not equal.

The expression a!=b is equivalent to .neq(a, b).


5.24. <comparable> < <comparable> → <boolean>

Checks whether the left operand compares lesser than the right one.

The expression a<b is equivalent to .lt(a, b).


5.25. <comparable> <= <comparable> → <boolean>

Checks whether the left operand compares lesser than or equal to the right one.

The expression a<=b is equivalent to .lteq(a, b).


5.26. <comparable> > <comparable> → <boolean>

Checks whether the left operand compares greater than the right one.

The expression a>b is equivalent to .gt(a, b).


5.27. <comparable> >= <comparable> → <boolean>

Checks whether the left operand compares greater than or equal to the right one.

The expression a>=b is equivalent to .gteq(a, b).


5.28. <object>-><identifier> → <object>

Extracts a property from an object.

The expression a->identifier[123, 456] is equivalent to .property(a, 'identifier', 123, 456)

Example:

*
=> name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as match
=> '%s, %s':sprintf(match->last, match->first) as converted

Converts the name field to the format "Last, First".

*
=> last(*) as event every minute
=> event->name

Extracts name from the untyped field event.


5.29. <object>->(<object>) → <object>

Evaluates an expression in the semantic context of the target

The expression a->(b) is almost equivalent to .peek(a, b), but in the former case b is evaluated in the semantic context of a.

Example:

*
=> span('today')->(start*2, end, start+end)

Evaluates span('today') and changes it into another row object without having to go through another pipe.


5.30. <object>[<object...>] → <object>

Accesses an object by index/key in a list/string/map.

Available since Pipes v0.13

The expression a[b, c] is equivalent to .get(a, b, c).

Please note that this operator has special semantics when used directly after a property reference. So field[123] may be different from (field+field2)[123].

Example:

* => (field1+field2)[4]

Yields the 5th letter (index 4) of a string made of field1+field2.


5.31. ^<object> → <object>

Evaluates the expression in the parent semantic context (if any).

Available since Pipes v0.13

The expression ^a is almost equivalent to .hat(a), but in the former case, a is evaluated in the parent semantic context.

This operator is useful in conjunction with other operators that generate child semantic contexts, like <seq> |> <&fn> → <object> and <object>-><identifier> → <object>. In those operators, it may be necessary to evaluate some expressions in the parent context.

Example:

=> at the end
=> @for range(10)|>(range(10)|>"${^_}x${_} = ${^_*_}")|>@chain as line

Yields a multiplication table with one event for each line.


5.32. <object> ?? <object> → <object>

Returns the first if it is not null; otherwise, returns the second.

The expression a??b is equivalent to .coalesce(a, b).

Example:

* => first(name) ?? 'None' as first_name every second

Yields the first name in the window, or 'None' if no name was found.


5.33. <boolean> ? <object>, <object> → <object>

If the condition is true, returns the first object; otherwise, returns the second.

The expression a?b,c is equivalent to .iif(a, b, c).

Please note this is the only right-associative operator. It is so to allow constructions like:

AllMonitorings
=> (type == 'http' ? 'Http Monitoring',
    type == 'cpu' ? 'CPU Monitoring',
    type == 'mem' ? 'Memory Monitoring',
    'Unknown') as monitoring_type

Example:

* => count()%2==0 ? 'Even', 'Odd' as type every second

Yields 'Even' or 'Odd' every second, depending on the number of events in the window.


5.34. <seq> |> <&fn> → <object>

Transforms a sequence into another sequence or object.

Available since Pipes v0.13

The expression a|>b is almost equivalent to .transform(a, b), but in the former case, b is evaluated in the semantic context of a. E.g., in (1, 2, 3) |> _+1, _ is each element of the sequence on the left.

If the expression on the right is an aggregation, the result is the application of it into all elements of the sequence. E.g. range(10) |> sum(*) is the sum of all numbers between 0 and 9.

In the right operand, the expression _ has a special meaning. It always mean the entire object (like the * expression). This allows us to write the example above as range(10) |> :sum.

If the expression on the right is a scalar, the result is another seq with the expression applied to all the elements in the original sequence. E.g.: (1, 2, 3) |> $ casts all numbers in the sequence to string. If the original sequence is an instance of row, the result will also be a row with the same name, but with types changed, e.g. (1 as a, 2 as b) |> $ is the same as ('1' as a, '2' as b).

This operator also allows a special construction: the right operand may be a call to a pipe. In this case, all the elements in the sequence will be passed through the pipe as a single batch. E.g. range(20)|>@filter(_%2==0) returns all the even numbers between 0 and 19. Please note that the syntax requires parentheses, as it would be in a function call. The pipe syntax doesn't. If no arguments are needed, the parentheses are optional, e.g. (range(5), "abcde":split)|>@zip

Some pipes may use grouping, like @top. In this case, you can use it inside the parentheses, e.g. range(20)|>@top(2, _ desc by _%2).

The expression on the right is always evaluated on the context of the left operand. I.e. it behaves like each element of the sequence is an event with the same type as the sequence's type argument. You can use the operator ^<object> → <object> to access the parent semantic context, e.g. range(10)|>(range(10)|>^_*_) returns a multiplication table.

Example:

=> at the end
=> range(2, 100)
   |> @filter(range(2, floor(_**0.5)+1) |> not any(^_%_==0))
   as primes

Computes all primes less than 100.


6. Scalar Functions


6.1. <boolean>:compile.if(<object if_true>, <object if_false>) → <object>

Chooses one of the expressions based on a constant condition in compile-time

Available since Pipes v0.17


6.2. <map>:containskey(<object>) → <boolean>

Returns whether the target map contains the argument as a key.

Available since Pipes v0.17.1


6.3. <number>:abs() → <number>

Calculates the absolute value of a number.


6.4. <number>:acos() → <number>

Returns the arc cosine of the argument to an angle in radians.


6.5. <number>:aequal(<number>, [<number epsilon>]) → <number>

Returns true if a and b are approximately equal. If no number is given, epsilon defaults to 1e-6.

Available since Pipes v0.18.5


6.6. <number>:asin() → <number>

Returns the arc sine of the argument to an angle in radians.


6.7. <number>:atan() → <number>

Returns the arc tangent of the argument to an angle in radians.


6.8. <number>:atan2(<number x>) → <number>

Returns the two-argument arc tangent of the argument to an angle in radians.


6.9. <number>:bytes([<number precision>]) → <string>

Formats a number as the best possible byte multiple.


6.10. <number>:ceil([<number precision>]) → <number>

Returns the smallest number that is greatest than or equal to the argument.


6.11. <number>:chr() → <string>

Converts a unicode codepoint to a single-char string.

Available since Pipes v0.13


6.12. <number>:cos() → <number>

Returns the cosine of an angle in radians.


6.13. <number>:cosh() → <number>

Returns the hyperbolic cosine of an angle in radians.


6.14. <number>:dateadd(<period>) → <number>

Adds <period> to timestamp argument.

Since Pipes v0.14.7, this function accepts non-constant periods.

Example:

* => timestamp():dateadd(1 day) every second

Adds one day to the current timestamp.


6.15. <number>:datefloor(<period>) → <number>

Rounds timestamp down to the nearest date that is divisible by <period>.

Since Pipes v0.14.7, this function accepts non-constant periods.

Example:

* => timestamp():datefloor(1 day) every second

Returns the beginning of the current day.


6.16. <number>:dateformat([<string format>], [<string tz>]) → <string>

Formats timestamp using specified format

Example:

* => timestamp():dateformat("dd/MM/YYYY hh:mm", "UTC") as date every second

Formats the current timestamp assuming UTC time zone.


6.17. <number>:datesub(<period>) → <number>

Sutracts <period> from timestamp argument.

Since Pipes v0.14.7, this function accepts non-constant periods.

Example:

* => timestamp():datesub(1 day) every second

Subtracts one day to the current timestamp.


6.18. <number>:displaynumber() → <number>

Formats a number in unicode for display in small spaces

Available since Pipes v0.18.5


6.19. <number>:exp() → <number>

Calculates the exponential of a number.


6.20. <number>:floor([<number precision>]) → <number>

Returns the largest number that is lesser than or equal to the argument.


6.21. <number>:format([<string format>], [<string locale>]) → <string>

Formats a number according to format string and locale.

Examples:

* => 123.456:format(".00") every second

Format with 2 digits after decimal point.

=> 12345.678:format("#,##0.00", "pt-BR") as money every second

Format 12345.678 to '12.345,68' using Brazilian locale.


6.22. <number>:log([<number base>]) → <number>

Calculates the logarithm of a number.


6.23. <number>:milliseconds([<precision>]) → <string>

Formats a number representing a timespan in milliseconds

Available since Pipes v0.14


6.24. <number>:normdist(<number mean>, <number sdev>, [<boolean acc>]) → <number>

Calculates the gaussian distribution function value, optionally integrated.


6.25. <number>:pow(<number exp>) → <number>

Raises one number to another.


6.26. <number>:round([<number precision>]) → <number>

Rounds a number to <precision> decimal places.


6.27. <number>:select(<object... list>) → <object>

Selects the ith element from a list of arguments. Or null if it doesn't exist.

Example:

* => 1:select("a", "b", "c") as selected every second

Yields 'selected:b' every second. The list of arguments is zero-based.


6.28. <number>:sin() → <number>

Returns the sine of an angle in radians.


6.29. <number>:sinh() → <number>

Returns the hyperbolic sine of an angle in radians.


6.30. <number>:sqrt() → <number>

Returns the correctly rounded positive square root of a double value.

Available since Pipes v0.18.8


6.31. <number>:tan() → <number>

Returns the tangent of an angle in radians.


6.32. <number>:tanh() → <number>

Returns the hyperbolic tangent of an angle in radians.


6.33. <number>:tobase(<number base>) → <string>

Converts a number to a string representing it in some base

Available since Pipes v0.13.12


6.34. <object>:boolean() → <boolean>

Converts object to boolean.


6.35. <object>:class() → <string>

Returns the underlying Java class of the object.


6.36. <object>:comparable() → <comparable>

Converts object to comparable.


6.37. <object>:const() → <object>

Makes any scalar act as a constant in compile time

Available since Pipes v0.14.7


6.38. <object>:decode(<object,object... pairs>) → <object>

Transforms the parameter using the translation rules defined in <pairs>.


6.39. <object>:get(<object... keys>) → <object>

Much like property[keys]. Works for strings, containers and arrays.

This function infers the correct return type in compile time, whenever possible.


6.40. <object>:indexin(<object... list>) → <number>

Returns the first index of the value in <list>, or null if <list> does not contain it.


6.41. <object>:isin(<object... list>) → <boolean>

Returns true if <list> contains the value, false otherwise.


6.42. <object>:json() → <string>

Converts the object to its JSON string representation.


6.43. <object>:keep([<number ttl>]) → <object>

When used in a simple pipe, delays or disable (if ttl not supplied or < 0) inactive group removal.


6.44. <object>:len() → <number>

Tries to get <target>'s size. Works for strings, containers and arrays.


6.45. <object>:map() → <map>

Converts object to map.


6.46. <object>:nameof() → <string>

Returns a suggested field name for some expression

Available since Pipes v0.19


6.47. <object>:number() → <number>

Converts object to number.

Example:

* => avg(response_time:number()) every minute

Yields the average of the field response_time every minute.


6.48. <object>:object() → <object>

Casts any object to its canonical object representation.


6.49. <object>:row() → <seq>

Casts any object to a row.


6.50. <object>:seq() → <seq>

Casts any object to a seq.


6.51. <object>:setfield(<named... fields>) → <object>

Sets fields into the object in a type-safe way

Available since Pipes v0.21.1


6.52. <object>:string() → <string>

Converts object to string.

Example:

* => 'Count: ' + count():string every minute

Yields the number of events every minute as a string 'Count: X'.


6.53. <object>:typeof() → <string>

Returns the compile-time type of an expression.

Available since Pipes v0.13

This function changed name in Pipes v0.19: from typename to typeof.


6.54. <object>:unfold(<&fn>) → <seq>

Creates a sequence by applying the same function successively to a state.

The target object is the initial state value. The argument function must receive an instance of that state and return either a tuple containing the value to be generated with the next state, or null if no further values should be generated.

* => value#unfold(fun x: x<100 ? (x, x+1), null)

Unfolds value into the sequence (value..99).


6.55. <row>:tomap() → <map>

Converts a row instance to a map, using the field names as keys

Available since Pipes v0.17.1


6.56. <seq>:contains(<object>) → <boolean>

Returns whether the target seq contains the argument.

Available since Pipes v0.13.2


6.57. <seq>:enumerate([<number start>]) → <seq(row)>

Returns a same-sized sequence with an incrementing number before each object.

Available since Pipes v0.14

The returned sequence has rows with the following fields:

fieldtypedescription
indexnumberThe relative index of this object
value?The original object

6.58. <seq>:except(<seq b>) → <seq>

Creates a sequence with the same elements, except the ones that are also present in <b>

Available since Pipes v0.18.8


6.59. <seq>:indexof(<object>) → <number>

Returns the index of position of <s> inside the target string. Returns null otherwise.

Available since Pipes v0.19


6.60. <seq>:mapnames(<&string fn>) → <row>

Converts a constant sequence into a strongly-typed row in compile-time.

Available since Pipes v0.14

This function takes a constant sequence and gives a name to each element based on the element value. For example, (1, 2, 3):mapnames("field${_}") creates a row with the fields, field1, field2, and field3, all numbers.


6.61. <seq>:qpush(<object>, [<number bound>]) → <seq>

Returns the merge of a sequence and a value with a maximum size of <bound>, removing elements in a FIFO manner.

Available since Pipes v0.19


6.62. <seq>:qpushseq(<seq b>, [<number bound>]) → <seq>

Returns the merge of both sequences with a maximum size of <bound>, removing elements in a FIFO manner.

Available since Pipes v0.19


6.63. <seq>:repeat(<number>) → <seq>

Repeats the sequence a certain number of times.

Available since Pipes v0.13


6.64. <seq>:retain(<seq b>) → <seq>

Creates a sequence with the same elements, but only the ones that are also present in <b>

Available since Pipes v0.18.8


6.65. <seq>:skip([<number n>]) → <seq>

Returns a new sequence that skips the first <n> elements

Available since Pipes v0.14


6.66. <seq>:slice([<number start>], [<number end>], [<number step>]) → <seq>

Returns the sliced sequence starting at index <start>, ending at <end> getting every <step> elements.

Available since Pipes v0.14


6.67. <seq>:take([<number n>]) → <seq>

Returns a new sequence that takes only the first <n> elements

Available since Pipes v0.14


6.68. <string>:compile(<object... args>) → <number>

Compiles a constant string into a pipes expression

Available since Pipes v0.14.7

The arguments are available as template macros starting from @@0.


6.69. <string>:compile.map(<object... args>) → <number>

Compiles a constant string with each argument into many pipes expressions

Available since Pipes v0.15.3

The argument can be used in the expression as @@0.


6.70. <string>:contains(<string>) → <boolean>

Returns whether the target string contains the argument.


6.71. <string>:dateparse([<string format>], [<string tz>]) → <number>

Parses timestamp using specified format


6.72. <string>:endswith(<string>) → <boolean>

Returns whether the target string ends with the argument.


6.73. <string>:frombase(<number base>) → <number>

Converts a string representing a number in some base to the number itself

Available since Pipes v0.13.12


6.74. <string>:hll.eval() → <number>

Evaluates compressed base64 HyperLogLog data.

Useful to evaluate the final result from hll.init(<number log2m>, <object...>) → <string> aggregations.

Example

This first query precomputes the distinct users every minute, but does not yield a result.

type:http => hll.init(userid) as value every minute

The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.

type:firstquery => hll.eval(hll.merge(value)) as distinct_users every 10 minutes

6.75. <string>:indexof(<string s>, [<number fromIndex>]) → <number>

Returns the first index of position of <s> inside the target string. Returns null otherwise.


6.76. <string>:jsonparse() → <object>

Parses JSON string into objects.

Available since Pipes v0.13

It parses arrays as java.util.List, objects as java.util.Map, numbers as java.lang.Double and strings as java.lang.String.


6.77. <string>:lower() → <string>

Converts string to lowercase.


6.78. <string>:ltrim([<string chars>]) → <string>

Trims leading characters from a string


6.79. <string>:ord() → <number>

Gets the unicode codepoint from the first char in the string.

Available since Pipes v0.13


6.80. <string>:parse([<string format>], [<string locale>]) → <number>

Parses a number according to format string and locale.


6.81. <string>:property() → <object>

Compiles a string into the corresponding property access object

Available since Pipes v0.19


6.82. <string>:regex(<string regex>) → <row>

Returns a strongly typed row composed by all named groups in <regex>.

This function usage is as follows:
<string>:regex(?<name of the key>pattern)
and it searches for the pattern throughout the string passed, and it returns a row object with the captured text as the value for the key 'name of the key' passed.

*
=> expand name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$')

Try it

Extracts the first and the last names from a single field.

*
=> name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as match
=> '%s, %s':sprintf(match->last, match->first) as converted

Try it

Converts the name field to the format "Last, First".

WebAccess 
=> uid:regex('(?<first>[A-Za-z0-9]+)([A-Za-z0-9]*-)+(?<last>[A-Za-z0-9]+)') as match_object, uid

Try it

Explanation:

(?<first>[A-Za-z0-9]+):
This pattern matches string of characters with letters 'A' to 'Z' (both uppercase and lowercase) and also matches digits 0 to 9, and the result of this pattern it creates a row structure with the key first.

([A-Za-z0-9]*-)+:
This pattern is similar to the one above, but it requires a minus sign at the end. Since there is no key being passed as in the previous case, the captured pattern will be ignored.

(?<last>[A-Za-z0-9]+):
Since it is the last pattern passed, the regex matches the last set of characters with this pattern, assign the captured string to the key last in the output event record.
WebAccess 
=> uid:regex('(?<Regex Lookaround example>(?<=[A-Za-z0-9]+-)[A-Za-z0-9\\-]*(?=-[A-Za-z0-9]+))') as match_object, uid

Try it

Explanation:

Lookbehind: (? <= Pattern1) pattern2
Returns the first characters that match the pattern2 pattern after matching the pattern1.

Lookahead: pattern2 (?= Pattern3)
Returns the last characters that match the pattern2 pattern before checking the pattern3 pattern.

[A-Za-z0-9]+:
Is a regular expression that matches both upper and lower case letters from A to Z and numbers from 0 to 9. The + symbol indicates that the single character pattern must occur at least 1 time.

[A-Za-z0-9\\-]*:
Is similar to the pattern above. It allows the minus sign - to be matched and the asterisk symbol * indicates that the pattern can occur 0 or more times.

Therefore, the regular expression (?<=[A-Za-z0-9]+-)[A-Za-z0-9\\-]*(?=-[A-Za-z0-9]+)
searches for a characters that match [A-Za-z0-9\\-]* but that occur the pattern [A-Za-z0-9]+- and before the pattern -[A-Za-z0-9]+..

6.83. <string>:regexall(<string regex>) → <seq(row)>

Returns a sequence of rows composed by all matches of all named groups in <regex>.

Available since Pipes v0.13

This method is similar to <string>:regex(<string regex>) → <row>, but returns all matches, not only the first.

WebAccess 
=> @set uid + ' ' + uid as uid
=> uid:regexall('(?<first>[A-Za-z0-9]+)([A-Za-z0-9]*-)+(?<last>[A-Za-z0-9]+)') as all_matches, uid

Try it

Explanation:

(?<first>[A-Za-z0-9]+):
This pattern matches a character sequence containing any letter from 'A' to 'Z' (both uppercase and lowercase) and any digit from 0 to 9. If there is a match it will create a row structure with the match as the value and 'first' as the key.

([A-Za-z0-9]*-)+:
Here the regex looks for patterns that have the same characteristics as the previous one and, in addition, end with a minus sign -. Since there is no named group specified, the pattern is matched but not assigned to the output.

(?<last>[A-Za-z0-9]+):
Since it is the last pattern defined, the regex attempts to match text following this pattern and, and assigns the captured text to the 'last' key for each match object.
WebAccess 
=> uid:regexall('(?<Regex Lookaround example>(?<=[A-Za-z0-9]+-)[A-Za-z0-9]*(?=-[A-Za-z0-9]+))') as regex_row, uid

Try it


6.84. <string>:regexfind(<string regex>, [<number|string group>]) → <string>

Returns the matched string by <regex> in target (or one specific group).

WebAccess 
=> uid:regexfind('(^[A-Za-z0-9]*|[A-Za-z0-9]*$)') as match_text, uid

Try it

Explanation:

(pattern1 | pattern2)
This block tells regexfind to return the pattern pattern1 or pattern2. Since regexfind only has the first recognized pattern it is a single string.
WebAccess 
=> uid:regexfind('(?<=[A-Za-z0-9]+-)[A-Za-z0-9\\-]*(?=-[A-Za-z0-9]+)') as match_text, uid

Try it

Explanation:

Lookbehind: (? <= Pattern1) pattern2
Returns the first characters that match pattern2 after matching pattern1.

Lookahead: pattern2 (? = Pattern3)
Returns the last characters that match pattern2 before matching pattern3.

[A-Za-z0-9]+:
Is a pattern that match both upper and lower case letters from A to Z and numbers from 0 to 9. The + symbol indicates that the character pattern can occur 1 time or more times.
[A-Za-z0-9\\-]*
is the same pattern as before, but it also includes the minus sign - and the asterisk symbol * indicates that the character pattern can occur 0 or more times.
Therefore, the regular expression (?<=[A-Za-z0-9]+-)[A-Za-z0-9\\-]*(?=-[A-Za-z0-9]+) searches for a block [A-Za-z0-9\\-]* that is after the pattern [A-Za-z0-9]+- and before the pattern -[A-Za-z0-9]+.

6.85. <string>:regexfindall(<string regex>, [<number|string group>]) → <seq(string)>

Returns all the matched strings by <regex> in target (or one specific group).

Available since Pipes v0.13

WebAccess 
=> uid:regexfindall('(^[A-Za-z0-9]*|[A-Za-z0-9]+$)') as match_list, uid

Try it

Explanation:

(pattern1|pattern2)
This expression matches either pattern1 or pattern2. Since regexfindall returns all the matched groups, the function returns a sequence of strings.
WebAccess
=> uid:regexfindall('(?<=[A-Za-z0-9]+-)[A-Za-z0-9]*(?=-[A-Za-z0-9]+)') as match_list, uid

Try it

Explanation:

regexfindall returns a list with all substrings that match the given pattern. In this case, text with [A-Za-z0-9]* found after the pattern [A-Za-z0-9]+- and before the -[A-Za-z0-9]+) pattern.

6.86. <string>:regexmatch(<string regex>) → <boolean>

Returns true if the target matches <regex>. False otherwise.

WebAccess
=> host,
   host:regexmatch('greenfarm') as matches_example1, 
   host:regexmatch('.*greenfarm.*') as matches_example2, 
   host:contains('greenfarm') as host_contains

Try it

Explanation:

regexmatch tests if the string matches the regular expression passed as a string argument.

  1. Tests if host is a string with content 'greenfarm' exactly.
  2. Since * means any character that appears 0 or more times. it tests if host has a substring exactly like 'greenfarm.' preceded by any characters.
  3. Tests whether host contains the substring 'greenfarm'.

6.87. <string>:regexsplit(<string regex>, [<number limit>]) → <seq(string)>

Splits string by regular expression <regex> in up to <limit> pieces.

Available since Pipes v0.13

WebAccess
=> uid, 
   uid:regexsplit('-') as regex_split1, 
   uid:regexsplit('[0-9]+') as regex_split2

Try it

Explanation:

regexsplit divides the original string into a list of substrings using the pattern as a separator.
In the first case '-' is used as a separator, while in the second case any sequence of numeric characters (one or more digits) is treated as a separator.

6.88. <string>:regexsub(<string regex>, <string replacement>) → <string>

Replaces all matches of <regex> in target by <replacement>.

WebAccess
=> urltype, urltype:regexsub('out', 'in') as regex_sub

Try it

Explanation:

regexsub finds occurrences in the first string parameter that match the regular expression and replaces each match with the string replacement.

WebAccess
=> host, url, 
   host:regexsub('(?<=[a-z0-9]\\.).+(?=farm)', 'teste') as renamed_host, 
   url:regexsub('(?<=\\.com)(?=/{0,1})', '.br') as url_with_br_suffix

Try it


6.89. <string>:repeat(<number>) → <string>

Repeats the string a number of times.

Available since Pipes v0.13


6.90. <string>:replace(<string from>, <string to>) → <string>

Replaces all instances of <from> with the string <to>.


6.91. <string>:rindexof(<string s>, [<number fromIndex>]) → <number>

Returns the last index of position of <s> inside the target string. Returns null otherwise.

Available since Pipes v0.19


6.92. <string>:rtrim([<string chars>]) → <string>

Trims trailing characters from a string


6.93. <string>:rtruncate(<number length>, [<string ellipsis>]) → <string>

Truncates the string to the specified length, keeping the end, optionally adding an ellipsis at the start.

Available since Pipes v0.13.3

The ellipsis length does count to the maximum length. If you can, use the char "\u2026" as a single-char ellipsis.

Examples:

"0123456789":truncate(10, "...")

In this case, the string won't be truncated.

"01234567890":truncate(10, "...")

But in this it will return "...34567890"


6.94. <string>:split([<string delim>], [<number limit>]) → <seq(string)>

Splits string by <delim> in up to <limit> pieces.

Available since Pipes v0.13

If the delimiter is not set, the string is then split by individual chars.


6.95. <string>:sprintf(<object... args>) → <string>

Uses the target string as format to arguments.


6.96. <string>:startswith(<string>) → <boolean>

Returns whether the target string starts with the argument.


6.97. <string>:substring(<number from>, [<number to>]) → <string>

Returns the substring between the indices <from> and <to>.


6.98. <string>:trim([<string chars>]) → <string>

Trims a string


6.99. <string>:truncate(<number length>, [<string ellipsis>]) → <string>

Truncates the string to the specified length, optionally adding an ellipsis at the end.

The ellipsis length does count to the maximum length. If you can, use the char "\u2026" as a single-char ellipsis.

Examples:

"0123456789":truncate(10, "...")

In this case, the string won't be truncated.

"01234567890":truncate(10, "...")

But in this it will return "01234567..."


6.100. <string>:upper() → <string>

Converts string to uppercase.


6.101. <string>:urldecode([<string encoding>]) → <string>

Decodes an application/x-www-form-urlencoded unicode string.

Available since Pipes v0.13.12


6.102. <string>:urlencode([<string encoding>]) → <string>

Encodes a string as application/x-www-form-urlencoded.

Available since Pipes v0.13.12


6.103. cast(<object>, <string typename>) → <object>

Casts a value into another type.

Available since Pipes v0.17


6.104. compare(<comparable a>, <comparable b>) → <number>

Returns a number < 0 if a < b, > 0 if a > b or 0 if a = b.


6.105. concat(<string...>) → <string>

Concatenetes many strings together


6.106. cron(<string>, [<string tz>]) → <period>

Constructs a period instance from a constant cron string

Example:

* => count() every cron('*/3 * * * * *')

Yields the event count every three seconds.

* => timestamp():dateadd(cron('0 0 * * * *')) every second

Outputs every second the timestamp of the next hour.


6.107. exprange([<number start>], <number end>, <number step>) → <object>

Creates a seq that iterates through numbers exponentially

Available since Pipes v0.14.7

Example:

* => @for exprange(17, 2)

(1, 2, 4, 8, 16)

* => @for exprange(4, 17, 2)

(4, 8, 16)


6.108. happened([<number ref>], [<number tested>], <string>, [<string tz>]) → <number>

Returns true if the tested timestamp is inside the span, false otherwise.

Please note that someprop:happened('today') is equivalent to happened(timestamp(), someprop, 'today').

Also, happened('today') is equivalent to happened(timestamp(), timestamp(), 'today').


6.109. hll.merge(<string... data>) → <string>

Merge many instances of compressed base64 HyperLogLog data.

Useful to merge results from hll.init(<number log2m>, <object...>) → <string> aggregations.


6.110. itermap(<object>, [<string keyField>], [<string valueField>]) → <seq(row)>

Creates a seq that iterates through java.util.Map, seq or row entries

Available since Pipes v0.13

Example:

* => @for itermap(some_map_property:object)

Creates one event for each map entry in some_map_property

* => abc, qwe# => @for itermap(*)

Creates one event for each field


6.111. max(<comparable>, <comparable>, <comparable...>) → <comparable>

Returns the greatest value of all supplied arguments.


6.112. metadata() → <object>

Returns a representation of the previous pipe metadata.

Available since Pipes v0.13


6.113. min(<comparable>, <comparable>, <comparable...>) → <comparable>

Returns the least value of all supplied arguments.


6.114. minhash.eval(<string...>) → <number>

Evaluates the similarity between many MinHash encoded data.

Available since Pipes v0.18.8


6.115. minhash.merge(<string... data>) → <string>

Merge many MinHash encoded data.

Available since Pipes v0.18.8


6.116. mregression.apply(<seq(number) coefficients>, <number... xs>) → <number>

Applies regression coefficients to variables

Available since Pipes v0.17.1

The coefficients can be obtained by applying the mregression([<number... x>], <number y>) → <seq> aggregation.


6.117. newmap(<object,object... pairs>) → <map>

Creates a instance of java.util.Map with the supplied keys and values.

Example:

=> newmap("company", "Intelie", "website", "http://intelie.com") as object at the end
    => object->company, object->website

Returns a Map equivalent to {"company": "Intelie", "website": "http://intelie.com"}


6.118. normrandom([<number mean>], [<number stdev>]) → <number>

Returns a random value from a normal (Gaussian) distribution.

Available since Pipes v0.15.6


6.119. period(<number>, <string unit>, [<string tz>]) → <period>

Constructs a period instance from parameters

Example:

* => count() every period(3, 'seconds')

Yields the event count every three seconds same as the literal 3 seconds.


6.120. pi() → <number>

Returns the constant value of pi.


6.121. random([<number min>], [<number max>]) → <number>

Returns a random value between <min> and <max> (0 and 1 if not defined).


6.122. range([<number start>], <number end>, [<number step>]) → <object>

Creates a seq that iterates through numbers

Available since Pipes v0.13

Example:

* => @for range(10)

(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

* => @for range(5, 10)

(5, 6, 7, 8, 9)

* => @for range(7, 10, 0.5)

(7, 7.5, 8, 8.5, 9, 9.5)


6.123. span([<number ref>], <string>, [<string tz>]) → <row>

Calculates start and end timestamps of span based on target.

Available since Pipes v0.13

The result is a row with three fields: timestamp, start and end.

Example:

* => span('today')|>:dateformat

Computes and formats the start and end timestamps of current day for each event.


6.124. spanend([<number ref>], <string>, [<string tz>]) → <number>

Calculates end timestamp of span based on target.

Please note that spanend('today') is equivalent to spanend(timestamp(), 'today').


6.125. spanstart([<number ref>], <string>, [<string tz>]) → <number>

Calculates start timestamp of span based on target.

Please note that spanstart('today') is equivalent to spanstart(timestamp(), 'today').


6.126. spantest([<number ref>], [<number tested>], <string>, [<string tz>]) → <number>

Returns -1 if the tested timestamp is before the span, 0 if it is inside and 1 if it is after.

Please note that someprop:spantest('today') is equivalent to spantest(timestamp(), someprop, 'today').

Also, spantest('today') is equivalent to spantest(timestamp(), timestamp(), 'today').


6.127. timestamp() → <number>

Returns the most appropriate timestamp, whether in scalar or aggregation contexts.


6.128. uuid() → <string>

Returns a random UUID string.


6.129. zip(<seq... args>) → <seq(row)>

Creates a seq such that each element is a row with the respective element from input seqs

Available since Pipes v0.15.3

Example:

zip((1, 2), ('a', 'b'))

Returns a seq equivalent to ((1, 'a'), (2, 'b'))


6.130. System functions

Functions that return information about the system where the query is running.


6.130.1. sys.cpu() → <row>

Returns the system's CPU usage info.

Available since Pipes v0.13.1

The returned row has the following fields:

fieldtypedescription
processnumberCurrent process CPU load (in range [0..1])
systemnumberSystem's CPU load (in range [0..1])
loadavgnumberLoad average in the last minute
coresnumberNumber of processor cores installed

6.130.2. sys.disks() → <seq(row)>

Returns info about all devices in the filesystem.

Available since Pipes v0.13.1

Each row in the returned sequence has the following fields:

fieldtypedescription
namestringName of the device or partition
typestringFormat type
readonlybooleanIs read only?
unallocatednumber# of unallocated bytes in the device
usablenumber# of usable bytes (considering write permissions, etc.)
totalnumberTotal bytes in the device

6.130.3. sys.fds() → <row>

Returns info about process file descriptors.

Available since Pipes v0.13.1

The returned row has the following fields:

fieldtypedescription
opennumber# of open file descriptors
maxnumberMaximum file descriptors that can be open by this process

6.130.4. sys.heap() → <row>

Returns the system's current JVM heap info.

Available since Pipes v0.13.1

The returned row has the following fields (all values in bytes):

fieldtypedescription
usednumberUsed heap memory
maxnumberMax heap size
committednumberCommited memory by the OS for the current VM

6.130.5. sys.hostname() → <string>

Returns the system's hostname.

Available since Pipes v0.13.1


6.130.6. sys.memory() → <row>

Returns the system's memory usage info.

Available since Pipes v0.13.1

The returned row has the following fields (all values in bytes):

fieldtypedescription
usednumberUsed physical memory
totalnumberTotal physical memory
usedswapnumberUsed swap memory
totalswapnumberTotal swap memory

6.130.7. sys.properties() → <object>

Returns a list of defined system properties

Available since Pipes v0.14

The return is always an instance of a java Map.


6.130.8. sys.threadlist() → <seq(row)>

Returns a list of threads in the JVM

Available since Pipes v0.14

Each row in the returned sequence has the following fields:

fieldtypedescription
idnumberManaged thread ID
namestringThread name
cputimenumberCPU time, in seconds
statestringOne of: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED
locknamestringLock the thread is waiting for
lockowneridnumberThread ID of the lock owner
lockownerstringThread name of the lock owner

6.130.9. sys.threads() → <row>

Returns info about running threads in current process.

Available since Pipes v0.13.1

The returned row has the following fields:

fieldtypedescription
runningnumber# of running threads
peaknumberMax # of running threads
totalnumber# of threads ever started in this JVM
daemonnumber# of daemon threads
deadlockednumber# of deadlocked threads

6.130.10. sys.timestamp() → <number>

Returns the system's real timestamp (not the query timestamp).

Available since Pipes v0.13.1


6.130.11. sys.uptime() → <number>

Returns the number of milliseconds since the start of the JVM

Available since Pipes v0.15.5


6.130.12. sys.version() → <row>

Returns Pipes version info

Available since Pipes v0.13.1

The returned row has the following fields:

fieldtypedescription
majornumberVersion's major component
minornumberVersion's minor component
patchnumberVersion's current patch
releasednumberWhether the version was officialy released or not (-SNAPSHOT)

7. Aggregation Functions


7.1. <aggregation object expr>:if(<boolean condition>) → <object>

Aggregates only events that evaluates true to <condition>.


7.2. <aggregation object expr>:overall() → <object>

Merges all the results from the target aggregation.


7.3. <aggregation object expr>:overlast(<number window>) → <object>

Merges the results of the last <window> aggregations.


7.4. <aggregation object expr>:prev([<number prev>]) → <object>

Delays and returns the previous <number>th result from target aggregation.


7.5. all(<boolean>) → <boolean>

Returns true if all ocurrences evaluate true.


7.6. amap(<row>, <aggregation object>) → <row>

Applies the same aggregation to all fields of a row.

Available since Pipes v0.14

"amap" means "aggregation map".

This aggregation takes a row (it can't be used with any sequence) and returns one aggregation row with the argument aggregation applied to each field in the original row. Here's a complete example:

=> sys.memory() as mem every minute
=> expand mem:amap(:avg) every hour

Collects memory statistics every minute and returns the average of the collected points every hour.


7.7. any(<boolean>) → <boolean>

Returns true if any ocurrence evaluates true.


7.8. areduce(<seq>, <aggregation object>) → <object>

Applies all elements of a sequence to the same aggregation

Available since Pipes v0.14

"areduce" means "aggregation reduce".

This aggregation takes a sequence and applies each element to a single aggregation. An example:

Network => (bytes_in#, bytes_out#):areduce(:sum) as total_bytes every minute

Returns the sum of both fields bytes_in and bytes_out as a single value, every minute.


7.9. avg(<number>, [<number weight>]) → <number>

Calculates the (possibly weighted) average of some expression.


7.10. count([<object>]) → <number>

Counts all events in a window (except those that evaluate to false or null).


7.11. dcount(<object...>) → <number>

Estimates the field's cardinality (distinct count) using HyperLogLog.

Equivalent to hll(12, <object...>).

The aggregation uses a probabilistic algorithm known as HyperLogLog. With a fixed log2m value of 12, the standard error will be 1.625% and will use 4KB of memory per group.

The aggregation considers all arguments, i.e., dcount(a, b) aggregates the number of distinct pairs of the tuple (a, b).

element approximate memory (more info)
state 4 KB
tree 4 KB
merger 512 KB

7.12. describe(<aggregation object expr>) → <string>

Yields a string json explaining the target aggregation's inner state representation.


7.13. first(<object>) → <object>

Yields the ocurrence with least timestamp.


7.14. firstn(<object>, <number n>) → <seq>

Yields the <n> values with least timestamp.

Available since Pipes v0.15


7.15. firstv(<object>) → <object>

Yields the non-null ocurrence with least timestamp.

Available since Pipes v0.14


7.16. fold(<object>, [<object initial>], <&fn>, [<&merger>]) → <object>

Reduces sequence of elements applying function successively

This aggregation can only be distributed or used on windows if the merger function is specified.

* => value#fold(fun(a, x): a+x, fun(a,b): a+b) every 10 minutes

Almost equivalent to sum (except for null handling).


7.17. foldv(<object>, [<object initial>], <&fn>, [<&merger>]) → <object>

Reduces sequence of elements applying function successively (ignores nulls)

Available since Pipes v0.19

This aggregation can only be distributed or used on windows if the merger function is specified.

* => value#foldv(fun(a, x): a+x) every 10 minutes

Equivalent to sum.


7.18. greatest(<object>, <sortfield...>) → <object>

Yields the greatest ocurrence in the window based on the sort fields.


7.19. greatestv(<object>, <sortfield...>) → <object>

Yields the greatest non-null ocurrence in the window based on the sort fields.


7.20. hll(<number log2m>, <object...>) → <number>

Estimates the field's cardinality (distinct count) using HyperLogLog.

The aggregation uses a probabilistic algorithm known as HyperLogLog.

log2m must be an integer between 6 and 16. The higher this value, higher will be the precision. The error will probably be within: 104/sqrt(2**log2m).

E.g.: With log2m = 14, the standard error will be 0.81%. But higher log2m values will also consume more memory. The aggregation will typically use 2**log2m bytes of memory.

The aggregation considers all arguments, i.e., hll(16, a, b) aggregates the number of distinct pairs of the tuple (a, b).

element approximate memory (more info)
state 2**log2m bytes
tree 2**log2m bytes
merger 128 * 2**log2m bytes

7.21. hll.init(<number log2m>, <object...>) → <string>

Similar to hll, but it doesn't evaluate final cardinality, just return the sketch data.

This aggregation is similar to hll(<number log2m>, <object...>) → <number>, but instead of returning the final distinct count, it yields a representation of its internal data structure, for later merges. It can be used in conjunction with the scalar functions <string>:hll.eval() → <number>, hll.merge(<string... data>) → <string> and the aggregation hll.merge(<string>) → <string>.

It returns a string with a base64'd representation of the compressed byte array. It is especially useful when some preprocessing and storage is needed.

Example

This first query precomputes the distinct users every minute, but does not yield a result.

type:http => hll.init(16, userid) as value every minute

The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.

type:firstquery => hll.eval(hll.merge(value)) as distinct_users every 10 minutes

7.22. hll.merge(<string>) → <string>

Performs union of many HyperLogLog encoded data in a window.

Useful to merge results from hll.init(<number log2m>, <object...>) → <string> aggregations.

hll.eval(hll.merge(expr)) is equivalent to hll.mergeeval(expr).

Example

This first query precomputes the distinct users every minute, but does not yield a result.

type:http => hll.init(userid) as value every minute

The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.

type:firstquery => hll.eval(hll.merge(value)) as distinct_users every 10 minutes

7.23. hll.mergeeval(<string>) → <number>

Performs union of many HyperLogLog encoded data in a window and evaluates the result.

Useful to merge results from hll.init(<number log2m>, <object...>) → <string> aggregations.

hll.eval(hll.merge(expr)) is equivalent to hll.mergeeval(expr).

Example

This first query precomputes the distinct users every minute, but does not yield a result.

type:http => hll.init(userid) as value every minute

The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.

type:firstquery => hllmergeeval(value) as distinct_users every 10 minutes

7.24. intervals(<number begin>, <number end>) → <seq>

Creates a list of disjoint intervals

Available since Pipes v0.24


7.25. last(<object>) → <object>

Yields the ocurrence with greatest timestamp.


7.26. lastn(<object>, <number n>) → <seq>

Yields the <n> values with greatest timestamp.

Available since Pipes v0.15


7.27. lastv(<object>) → <object>

Yields the non-null ocurrence with greatest timestamp.

Available since Pipes v0.14


7.28. least(<object>, <sortfield...>) → <object>

Yields the least ocurrence in the window based on the sort fields.


7.29. leastv(<object>, <sortfield...>) → <object>

Yields the least non-null ocurrence in the window based on the sort fields.


7.30. list(<object>) → <seq>

Creates a java.util.List from all events in a window.

Available since Pipes v0.13


7.31. map(<object key>, <aggregation object value>) → <map>

Creates a java.util.Map from all events in a window.

Since Pipes v0.20.4, this function accepts aggregations as values.


7.32. max(<comparable>) → <comparable>

Yields the greatest ocurrence in the window.


7.33. maxv(<comparable>) → <comparable>

Yields the greatest non-null ocurrence in the window.


7.34. median(<number>) → <number>

Estimates the median value of the population using Count-Min Sketch.

Equivalent to quantile(<number>, 0.5).


7.35. min(<comparable>) → <comparable>

Yields the least ocurrence in the window.


7.36. minhash.init(<number size>, <object...>) → <string>

Returns MinHash set signature.

Available since Pipes v0.18.8


7.37. minhash.merge(<string>) → <string>

Merge many MinHash encoded data in a window.

Available since Pipes v0.18.8

Useful to merge results from minhash.init(<number size>, <object...>) → <string> aggregations.


7.38. minhash.mergeeval(<string...>) → <number>

Merge many MinHash encoded data in a window and evaluates the similarity between them.

Available since Pipes v0.18.8


7.39. minv(<comparable>) → <comparable>

Yields the least non-null ocurrence in the window.


7.40. mregression([<number... x>], <number y>) → <seq>

Computes multivariate regression for given variables.

Available since Pipes v0.17.1

If no arguments x are given, it is assumed to be the event timestamp (if any).

The return is a sequence of the regression coefficients. The first value is the regression intercept.


7.41. mregression.full([<number... x>], <number y>) → <row>

Computes multivariate regression statistics for given variables.

Available since Pipes v0.17.1

If no arguments x are given, it is assumed to be the event timestamp (if any).

The returned row has the following fields:

fieldtypedescription
nnumberSample size
coefficientsseq(number)Regression coefficients (the first value is the intercept).
correlationseq(seq(number))Correlation matrix.
covarianceseq(seq(number))Covariance matrix.
meanseq(number)Mean vector.

7.42. pcount(<boolean>) → <number>

Aggregates the proportion of events that evaluate true to expression.


7.43. pfold(<object>, [<object initial>], <&fn>) → <object>

Same as fold, but with state persisted

Available since Pipes v0.19

This aggregation cannot be distributed. And can only be used inside a window if an unmerger function is also specified.

Cumulative Sum

=> 1 as number every min

=> number, 
   number#:pfold(0, fun(state, value): (value#+state#)) as cumulative_sum

Try it

Creating a conditional cumulative sum:

=> 1 as number every min

=> number,
   number#:pfold(0, fun(state, value): state#>=3 ? value, (value#+state#)) as sum_resets_after_3

Try it

Keep the higher number:

=> random() as number every min

=> number, 
   number#:pfold(0, fun(a, b):(a > b ? a, b )) as higher

Try it

Caution when using:
Events with null value of the variable can break the sum, because null + number == null and then when the condition matches, will became null again.
Here you can see one example between two scenarios, one processing null values (cumulativeSumWithNullCheck) and another without processing the null values (cumulativeSumWithoutNullCheck):
=> (1,6,2,7,3,8,null,9,5,10) as series at the end

=> @for series as number 

=> @set number#:pfold(0,fun(state, value):
         state + value
        ) as cumulativeSumWithoutNullCheck,
      
        number#:pfold(0,fun(state, value):
         value == null ? state, state + value
        ) as cumulativeSumWithNullCheck

Try it


7.44. quantile(<number>, <number q>) → <number>

Estimates the q (0..1) quantile of the population using Count-Min Sketch.


7.45. regression([<number x>], <number y>) → <row>

Computes regression and correlation statistics for given variables.

Available since Pipes v0.13.3

The argument x, if not given, is assumed to be the event timestamp (if any).

The returned row has the following fields:

fieldtypedescription
nnumberSample size
slopenumberSlope of the regression line
interceptnumberWhere in the x-axis the line intercepts
correlationnumberCorrelation coefficient between x and y
xmeannumberMean value for variable x
xstdevnumberStandard deviation for x
ymeannumberMean value for variable y
ystdevnumberStandard deviation for y

7.46. set(<object>) → <seq>

Creates a java.util.Set from all events in a window.


7.47. similarity(<object...>) → <number>

Estimates the Jaccard similarity coefficient between many sets.

Available since Pipes v0.18.8


7.48. smooth(<aggregation number expr>, [<number alpha>], [<number beta>]) → <number>

Smoothes the curve of another aggregation.

The argument alpha is a (0..1) value, meaning the weight of the previous points in the final value.

The argument beta is a (0..1) value, meaning the weight of previous trends into the current trend value.

This aggregation cannot be used inside a window (because it depends on total order of the elements).


7.49. statistics(<number>) → <row>

Aggregates sample statistics for some property.

Available since Pipes v0.13.3

Please note that all statistics here are based on the sample variance, not the population variance. This behavior is distinct from the already known variance(<number>, [<number weight>]) → <number> and stdev(<number>, [<number weight>]) → <number> aggregations.

The returned row has the following fields (all values in bytes):

fieldtypedescription
nnumberSample size
meannumberSample mean value
variancenumberSample variance
stdevnumberSample standard deviation
skewnessnumberSample skewness
kurtosisnumberSample kurtosis

7.50. stdev(<number>, [<number weight>]) → <number>

Calculates the (possibly weighted) standard deviation of some expression.


7.51. sum(<number>) → <number>

Sums all evaluations of some expression.


7.52. summary(<string>, [<string separator>], [<string lastSeparator>]) → <string>

Join all the strings in a window.


7.53. top(<number k>, <object>, [<sortfield...>]) → <seq>

Yields the k minimum occurrences of the target object.

Available since Pipes v0.13.18


7.54. variance(<number>, [<number weight>]) → <number>

Calculates the (possibly weighted) variance of some expression.


7.55. when(<boolean expr>) → <number>

Yields the latest timestamp inside window when some condition was true.


7.56. whenfirst(<boolean expr>) → <number>

Yields the first timestamp inside window when some condition was true.


7.57. Window Meta-aggregations

Meta-aggregations give information on how and when the results were merged.

Meta-aggregations meaning in time axis

Meta-aggregations meaning in time axis


7.57.1. wcount() → <number>

Yields how many outputs are merged in the current window.


7.57.2. wstart() → <number>

Returns the window's first allowed timestamp (or item index).


7.57.3. wend() → <number>

Returns the window's last allowed timestamp (or item index).


7.57.4. ostart() → <number>

Returns the output's first allowed timestamp (or item index).


7.57.5. oend() → <number>

Returns the output's last allowed timestamp (or item index).


7.57.6. otimestamp() → <number>

Returns the timestamp when the outputs were merged (equal to oend() on time pipes).


8. Timespan Language

Pipes comes bundled with a timespan definition language, that helps defining relative dates with an almost natural language syntax. A timespan is an expression that when provided with a reference timestamp can calculate the start and the end of a relative period.

For example, the expression current month will return:

  • (2014-04-01 00:00:00, 2014-05-01 00:00:00) when provided with the timestamp for (2014-04-11 14:56:20)
  • (2014-05-01 00:00:00, 2014-06-01 00:00:00) when provided with the timestamp for (2014-05-11 14:56:20)

Please notice that span intervals are always right-open.


8.1. Span types

definition example expressions description
now
none
now
none
Now.
today
current|this <period>
today
current day
this month
From the beginning of the current period to the beginning of the next.
yesterday
previous|preceeding|past|yester <period>
yesterday
yesterweek
yester month
previous year
previous 5 years
Previous specified period.
tomorrow
coming|subsequent|future|following <period>
tomorrow
coming year
subsequent 5 years
Subsequent specified period.
last <period...>
last day
last 40 weeks
last 2 years and 5 months
Period of the specified length ending now.
next <period...>
next day
next 40 weeks
next 2 years and 5 months
Period of the specified length starting now.
<period...> ago
2 days ago
2 years and 5 months ago
Single point in the past.
<date> [<time>]
2014
201406
20140627
2014-06-27
2014/06/27
2014-06-27 16
2014-06-27 4pm
2014-06-27 16:17:18
2014-06-27 4:17:18pm
Constant date or time.
<time>
16
4pm
16:17:18
4:17:18pm
Specified time today.
timestamp|ts <number>
timestamp 1403893236000
ts 1403893236000
Constant timestamp
timezone|tz <string> <span>
tz 'UTC' today
timezone 'America/Sao_Paulo' today
Evaluates span using the specified timezone.
monday ... sunday
mon ... sun
monday
mon
wednesday
wed
sunday
sun
Day of week in the current week.
january ... december
jan ... dec
january
jan
april
apr
december
dec
Month in the current year.
<month> (<number>|ordinal)
january first
may 4th
april 11
Day in month in the current year.
[from|since] <span A> to|until <span B>
yesterday to today
from yesterday to today
since last year until yesterday
A range starting at the beginning of span A and ending at the end of span B.
from|since <span A>
since last year
A range starting at the beginning of span A and ending now.
until|to <span B>
until yesterday
A range starting at the the Big Bang (known as 1970) and ending at the end of span B.
union of <spanA>, <spanB>, ... and <spanC>
union of 2018 and this year
A span that has the earliest start and latest end between all spans.
intersection of <spanA>, <spanB>, ... and <spanC>
intersection of 2018 and this year
A span that has the latest start and earliest end between all spans.
<period...> before <span>
2 years before this month
2 days and an hour before this month
A period of specified length ending exactly in the start of specified span.
before|start|begining|left|exactly [of] <span>
before this week
start of this month
beginning of december
left of yesterday
exactly today
The point in the start of specified span.
window [of] <span>
window of yesterday
window of 2 days before today
The same as the argument span, but from the view of a timestamp inside it.
<period...> after <span>
2 days after previous month
2 days and an hour after previous month
A period of specified length starting exactly in the end of specified span.
after|end|right [of] <span>
after this week
right of yesterday
end of this month
The point in the end of specified span.
<ordinal> <period> of <span>
first quarter of previous year
3rd week of this month
first hour today
Some discrete period inside the specified span.
[the] <period> [of] <span>
the week of 2 days ago
the quarter of last week
Expands the point at the start of a span to the entire period defined.
<span> shifted [left|right] by <percentage>
<span> shifted [left|right] by <period...>
this week shifted by 2 days
previous year shifted right by 2 days
previous year shifted right by 10%
Calculates the span referent to now, then shifts it some period or percentage in the past (left) or future(right).
<span A> shifted to <span B>
<span A> relative to <span B>
<span A> of <span B>
<span B> at <span A>
sunday shifted to previous week
sunday relative to previous week
sunday of previous week
previous week at sunday
sunday @8
Calculates a span A as if "now" was the start of span B.
<span A> aligned [left|right] to <span B>
this month aligned to 7 months ago
this month aligned left to 7 months ago
this month aligned right to 7 months ago
Calculates the span A and aligns either the left or the right ends of the first with the span B.
<span> extend[ed] [left|right] by <percentage>
<span> extend[ed] [left|right] by <period...>
this month extend left by 20%
this month extend right by 2 days
Calculates the span and then extends either proportionally or by a fixed period.

8.2. Period types

The periods are the units of time that powers span definitions.

Some spans accept only a single period definition (e.g. current day). Others accept multiple spans definition (e.g. last 4 days, 2 hours and 1 minute). In this documentation <period> means single periods only and <period...> means one or more periods.

A single period is composed of (optionally) an amount and an unit, e.g.:

day
1 day
a day
the day

A multiple period is composed of many single periods separated by commas or 'and', e.g.: 1 day and 5 hours, a day, an hour and 5 minutes.

1 day and 5 hours
a day, an hour and 5 minutes

The available period units are:

unitalso acceptsequivalent to
millisecondmilli ms
secondsec1000 ms
minutemin60000 ms
hour360000 ms
day
weekwk
month
bimester2 months
quarter3 months
semester6 months
yearyr

Page generated at 09-Sep-2021 12:43:08