This chapter is an introduction to the Pipes language through examples. Each section will contain links to other chapters that develop a little more on the subjects presented here.
version 0.25.0 (for older versions, click here)
Copyright 2021 Intelie
1. Quick Start | |
1.1. Filters | |
1.2. Simple pipe | |
1.3. Chaining pipes | |
1.4. The join and union pipes | |
1.5. Sequences and rows | |
1.6. Macros and user functions | |
2. Concepts | |
2.1. Events and filters | |
2.2. Chained computation model | |
2.3. Output rates and aggregation windows | |
2.4. Grouping and group expiry | |
2.5. Expressions | |
2.5.1. Type system | |
2.5.2. Type arguments | |
2.5.3. Literals | |
2.5.4. Sequences and Rows | |
2.5.5. Scalars and aggregations | |
2.5.6. Aggregations state representation | |
2.5.7. Aggregation TTL | |
2.5.8. Property access | |
2.5.9. Special expressions | |
2.5.10. Function call | |
2.6. Macros and User Functions | |
3. Filters | |
3.1. * | Special filter that allows all records through. |
3.2. <term> | Selects records where one of the current fields matches <term>. |
3.3. <term>~<number maxEdits> | Selects records where one of the current fields matches <term> with at most <maxEdits> edits. |
3.4. [<term lower> TO <term upper>] | Selects records where one of the current fields is between <lower> and <upper>. |
3.5. <field>: <filter> | Sets the current field to <field>. Usually followed by <term> (e.g. somefield:someterm). |
3.6. <filter> & <filter> | Selects the intersection of two other filters. |
3.7. <filter> | <filter> | Selects the union of two other filters. |
3.8. -<filter> | Selects the complement of another filter. |
4. Pipes | |
4.1. <named...> [by <named...>] [over <window>] [every <period> | at the end] | Transforms or aggregates records over configurable data window and output. |
4.2. <pipe> join [on <fields...>] <pipe> | Computes the cartesian product of the outputs from two pipes with same output rates. |
4.3. <pipe> union <pipe> | Concatenates the outputs from two pipes with compatible output rates. |
4.4. @chain [<seq expr>] | Chains many sequences in a batch into a single sequence |
4.5. @compile <string>, <obj... args> | Compiles a constant string into a pipe |
4.6. @compress <number k>, [<number k2>,] <number... y> [by <object...>] | Alias to either @compress.pip (default) or @compress.uniform . |
4.7. @compress.pip <number k>, [<number k2>,] <number... y> [by <object...>] | Compresses the result from the previous pipe to at most k most important rows. |
4.8. @compress.uniform <number k>, [<number k2>,] <number... y> [by <object...>] | Compresses the result from the previous pipe to at most k rows. |
4.9. @debounce <period> [by <object...>] | Only outputs rows that follows <period> time without any output. |
4.10. @empty [over <window>...] | Creates an empty pipe (pass-through) with optional window metadata |
4.11. @filter <boolean condition> [by <object...>] | Filters the results from previous pipe. |
4.12. @for <named(seq)>, <named... additional> | Unwinds events in the seq instance |
4.13. @latest | Keeps the latest batch of input events and output it at the end. |
4.14. @makejoin <string... props>, <pipe...> | Creates a join pipe containing all pipes passed as arguments |
4.15. @makeljoin <string... props>, <pipe...> | Creates a left join pipe containing all pipes passed as arguments |
4.16. @makelrjoin <string... props>, <pipe...> | Creates a full join pipe containing all pipes passed as arguments |
4.17. @makerjoin <string... props>, <pipe...> | Creates a right join pipe containing all pipes passed as arguments |
4.18. @makeunion <pipe...> | Creates a union pipe containing all pipes passed as arguments |
4.19. @meta <named...> [over <window>...] | Adds custom fields to query metadata |
4.20. @onchange [<object...>] [by <object...>] | Outputs only events that change the last value of any expression in the group |
4.21. @onfalse <boolean condition> [by <object...>] | Outputs only events for which condition evaluates false and was not false in the previous event |
4.22. @ontrue <boolean condition> [by <object...>] | Outputs only events for which condition evaluates true and was not true in the previous event |
4.23. @resample <period>, (<object>, <period>, <string method>...) [by <object...>] | Performs resampling of the incoming stream to match the specified period |
4.24. @seq | Changes the static type of any row to the highest corresponding seq. |
4.25. @set <named...> [by <object...>] | Sets fields in the original event (both typed and raw) |
4.26. @skip <number> [by <object...>] | Skips some results from previous pipe. |
4.27. @skipwhile <boolean> | Skips rows from every batch while some condition is true. |
4.28. @slice [<number start>], [<number end>], [<number step>] [by <object...>] | Gets some rows (based on index) from previous pipe. |
4.29. @sort <sortfield... expr> | Sorts the results from previous pipe. |
4.30. @take <number> [by <object...>] | Limits the results from previous pipe. |
4.31. @takewhile <boolean> | Takes rows from every batch while some condition is true. |
4.32. @throttle [<number k>, ] <period|seq> [by <object...>] | Limits the output of the previous pipe to at most <k> rows per <period>. |
4.33. @top <number k>, <sortfield... expr> [by <object...>] | Sorts the results and gets the first k rows (possibly grouped) from previous pipe. |
4.34. @unbatch | Extracts a batch of events in many batches, one for each event. |
4.35. @unsafe | Marks that any pipe executed after this must run in a non-distributed environment. |
4.36. @yield [<object expr>] | Extracts one field of the stream to be the output event. |
4.37. @zip [<seq expr>] | Takes elements from a batch of seqs, one by one and make a seq of seqs. |
4.38. atomic <pipe> | Modify a pipe to run entirely in the same node, ignoring any distribution invariants. |
5. Operators | |
5.1. \<filter> | Returns a boolean expression that evaluates true if the filter accepts the event. |
5.2. <object># → <number> | Coerces the expression to number. Shorthand to <object>:number() → <number> . |
5.3. <object>$ → <string> | Coerces the expression to string. Shorthand to <object>:string() → <string> . |
5.4. <number> + <number> → <number> | Adds two numbers. |
5.5. <string> + <string> → <string> | Concatenates two strings. |
5.6. <seq> + <seq> → <seq> | Concatenates two seqs. |
5.7. <map> + <map> → <map> | Concatenates two maps. |
5.8. <number> - <number> → <number> | Subtracts one number from another. |
5.9. <number> * <number> → <number> | Multiplies two numbers. |
5.10. <number> / <number> → <number> | Divides one number by another (float division). |
5.11. <number> /? <number> → <number> | Divides one number by another (float division). Legacy operator that returns 0.0 when the divisor is 0.0. |
5.12. <number> // <number> → <number> | Divides one number by another (integer division). |
5.13. <number> //? <number> → <number> | Divides one number by another (integer division). Legacy operator that returns 0.0 when the divisor is 0.0. |
5.14. <number> ** <number> → <number> | Raises one number to another's power. |
5.15. <number> % <number> → <number> | Returns the rest of the division of one number by another. |
5.16. <number> %? <number> → <number> | Returns the rest of the division of one number by another. Legacy operator that returns 0.0 when the divisor is 0.0. |
5.17. -<number> → <number> | Negates one number. |
5.18. <boolean> and <boolean> → <boolean> | Returns the logical AND of two booleans. |
5.19. <boolean> or <boolean> → <boolean> | Returns the logical OR of two booleans. |
5.20. <boolean> xor <boolean> → <boolean> | Returns the logical XOR of two booleans. |
5.21. not <boolean> → <boolean> | Returns the logical NOT of a boolean. |
5.22. <object> == <object> → <boolean> | Checks whether two objects are equal. |
5.23. <object> != <object> → <boolean> | Checks whether two objects are not equal. |
5.24. <comparable> < <comparable> → <boolean> | Checks whether the left operand compares lesser than the right one. |
5.25. <comparable> <= <comparable> → <boolean> | Checks whether the left operand compares lesser than or equal to the right one. |
5.26. <comparable> > <comparable> → <boolean> | Checks whether the left operand compares greater than the right one. |
5.27. <comparable> >= <comparable> → <boolean> | Checks whether the left operand compares greater than or equal to the right one. |
5.28. <object>-><identifier> → <object> | Extracts a property from an object. |
5.29. <object>->(<object>) → <object> | Evaluates an expression in the semantic context of the target |
5.30. <object>[<object...>] → <object> | Accesses an object by index/key in a list/string/map. |
5.31. ^<object> → <object> | Evaluates the expression in the parent semantic context (if any). |
5.32. <object> ?? <object> → <object> | Returns the first if it is not null; otherwise, returns the second. |
5.33. <boolean> ? <object>, <object> → <object> | If the condition is true, returns the first object; otherwise, returns the second. |
5.34. <seq> |> <&fn> → <object> | Transforms a sequence into another sequence or object. |
6. Scalar Functions | |
6.1. <boolean>:compile.if(<object if_true>, <object if_false>) → <object> | Chooses one of the expressions based on a constant condition in compile-time |
6.2. <map>:containskey(<object>) → <boolean> | Returns whether the target map contains the argument as a key. |
6.3. <number>:abs() → <number> | Calculates the absolute value of a number. |
6.4. <number>:acos() → <number> | Returns the arc cosine of the argument to an angle in radians. |
6.5. <number>:aequal(<number>, [<number epsilon>]) → <number> | Returns true if a and b are approximately equal. If no number is given, epsilon defaults to 1e-6. |
6.6. <number>:asin() → <number> | Returns the arc sine of the argument to an angle in radians. |
6.7. <number>:atan() → <number> | Returns the arc tangent of the argument to an angle in radians. |
6.8. <number>:atan2(<number x>) → <number> | Returns the two-argument arc tangent of the argument to an angle in radians. |
6.9. <number>:bytes([<number precision>]) → <string> | Formats a number as the best possible byte multiple. |
6.10. <number>:ceil([<number precision>]) → <number> | Returns the smallest number that is greatest than or equal to the argument. |
6.11. <number>:chr() → <string> | Converts a unicode codepoint to a single-char string. |
6.12. <number>:cos() → <number> | Returns the cosine of an angle in radians. |
6.13. <number>:cosh() → <number> | Returns the hyperbolic cosine of an angle in radians. |
6.14. <number>:dateadd(<period>) → <number> | Adds <period> to timestamp argument. |
6.15. <number>:datefloor(<period>) → <number> | Rounds timestamp down to the nearest date that is divisible by <period>. |
6.16. <number>:dateformat([<string format>], [<string tz>]) → <string> | Formats timestamp using specified format |
6.17. <number>:datesub(<period>) → <number> | Sutracts <period> from timestamp argument. |
6.18. <number>:displaynumber() → <number> | Formats a number in unicode for display in small spaces |
6.19. <number>:exp() → <number> | Calculates the exponential of a number. |
6.20. <number>:floor([<number precision>]) → <number> | Returns the largest number that is lesser than or equal to the argument. |
6.21. <number>:format([<string format>], [<string locale>]) → <string> | Formats a number according to format string and locale. |
6.22. <number>:log([<number base>]) → <number> | Calculates the logarithm of a number. |
6.23. <number>:milliseconds([<precision>]) → <string> | Formats a number representing a timespan in milliseconds |
6.24. <number>:normdist(<number mean>, <number sdev>, [<boolean acc>]) → <number> | Calculates the gaussian distribution function value, optionally integrated. |
6.25. <number>:pow(<number exp>) → <number> | Raises one number to another. |
6.26. <number>:round([<number precision>]) → <number> | Rounds a number to <precision> decimal places. |
6.27. <number>:select(<object... list>) → <object> | Selects the ith element from a list of arguments. Or null if it doesn't exist. |
6.28. <number>:sin() → <number> | Returns the sine of an angle in radians. |
6.29. <number>:sinh() → <number> | Returns the hyperbolic sine of an angle in radians. |
6.30. <number>:sqrt() → <number> | Returns the correctly rounded positive square root of a double value. |
6.31. <number>:tan() → <number> | Returns the tangent of an angle in radians. |
6.32. <number>:tanh() → <number> | Returns the hyperbolic tangent of an angle in radians. |
6.33. <number>:tobase(<number base>) → <string> | Converts a number to a string representing it in some base |
6.34. <object>:boolean() → <boolean> | Converts object to boolean. |
6.35. <object>:class() → <string> | Returns the underlying Java class of the object. |
6.36. <object>:comparable() → <comparable> | Converts object to comparable. |
6.37. <object>:const() → <object> | Makes any scalar act as a constant in compile time |
6.38. <object>:decode(<object,object... pairs>) → <object> | Transforms the parameter using the translation rules defined in <pairs>. |
6.39. <object>:get(<object... keys>) → <object> | Much like property[keys]. Works for strings, containers and arrays. |
6.40. <object>:indexin(<object... list>) → <number> | Returns the first index of the value in <list>, or null if <list> does not contain it. |
6.41. <object>:isin(<object... list>) → <boolean> | Returns true if <list> contains the value, false otherwise. |
6.42. <object>:json() → <string> | Converts the object to its JSON string representation. |
6.43. <object>:keep([<number ttl>]) → <object> | When used in a simple pipe, delays or disable (if ttl not supplied or < 0) inactive group removal. |
6.44. <object>:len() → <number> | Tries to get <target>'s size. Works for strings, containers and arrays. |
6.45. <object>:map() → <map> | Converts object to map. |
6.46. <object>:nameof() → <string> | Returns a suggested field name for some expression |
6.47. <object>:number() → <number> | Converts object to number. |
6.48. <object>:object() → <object> | Casts any object to its canonical object representation. |
6.49. <object>:row() → <seq> | Casts any object to a row. |
6.50. <object>:seq() → <seq> | Casts any object to a seq. |
6.51. <object>:setfield(<named... fields>) → <object> | Sets fields into the object in a type-safe way |
6.52. <object>:string() → <string> | Converts object to string. |
6.53. <object>:typeof() → <string> | Returns the compile-time type of an expression. |
6.54. <object>:unfold(<&fn>) → <seq> | Creates a sequence by applying the same function successively to a state. |
6.55. <row>:tomap() → <map> | Converts a row instance to a map, using the field names as keys |
6.56. <seq>:contains(<object>) → <boolean> | Returns whether the target seq contains the argument. |
6.57. <seq>:enumerate([<number start>]) → <seq(row)> | Returns a same-sized sequence with an incrementing number before each object. |
6.58. <seq>:except(<seq b>) → <seq> | Creates a sequence with the same elements, except the ones that are also present in <b> |
6.59. <seq>:indexof(<object>) → <number> | Returns the index of position of <s> inside the target string. Returns null otherwise. |
6.60. <seq>:mapnames(<&string fn>) → <row> | Converts a constant sequence into a strongly-typed row in compile-time. |
6.61. <seq>:qpush(<object>, [<number bound>]) → <seq> | Returns the merge of a sequence and a value with a maximum size of <bound>, removing elements in a FIFO manner. |
6.62. <seq>:qpushseq(<seq b>, [<number bound>]) → <seq> | Returns the merge of both sequences with a maximum size of <bound>, removing elements in a FIFO manner. |
6.63. <seq>:repeat(<number>) → <seq> | Repeats the sequence a certain number of times. |
6.64. <seq>:retain(<seq b>) → <seq> | Creates a sequence with the same elements, but only the ones that are also present in <b> |
6.65. <seq>:skip([<number n>]) → <seq> | Returns a new sequence that skips the first <n> elements |
6.66. <seq>:slice([<number start>], [<number end>], [<number step>]) → <seq> | Returns the sliced sequence starting at index <start>, ending at <end> getting every <step> elements. |
6.67. <seq>:take([<number n>]) → <seq> | Returns a new sequence that takes only the first <n> elements |
6.68. <string>:compile(<object... args>) → <number> | Compiles a constant string into a pipes expression |
6.69. <string>:compile.map(<object... args>) → <number> | Compiles a constant string with each argument into many pipes expressions |
6.70. <string>:contains(<string>) → <boolean> | Returns whether the target string contains the argument. |
6.71. <string>:dateparse([<string format>], [<string tz>]) → <number> | Parses timestamp using specified format |
6.72. <string>:endswith(<string>) → <boolean> | Returns whether the target string ends with the argument. |
6.73. <string>:frombase(<number base>) → <number> | Converts a string representing a number in some base to the number itself |
6.74. <string>:hll.eval() → <number> | Evaluates compressed base64 HyperLogLog data. |
6.75. <string>:indexof(<string s>, [<number fromIndex>]) → <number> | Returns the first index of position of <s> inside the target string. Returns null otherwise. |
6.76. <string>:jsonparse() → <object> | Parses JSON string into objects. |
6.77. <string>:lower() → <string> | Converts string to lowercase. |
6.78. <string>:ltrim([<string chars>]) → <string> | Trims leading characters from a string |
6.79. <string>:ord() → <number> | Gets the unicode codepoint from the first char in the string. |
6.80. <string>:parse([<string format>], [<string locale>]) → <number> | Parses a number according to format string and locale. |
6.81. <string>:property() → <object> | Compiles a string into the corresponding property access object |
6.82. <string>:regex(<string regex>) → <row> | Returns a strongly typed row composed by all named groups in <regex>. |
6.83. <string>:regexall(<string regex>) → <seq(row)> | Returns a sequence of rows composed by all matches of all named groups in <regex>. |
6.84. <string>:regexfind(<string regex>, [<number|string group>]) → <string> | Returns the matched string by <regex> in target (or one specific group). |
6.85. <string>:regexfindall(<string regex>, [<number|string group>]) → <seq(string)> | Returns all the matched strings by <regex> in target (or one specific group). |
6.86. <string>:regexmatch(<string regex>) → <boolean> | Returns true if the target matches <regex>. False otherwise. |
6.87. <string>:regexsplit(<string regex>, [<number limit>]) → <seq(string)> | Splits string by regular expression <regex> in up to <limit> pieces. |
6.88. <string>:regexsub(<string regex>, <string replacement>) → <string> | Replaces all matches of <regex> in target by <replacement>. |
6.89. <string>:repeat(<number>) → <string> | Repeats the string a number of times. |
6.90. <string>:replace(<string from>, <string to>) → <string> | Replaces all instances of <from> with the string <to>. |
6.91. <string>:rindexof(<string s>, [<number fromIndex>]) → <number> | Returns the last index of position of <s> inside the target string. Returns null otherwise. |
6.92. <string>:rtrim([<string chars>]) → <string> | Trims trailing characters from a string |
6.93. <string>:rtruncate(<number length>, [<string ellipsis>]) → <string> | Truncates the string to the specified length, keeping the end, optionally adding an ellipsis at the start. |
6.94. <string>:split([<string delim>], [<number limit>]) → <seq(string)> | Splits string by <delim> in up to <limit> pieces. |
6.95. <string>:sprintf(<object... args>) → <string> | Uses the target string as format to arguments. |
6.96. <string>:startswith(<string>) → <boolean> | Returns whether the target string starts with the argument. |
6.97. <string>:substring(<number from>, [<number to>]) → <string> | Returns the substring between the indices <from> and <to>. |
6.98. <string>:trim([<string chars>]) → <string> | Trims a string |
6.99. <string>:truncate(<number length>, [<string ellipsis>]) → <string> | Truncates the string to the specified length, optionally adding an ellipsis at the end. |
6.100. <string>:upper() → <string> | Converts string to uppercase. |
6.101. <string>:urldecode([<string encoding>]) → <string> | Decodes an application/x-www-form-urlencoded unicode string. |
6.102. <string>:urlencode([<string encoding>]) → <string> | Encodes a string as application/x-www-form-urlencoded. |
6.103. cast(<object>, <string typename>) → <object> | Casts a value into another type. |
6.104. compare(<comparable a>, <comparable b>) → <number> | Returns a number < 0 if a < b, > 0 if a > b or 0 if a = b. |
6.105. concat(<string...>) → <string> | Concatenetes many strings together |
6.106. cron(<string>, [<string tz>]) → <period> | Constructs a period instance from a constant cron string |
6.107. exprange([<number start>], <number end>, <number step>) → <object> | Creates a seq that iterates through numbers exponentially |
6.108. happened([<number ref>], [<number tested>], <string>, [<string tz>]) → <number> | Returns true if the tested timestamp is inside the span, false otherwise. |
6.109. hll.merge(<string... data>) → <string> | Merge many instances of compressed base64 HyperLogLog data. |
6.110. itermap(<object>, [<string keyField>], [<string valueField>]) → <seq(row)> | Creates a seq that iterates through java.util.Map, seq or row entries |
6.111. max(<comparable>, <comparable>, <comparable...>) → <comparable> | Returns the greatest value of all supplied arguments. |
6.112. metadata() → <object> | Returns a representation of the previous pipe metadata. |
6.113. min(<comparable>, <comparable>, <comparable...>) → <comparable> | Returns the least value of all supplied arguments. |
6.114. minhash.eval(<string...>) → <number> | Evaluates the similarity between many MinHash encoded data. |
6.115. minhash.merge(<string... data>) → <string> | Merge many MinHash encoded data. |
6.116. mregression.apply(<seq(number) coefficients>, <number... xs>) → <number> | Applies regression coefficients to variables |
6.117. newmap(<object,object... pairs>) → <map> | Creates a instance of java.util.Map with the supplied keys and values. |
6.118. normrandom([<number mean>], [<number stdev>]) → <number> | Returns a random value from a normal (Gaussian) distribution. |
6.119. period(<number>, <string unit>, [<string tz>]) → <period> | Constructs a period instance from parameters |
6.120. pi() → <number> | Returns the constant value of pi. |
6.121. random([<number min>], [<number max>]) → <number> | Returns a random value between <min> and <max> (0 and 1 if not defined). |
6.122. range([<number start>], <number end>, [<number step>]) → <object> | Creates a seq that iterates through numbers |
6.123. span([<number ref>], <string>, [<string tz>]) → <row> | Calculates start and end timestamps of span based on target. |
6.124. spanend([<number ref>], <string>, [<string tz>]) → <number> | Calculates end timestamp of span based on target. |
6.125. spanstart([<number ref>], <string>, [<string tz>]) → <number> | Calculates start timestamp of span based on target. |
6.126. spantest([<number ref>], [<number tested>], <string>, [<string tz>]) → <number> | Returns -1 if the tested timestamp is before the span, 0 if it is inside and 1 if it is after. |
6.127. timestamp() → <number> | Returns the most appropriate timestamp, whether in scalar or aggregation contexts. |
6.128. uuid() → <string> | Returns a random UUID string. |
6.129. zip(<seq... args>) → <seq(row)> | Creates a seq such that each element is a row with the respective element from input seqs |
6.130. System functions | |
6.130.1. sys.cpu() → <row> | Returns the system's CPU usage info. |
6.130.2. sys.disks() → <seq(row)> | Returns info about all devices in the filesystem. |
6.130.3. sys.fds() → <row> | Returns info about process file descriptors. |
6.130.4. sys.heap() → <row> | Returns the system's current JVM heap info. |
6.130.5. sys.hostname() → <string> | Returns the system's hostname. |
6.130.6. sys.memory() → <row> | Returns the system's memory usage info. |
6.130.7. sys.properties() → <object> | Returns a list of defined system properties |
6.130.8. sys.threadlist() → <seq(row)> | Returns a list of threads in the JVM |
6.130.9. sys.threads() → <row> | Returns info about running threads in current process. |
6.130.10. sys.timestamp() → <number> | Returns the system's real timestamp (not the query timestamp). |
6.130.11. sys.uptime() → <number> | Returns the number of milliseconds since the start of the JVM |
6.130.12. sys.version() → <row> | Returns Pipes version info |
7. Aggregation Functions | |
7.1. <aggregation object expr>:if(<boolean condition>) → <object> | Aggregates only events that evaluates true to <condition>. |
7.2. <aggregation object expr>:overall() → <object> | Merges all the results from the target aggregation. |
7.3. <aggregation object expr>:overlast(<number window>) → <object> | Merges the results of the last <window> aggregations. |
7.4. <aggregation object expr>:prev([<number prev>]) → <object> | Delays and returns the previous <number>th result from target aggregation. |
7.5. all(<boolean>) → <boolean> | Returns true if all ocurrences evaluate true. |
7.6. amap(<row>, <aggregation object>) → <row> | Applies the same aggregation to all fields of a row. |
7.7. any(<boolean>) → <boolean> | Returns true if any ocurrence evaluates true. |
7.8. areduce(<seq>, <aggregation object>) → <object> | Applies all elements of a sequence to the same aggregation |
7.9. avg(<number>, [<number weight>]) → <number> | Calculates the (possibly weighted) average of some expression. |
7.10. count([<object>]) → <number> | Counts all events in a window (except those that evaluate to false or null). |
7.11. dcount(<object...>) → <number> | Estimates the field's cardinality (distinct count) using HyperLogLog. |
7.12. describe(<aggregation object expr>) → <string> | Yields a string json explaining the target aggregation's inner state representation. |
7.13. first(<object>) → <object> | Yields the ocurrence with least timestamp. |
7.14. firstn(<object>, <number n>) → <seq> | Yields the <n> values with least timestamp. |
7.15. firstv(<object>) → <object> | Yields the non-null ocurrence with least timestamp. |
7.16. fold(<object>, [<object initial>], <&fn>, [<&merger>]) → <object> | Reduces sequence of elements applying function successively |
7.17. foldv(<object>, [<object initial>], <&fn>, [<&merger>]) → <object> | Reduces sequence of elements applying function successively (ignores nulls) |
7.18. greatest(<object>, <sortfield...>) → <object> | Yields the greatest ocurrence in the window based on the sort fields. |
7.19. greatestv(<object>, <sortfield...>) → <object> | Yields the greatest non-null ocurrence in the window based on the sort fields. |
7.20. hll(<number log2m>, <object...>) → <number> | Estimates the field's cardinality (distinct count) using HyperLogLog. |
7.21. hll.init(<number log2m>, <object...>) → <string> | Similar to hll, but it doesn't evaluate final cardinality, just return the sketch data. |
7.22. hll.merge(<string>) → <string> | Performs union of many HyperLogLog encoded data in a window. |
7.23. hll.mergeeval(<string>) → <number> | Performs union of many HyperLogLog encoded data in a window and evaluates the result. |
7.24. intervals(<number begin>, <number end>) → <seq> | Creates a list of disjoint intervals |
7.25. last(<object>) → <object> | Yields the ocurrence with greatest timestamp. |
7.26. lastn(<object>, <number n>) → <seq> | Yields the <n> values with greatest timestamp. |
7.27. lastv(<object>) → <object> | Yields the non-null ocurrence with greatest timestamp. |
7.28. least(<object>, <sortfield...>) → <object> | Yields the least ocurrence in the window based on the sort fields. |
7.29. leastv(<object>, <sortfield...>) → <object> | Yields the least non-null ocurrence in the window based on the sort fields. |
7.30. list(<object>) → <seq> | Creates a java.util.List from all events in a window. |
7.31. map(<object key>, <aggregation object value>) → <map> | Creates a java.util.Map from all events in a window. |
7.32. max(<comparable>) → <comparable> | Yields the greatest ocurrence in the window. |
7.33. maxv(<comparable>) → <comparable> | Yields the greatest non-null ocurrence in the window. |
7.34. median(<number>) → <number> | Estimates the median value of the population using Count-Min Sketch. |
7.35. min(<comparable>) → <comparable> | Yields the least ocurrence in the window. |
7.36. minhash.init(<number size>, <object...>) → <string> | Returns MinHash set signature. |
7.37. minhash.merge(<string>) → <string> | Merge many MinHash encoded data in a window. |
7.38. minhash.mergeeval(<string...>) → <number> | Merge many MinHash encoded data in a window and evaluates the similarity between them. |
7.39. minv(<comparable>) → <comparable> | Yields the least non-null ocurrence in the window. |
7.40. mregression([<number... x>], <number y>) → <seq> | Computes multivariate regression for given variables. |
7.41. mregression.full([<number... x>], <number y>) → <row> | Computes multivariate regression statistics for given variables. |
7.42. pcount(<boolean>) → <number> | Aggregates the proportion of events that evaluate true to expression. |
7.43. pfold(<object>, [<object initial>], <&fn>) → <object> | Same as fold, but with state persisted |
7.44. quantile(<number>, <number q>) → <number> | Estimates the q (0..1) quantile of the population using Count-Min Sketch. |
7.45. regression([<number x>], <number y>) → <row> | Computes regression and correlation statistics for given variables. |
7.46. set(<object>) → <seq> | Creates a java.util.Set from all events in a window. |
7.47. similarity(<object...>) → <number> | Estimates the Jaccard similarity coefficient between many sets. |
7.48. smooth(<aggregation number expr>, [<number alpha>], [<number beta>]) → <number> | Smoothes the curve of another aggregation. |
7.49. statistics(<number>) → <row> | Aggregates sample statistics for some property. |
7.50. stdev(<number>, [<number weight>]) → <number> | Calculates the (possibly weighted) standard deviation of some expression. |
7.51. sum(<number>) → <number> | Sums all evaluations of some expression. |
7.52. summary(<string>, [<string separator>], [<string lastSeparator>]) → <string> | Join all the strings in a window. |
7.53. top(<number k>, <object>, [<sortfield...>]) → <seq> | Yields the k minimum occurrences of the target object. |
7.54. variance(<number>, [<number weight>]) → <number> | Calculates the (possibly weighted) variance of some expression. |
7.55. when(<boolean expr>) → <number> | Yields the latest timestamp inside window when some condition was true. |
7.56. whenfirst(<boolean expr>) → <number> | Yields the first timestamp inside window when some condition was true. |
7.57. Window Meta-aggregations | |
7.57.1. wcount() → <number> | Yields how many outputs are merged in the current window. |
7.57.2. wstart() → <number> | Returns the window's first allowed timestamp (or item index). |
7.57.3. wend() → <number> | Returns the window's last allowed timestamp (or item index). |
7.57.4. ostart() → <number> | Returns the output's first allowed timestamp (or item index). |
7.57.5. oend() → <number> | Returns the output's last allowed timestamp (or item index). |
7.57.6. otimestamp() → <number> | Returns the timestamp when the outputs were merged (equal to oend() on time pipes). |
8. Timespan Language | |
8.1. Span types | |
8.2. Period types |
This chapter is an introduction to the Pipes language through examples. Each section will contain links to other chapters that develop a little more on the subjects presented here.
The simplest Pipes query is a filter. The simplest filter is *
that matches all events.
*
Another simple filter selects all events where somefield
is equal to somevalue
:
somefield:somevalue
Use quotes ("
or '
) to filter by complex strings:
somefield:"a complex value"
There are other filter types available, for example:
somefield:aaa*zzz
Filters events where somefield
has a string that starts with aaa
and ends with zzz
.
somefield#:[42, 1000)
Filters events where somefield
, converted to number, has a value between 42 (inclusive) and 1000 (exclusive).
Filters are composable with the &
and |
operators:
(host:A & value:30) | (host:B & value:42)
The operator &
can be omitted and has greater precedence than |
:
host:A value:30 | host:B value:42
The operator -
has the effect of select the complement of a filter:
host:A -value:30
Select events where host
equals "A"
, but value
isn't 30
.
If you do not define the field name, the default field is selected. For example, if the
default field is __type
, you can write:
SomeType value:42
Select events from SomeType with value equal to 42.
You can write an entire sub-filter inside some field declaration:
SomeType value:(42 | (43 -othervalue:X))
Select events from SomeType if value is 42 or 43 (but the latter only if othervalue isn't equal to X).
To know more about...
\<filter>
.After the filter, you can chain one or more pipes. The simplest pipe just selects some fields from the event.
SomeType => field1, field2
Selects only field1
and field2
from events matching SomeType
By default, references to fields from filtered events are assumed to be strings.
If you want to declare a field reference as number, use the operator #
:
SomeType => field1#, field2#
Selects only field1
and field2
from events matching SomeType
You can aggregate events, just like you would do in SQL:
SomeType => count() every minute
Outputs every minute the number of events matching SomeType
in that minute.
You can use more than one aggregation and give a name to the output:
SomeType => count() as {Event count}, avg(value#) as mean every minute
Outputs every minute the number of events and the mean of the variable value for events
matching SomeType
in that minute.
You can also define a time window to aggregate over:
SomeType => count() over last 2 hours every minute
Outputs every minute the number of events matching SomeType
in the last 2 hours.
If you want to group the results by some other field, use:
SomeType => count() by host every minute
Outputs every minute, for every host, the number of events matching SomeType
To know more about...
<named...> [by <named...>] [over <window>] [every <period> | at the end]
.Each query in Pipes creates new events. Those events can be chained into another pipe(s) to be transformed:
SomeType => count() by host every minute => @top 5, count desc
Outputs every minute the top 5 hosts that received most events matching SomeType
.
You can use as many pipes as you want:
SomeType => count(), avg(response_time#) as time by host every minute => @filter time > 3 => @top 5, count desc
Outputs every minute hosts with average response time greater than 3 seconds, getting the top 5 by event count.
There are many different pipe types:
SomeType => count() every minute => @filter count > 1000 => @throttle 30 minutes
Outputs the number of events for every minute that has more than 1000 events, but only if it didn't already happen in the last 30 minutes.
To know more about...
Sometimes it is important to mix information from different streams into a single event type.
For example, assuming CPU and Memory metrics come in different events, but with the same
event type Metrics
, you can write a query with the join
pipe to mix them both:
Metrics => [ @filter \metric_name:CPU => avg(value#) as cpu every minute join @filter \metric_name:Memory => avg(value#) as mem every minute ]
Outputs the average cpu and memory every minute, in the same event.
You can also define a field to join the results on. For example, if you need to calculate the metrics in previous example by host, you can join on this field using:
Metrics => [ @filter \metric_name:CPU => avg(value#) as cpu by host every minute join on host @filter \metric_name:Memory => avg(value#) as mem by host every minute ]
Outputs the average cpu and memory for each host every minute, in the same event.
Another example: imagine you need to create a ranking with the top 10 hosts by
event count in the minute, but with the 11th row being the total count in all hosts,
you can use the union
pipe to do that:
SomeType => [ count() by host every minute => @top 10, count desc union count() by 'Total' every minute ]
Outputs 11 rows every minute: the top 10 hosts by event count and a total row.
One last example: if you need to filter hosts with average CPU in the last minute greater than the global average, you can write:
Metrics metric_name:CPU => [ avg(value#) as cpu by host every minute join avg(value#) as global_avg every minute ] => @filter cpu > global_avg
Outputs every minute only hosts with average CPU greater than the global average.
To know more about...
join
pipe, please see <pipe> join [on <fields...>] <pipe>
.union
pipe, please see <pipe> union <pipe>
.
There are event properties that represents lists and tuples. Pipes has two types to represent those
properties: seq
and row
.
A seq
represents a sequence of objects. The type of each object in the sequence
is denoted along with the type name. For example, a seq(number)
is a number sequence.
As example, the expression range(10)
is a seq(number)
.
A row
represents a named tuple, that is, a tuple where each position has a distinct
name and type. These metadata is also denoted along with the type name. For example,
row(number a, string b)
is a row with two properties, a
is a number and b
is a string. An example expression with that type is (42 as a, 'John' as b)
. Is is
important to note that every row
is also a seq(T)
, where T
is
the minimum common type to all fields.
Pipes has functions and operators to deal with sequences. The main one is <seq> |> <&fn> → <object>
.
It transforms sequences. For example, to get only the prices from a list of products inside an event,
one can do:
Sales => products:seq |> price#
Gets a list of products inside a Sales
event and picks only the price property
as a number.
If you use a aggregation in the right side of a |>
operator, it applies the aggregation to
all elements in the sequence and returns a single value. For example, to get the sum of all product prices.
Sales => products:seq |> sum(price#)
Gets a list of products inside a Sales
event and sums all the prices.
You can even use other pipes inside the |>
operator, for example:
Sales => products:seq |> @filter(name:startswith('M&M')) |> sum(price#)
Gets a list of products inside a Sales
event and sums all the prices for
products whose name start with "M&M"
.
To know more about...
|>
operator, please see <seq> |> <&fn> → <object>
.In Pipes, you can define macros to reduce code duplication. For example:
def last_gt(x, v): last(x#):if(x# > v); WebAccess => response_time:last_gt(2), response_time:last_gt(3) every hour
Gets the last response time greater than 2 and 3, respectively.
It is possible to define new user pipes with macros, for example:
def @last_gt(x, v every &out): @filter x# > v => last(x) every=out; WebAccess => [ @last_gt response_time, 2 every hour join @last_gt response_time, 3 every hour ]
Gets the last response time greater than 2 and 3, respectively.
There is one special kind of macro that can also be used inside
filters: constants. They always start with @@
. For example:
def @@states: ('NY', 'CA'); Orders state:@@states => sum(value#) every hour
Gets all orders from states NY or CA and yields their sum every hour.
To know more about...
Pipes is a language and an engine to perform distributed aggregations over real-time streams. It enables stream processing with low latency and minimum memory footprint (constant for most operations).
The language is desined to be as intuitive as possible, expressing more explicitly the data flow. The goal was to provide a language you can read from left to right (no visual backtracking) and still keep the declarative way to express the computations. The name Pipes is an allusion to the unix pipeline, which enables a powerful and elegant functional programming model on unix shells.
Pipes language operates over events. Events are (usually immutable) sets of (key, value) tuples and may have some time information assigned to it. In the Pipes event model, events have no intrinsic type. They're all part of a single public stream of tuples. When a type information is applicable, it is usually encoded as a property of each event (e.g. type or __type).
The engine makes no assumption on the nature, frequency or schema of the incoming events. Every operation is written keeping in mind that the event may not have the referenced field or the value may not be of same type always. It is safe to say events are pretty well representable by schemaless JSONs. As a Java library, Pipes' default configuration understands instances of java.util.Map container.
It is recommended that events have a property called 'timestamp', with a Java timestamp, that is the number of milliseconds since 1 January 1970 UTC. But even that is not enforced nor required.
Unlike a SQL database, stream processing allows very little preprocessing in the events (e.g. index creation), so the heavy optimization is done in the queries. This is why, most of the times, the queries will run in the same engine: this way, the engine can optimize and share the most processing between them.
The first part of a Pipes query always consist in selecting some events from that stream, using a
filter (see Chapter 3: Filters). Filters select values from the public stream. Each filter contains
basically predicates about some fields and boolean operations between them. This has a great potential for
optimization. For example, given the filters type:http status:404
, type:http status:200
and type:http status:(2?? | 3??)
, the engine builds automata like the ones in the picture bellow:
Filters automata
After filtering the public stream, you can chain the result through one or more pipes (see Chapter 4: Pipes).
Each pipe may transform, filter or aggregate the results from the previous one. In the language, this chaining
is represented by the operator =>
.
Pipes computation model
Each pipe in the chain can have one or more output rates. For example, a pipe can receive an input every time the filter matches some event in the public stream, but only output events in batches of one minute. There are five different types of output: every time period, every item, every batch, every condition, and at the end. For more information, see Section 2.3: Output rates and aggregation windows.
Inside each pipe, you can interact with events using expressions that access, transforms or aggregates its properties. For more information, see Section 2.5: Expressions.
Most pipes are implemented using parallel algorithms that allow seamless distribution over multiple physical machines. Usually, each pipe can be classified in three groups:
When writing a typical query, it's likely your pipes will distribute as follows:
Typical pipe in single machine
Typical pipe in multiple machines
Every pipe in the chain will have one or more output rates. In this section, we will focus on how to declare them in simple pipe, but the concept itself can be applied to any pipe type.
There are some types of output rates: every time period, every item, every batch, every condition, and at the end.
every <period>
: outputs every time the pipe's internal clock ticks a constant period. E.g. every 5 seconds
every <number> items
: outputs every time the specified amount of items is processed by the engine. E.g. every 5 items
every <number> batches
: outputs every time the specified amount item batches is processed by the engine. E.g. every 5 batches
every <boolean>
: outputs every time the specified condition becomes true. E.g. every (x>x:prev)
, every \SomeEventType
at the end
: outputs only at the end of execution. Useful for summary queries.The available period units are:
unit | also accepts | equivalent to |
---|---|---|
millisecond | milli, ms | |
second | sec | 1000 ms |
minute | min | 60000 ms |
hour | 360000 ms | |
day | ||
week | wk | |
month | mon | |
bimester | 2 months | |
quarter | 3 months | |
semester | 6 months | |
year | yr |
Keep in mind that in the case of simple pipe, there may be a difference between the declared rate and the effective rate.
In the query bellow, for example, the declared output rate in the last pipe is every item
, but effectivelly the query outputs every 10 seconds
.
* => count() every 10 seconds => count, prev(count) every item
Outputs every 10 seconds
This happens because when writing a query, you declare the output with pipe's local view. But the effective output rate considers the entire query, from the filter to the last pipe. In the example above, although the last pipe outputs every time it receives two items
Also in simple pipe, it is allowed to aggregate over a period other than the output. You can, for example, output a result every minute, that corresponds to the last 5 minutes of aggregation. It is an aggregation over time window.
The window concept only applies to the simple pipe.
In traditional stream processing libraries, every event is stored inside a data window and the aggregation runs over them. This allows a fine grained configuration of outputs vs windows. In Pipes, the goal is to use the least resources possible. So, instead of storing events inside the window, we only store the aggregation outputs (in a mergeable format, see Section 2.5.6: Aggregations state representation). The picture below exemplifies:
Windows merges aggregations outputs
This way, the memory footprint of a query is very low and predictable. Of course this approach introduces some limitations (e.g. the window size must be a multiple of the output rate) but the benefits pay it off.
There are four types of windows:
over last (<period>|<number> items|<number> batches)
: aggregates over last <period> / <output_rate>
outputs. E.g. over last 5 minutes
over current (<period>|<number> items|<number> batches)
: aggregates since the beginning of the current period. E.g. over current day
over span <string>
: aggregates over some timespan (cf. Chapter 8: Timespan Language). E.g. over span 'last 2 hours and 5 minutes'
over all
: aggregates over all events, merges all outputs.
It is worth noticing that item-based windows can only be used with item-based outputs, but time-based windows can be used with all but
at the end
outputs.
In fact, at the end
pipes support no window definition as there is only a single output at the end. Below, there is a
table with examples showing what is valid and what is not.
window/output examples | every 2 minutes | every 2 items | every 2 batches | every (count()==2) | at the end |
---|---|---|---|---|---|
over last 10 minutes | ✔ valid | ✔ valid | ✔ valid | ✔ valid | ✖ invalid |
over current 10 minutes | ✔ valid | ✔ valid | ✔ valid | ✔ valid | ✖ invalid |
over last 10 items | ✖ invalid | ✔ valid | ✖ invalid | ✔ valid | ✖ invalid |
over current 10 items | ✖ invalid | ✔ valid | ✖ invalid | ✔ valid | ✖ invalid |
over last 10 batches | ✖ invalid | ✖ invalid | ✔ valid | ✖ invalid | ✖ invalid |
over current 10 batches | ✖ invalid | ✖ invalid | ✔ valid | ✖ invalid | ✖ invalid |
over span 'last 10 minutes' | ✔ valid | ✔ valid | ✔ valid | ✔ valid | ✖ invalid |
over all | ✔ valid | ✔ valid | ✔ valid | ✔ valid | ✖ invalid |
Some pipes, including the simple pipe, allow grouping the events by one or
more properties during the processing. This can be done by appending by prop1, ..., propn
in the pipe declaration.
* => avg(memory#) by host every minute
Yields, every minute, the average memory consumption of every host with an event in the last minute.
* => @onchange state by host
Only outputs events when the value of the variable state
changes for that host.
Not all pipes support the grouping syntax, and each one defines the behavior of the grouping syntax, but most of the time, it will mean the engine will keep a separate state for each tuple on the grouping expression.
If the state for every tuple is kept forever, long running queries can leak memory, because once a group state is created, it will never be discarded. The engine tries its best to reclaim unused groups as soon as possible. For most simple pipes, the aggregation TTL information is used. That is, if the TTL for all agregations in a group state have expired, the group is automatically removed.
But for pipes with item outputs and some other pipes, it may be impossible to predict
when a group has to expire. That's what the expiry
keyword is for. It defines the maximum period
a group state can be kept before it is expired and purged from memory.
* => avg(memory#) by host expiry 10 minutes over last 32 items every item
* => @onchange state by host expiry 10 minutes
In both cases, if a host doesn't receive an event in 10 minutes, its state will be purged.
The expiry
keyword is available only since Pipes v0.15.
Expressions access and transform data from events. We will introduce them here, explaing Pipes' type system and how aggregations work. For detailed information on expressions, please refer Chapter 5: Operators, Chapter 6: Scalar Functions and Chapter 7: Aggregation Functions.
The Pipes language is statically typed, but has a very simple type system. There are four main types: number, string, boolean and object. All types inherit from object. There are a few other types, used in some specific functions. The table below summarizes all types:
type | java equivalent | example |
---|---|---|
number | java.lang.Double | 42 , 42.0 , 1e42 |
string | java.lang.String | "42" |
boolean | java.lang.Boolean | true , false |
object | java.lang.Object | null |
comparable | java.lang.Comparable | |
period | net.intelie.pipes.time.Period | 1 day , 42 minutes , 1 month {America/Sao_Paulo} |
seq | java.util.Iterable | (1, 2, 3, 4) , range(10) |
row | net.intelie.pipes.Row | (123 as abc, 456 as defg) |
map | java.util.Map | newmap('key1', 42, 'key2', 123) |
Pipes does not require events to be strongly typed (although it allows them to be). Because of this, the type of a property access may be not known at compile time. Still, the language is statically typed, so some type must be assigned to the expression. The chosen type depends on the configuration, but the default is to be a string. This means that even if the value of the field in the event contains a numeric type, it will be cast as string before being used by the engine.
Types are checked in compile time. For example, trying to use a function that requires a number passing a
string (like avg(someproperty)
) results in error:
PipeException: Error in call avg(someproperty) with types <PropertyAccess[string]>, cause(s): AvgAggregation: The parameter #1 <someproperty> must be 'number' instead of 'string'.
It is possible to tell the engine what type an expression must have, using the type's name as a function. E.g.
number(someproperty)
indicates that someproperty is a numeric value and will coerce
to a number in cases it isn't. When used with other expressions, those functions also convert values:
number("42")
will just return 42.
The string and number types also have shorthands for these functions:
number(someproperty)
is the same as someproperty#
string(someproperty)
is the same as someproperty$
Some types also have arguments. In special, seq
has a type argument that informs the
type of all elements in the seq. E.g., the literal (1, 2, 3, 4)
is a seq(number)
.
This is specially useful when using the pipe @for <named(seq)>, <named... additional>
:
* => @for (1,2,3,4) as x
x
in the query will be a number
in the next pipe
The row
type has an also an argument telling the field list of that row. E.g., in the query:
* => A, B# by C# => last(*) as result every minute
the field result
is a row(timestamp<number>;C<number>;A<string>,B<number>)
An instance of typed row
has three main uses:
@yield [<object expr>]
expand
keyword in a select or by
clause. (see simple pipe)<object>-><identifier> → <object>
operator
The types row
and seq
share an intimate relation, because every row
is also a seq
by polymorphism. The type argument of the underlying seq
depends on the types of the row. A
row(number, number)
is a seq(number)
, but a row(number, string)
is a seq(comparable)
.
Examples
* => id, name, value# => last(*) every minute => @yield
Selects fields and outputs the last every minute.
Please note that last(*)
outputs a single field of type row
with 3 fields in its metadata and
the pipe @yield
expands them into an event.
* => id, name, value# => expand last(*) every minute
Equivalent to above. But the pipe @yield
accepts any scalar, while the expand
keyword only works with
row
values with metadata.
* => expand name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as {extracted_*_name}
Yields an event with the fields extracted_first_name
and extracted_last_name
.
Most Pipes's literals are well known in many programming languages. In special, we can enumerate:
Numeric literals
Since Pipes has no integer data type, only number
, the only numeric literals are those in the
following examples: 42
, 42.1
, 1e42
.
Boolean literals
Only true
or false
.
Also, not exactly a literal, but you can write boolean filters with the filter syntax
using the operator \<filter>
.
Null literal
null
String literals
There are some flavors of string literals:
name | example | escaping? | interpolation? | useful for |
---|---|---|---|---|
Single quoted | 'abcde' | ✔ yes | ✖ no | ordinary strings |
Single quoted raw | r'\(some text\)' | ✖ no | ✖ no | regexes |
Double quoted | "abc${x#+y#}def" | ✔ yes | ✔ yes | interpolated strings |
Row and seq literals
You can write row
instances using the syntax as in the following examples: (1, 2, 3, 4)
,
(123 as propa, '456' as propb)
.
The row will be strongly typed. And because of the polymorphism, the row
will also be a strongly typed seq
using the most specific type that covers all the values. For example,
(1, '2', 3)
will be a row(_1<number>, _2<string>, _3<number>)
, but also a seq(comparable)
.
If you wish to define a row with only one element, you MUST use a comma
after the single element ((42,)
). If you don't, the compiler will treat the value as an expression, i.e. (42) != (42,)
.
Since Pipes 0.13, queries and functions can benefit from the concept of sequences and rows.
The type row
already existed before, but it has been completely overhauled in 0.13.
A sequence is implemented by the type seq
. As the name suggests, it represents a (potentially infinite)
sequence of objects. It always has a type. For example, the literal (1, 2, 3)
is a seq<number>
, and
(1, '2', 3)
is a seq<comparable>
(because both numbers and strings are a subtype of comparable).
A row is a subtype of sequence. In special, it is a finite sequence where each position has a specific
type and name. The literal from the last paragraph, (1, 2, 3)
, is actually a
row<number _1, number _2, number _3>
, that inherits from seq<number>
. The underlying seq
type of a row is always the minimum common type between all the elements.
Pipes provides many built-in functions and operators to deal with sequences and rows. Some examples:
operator |>
, aggregation amap()
,
aggregation areduce()
, function mapnames()
.
Every Pipes expression can be either a scalar or an aggregation. We call it a level.
Scalars are expressions that receive an event and computes its result immediately. E.g. to get the length of a string,
one can write len(someproperty)
. In this case, the whole expression is a scalar.
Aggregations are expressions that receive several events but only computes its result at the end of the window.
E.g. to get the average of a property during the window, one can write avg(someproperty#)
.
In this case, the whole expression is an aggregation.
When a scalar has only constant values (and operations over them) we can say it is a constant. E.g. the
literal value "some value"
is a constant. len("some value")
is also
a constant, and may be optimized in compile to time to the value 10.
Most functions just assume the same level as some (sometimes all) of its parameters. E.g. in the expression
len(first(someproperty))
the resulting expression is an aggregation, because
first(someproperty)
is also an aggregation. When a function always result in an
aggregation, we refer it just as aggregation.
Some functions (mostly aggregations) require parameters to have some specific level. E.g. in the aggregation
avg(<number>, [<number weight>]) → <number>
, the parameter must be a scalar.
For example, trying to compile avg(first(someproperty#))
result in an error:
PipeException: Error in call avg(first(someproperty#)) with types <FirstAggregation[number]>, cause(s): AvgAggregation: The parameter #1 <first(someproperty#)> must be a scalar instead of an aggregation.
For most purposes, every constant is also a scalar and every scalar may act like an aggregation, if required to.
Level set diagram
One of the main features in Pipes is the ability to run aggregations distributedly. To make this possible, most aggregations are written using parallel versions of the algorithms. Most of the solutions presented in this section only affect how semi-safe pipes (see Section 2.2: Chained computation model) represent their result over the wire. Additionally, this same mechanism is used by windows in simple pipe (see Section 2.3: Output rates and aggregation windows) and meta-aggregations (e.g. overlast, overall) to merge the results from many outputs.
Aggregation tree-merge-eval model
Lets take as example the average aggregation. Suppose there will be two machines calculating the
query type:http => avg(response_time#) every minute
in parallel. They share their results every minute. Suppose:
Using only this information, it is impossible to know the global average value. If we just take the mean value from both results (649.26) this value could be wrong, because the machine 1 could have received way more events than machine 2, so its result should weight more in the global result.
That's the reason why every aggregation has an internal state representation more complex than just its result. In the case of average aggregation its state representation has two fields: "mean" and "sumw".
Aggregations state representation
Some aggregations have a more complex inner state, like the distinct count aggregation, that uses the HyperLogLog data structure and must share the sketch content with the other nodes to merge its results. But even that state uses constant memory for its representation.
There is a meta-aggregation in pipes called describe that operates over other aggregations,
but instead of yielding the result at the end of the window, it show a json representation of the aggregation's inner
state. Using the same example as before, writing describe(avg(response_time#))
would return a value like:
{"sumw":262.0,"mean":416.628854962}
When using simple pipe with a non-empty group, the query tries to reduce memory consumption by removing any group that has not received any event in the last output period. Most aggregations do not hold any state other than the required in the last period, so this is fine. But some aggregations need additional state (e.g. overlast, prev) and may prevent this memory reclaim. There are even aggregations that need to be kept in memory forever to work properly (e.g. overall).
To solve this problem, every aggregation has a property called TTL (time to live). Which defines how long it should prevent row state from being deleted from a query if no events are received in that group. If there is no grouping, the property is ignored.
Most aggregations have TTL=1, but some aggregations with persistent state may have longer TTLs (even infinite).
There is also the function <object>:keep([<number ttl>]) → <object>
that allows to change the TTL of any expression to any desired value.
Property expressions are the way to access event's properties. In a glance, every identifier denotes a property access. Properties are scalar expressions, but they can also be used in aggregation pipes, yielding the last evaluated expression.
* => last(cpu#), len(host) by host
In the example above, len(host)
is a scalar, but acts as aggregations and returns the length of the
host name.
Sometimes, there are property names that collide with keywords in the language. Or cases where the property name doesn't match the traditional identifier syntax. In this case, you can use curly braces to define the property name, like in the example below:
* => last({CPU usage value}) as cpu, len(host) as {product} by host
The way properties are fetched is specific in each configuration, but by default, it reads values from
a java.util.Map instance, always as a string (casting to it when necessary). To access
it as a number, for example, use the cast operators, e.g. prop#
or prop:number()
.
Some configurations may allow indexing properties access. It may be done by using brackets. For example,
assuming the event has a property config
that is itself a java.util.Map:
* => config['hostname']
This indexing is identic in syntax to the indexing operator
(<object>[<object...>] → <object>
), but the semantics described here only apply to property access and is useful only
to apply indexing before the usual cast occurs. E.g. someprop
is a string, but someprop[0]
isn't
the first character of the string in someprop
. It has the indexing semantics defined in configuration.
Usually this is necessary for multivalued fields. Or fields you already know that contain a list instance.
The property expression can also be used to access values from previous pipes. In this case, the value will have the same type as defined in the referenced pipe. Example:
* => avg(cpu#) as avg, stdev(cpu#) as stdev every minute => avg-2*stdev as lower, avg+2*stdev as upper
There are also two special expressions that act like properties, but have special semantics: _
and *
.
The expression _
returns the value of the default property. In untyped pipes, this is the entire event.
In typed pipes, it is the first property of the previous pipe. I.e., in untyped pipes, _
is equivalent
to *
, and in typed pipes, it is equivalent to {0}
.
host1 | host2 => avg(cpu#) as avg by _ every minute => _*2 as doubled
host:host1 | host:host2 => avg(cpu#) as avg by * every minute => avg*2 as doubled
The expression *
returns the raw input event. Very useful when you don't want to repeat every field
present in the event (or when you don't even know them in advance).
host1 => last(*) every second => @yield
Yields only the last event every second. In the example above, the expression last(*)
has the type
object and is as weakly typed as the input stream.
host1 => cpu#, memory# => last(*) every second => last->cpu, last->memory
In the (contrived) example above, the expression last(*)
has the type row
and is strongly typed. That's why when we use the operator 'peek' on the property
last
, the resulting values are also strongly typed.
Pipes language allows previously-configured functions to be called inside queries. The default syntax
has nothing new. You can call a function by using parentheses after the function name. E.g.
fn(param-1, param-2, ..., param-n)
. You can check the functions available by default at
Chapter 6: Scalar Functions and Chapter 7: Aggregation Functions. Please note that it is possible to add
other functions by configuration, so it isn't a complete list.
Other than the default syntax, there are other ways to call a function. For example, the syntax
a:fn
is equivalent to fn(a)
and a:fn(b)
is equivalent to fn(a, b)
.
You can find a table bellow with all the alternative syntaxes for method call.
syntax | equivalent to |
---|---|
a:fn |
fn(a) |
a:fn(b, c) |
fn(a, b, c) |
a#fn |
fn(a#) |
a#fn(b, c) |
fn(a#, b, c) |
a$fn |
fn(a$) |
a$fn(b, c) |
fn(a$, b, c) |
:fn |
fn(_) |
:fn(b, c) |
fn(_, b, c) |
#fn |
fn(_#) |
#fn(b, c) |
fn(_#, b, c) |
$fn |
fn(_$) |
$fn(b, c) |
fn(_$, b, c) |
Since Pipes 0.14, uses can write their own functions to use on queries. The functions can be declared even inside a query (before the pipes) to reduce expression duplication. A simple function can be defined as:
def f(a, b) "sums two numbers": a+b; => f(2, 2), f(2, 3) at the end
Declares and uses the function f(a, b)
, that sums both arguments.
Please notice that user functions must have a semicolon at the end of its declaration. Also, the string after the function header and before the colon is used to create its documentation entry.
Despite of the name "user functions", they're actually macros. In practice, the function body is expanded with the parameters in compile time. This makes impossible to use recursion, for example.
You can also define user pipes with the same concept:
def @filtereven(a): @filter a%2==0; * => @filtereven x#
Declares and uses the pipe @filtereven x
, that selects only the events where the argument is even.
And, similarly, to declare constant macros:
def @@answer: 42; => @@answer at the end;
User functions can have optional arguments with default values, for example:
def f(a, b=2): a+b; => f(2), f(2, 3) at the end
Declares and uses the function f(a, b)
, where the argument b
is optional.
The default expansion mode for the arguments is "at declaration", but sometimes this may cause problems
with functions that receive an expression (like the amap()
aggregation), so
in this cases, it may be useful to pass an argument to be lazily expanded, like this:
def aggregate_both(&fn, a, b): (a,b):amap(fn); * => expand aggregate_both(:last, response_time, host) at the end
Declares and uses the function aggregate_both(a, b, fn)
, that expands to an aggregation row with two
fields, one for each previous argument.
To create a function that receives an arbitrary number of arguments, add an asterisk before the last argument. The argument will be passed as a row, with one field for each additional argument.
def aggregate_all(&fn, *fields): fields:amap(fn); * => expand aggregate_all(:last, response_time, host) at the end
Declares and uses the function aggregate_both(a, b, fn)
, that expands to an aggregation row with as
fields as passed to the function.
Since Pipes v0.19, you can use the def*
syntax to define functions that will be applied successively
to all arguments passed and return a tuple of results. This is specially useful for metaprogramming
techniques.
def* f(name): count(name:property) as (name); * => explode f('response_time', 'product_id') every hour
Compiles the pipe * => count(response_time) as response_time, count(product_id) as product_id every 1 hour
.
Every query in Pipes is required to start with a filter. The filter runtime is optimized to allow many queries running over the same stream, sharing as much processing as possible.
The filter syntax is different from the rest of the syntax. It is inspired in the Lucene's query syntax.
Examples:
type:http status:404
Filters all records where the field "type" has a value "http" and "status" has a value "404"
Special filter that allows all records through.
When used in conjunction with field syntax, selects all records that contain that field.
Examples:
*
All records (no filter)
type:*
Filters records that contains any value in the field "type".
Selects records where one of the current fields matches <term>.
The match is case insensitive.
The use of wildcards (* and ?) is allowed.
Use the field syntax to change the default field.
Examples:
http
If the default field is "somefield", selects records where "somefield" has a value "http" (case insensitive)
otherfield:http
Selects records where "otherfield" has a value "http" (case insensitive)
otherfield:http*404
Selects records where "otherfield" has a value that starts with "http" and ends with "404" (case insensitive)
Selects records where one of the current fields matches <term> with at most <maxEdits> edits.
The match is case insensitive.
Wildcards are not allowed in this filter.
Examples:
http~2
If the default field is "somefield", selects records where "somefield" has a value similar to "http" (e.g. "hxxp").
Selects records where one of the current fields is between <lower> and <upper>.
The match is case insensitive.
The range filter is inclusive in both ends. To make it exclusive, change "[" to "(" and/or "]" to ")".
The TO
keyword must be written in uppercase. It can also be replaced by a single
comma with no change in meaning.
Wildcards are not allowed in this filter. Except for the single *
.
In this filter it means an unbounded end.
There are shorthand syntaxes for some versions of this query:
[lower TO *]
can be writen as >= lower
(lower TO *]
can be writen as > lower
[* TO upper]
can be writen as <= upper
[* TO upper)
can be writen as < upper
Examples:
status:[200 TO 299]
Selects records where status is between 200 and 299
status:[200, 300)
Selects records where status is between 200 and 299 (300 exclusive)
status:[200, *]
Selects records where status is greater than or equal to 200.
status:>=200
Same as the filter above.
Sets the current field to <field>. Usually followed by <term> (e.g. somefield:someterm).
Any filter expression can be used inside a filter field. Even other filter fields.
Examples:
type:http
Selects records where the field "type" is equal to "http".
type:(http | cpu)
Selects records where the field "type" is equal to "http" or "cpu".
Selects the intersection of two other filters.
It's the default filter conjunction, so it can safely be omitted.
The operator &
is equivalent to &&
and AND
(must be uppercase).
Examples:
http verb:get status:404
Selects records with the default field equal to "http", "verb" equal to "get" and "status" equal to "404"
http & verb:get & status:404
Same as above, but with explicit &
tag:(important & resolved)
Selects records with the field "tag" containing both "important" and "resolved" values
Selects the union of two other filters.
The operator |
is equivalent to ||
and OR
(must be uppercase).
Examples:
http | verb:get | status:404
Selects records with the default field equal to "http", "verb" equal to "get" or "status" equal to "404"
tag:(important | resolved)
Selects records with the field "tag" containing either "important" or "resolved" values
Selects the complement of another filter.
The operator -
is equivalent to !
and NOT
(must be uppercase).
Examples:
-verb:get
Selects records with the field "verb" different of "get"
tag:(a* -abnormal)
Selects records with the field "tag" starting with "a", but not equal to "abnormal"
Pipes are the main building blocks in this language. No surprise the language is named after them. Pipes represent one single processing element that consumes events from the input and produces other events in the output.
The first pipe reads from the filtered public stream. Then, the output from one pipe can be chained as the
input to another using the operator =>
.
Each kind of pipe has its own properties, like rate of output and data window type and size. Some pipes can run on a distributed environment, some cannot. Some pipes may consume very low memory, some may need a lot of memory to run. All of these properties are derived and checked in compile time.
Transforms or aggregates records over configurable data window and output.
This is the default type of pipe. It receives events, aggregates them and output
the result based on what is configured on the every
clause. By default, the aggregation
resets its state on every output, but the event retention can be configured on the over
clause. The acceptable values for both clauses are discussed at Section 2.3: Output rates and aggregation windows.
The first part of the pipe defines which aggregations will be made. It accepts all kinds
of expressions and treats them all as aggregations.
All expressions can act as aggregations.
The expression can receive an automatic name based on its contents or you can defined
custom one using the keyword as
. Example:
* => sum(x#)/count()**2 as value
Outputs the sum of the variable x
divided by the event count squared every second.
If the expression is of type row
and has metadata associated to it, you can also use the keyword
expand
to expand its values. Fore more info, see Section 2.5.2: Type arguments. It's worth
noting that expand
keyword skips any timestamp fields the row might have.
The by
clause allows one to group the results by one or more scalar values.
Aggregations are not allowed in the by
clause, only scalar values. When grouping,
the group only outputs if at least one of its aggregations has still TTL, this usually means
that it only outputs if the group received some event in the last period, but there
are exceptions. For more info, see Section 2.5.7: Aggregation TTL.
All clauses are optional, so the simplest possible pipe is an empty string, which
is equivalent to count() every 1 second
. Examples:
* =>
Outputs how many events match the query type:http
every second.
* => by host
Outputs how many events match the query type:http
every second, aggregating by host.
This pipe can be classified as safe, semi-safe or unsafe, depending on which expressions, period and output are chosen. For more information on pipes safety, see Section 2.2: Chained computation model.
The simple pipe is safe if and only if all the expressions are
at least scalars, it has no by
nor over
clauses and the every
clause
is ommitted or exactly every item
. This means in practice the pipe is safe if
it is only a stateless transformation item-by-item of the input stream. Example:
* => id, name, value# * 3.14
The simple pipe is unsafe if and only if the output type is
every <n> items
or it has some aggregation that forces the pipe to be unsafe
(e.g. smooth). This means in practice the pipe is unsafe if it depends
on some total order in the input, achieved running in a single node. This is true for
item batch outputs and some aggregations. Please notice that unsafe pipes with no
safe or semi-safe pipes before must be preceeded with @unsafe
. Example:
* => @unsafe => avg(value#) every 10 items
* => @unsafe => smooth(value#) every 10 seconds
The simple pipe is semi-safe in all other cases, i.e., when the output is by time period and it has no aggregation that makes it unsafe.
* => avg(value#) every 10 seconds
Computes the cartesian product of the outputs from two pipes with same output rates.
The outputs from two pipes are compatible if they are same type. For example every 5 seconds
is compatible with every 7 seconds
, but not with every 5 items
. In this case, the
resulting pipe would assume both outputs as its own output definition.
The resulting pipe will have all the fields from both queries (some fields may be renamed to avoid collision).
When defining fields in the on
clause, the fields must be present in both pipes and
must have the same type. A compilation error will occur if these restrictions are not met.
Also, fields in the on
clause aren't renamed, so they'll happen only once in the
resulting event.
Examples:
* => [ sum(value#) by host join sum(value#) as total ]
Computes the sum grouped by host with a field containing the total sum in each group.
The example above would generate events like:
{"timestamp":1399927869000, "host":"host1","sum_value":42.0, "total_sum":129.0} {"timestamp":1399927869000, "host":"host2","sum_value":43.0, "total_sum":129.0} {"timestamp":1399927869000, "host":"host3","sum_value":44.0, "total_sum":129.0}
You can also define a field to join the results on. For example, if you need to calculate the metrics in previous example by host, you can join on this field using:
Metrics => [ @filter \metric_name:CPU => avg(value#) as cpu by host every minute join on host @filter \metric_name:Memory => avg(value#) as mem by host every minute ]
Outputs the average cpu and memory for each host every minute, in the same event.
The example above would generate events like:
{"timestamp":1399927869000, "host":"host1","cpu":89.0, "mem":1314.0} {"timestamp":1399927869000, "host":"host2","cpu":21.0, "mem":715.55} {"timestamp":1399927869000, "host":"host3","cpu":40.0, "mem":16111.7}
Use ljoin
, rjoin
and lrjoin
for outer joins.
Metrics => [ @filter \metric_name:CPU => avg(value#) as cpu by host every minute lrjoin on host @filter \metric_name:Memory => avg(value#) as mem by host every minute ]
Outputs the average cpu and memory for each host every minute, in the same event.
The example above would generate events like:
{"timestamp":1399927869000, "host":"host1","mem":1314.0} {"timestamp":1399927869000, "host":"host2","cpu":21.0, "mem":715.55} {"timestamp":1399927869000, "host":"host3","cpu":40.0}
Concatenates the outputs from two pipes with compatible output rates.
The outputs from two pipes are compatible if they are same type. For example every 5 seconds
is compatible with every 7 seconds
, but not with every 5 items
. In this case, the
resulting pipe would assume both outputs as its own output definition.
If both pipes' type definitions are also compatible, the resulting pipe will assume the first pipe's types and field names. The field names must not be equal between the pipes, only their types and order.
Example:
* => [ sum(value#) by host union 'Total', sum(value#) ]
Computes the union of the sum grouped by host with the total sum. Note that the fields in the by
clause come first in the final order, that's why 'Total'
comes first in the second pipe.
The example above would generate events like:
{"timestamp":1399927869000, "host":"host1","sum_value":42.0} {"timestamp":1399927869000, "host":"host2","sum_value":43.0} {"timestamp":1399927869000, "host":"host3","sum_value":44.0} {"timestamp":1399927869000, "host":"Total","sum_value":129.0}
Chains many sequences in a batch into a single sequence
If the expression is not defined, it is assumed to be _
.
Examples:
* => @chain (x#, y#)
Yields a seq(number)
for each batch, with 2 numbers for each event in the input.
* => (x1:seq, x2:seq) |> @chain
Concatenates the sequences x1
and x2
for each event.
Compiles a constant string into a pipe
Available since Pipes v0.14.7
The arguments are available as template macros starting from @@0
.
Example:
* => @compile '@zip (x#, y#)'
Compiles the string into the pipe @zip (x#, y#)
.
Alias to either @compress.pip
(default) or @compress.uniform
.
Compresses the result from the previous pipe to at most k most important rows.
Compress pipe keeps most of the previous pipe metadata, except for the output. It
outputs only at the end
. It is also an unsafe pipe (see
Section 2.2: Chained computation model).
The pipe can optionally be grouped. In this case, it may be useful to define a global maximum to avoid that a group explosion causes the number of points to grow too much.
Examples:
* => count() every minute => @compress 300, _
Computes the event count every minute, but compress it to the 300 most important points at end of the query.
* => count() by host every minute => @compress 300, 1000, _ by host
Computes the event count by host every minute, but compress it to each host's 300 most important points at end of the query. Also limits to output at most 1000 points globally (even if there are 4 hosts or more).
Compresses the result from the previous pipe to at most k rows.
Compress pipe keeps most of the previous pipe metadata, except for the output. It
outputs only at the end
. It is also an unsafe pipe (see
Section 2.2: Chained computation model).
The pipe can optionally be grouped. In this case, it may be useful to define a global maximum to avoid that a group explosion causes the number of points to grow too much.
Examples:
* => count() every minute => @compress 300, _
Computes the event count every minute, but compress it to 300 points at end of the query.
* => count() by host every minute => @compress 300, 1000, _ by host
Computes the event count by host every minute, but compress it to each host's 300 points at end of the query. Also limits to output at most 1000 points globally (even if there are 4 hosts or more).
Only outputs rows that follows <period> time without any output.
The exceeding rows are discarded. Due to its nature, this is an semi-safe pipe (see Section 2.2: Chained computation model). When running in distributed environment, this pipe is used as-is both in the map and reduce pipes.
Debouncing: the red events are discarded.
Examples:
* => count() every minute => @filter _ > 100 => @debounce 15 minutes
Alert when the number of events in a minute is greater than 100, but only outputs if there was a silence of at least 15 minutes.
* => count() by host every minute => @filter _ > 100 => @debounce 15 minutes by host
Same as previous, but grouping by host.
Creates an empty pipe (pass-through) with optional window metadata
Available since Pipes v0.19.3
It is useful to include additional windows to query metadata.
Filters the results from previous pipe.
Filter pipe does not change any pipe metadata, it just repeats previous pipe info.
Since Pipes v0.14.8, this pipe has a version that also accepts simple aggregations.
Examples:
* => count() by host => @filter _ > 10
Filter only hosts that have event count greater than 10.
Unwinds events in the seq
instance
Available since Pipes v0.13
If the seq
has a type argument, the resulting field has the same type of that argument.
It is possible to use the keyword expand
if the inner type is a strongly typed row.
Examples:
* => @for products:seq as product
Creates one event for each element inside the field products
.
* => @for range(10) as i
Creates 10 events with timestamp<number>
and i<number>
.
* => @for expand range(10) |> (_+1 as a, _$ as b)
Creates 10 events with timestamp<number>
, a<number>
and b<string>
.
=> 100 as upto at the end => @for range(2, upto) as prime => @for range(1, floor(prime**0.5)+1) as j, prime => count(prime%j==0) by prime => @filter _==1 => summary(prime$) as primes
Computes prime numbers up to 100.
Keeps the latest batch of input events and output it at the end.
This pipe is a convenience to output only the latest batch of events with the output
at the end
. This concept of item batch doesn't exist in the syntax, so it would
be harder to write queries that are only interested in the latest value for each group,
for example. They are especially useful when replaying events in the past for a graph
that only shows the latest value.
@latest
is an unsafe pipe (see Section 2.2: Chained computation model).
Examples:
* => count() by host => @latest
Outputs only the latest batch of counts by host at the end of the execution.
Creates a join pipe containing all pipes passed as arguments
Available since Pipes v0.19
Example:
def* @example(&fn): @filter product_id != null => fn by product_id over last hour every 2 hours; WebAccess => @makejoin 'product_id', explode @example(count(), avg(response_time#))
Compiles a join of two pipes.
Creates a left join pipe containing all pipes passed as arguments
Available since Pipes v0.19
Creates a full join pipe containing all pipes passed as arguments
Available since Pipes v0.19
Creates a right join pipe containing all pipes passed as arguments
Available since Pipes v0.19
Creates a union pipe containing all pipes passed as arguments
Available since Pipes v0.19
Example:
def* @example(prop): avg(prop#) as value by nameof(prop) as property over last hour every 2 hours; WebAccess => @makeunion explode @example(response_time, error_count)
Compiles a union of two pipes.
Adds custom fields to query metadata
Available since Pipes v0.20
All values must be constant.
Outputs only events that change the last value of any expression in the group
Please note that the first event is always outputed, even if the property value is null.
Examples:
* => @unsafe => @onchange valid by host
Outputs only events whose value in the property valid
is different from the last event.
Outputs only events for which condition evaluates false and was not false in the previous event
Please note that the first event is always outputed.
Examples:
* => @unsafe => @onfalse valid by host
Outputs only events whose value in the property valid
is false
and different from the last event.
Outputs only events for which condition evaluates true and was not true in the previous event
Please note that the first event is always outputed.
Examples:
* => @unsafe => @ontrue valid by host
Outputs only events whose value in the property valid
is true
and different from the last event.
Performs resampling of the incoming stream to match the specified period
This pipe resamples input list of fields using a interpolation function for each field. Currently, the available functions are: 'LAST' and 'LINEAR'.
'LAST' just repeats the last non-expired value before each sampling point.
'LINEAR' performs a linear interpolation between two adjacent points of the sampling period. It may require waiting for the next point to arrive. That makes this function unsuited for real-time queries. It can only work correctly when the pipe clock is advanced solely by its events.
Examples:
Metric => @resample 30 minutes, value#, 1 hour, 'LINEAR' by host, type expiry null
Resamples metrics to be outputed every 30 minutes, performing a linear interpolation when there is no exact point.
Changes the static type of any row to the highest corresponding seq.
Available since Pipes v0.13
Examples:
* => x, y# => @seq
Changes the type from row(timestamp<number>;;x<string>,y<number>)
to seq(comparable)
.
Sets fields in the original event (both typed and raw)
Since Pipes v0.14.8, this pipe has a version that also accepts simple aggregations.
Examples:
* => @set (date+time):dateparse('yyyyMMddHHmmss') as timestamp
Adds a new field to the original event that is the timestamp parsed using
both date
and time
fields from the event.
* => date, time, text => @set (date+time):dateparse('yyyyMMddHHmmss') as timestamp
Adds a new field to the original event that is the timestamp parsed using
both date
and time
fields from the event. Equivalent to expand *, (date+time):dateparse('yyyyMMddHHmmss') as timestamp
as the previous pipe is typed.
Skips some results from previous pipe.
Skip pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
Examples:
* => count() by url, host => @sort _ desc by host => @skip 3 by host
Gets most accessed urls by host, except the 3 most accessed in each host.
Skips rows from every batch while some condition is true.
Available since Pipes v0.13
This pipe also accepts aggregation as condition. In this case, it
will behave like every row is a batch and with a over all
window.
Examples:
* => count() by url => @sort _ desc => @skipwhile :sum < 1000
Gets the most accessed urls, skipping until the access count sum is greater than or equal to 1000.
Gets some rows (based on index) from previous pipe.
Available since Pipes v0.13
Slice pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
Examples:
* => count() by url, host => @sort _ desc by host => @slice 2, 10, 3 by host
Gets the 2nd, 5th and 8th most accessed urls by host
Sorts the results from previous pipe.
Sort pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
The expressions can be any comparable
, optionally followed by asc
or desc
to make the sort order explicit.
Examples:
* => count() by host => @sort _ desc, host
Sorts the hosts by event count descending, and then by host (when the counts are equal).
Limits the results from previous pipe.
Take pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
Examples:
* => count() by url, host => @sort _ desc by host => @skip 3 by host => @take 2 by host
Gets the 4th and 5th most accessed urls by host
Takes rows from every batch while some condition is true.
Available since Pipes v0.13
This pipe also accepts aggregation as condition. In this case, it
will behave like every row is a batch and with a over all
window.
Examples:
* => count() by url => @sort _ desc => @takewhile :sum < 1000
Gets the most accessed urls, while the access count sum is lower than 1000.
Limits the output of the previous pipe to at most <k> rows per <period>.
The exceeding rows are discarded. This is an unsafe pipe (see Section 2.2: Chained computation model).
Throttling: the red events are discarded.
When a <seq>
instance is passed, the throttle period will increase as
the backpressure grows. This behavior exists since Pipes v0.14.7.
Examples:
* => count() every minute => @filter _ > 100 => @throttle 15 minutes
Alert when the number of events in a minute is greater than 100, but throttles the output to once in every 15 minutes.
* => count() by host every minute => @filter _ > 100 => @throttle 2, 15 minutes by host
Same as previous, but allows at most two events by host every 15 minutes.
Sorts the results and gets the first k rows (possibly grouped) from previous pipe.
Top pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
The expressions can be any comparable
, optionally followed by asc
or desc
to make the sort order explicit.
Examples:
* => count() by host => @top 10, _ desc
Outputs the 10 hosts with the most events.
* => count() by url, host => @top 10, _ desc by host
Outputs, for every host, the 10 urls with the most events.
Extracts a batch of events in many batches, one for each event.
Available since Pipes v0.13
Specially useful if used in conjunction with batch output type.
Marks that any pipe executed after this must run in a non-distributed environment.
The engine forces that each unsafe pipe must be preceeded with a safe or semi-safe pipe. It is recommended to avoid excessive communication between nodes. When you really must write a query that breaks this rule, the unsafe pipe must be preceeded with this pipe, that is in fact an empty pipe that forces the pipe split, making everything after it to run in the reduce node. For more information, see Section 2.2: Chained computation model.
Examples:
* => @unsafe => count() over all every item
Outputs the total count of events since the beginning of the query every time an item arrives.
Extracts one field of the stream to be the output event.
If the expression is not defined, it is assumed to be _
.
If the expression type is row
and it has metadata, the pipe assumes the type's metadata.
See concepts-expression-typeargs
for more information.
If the expression is an aggregation, the pipe will yield the all the input rows applied to the aggregation argument.
Examples:
* => name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as name => @yield name
Yields an event with the fields first
and last
.
* => name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as name => @yield list(name)
Yields a single event for every batch with a list of names in that batch.
Takes elements from a batch of seqs, one by one and make a seq of seqs.
Available since Pipes v0.13
If the expression is not defined, it is assumed to be _
.
Examples:
* => @zip (x#, y#)
Yields 2 sequences every batch, one with all the x#
in the batch, other with all the y#
.
* => (x1:seq, x2:seq) |> @zip
Generates a sequence like ((x1[0], x2[0]), (x1[1], x2[1]), ... )
.
Modify a pipe to run entirely in the same node, ignoring any distribution invariants.
This modifier is useful when the data is known to be already partitioned. For example, if you are sure that each host will be consistently sent to the same node, you can avoid distribution costs by writing aggregations using this modifier.
Compare the following picture with the one described in Section 2.2: Chained computation model.
Typical atomic pipe in single machine
Typical atomic pipe in multiple machines
Examples:
* => atomic count() by host every minute
Outputs the event count by host every minute. Each output will be processed in the node the event arrived, never being merged between nodes (because it assumes that the data is already partitioned by host)
Operators are elements of syntax that allows simple manipulation of scalar and aggregation values.
Every operator translates directly to a function call. E.g. the expression 2+2==4
is equivalent to .eq(.add(2, 2), 4)
.
You can find bellow a table with the operator group precedence, from highest to lowest. Operators inside same group have the same precedence.
Only the ternary operator is right-associative. All other operators are left-associative.
You can change both procedence and associativity by using parentheses. E.g. a+b*c
is equivalent to a+(b*c)
.
Returns a boolean expression that evaluates true if the filter accepts the event.
The filter syntax is the same described in Chapter 3: Filters.
Everything after the \
is considered part of the filter, until it finds a ]
,
)
, ,
or =>
.
Sometimes it is necessary to enclose the expression in parentheses to avoid ambiguities.
E.g. \field:value as something
should be (\field:value) as something
.
This is so because as
is a valid expression in filter language and means something
different there.
Examples:
CPU | Memory => [ @filter \CPU => max(value#) as cpu every minute join @filter \Memory => max(value#) as mem every minute ]
Yields the maximum value every minute for both streams on the same event.
CPU => max(value#):if(\host:A) as cpuA, max(value#):if(\host:B) as cpuB every minute
Yields the maximum CPU value every minute for both hosts on the same event.
Coerces the expression to number. Shorthand to <object>:number() → <number>
.
The expression a#
is equivalent to number(a)
.
Example:
* => avg(response_time#) every minute
Yields the average of the field response_time every minute.
Coerces the expression to string. Shorthand to <object>:string() → <string>
.
The expression a$
is equivalent to string(a)
.
Example:
* => 'Count: ' + count()$ every minute
Yields the number of events every minute as a string 'Count: X'.
Adds two numbers.
The expression a+b
is equivalent to .add(a, b)
.
Concatenates two strings.
The expression a+b
is equivalent to .add(a, b)
.
Concatenates two seqs.
Available since Pipes v0.13
The expression a+b
is equivalent to .add(a, b)
.
Concatenates two maps.
Available since Pipes v0.17.1
The expression a+b
is equivalent to .add(a, b)
.
Subtracts one number from another.
The expression a-b
is equivalent to .sub(a, b)
.
Multiplies two numbers.
The expression a*b
is equivalent to .mul(a, b)
.
Divides one number by another (float division).
The expression a/b
is equivalent to .div(a, b)
.
Divides one number by another (float division). Legacy operator that returns 0.0 when the divisor is 0.0.
The expression a/?b
is equivalent to .divzero(a, b)
.
Divides one number by another (integer division).
The expression a//b
is equivalent to .intdiv(a, b)
.
Divides one number by another (integer division). Legacy operator that returns 0.0 when the divisor is 0.0.
The expression a//?b
is equivalent to .intdivzero(a, b)
.
Raises one number to another's power.
The expression a**b
is equivalent to .pow(a, b)
or even pow(a, b)
.
Returns the rest of the division of one number by another.
The expression a%b
is equivalent to .mod(a, b)
.
Returns the rest of the division of one number by another. Legacy operator that returns 0.0 when the divisor is 0.0.
The expression a%?b
is equivalent to .modzero(a, b)
.
Returns the logical AND of two booleans.
The expression a and b
is equivalent to a&b
, a&&b
and .and(a, b)
.
Returns the logical OR of two booleans.
The expression a or b
is equivalent to a|b
, a||b
and .or(a, b)
.
Returns the logical XOR of two booleans.
The expression a xor b
is equivalent to a^b
and .xor(a, b)
.
Returns the logical NOT of a boolean.
The expression not a
is equivalent to !a
and .not(a)
.
Checks whether two objects are equal.
The expression a==b
is equivalent to .eq(a, b)
.
Checks whether two objects are not equal.
The expression a!=b
is equivalent to .neq(a, b)
.
Checks whether the left operand compares lesser than the right one.
The expression a<b
is equivalent to .lt(a, b)
.
Checks whether the left operand compares lesser than or equal to the right one.
The expression a<=b
is equivalent to .lteq(a, b)
.
Checks whether the left operand compares greater than the right one.
The expression a>b
is equivalent to .gt(a, b)
.
Checks whether the left operand compares greater than or equal to the right one.
The expression a>=b
is equivalent to .gteq(a, b)
.
Extracts a property from an object.
The expression a->identifier[123, 456]
is equivalent to .property(a, 'identifier', 123, 456)
Example:
* => name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as match => '%s, %s':sprintf(match->last, match->first) as converted
Converts the name field to the format "Last, First".
* => last(*) as event every minute => event->name
Extracts name
from the untyped field event
.
Evaluates an expression in the semantic context of the target
The expression a->(b)
is almost equivalent to .peek(a, b)
, but
in the former case b
is evaluated in the semantic context of a
.
Example:
* => span('today')->(start*2, end, start+end)
Evaluates span('today')
and changes it into another row
object without having to go through another pipe.
Accesses an object by index/key in a list/string/map.
Available since Pipes v0.13
The expression a[b, c]
is equivalent to .get(a, b, c)
.
Please note that this operator has special semantics when used directly after a
property reference. So field[123]
may be different from (field+field2)[123]
.
Example:
* => (field1+field2)[4]
Yields the 5th letter (index 4) of a string made of field1+field2
.
Evaluates the expression in the parent semantic context (if any).
Available since Pipes v0.13
The expression ^a
is almost equivalent to .hat(a)
, but
in the former case, a
is evaluated in the parent semantic context.
This operator is useful in conjunction with other operators that generate child semantic contexts,
like <seq> |> <&fn> → <object>
and <object>-><identifier> → <object>
. In those operators, it may be necessary to
evaluate some expressions in the parent context.
Example:
=> at the end => @for range(10)|>(range(10)|>"${^_}x${_} = ${^_*_}")|>@chain as line
Yields a multiplication table with one event for each line.
Returns the first if it is not null; otherwise, returns the second.
The expression a??b
is equivalent to .coalesce(a, b)
.
Example:
* => first(name) ?? 'None' as first_name every second
Yields the first name in the window, or 'None' if no name was found.
If the condition is true, returns the first object; otherwise, returns the second.
The expression a?b,c
is equivalent to .iif(a, b, c)
.
Please note this is the only right-associative operator. It is so to allow constructions like:
AllMonitorings => (type == 'http' ? 'Http Monitoring', type == 'cpu' ? 'CPU Monitoring', type == 'mem' ? 'Memory Monitoring', 'Unknown') as monitoring_type
Example:
* => count()%2==0 ? 'Even', 'Odd' as type every second
Yields 'Even' or 'Odd' every second, depending on the number of events in the window.
Transforms a sequence into another sequence or object.
Available since Pipes v0.13
The expression a|>b
is almost equivalent to .transform(a, b)
, but
in the former case, b
is evaluated in the semantic context of a
.
E.g., in (1, 2, 3) |> _+1
, _
is each element of the sequence on the left.
If the expression on the right is an aggregation, the result is the application
of it into all elements of the sequence. E.g. range(10) |> sum(*)
is the
sum of all numbers between 0 and 9.
In the right operand, the expression _
has a special meaning. It always mean
the entire object (like the *
expression). This allows us to write the example above
as range(10) |> :sum
.
If the expression on the right is a scalar, the result is another seq
with the expression applied to all the elements in the original sequence.
E.g.: (1, 2, 3) |> $
casts all numbers in the sequence to string.
If the original sequence is an instance of row
, the result will
also be a row with the same name, but with types changed, e.g. (1 as a, 2 as b) |> $
is the same as ('1' as a, '2' as b)
.
This operator also allows a special construction: the right operand may be
a call to a pipe. In this case, all the elements in
the sequence will be passed through the pipe as a single batch. E.g.
range(20)|>@filter(_%2==0)
returns all the even numbers between 0 and 19.
Please note that the syntax requires parentheses, as it would be in a function call.
The pipe syntax doesn't. If no arguments are needed, the parentheses are
optional, e.g. (range(5), "abcde":split)|>@zip
Some pipes may use grouping, like @top
. In this case, you can
use it inside the parentheses, e.g. range(20)|>@top(2, _ desc by _%2)
.
The expression on the right is always evaluated on the context of the left operand. I.e.
it behaves like each element of the sequence is an event with the same type as the
sequence's type argument. You can use the operator ^<object> → <object>
to access
the parent semantic context, e.g. range(10)|>(range(10)|>^_*_)
returns
a multiplication table.
Example:
=> at the end => range(2, 100) |> @filter(range(2, floor(_**0.5)+1) |> not any(^_%_==0)) as primes
Computes all primes less than 100.
Chooses one of the expressions based on a constant condition in compile-time
Available since Pipes v0.17
Returns whether the target map contains the argument as a key.
Available since Pipes v0.17.1
Returns true if a and b are approximately equal. If no number is given, epsilon defaults to 1e-6.
Available since Pipes v0.18.5
Returns the two-argument arc tangent of the argument to an angle in radians.
Formats a number as the best possible byte multiple.
Returns the smallest number that is greatest than or equal to the argument.
Converts a unicode codepoint to a single-char string.
Available since Pipes v0.13
Adds <period> to timestamp argument.
Since Pipes v0.14.7, this function accepts non-constant periods.
Example:
* => timestamp():dateadd(1 day) every second
Adds one day to the current timestamp.
Rounds timestamp down to the nearest date that is divisible by <period>.
Since Pipes v0.14.7, this function accepts non-constant periods.
Example:
* => timestamp():datefloor(1 day) every second
Returns the beginning of the current day.
Formats timestamp using specified format
Example:
* => timestamp():dateformat("dd/MM/YYYY hh:mm", "UTC") as date every second
Formats the current timestamp assuming UTC time zone.
Sutracts <period> from timestamp argument.
Since Pipes v0.14.7, this function accepts non-constant periods.
Example:
* => timestamp():datesub(1 day) every second
Subtracts one day to the current timestamp.
Formats a number in unicode for display in small spaces
Available since Pipes v0.18.5
Returns the largest number that is lesser than or equal to the argument.
Formats a number according to format string and locale.
Examples:
* => 123.456:format(".00") every second
Format with 2 digits after decimal point.
=> 12345.678:format("#,##0.00", "pt-BR") as money every second
Format 12345.678 to '12.345,68' using Brazilian locale.
Formats a number representing a timespan in milliseconds
Available since Pipes v0.14
Calculates the gaussian distribution function value, optionally integrated.
Rounds a number to <precision> decimal places.
Selects the ith element from a list of arguments. Or null if it doesn't exist.
Example:
* => 1:select("a", "b", "c") as selected every second
Yields 'selected:b' every second. The list of arguments is zero-based.
Returns the correctly rounded positive square root of a double value.
Available since Pipes v0.18.8
Converts a number to a string representing it in some base
Available since Pipes v0.13.12
Makes any scalar act as a constant in compile time
Available since Pipes v0.14.7
Transforms the parameter using the translation rules defined in <pairs>.
Much like property[keys]. Works for strings, containers and arrays.
This function infers the correct return type in compile time, whenever possible.
Returns the first index of the value in <list>, or null if <list> does not contain it.
Returns true if <list> contains the value, false otherwise.
When used in a simple pipe, delays or disable (if ttl not supplied or < 0) inactive group removal.
Tries to get <target>'s size. Works for strings, containers and arrays.
Returns a suggested field name for some expression
Available since Pipes v0.19
Converts object to number.
Example:
* => avg(response_time:number()) every minute
Yields the average of the field response_time every minute.
Sets fields into the object in a type-safe way
Available since Pipes v0.21.1
Converts object to string.
Example:
* => 'Count: ' + count():string every minute
Yields the number of events every minute as a string 'Count: X'.
Returns the compile-time type of an expression.
Available since Pipes v0.13
This function changed name in Pipes v0.19: from typename
to typeof
.
Creates a sequence by applying the same function successively to a state.
The target object is the initial state value. The argument function must receive an instance of that state and return either a tuple containing the value to be generated with the next state, or null if no further values should be generated.
* => value#unfold(fun x: x<100 ? (x, x+1), null)
Unfolds value into the sequence (value..99).
Converts a row instance to a map, using the field names as keys
Available since Pipes v0.17.1
Returns whether the target seq contains the argument.
Available since Pipes v0.13.2
Returns a same-sized sequence with an incrementing number before each object.
Available since Pipes v0.14
The returned sequence has rows with the following fields:
field | type | description |
---|---|---|
index | number | The relative index of this object |
value | ? | The original object |
Creates a sequence with the same elements, except the ones that are also present in <b>
Available since Pipes v0.18.8
Returns the index of position of <s> inside the target string. Returns null otherwise.
Available since Pipes v0.19
Converts a constant sequence into a strongly-typed row in compile-time.
Available since Pipes v0.14
This function takes a constant sequence and gives a name to each element based on the element value.
For example, (1, 2, 3):mapnames("field${_}")
creates a row
with the fields, field1
,
field2
, and field3
, all numbers.
Returns the merge of a sequence and a value with a maximum size of <bound>, removing elements in a FIFO manner.
Available since Pipes v0.19
Returns the merge of both sequences with a maximum size of <bound>, removing elements in a FIFO manner.
Available since Pipes v0.19
Repeats the sequence a certain number of times.
Available since Pipes v0.13
Creates a sequence with the same elements, but only the ones that are also present in <b>
Available since Pipes v0.18.8
Returns a new sequence that skips the first <n> elements
Available since Pipes v0.14
Returns the sliced sequence starting at index <start>, ending at <end> getting every <step> elements.
Available since Pipes v0.14
Returns a new sequence that takes only the first <n> elements
Available since Pipes v0.14
Compiles a constant string into a pipes expression
Available since Pipes v0.14.7
The arguments are available as template macros starting from @@0
.
Compiles a constant string with each argument into many pipes expressions
Available since Pipes v0.15.3
The argument can be used in the expression as @@0
.
Returns whether the target string contains the argument.
Parses timestamp using specified format
Returns whether the target string ends with the argument.
Converts a string representing a number in some base to the number itself
Available since Pipes v0.13.12
Evaluates compressed base64 HyperLogLog data.
Useful to evaluate the final result from hll.init(<number log2m>, <object...>) → <string>
aggregations.
Example
This first query precomputes the distinct users every minute, but does not yield a result.
type:http => hll.init(userid) as value every minute
The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.
type:firstquery => hll.eval(hll.merge(value)) as distinct_users every 10 minutes
Returns the first index of position of <s> inside the target string. Returns null otherwise.
Parses JSON string into objects.
Available since Pipes v0.13
It parses arrays as java.util.List, objects as java.util.Map, numbers as java.lang.Double and strings as java.lang.String.
Gets the unicode codepoint from the first char in the string.
Available since Pipes v0.13
Parses a number according to format string and locale.
Compiles a string into the corresponding property access object
Available since Pipes v0.19
Returns a strongly typed row composed by all named groups in <regex>.
This function usage is as follows:
<string>:regex(?<name of the key>pattern)
and it searches for the pattern throughout the string passed, and it returns a row object with the captured text as the value for the key 'name of the key'
passed.
* => expand name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$')
Extracts the first and the last names from a single field.
* => name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as match => '%s, %s':sprintf(match->last, match->first) as converted
Converts the name field to the format "Last, First".
WebAccess => uid:regex('(?<first>[A-Za-z0-9]+)([A-Za-z0-9]*-)+(?<last>[A-Za-z0-9]+)') as match_object, uid
Explanation:
(?<first>[A-Za-z0-9]+)
:first
.([A-Za-z0-9]*-)+
:(?<last>[A-Za-z0-9]+)
:last
in the output event record.
WebAccess => uid:regex('(?<Regex Lookaround example>(?<=[A-Za-z0-9]+-)[A-Za-z0-9\\-]*(?=-[A-Za-z0-9]+))') as match_object, uid
Explanation:
Lookbehind: (? <= Pattern1) pattern2pattern2
pattern after matching the pattern1
.pattern2
pattern before checking the pattern3 pattern.[A-Za-z0-9]+
:+
symbol indicates that the single character pattern must occur at least 1 time.[A-Za-z0-9\\-]*
:-
to be matched and the asterisk symbol *
indicates that the pattern can occur 0 or more times.
(?<=[A-Za-z0-9]+-)[A-Za-z0-9\\-]*(?=-[A-Za-z0-9]+)
[A-Za-z0-9\\-]*
but that occur the pattern [A-Za-z0-9]+-
and before the pattern -[A-Za-z0-9]+.
.
Returns a sequence of rows composed by all matches of all named groups in <regex>.
Available since Pipes v0.13
This method is similar to <string>:regex(<string regex>) → <row>
, but returns all matches, not only the first.
WebAccess => @set uid + ' ' + uid as uid => uid:regexall('(?<first>[A-Za-z0-9]+)([A-Za-z0-9]*-)+(?<last>[A-Za-z0-9]+)') as all_matches, uid
Explanation:
(?<first>[A-Za-z0-9]+)
:([A-Za-z0-9]*-)+
:-
. Since there is no named group specified, the pattern is matched but not assigned to the output.(?<last>[A-Za-z0-9]+)
:WebAccess => uid:regexall('(?<Regex Lookaround example>(?<=[A-Za-z0-9]+-)[A-Za-z0-9]*(?=-[A-Za-z0-9]+))') as regex_row, uid
Returns the matched string by <regex> in target (or one specific group).
WebAccess => uid:regexfind('(^[A-Za-z0-9]*|[A-Za-z0-9]*$)') as match_text, uid
Explanation:
(pattern1 | pattern2)
WebAccess => uid:regexfind('(?<=[A-Za-z0-9]+-)[A-Za-z0-9\\-]*(?=-[A-Za-z0-9]+)') as match_text, uid
Explanation:
Lookbehind: (? <= Pattern1) pattern2pattern2
after matching pattern1
.pattern2
before matching pattern3
.[A-Za-z0-9]+
:[A-Za-z0-9\\-]*
-
and the asterisk symbol *
indicates that the character pattern can occur 0 or more times.
(?<=[A-Za-z0-9]+-)[A-Za-z0-9\\-]*(?=-[A-Za-z0-9]+)
searches for a block [A-Za-z0-9\\-]*
that is after the pattern [A-Za-z0-9]+-
and before the pattern -[A-Za-z0-9]+
.
Returns all the matched strings by <regex> in target (or one specific group).
Available since Pipes v0.13
WebAccess => uid:regexfindall('(^[A-Za-z0-9]*|[A-Za-z0-9]+$)') as match_list, uid
Explanation:
(pattern1|pattern2)
pattern1
or pattern2
. Since regexfindall
returns all the matched groups, the function returns a sequence of strings.
WebAccess => uid:regexfindall('(?<=[A-Za-z0-9]+-)[A-Za-z0-9]*(?=-[A-Za-z0-9]+)') as match_list, uid
Explanation:
regexfindall
returns a list with all substrings that match the given pattern. In this case, text with [A-Za-z0-9]*
found after the pattern [A-Za-z0-9]+-
and before the -[A-Za-z0-9]+)
pattern.
Returns true if the target matches <regex>. False otherwise.
WebAccess => host, host:regexmatch('greenfarm') as matches_example1, host:regexmatch('.*greenfarm.*') as matches_example2, host:contains('greenfarm') as host_contains
Explanation:
regexmatch
tests if the string matches the regular expression passed as a string argument.*
means any character that appears 0 or more times. it tests if host has a substring exactly like 'greenfarm.' preceded by any characters.Splits string by regular expression <regex> in up to <limit> pieces.
Available since Pipes v0.13
WebAccess => uid, uid:regexsplit('-') as regex_split1, uid:regexsplit('[0-9]+') as regex_split2
Explanation:
regexsplit
divides the original string into a list of substrings using the pattern as a separator.Replaces all matches of <regex> in target by <replacement>.
WebAccess => urltype, urltype:regexsub('out', 'in') as regex_sub
Explanation:
regexsub
finds occurrences in the first string parameter that match the regular expression and replaces each match with the string replacement.
WebAccess => host, url, host:regexsub('(?<=[a-z0-9]\\.).+(?=farm)', 'teste') as renamed_host, url:regexsub('(?<=\\.com)(?=/{0,1})', '.br') as url_with_br_suffix
Repeats the string a number of times.
Available since Pipes v0.13
Replaces all instances of <from> with the string <to>.
Returns the last index of position of <s> inside the target string. Returns null otherwise.
Available since Pipes v0.19
Truncates the string to the specified length, keeping the end, optionally adding an ellipsis at the start.
Available since Pipes v0.13.3
The ellipsis length does count to the maximum length. If you can, use the char "\u2026"
as a single-char ellipsis.
Examples:
"0123456789":truncate(10, "...")
In this case, the string won't be truncated.
"01234567890":truncate(10, "...")
But in this it will return "...34567890"
Splits string by <delim> in up to <limit> pieces.
Available since Pipes v0.13
If the delimiter is not set, the string is then split by individual chars.
Uses the target string as format to arguments.
Returns whether the target string starts with the argument.
Returns the substring between the indices <from> and <to>.
Truncates the string to the specified length, optionally adding an ellipsis at the end.
The ellipsis length does count to the maximum length. If you can, use the char "\u2026"
as a single-char ellipsis.
Examples:
"0123456789":truncate(10, "...")
In this case, the string won't be truncated.
"01234567890":truncate(10, "...")
But in this it will return "01234567..."
Decodes an application/x-www-form-urlencoded unicode string.
Available since Pipes v0.13.12
Encodes a string as application/x-www-form-urlencoded.
Available since Pipes v0.13.12
Casts a value into another type.
Available since Pipes v0.17
Returns a number < 0 if a < b, > 0 if a > b or 0 if a = b.
Constructs a period instance from a constant cron string
Example:
* => count() every cron('*/3 * * * * *')
Yields the event count every three seconds.
* => timestamp():dateadd(cron('0 0 * * * *')) every second
Outputs every second the timestamp of the next hour.
Creates a seq that iterates through numbers exponentially
Available since Pipes v0.14.7
Example:
* => @for exprange(17, 2)
(1, 2, 4, 8, 16)
* => @for exprange(4, 17, 2)
(4, 8, 16)
Returns true if the tested timestamp is inside the span, false otherwise.
Please note that someprop:happened('today')
is equivalent to happened(timestamp(), someprop, 'today')
.
Also, happened('today')
is equivalent to happened(timestamp(), timestamp(), 'today')
.
Merge many instances of compressed base64 HyperLogLog data.
Useful to merge results from hll.init(<number log2m>, <object...>) → <string>
aggregations.
Creates a seq that iterates through java.util.Map, seq or row entries
Available since Pipes v0.13
Example:
* => @for itermap(some_map_property:object)
Creates one event for each map entry in some_map_property
* => abc, qwe# => @for itermap(*)
Creates one event for each field
Returns the greatest value of all supplied arguments.
Returns a representation of the previous pipe metadata.
Available since Pipes v0.13
Returns the least value of all supplied arguments.
Evaluates the similarity between many MinHash encoded data.
Available since Pipes v0.18.8
Merge many MinHash encoded data.
Available since Pipes v0.18.8
Applies regression coefficients to variables
Available since Pipes v0.17.1
The coefficients can be obtained by applying the mregression([<number... x>], <number y>) → <seq>
aggregation.
Creates a instance of java.util.Map with the supplied keys and values.
Example:
=> newmap("company", "Intelie", "website", "http://intelie.com") as object at the end => object->company, object->website
Returns a Map equivalent to {"company": "Intelie", "website": "http://intelie.com"}
Returns a random value from a normal (Gaussian) distribution.
Available since Pipes v0.15.6
Constructs a period instance from parameters
Example:
* => count() every period(3, 'seconds')
Yields the event count every three seconds same as the literal 3 seconds
.
Returns a random value between <min> and <max> (0 and 1 if not defined).
Creates a seq that iterates through numbers
Available since Pipes v0.13
Example:
* => @for range(10)
(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
* => @for range(5, 10)
(5, 6, 7, 8, 9)
* => @for range(7, 10, 0.5)
(7, 7.5, 8, 8.5, 9, 9.5)
Calculates start and end timestamps of span based on target.
Available since Pipes v0.13
The result is a row with three fields: timestamp
, start
and end
.
Example:
* => span('today')|>:dateformat
Computes and formats the start and end timestamps of current day for each event.
Calculates end timestamp of span based on target.
Please note that spanend('today')
is equivalent to spanend(timestamp(), 'today')
.
Calculates start timestamp of span based on target.
Please note that spanstart('today')
is equivalent to spanstart(timestamp(), 'today')
.
Returns -1 if the tested timestamp is before the span, 0 if it is inside and 1 if it is after.
Please note that someprop:spantest('today')
is equivalent to spantest(timestamp(), someprop, 'today')
.
Also, spantest('today')
is equivalent to spantest(timestamp(), timestamp(), 'today')
.
Returns the most appropriate timestamp, whether in scalar or aggregation contexts.
Creates a seq such that each element is a row with the respective element from input seqs
Available since Pipes v0.15.3
Example:
zip((1, 2), ('a', 'b'))
Returns a seq equivalent to ((1, 'a'), (2, 'b'))
Functions that return information about the system where the query is running.
Returns the system's CPU usage info.
Available since Pipes v0.13.1
The returned row has the following fields:
field | type | description |
---|---|---|
process | number | Current process CPU load (in range [0..1]) |
system | number | System's CPU load (in range [0..1]) |
loadavg | number | Load average in the last minute |
cores | number | Number of processor cores installed |
Returns info about all devices in the filesystem.
Available since Pipes v0.13.1
Each row in the returned sequence has the following fields:
field | type | description |
---|---|---|
name | string | Name of the device or partition |
type | string | Format type |
readonly | boolean | Is read only? |
unallocated | number | # of unallocated bytes in the device |
usable | number | # of usable bytes (considering write permissions, etc.) |
total | number | Total bytes in the device |
Returns info about process file descriptors.
Available since Pipes v0.13.1
The returned row has the following fields:
field | type | description |
---|---|---|
open | number | # of open file descriptors |
max | number | Maximum file descriptors that can be open by this process |
Returns the system's current JVM heap info.
Available since Pipes v0.13.1
The returned row has the following fields (all values in bytes):
field | type | description |
---|---|---|
used | number | Used heap memory |
max | number | Max heap size |
committed | number | Commited memory by the OS for the current VM |
Returns the system's memory usage info.
Available since Pipes v0.13.1
The returned row has the following fields (all values in bytes):
field | type | description |
---|---|---|
used | number | Used physical memory |
total | number | Total physical memory |
usedswap | number | Used swap memory |
totalswap | number | Total swap memory |
Returns a list of defined system properties
Available since Pipes v0.14
The return is always an instance of a java Map.
Returns a list of threads in the JVM
Available since Pipes v0.14
Each row in the returned sequence has the following fields:
field | type | description |
---|---|---|
id | number | Managed thread ID |
name | string | Thread name |
cputime | number | CPU time, in seconds |
state | string | One of: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED |
lockname | string | Lock the thread is waiting for |
lockownerid | number | Thread ID of the lock owner |
lockowner | string | Thread name of the lock owner |
Returns info about running threads in current process.
Available since Pipes v0.13.1
The returned row has the following fields:
field | type | description |
---|---|---|
running | number | # of running threads |
peak | number | Max # of running threads |
total | number | # of threads ever started in this JVM |
daemon | number | # of daemon threads |
deadlocked | number | # of deadlocked threads |
Returns the system's real timestamp (not the query timestamp).
Available since Pipes v0.13.1
Returns the number of milliseconds since the start of the JVM
Available since Pipes v0.15.5
Returns Pipes version info
Available since Pipes v0.13.1
The returned row has the following fields:
field | type | description |
---|---|---|
major | number | Version's major component |
minor | number | Version's minor component |
patch | number | Version's current patch |
released | number | Whether the version was officialy released or not (-SNAPSHOT) |
Aggregates only events that evaluates true to <condition>.
Merges all the results from the target aggregation.
Merges the results of the last <window> aggregations.
Delays and returns the previous <number>th result from target aggregation.
Applies the same aggregation to all fields of a row.
Available since Pipes v0.14
"amap" means "aggregation map".
This aggregation takes a row
(it can't be used with any sequence) and returns one aggregation
row with the argument aggregation applied to each field in the original row. Here's a complete example:
=> sys.memory() as mem every minute => expand mem:amap(:avg) every hour
Collects memory statistics every minute and returns the average of the collected points every hour.
Applies all elements of a sequence to the same aggregation
Available since Pipes v0.14
"areduce" means "aggregation reduce".
This aggregation takes a sequence and applies each element to a single aggregation. An example:
Network => (bytes_in#, bytes_out#):areduce(:sum) as total_bytes every minute
Returns the sum of both fields bytes_in
and bytes_out
as a single value, every minute.
Calculates the (possibly weighted) average of some expression.
Counts all events in a window (except those that evaluate to false or null).
Estimates the field's cardinality (distinct count) using HyperLogLog.
Equivalent to hll(12, <object...>)
.
The aggregation uses a probabilistic algorithm known as HyperLogLog. With a fixed
log2m
value of 12, the standard error will be 1.625% and will use 4KB of
memory per group.
The aggregation considers all arguments, i.e., dcount(a, b)
aggregates the number of
distinct pairs of the tuple (a, b).
element | approximate memory (more info) |
---|---|
state | 4 KB |
tree | 4 KB |
merger | 512 KB |
Yields a string json explaining the target aggregation's inner state representation.
Yields the <n> values with least timestamp.
Available since Pipes v0.15
Yields the non-null ocurrence with least timestamp.
Available since Pipes v0.14
Reduces sequence of elements applying function successively
This aggregation can only be distributed or used on windows if the merger function is specified.
* => value#fold(fun(a, x): a+x, fun(a,b): a+b) every 10 minutes
Almost equivalent to sum (except for null handling).
Reduces sequence of elements applying function successively (ignores nulls)
Available since Pipes v0.19
This aggregation can only be distributed or used on windows if the merger function is specified.
* => value#foldv(fun(a, x): a+x) every 10 minutes
Equivalent to sum.
Yields the greatest ocurrence in the window based on the sort fields.
Yields the greatest non-null ocurrence in the window based on the sort fields.
Estimates the field's cardinality (distinct count) using HyperLogLog.
The aggregation uses a probabilistic algorithm known as HyperLogLog.
log2m
must be an integer between 6 and 16. The higher this value,
higher will be the precision. The error will probably be within:
104/sqrt(2**log2m)
.
E.g.: With log2m = 14
, the standard error will be
0.81%. But higher log2m
values will also consume more memory. The aggregation
will typically use 2**log2m
bytes of memory.
The aggregation considers all arguments, i.e., hll(16, a, b)
aggregates the number of
distinct pairs of the tuple (a, b).
element | approximate memory (more info) |
---|---|
state | 2**log2m bytes |
tree | 2**log2m bytes |
merger | 128 * 2**log2m bytes |
Similar to hll, but it doesn't evaluate final cardinality, just return the sketch data.
This aggregation is similar to hll(<number log2m>, <object...>) → <number>
, but instead of returning
the final distinct count, it yields a representation of its internal data structure, for
later merges. It can be used in conjunction with the scalar functions <string>:hll.eval() → <number>
,
hll.merge(<string... data>) → <string>
and the aggregation hll.merge(<string>) → <string>
.
It returns a string with a base64'd representation of the compressed byte array. It is especially useful when some preprocessing and storage is needed.
Example
This first query precomputes the distinct users every minute, but does not yield a result.
type:http => hll.init(16, userid) as value every minute
The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.
type:firstquery => hll.eval(hll.merge(value)) as distinct_users every 10 minutes
Performs union of many HyperLogLog encoded data in a window.
Useful to merge results from hll.init(<number log2m>, <object...>) → <string>
aggregations.
hll.eval(hll.merge(expr))
is equivalent to hll.mergeeval(expr)
.
Example
This first query precomputes the distinct users every minute, but does not yield a result.
type:http => hll.init(userid) as value every minute
The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.
type:firstquery => hll.eval(hll.merge(value)) as distinct_users every 10 minutes
Performs union of many HyperLogLog encoded data in a window and evaluates the result.
Useful to merge results from hll.init(<number log2m>, <object...>) → <string>
aggregations.
hll.eval(hll.merge(expr))
is equivalent to hll.mergeeval(expr)
.
Example
This first query precomputes the distinct users every minute, but does not yield a result.
type:http => hll.init(userid) as value every minute
The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.
type:firstquery => hllmergeeval(value) as distinct_users every 10 minutes
Creates a list of disjoint intervals
Available since Pipes v0.24
Yields the <n> values with greatest timestamp.
Available since Pipes v0.15
Yields the non-null ocurrence with greatest timestamp.
Available since Pipes v0.14
Yields the least ocurrence in the window based on the sort fields.
Yields the least non-null ocurrence in the window based on the sort fields.
Creates a java.util.List from all events in a window.
Available since Pipes v0.13
Creates a java.util.Map from all events in a window.
Since Pipes v0.20.4, this function accepts aggregations as values.
Estimates the median value of the population using Count-Min Sketch.
Equivalent to quantile(<number>, 0.5)
.
Returns MinHash set signature.
Available since Pipes v0.18.8
Merge many MinHash encoded data in a window.
Available since Pipes v0.18.8
Useful to merge results from minhash.init(<number size>, <object...>) → <string>
aggregations.
Merge many MinHash encoded data in a window and evaluates the similarity between them.
Available since Pipes v0.18.8
Computes multivariate regression for given variables.
Available since Pipes v0.17.1
If no arguments x
are given, it is assumed to be the event timestamp (if any).
The return is a sequence of the regression coefficients. The first value is the regression intercept.
Computes multivariate regression statistics for given variables.
Available since Pipes v0.17.1
If no arguments x
are given, it is assumed to be the event timestamp (if any).
The returned row has the following fields:
field | type | description |
---|---|---|
n | number | Sample size |
coefficients | seq(number) | Regression coefficients (the first value is the intercept). |
correlation | seq(seq(number)) | Correlation matrix. |
covariance | seq(seq(number)) | Covariance matrix. |
mean | seq(number) | Mean vector. |
Aggregates the proportion of events that evaluate true to expression.
Same as fold, but with state persisted
Available since Pipes v0.19
This aggregation cannot be distributed. And can only be used inside a window if an unmerger function is also specified.
Cumulative Sum
=> 1 as number every min => number, number#:pfold(0, fun(state, value): (value#+state#)) as cumulative_sum
Creating a conditional cumulative sum:
=> 1 as number every min => number, number#:pfold(0, fun(state, value): state#>=3 ? value, (value#+state#)) as sum_resets_after_3
Keep the higher number:
=> random() as number every min => number, number#:pfold(0, fun(a, b):(a > b ? a, b )) as higherCaution when using:
null
value of the variable can break the sum, because null + number == null
and then when the condition
matches, will became null again.cumulativeSumWithNullCheck
) and
another without processing the null values (cumulativeSumWithoutNullCheck
):
=> (1,6,2,7,3,8,null,9,5,10) as series at the end => @for series as number => @set number#:pfold(0,fun(state, value): state + value ) as cumulativeSumWithoutNullCheck, number#:pfold(0,fun(state, value): value == null ? state, state + value ) as cumulativeSumWithNullCheck
Estimates the q (0..1) quantile of the population using Count-Min Sketch.
Computes regression and correlation statistics for given variables.
Available since Pipes v0.13.3
The argument x
, if not given, is assumed to be the event timestamp (if any).
The returned row has the following fields:
field | type | description |
---|---|---|
n | number | Sample size |
slope | number | Slope of the regression line |
intercept | number | Where in the x-axis the line intercepts |
correlation | number | Correlation coefficient between x and y |
xmean | number | Mean value for variable x |
xstdev | number | Standard deviation for x |
ymean | number | Mean value for variable y |
ystdev | number | Standard deviation for y |
Estimates the Jaccard similarity coefficient between many sets.
Available since Pipes v0.18.8
Smoothes the curve of another aggregation.
The argument alpha
is a (0..1) value, meaning the weight of the previous points
in the final value.
The argument beta
is a (0..1) value, meaning the weight of previous trends into
the current trend value.
This aggregation cannot be used inside a window (because it depends on total order of the elements).
Aggregates sample statistics for some property.
Available since Pipes v0.13.3
Please note that all statistics here are based on the sample variance, not the population variance.
This behavior is distinct from the already known variance(<number>, [<number weight>]) → <number>
and stdev(<number>, [<number weight>]) → <number>
aggregations.
The returned row has the following fields (all values in bytes):
field | type | description |
---|---|---|
n | number | Sample size |
mean | number | Sample mean value |
variance | number | Sample variance |
stdev | number | Sample standard deviation |
skewness | number | Sample skewness |
kurtosis | number | Sample kurtosis |
Calculates the (possibly weighted) standard deviation of some expression.
Join all the strings in a window.
Yields the k minimum occurrences of the target object.
Available since Pipes v0.13.18
Calculates the (possibly weighted) variance of some expression.
Yields the latest timestamp inside window when some condition was true.
Yields the first timestamp inside window when some condition was true.
Meta-aggregations give information on how and when the results were merged.
Meta-aggregations meaning in time axis
Returns the timestamp when the outputs were merged (equal to oend()
on time pipes).
Pipes comes bundled with a timespan definition language, that helps defining relative dates with an almost natural language syntax. A timespan is an expression that when provided with a reference timestamp can calculate the start and the end of a relative period.
For example, the expression current month
will return:
Please notice that span intervals are always right-open.
definition | example expressions | description |
---|---|---|
now none |
now none
|
Now. |
today current|this <period> |
today current day this month |
From the beginning of the current period to the beginning of the next. |
yesterday previous|preceeding|past|yester <period> |
yesterday yesterweek yester month previous year previous 5 years |
Previous specified period. |
tomorrow coming|subsequent|future|following <period> |
tomorrow coming year subsequent 5 years |
Subsequent specified period. |
last <period...> |
last day last 40 weeks last 2 years and 5 months |
Period of the specified length ending now. |
next <period...> |
next day next 40 weeks next 2 years and 5 months |
Period of the specified length starting now. |
<period...> ago |
2 days ago 2 years and 5 months ago |
Single point in the past. |
<date> [<time>] |
2014 201406 20140627 2014-06-27 2014/06/27 2014-06-27 16 2014-06-27 4pm 2014-06-27 16:17:18 2014-06-27 4:17:18pm |
Constant date or time. |
<time> |
16 4pm 16:17:18 4:17:18pm |
Specified time today. |
timestamp|ts <number> |
timestamp 1403893236000 ts 1403893236000
|
Constant timestamp |
timezone|tz <string> <span> |
tz 'UTC' today timezone 'America/Sao_Paulo' today |
Evaluates span using the specified timezone. |
monday ... sunday mon ... sun |
monday mon wednesday wed sunday sun |
Day of week in the current week. |
january ... december jan ... dec |
january jan april apr december dec |
Month in the current year. |
<month> (<number>|ordinal) |
january first may 4th april 11 |
Day in month in the current year. |
[from|since] <span A> to|until <span B> |
yesterday to today from yesterday to today since last year until yesterday |
A range starting at the beginning of span A and ending at the end of span B. |
from|since <span A> |
since last year |
A range starting at the beginning of span A and ending now. |
until|to <span B> |
until yesterday |
A range starting at the the Big Bang (known as 1970) and ending at the end of span B. |
union of <spanA>, <spanB>, ... and <spanC> |
union of 2018 and this year |
A span that has the earliest start and latest end between all spans. |
intersection of <spanA>, <spanB>, ... and <spanC> |
intersection of 2018 and this year |
A span that has the latest start and earliest end between all spans. |
<period...> before <span> |
2 years before this month 2 days and an hour before this month |
A period of specified length ending exactly in the start of specified span. |
before|start|begining|left|exactly [of] <span> |
before this week start of this month beginning of december left of yesterday exactly today |
The point in the start of specified span. |
window [of] <span> |
window of yesterday window of 2 days before today |
The same as the argument span, but from the view of a timestamp inside it. |
<period...> after <span> |
2 days after previous month 2 days and an hour after previous month |
A period of specified length starting exactly in the end of specified span. |
after|end|right [of] <span> |
after this week right of yesterday end of this month |
The point in the end of specified span. |
<ordinal> <period> of <span> |
first quarter of previous year 3rd week of this month first hour today |
Some discrete period inside the specified span. |
[the] <period> [of] <span> |
the week of 2 days ago the quarter of last week |
Expands the point at the start of a span to the entire period defined. |
<span> shifted [left|right] by <percentage> <span> shifted [left|right] by <period...> |
this week shifted by 2 days previous year shifted right by 2 days previous year shifted right by 10% |
Calculates the span referent to now, then shifts it some period or percentage in the past (left) or future(right). |
<span A> shifted to <span B> <span A> relative to <span B> <span A> of <span B> <span B> at <span A> |
sunday shifted to previous week sunday relative to previous week sunday of previous week previous week at sunday sunday @8 |
Calculates a span A as if "now" was the start of span B. |
<span A> aligned [left|right] to <span B> |
this month aligned to 7 months ago this month aligned left to 7 months ago this month aligned right to 7 months ago |
Calculates the span A and aligns either the left or the right ends of the first with the span B. |
<span> extend[ed] [left|right] by <percentage> <span> extend[ed] [left|right] by <period...> |
this month extend left by 20% this month extend right by 2 days |
Calculates the span and then extends either proportionally or by a fixed period. |
The periods are the units of time that powers span definitions.
Some spans accept only a single period definition (e.g. current day
). Others accept multiple spans
definition (e.g. last 4 days, 2 hours and 1 minute
). In this documentation
<period>
means single periods only and <period...>
means one or more periods.
A single period is composed of (optionally) an amount and an unit, e.g.:
day 1 day a day the day
A multiple period is composed of many single periods separated by commas or 'and', e.g.: 1 day and 5 hours
,
a day, an hour and 5 minutes
.
1 day and 5 hours a day, an hour and 5 minutes
The available period units are:
unit | also accepts | equivalent to |
---|---|---|
millisecond | milli ms | |
second | sec | 1000 ms |
minute | min | 60000 ms |
hour | 360000 ms | |
day | ||
week | wk | |
month | ||
bimester | 2 months | |
quarter | 3 months | |
semester | 6 months | |
year | yr |
Page generated at 09-Sep-2021 12:43:08