This chapter is an introduction to the Pipes language through examples. Each section will contain links to other chapters that develop a little more on the subjects presented here.
version 0.13 (for older versions, click here)
Copyright 2016 Intelie
1. Quick Start | |
1.1. Filters | |
1.2. Simple pipe | |
1.3. Chaining pipes | |
1.4. The join and union pipes | |
2. Concepts | |
2.1. Events and filters | |
2.2. Chained computation model | |
2.3. Output rates and aggregation windows | |
2.4. Expressions | |
2.4.1. Type system | |
2.4.2. Type arguments | |
2.4.3. Literals | |
2.4.4. Scalars and aggregations | |
2.4.5. Aggregations state representation | |
2.4.6. Aggregation TTL | |
2.4.7. Property access | |
2.4.8. Special expressions | |
2.4.9. Function call | |
3. Filters | |
3.1. * | Special filter that allows all records through. |
3.2. <term> | Selects records where one of the current fields matches <term>. |
3.3. <term>~<number maxEdits> | Selects records where one of the current fields matches <term> with at most <maxEdits> edits. |
3.4. [<term lower> TO <term upper>] | Selects records where one of the current fields is between <lower> and <upper>. |
3.5. <field>: <filter> | Sets the current field to <field>. Usually followed by <term> (e.g. somefield:someterm). |
3.6. <filter> & <filter> | Selects the intersection of two other filters. |
3.7. <filter> | <filter> | Selects the union of two other filters. |
3.8. -<filter> | Selects the complement of another filter. |
4. Pipes | |
4.1. Stateful pipes | |
4.1.1. <named...> [by <named...>] [over <window>] [every <period> | at the end] | Transforms or aggregates records over configurable data window and output. |
4.1.2. <pipe> join [on <fields...>] <pipe> | Computes the cartesian product of the outputs from two pipes with same output rates. |
4.1.3. <pipe> union <pipe> | Concatenates the outputs from two pipes with compatible output rates. |
4.1.4. @compress <number k>, [<number k2>,] <number... y> [by <object...>] | Compresses the result from the previous pipe to at most k most important rows. |
4.1.5. @debounce <period> [by <object...>] | Only outputs rows that follows <period> time without any output. |
4.1.6. @latest | Keeps the latest batch of input events and output it at the end. |
4.1.7. @onchange <object...> [by <object...>] | Outputs only events that change the last value of any expression in the group |
4.1.8. @onfalse <boolean condition> [by <object...>] | Outputs only events for which condition evaluates false and was not false in the previous event |
4.1.9. @ontrue <boolean condition> [by <object...>] | Outputs only events for which condition evaluates true and was not true in the previous event |
4.1.10. @throttle [<number k>, ] <period> [by <object...>] | Limits the output of the previous pipe to at most <k> rows per <period>. |
4.1.11. @unbatch | Extracts a batch of events in many batches, one for each event. |
4.1.12. @unsafe | Marks that any pipe executed after this must run in a non-distributed environment. |
4.1.13. atomic <pipe> | Modify a pipe to run entirely in the same node, ignoring any distribution invariants. |
4.2. Stateless pipes | |
4.2.1. @chain [<seq expr>] | Chains many sequences in a batch into a single sequence |
4.2.2. @filter <boolean condition> | Filters the results from previous pipe. |
4.2.3. @for <named(seq)>, <named... additional> | Unwinds events in the seq instance |
4.2.4. @seq | Changes the static type of any row to the highest corresponding seq. |
4.2.5. @set <named...> | Sets fields in the original event (both typed and raw) |
4.2.6. @skip <number> [by <object...>] | Skips some results from previous pipe. |
4.2.7. @skipwhile <boolean> | Skips rows from every batch while some condition is true. |
4.2.8. @slice [<number start>], [<number end>], [<number step>] [by <object...>] | Gets some rows (based on index) from previous pipe. |
4.2.9. @sort <sortfield... expr> | Sorts the results from previous pipe. |
4.2.10. @take <number> [by <object...>] | Limits the results from previous pipe. |
4.2.11. @takewhile <boolean> | Takes rows from every batch while some condition is true. |
4.2.12. @top <number k>, <sortfield... expr> [by <object...>] | Sorts the results and gets the first k rows (possibly grouped) from previous pipe. |
4.2.13. @yield [<object expr>] | Extracts one field of the stream to be the output event. |
4.2.14. @zip [<seq expr>] | Takes elements from a batch of seqs, one by one and make a seq of seqs. |
5. Operators | |
5.1. \<filter> | Returns a boolean expression that evaluates true if the filter accepts the event. |
5.2. <object># → <number> | Coerces the expression to number. Shorthand to <object>:number() → <number> . |
5.3. <object>$ → <string> | Coerces the expression to string. Shorthand to <object>:string() → <string> . |
5.4. <number> + <number> → <number> | Adds two numbers. |
5.5. <string> + <string> → <string> | Concatenates two strings. |
5.6. <seq> + <seq> → <seq> | Concatenates two seqs. |
5.7. <number> - <number> → <number> | Subtracts one number from another. |
5.8. <number> * <number> → <number> | Multiplies two numbers. |
5.9. <number> / <number> → <number> | Divides one number by another (float division). |
5.10. <number> // <number> → <number> | Divides one number by another (integer division). |
5.11. <number> ** <number> → <number> | Raises one number to another's power. |
5.12. <number> % <number> → <number> | Returns the rest of the division of one number by another. |
5.13. -<number> → <number> | Negates one number. |
5.14. <boolean> and <boolean> → <boolean> | Returns the logical AND of two booleans. |
5.15. <boolean> or <boolean> → <boolean> | Returns the logical OR of two booleans. |
5.16. <boolean> xor <boolean> → <boolean> | Returns the logical XOR of two booleans. |
5.17. not <boolean> → <boolean> | Returns the logical NOT of a boolean. |
5.18. <object> == <object> → <boolean> | Checks whether two objects are equal. |
5.19. <object> != <object> → <boolean> | Checks whether two objects are not equal. |
5.20. <comparable> < <comparable> → <boolean> | Checks whether the left operand compares lesser than the right one. |
5.21. <comparable> <= <comparable> → <boolean> | Checks whether the left operand compares lesser than or equal to the right one. |
5.22. <comparable> > <comparable> → <boolean> | Checks whether the left operand compares greater than the right one. |
5.23. <comparable> >= <comparable> → <boolean> | Checks whether the left operand compares greater than or equal to the right one. |
5.24. <object>-><identifier> → <object> | Extracts a property from an object. |
5.25. <object>->(<object>) → <object> | Evaluates an expression in the semantic context of the target |
5.26. <object>[<object...>] → <object> | Accesses an object by index/key in a list/string/map. |
5.27. ^<object> → <object> | Evaluates the expression in the parent semantic context (if any). |
5.28. <object> ?? <object> → <object> | Returns the first if it is not null; otherwise, returns the second. |
5.29. <boolean> ? <object>, <object> → <object> | If the condition is true, returns the first object; otherwise, returns the second. |
5.30. <seq> |> <object> → <object> | Transforms a sequence into another sequence or object. |
6. Scalar Functions | |
6.1. <number>:abs() → <number> | Calculates the absolute value of a number. |
6.2. <number>:acos() → <number> | Returns the arc cosine of the argument to an angle in radians. |
6.3. <number>:asin() → <number> | Returns the arc sine of the argument to an angle in radians. |
6.4. <number>:atan() → <number> | Returns the arc tangent of the argument to an angle in radians. |
6.5. <number>:bytes([<number precision>]) → <string> | Formats a number as the best possible byte multiple. |
6.6. <number>:ceil([<number precision>]) → <number> | Returns the smallest number that is greatest than or equal to the argument. |
6.7. <number>:chr() → <string> | Converts a unicode codepoint to a single-char string. |
6.8. <number>:cos() → <number> | Returns the cosine of an angle in radians. |
6.9. <number>:dateadd(<period>) → <number> | Adds <period> to timestamp argument. |
6.10. <number>:datefloor(<period>) → <number> | Rounds timestamp down to the nearest date that is divisible by <period>. |
6.11. <number>:dateformat([<string format>], [<string tz>]) → <string> | Formats timestamp using specified format |
6.12. <number>:datesub(<period>) → <number> | Sutracts <period> from timestamp argument. |
6.13. <number>:exp() → <number> | Calculates the exponential of a number. |
6.14. <number>:floor([<number precision>]) → <number> | Returns the largest number that is lesser than or equal to the argument. |
6.15. <number>:format([<string format>], [<string locale>]) → <string> | Formats a number according to format string and locale. |
6.16. <number>:log([<number base>]) → <number> | Calculates the logarithm of a number. |
6.17. <number>:normdist(<number mean>, <number sdev>, [<boolean acc>]) → <number> | Calculates the gaussian distribution function value, optionally integrated. |
6.18. <number>:pow(<number exp>) → <number> | Raises one number to another. |
6.19. <number>:round([<number precision>]) → <number> | Rounds a number to <precision> decimal places. |
6.20. <number>:select(<object... list>) → <object> | Selects the ith element from a list of arguments. Or null if it doesn't exist. |
6.21. <number>:sin() → <number> | Returns the sine of an angle in radians. |
6.22. <number>:tan() → <number> | Returns the tangent of an angle in radians. |
6.23. <number>:tobase(<number base>) → <string> | Converts a number to a string representing it in some base |
6.24. <object>:boolean() → <boolean> | Converts object to boolean. |
6.25. <object>:class() → <string> | Returns the underlying Java class of the object. |
6.26. <object>:comparable() → <comparable> | Converts object to comparable. |
6.27. <object>:decode(<object,object... pairs>) → <object> | Transforms the parameter using the translation rules defined in <pairs>. |
6.28. <object>:get(<object... keys>) → <object> | Much like property[keys]. Works for strings, containers and arrays. |
6.29. <object>:indexin(<object... list>) → <number> | Returns the first index of the value in <list>, or null if <list> does not contain it. |
6.30. <object>:isin(<object... list>) → <boolean> | Returns true if <list> contains the value, false otherwise. |
6.31. <object>:json() → <string> | Converts the object to its JSON string representation. |
6.32. <object>:keep([<number ttl>]) → <object> | When used in a simple pipe, delays or disable (if ttl not supplied or < 0) inactive group removal. |
6.33. <object>:len() → <number> | Tries to get <target>'s size. Works for strings, containers and arrays. |
6.34. <object>:number() → <number> | Converts object to number. |
6.35. <object>:object() → <object> | Casts any object to its canonical object representation. |
6.36. <object>:row() → <seq> | Casts any object to a row. |
6.37. <object>:seq() → <seq> | Casts any object to a seq. |
6.38. <object>:string() → <string> | Converts object to string. |
6.39. <object>:typename() → <string> | Returns the compile-time type of an expression. |
6.40. <seq>:contains(<object>) → <boolean> | Returns whether the target seq contains the argument. |
6.41. <seq>:repeat(<number>) → <seq> | Repeats the sequence a certain number of times. |
6.42. <string>:contains(<string>) → <boolean> | Returns whether the target string contains the argument. |
6.43. <string>:dateparse([<string format>], [<string tz>]) → <number> | Parses timestamp using specified format |
6.44. <string>:endswith(<string>) → <boolean> | Returns whether the target string ends with the argument. |
6.45. <string>:frombase(<number base>) → <number> | Converts a string representing a number in some base to the number itself |
6.46. <string>:hlleval() → <number> | Evaluates compressed base64 HyperLogLog data. |
6.47. <string>:indexof(<string s>, [<number fromIndex>]) → <number> | Returns the index of position of <s> inside the target string. Returns null otherwise. |
6.48. <string>:jsonparse() → <object> | Parses JSON string into objects. |
6.49. <string>:lower() → <string> | Converts string to lowercase. |
6.50. <string>:ltrim([<string chars>]) → <string> | Trims leading characters from a string |
6.51. <string>:ord() → <number> | Gets the unicode codepoint from the first char in the string. |
6.52. <string>:parse([<string format>], [<string locale>]) → <number> | Parses a number according to format string and locale. |
6.53. <string>:regex(<string regex>) → <row> | Returns a strongly typed row composed by all named groups in <regex>. |
6.54. <string>:regexall(<string regex>) → <seq(row)> | Returns a sequence of rows composed by all matches of all named groups in <regex>. |
6.55. <string>:regexfind(<string regex>, [<number|string group>]) → <string> | Returns the matched string by <regex> in target (or one specific group). |
6.56. <string>:regexfindall(<string regex>, [<number|string group>]) → <seq(string)> | Returns all the matched strings by <regex> in target (or one specific group). |
6.57. <string>:regexmatch(<string regex>) → <boolean> | Returns true if the target matches <regex>. False otherwise. |
6.58. <string>:regexsplit(<string regex>, [<number limit>]) → <seq(string)> | Splits string by regular expression <regex> in up to <limit> pieces. |
6.59. <string>:regexsub(<string regex>, <string replacement>) → <string> | Replaces all matches of <regex> in target by <replacement>. |
6.60. <string>:repeat(<number>) → <string> | Repeats the string a number of times. |
6.61. <string>:replace(<string from>, <string to>) → <string> | Replaces all instances of <from> with the string <to>. |
6.62. <string>:rtrim([<string chars>]) → <string> | Trims trailing characters from a string |
6.63. <string>:rtruncate(<number length>, [<string ellipsis>]) → <string> | Truncates the string to the specified length, keeping the end, optionally adding an ellipsis at the start. |
6.64. <string>:split([<string delim>], [<number limit>]) → <seq(string)> | Splits string by <delim> in up to <limit> pieces. |
6.65. <string>:sprintf(<object... args>) → <string> | Uses the target string as format to arguments. |
6.66. <string>:startswith(<string>) → <boolean> | Returns whether the target string starts with the argument. |
6.67. <string>:substring(<number from>, [<number to>]) → <string> | Returns the substring between the indices <from> and <to>. |
6.68. <string>:trim([<string chars>]) → <string> | Trims a string |
6.69. <string>:truncate(<number length>, [<string ellipsis>]) → <string> | Truncates the string to the specified length, optionally adding an ellipsis at the end. |
6.70. <string>:upper() → <string> | Converts string to uppercase. |
6.71. <string>:urldecode([<string encoding>]) → <string> | Decodes an application/x-www-form-urlencoded unicode string. |
6.72. <string>:urlencode([<string encoding>]) → <string> | Encodes a string as application/x-www-form-urlencoded. |
6.73. compare(<comparable a>, <comparable b>) → <number> | Returns a number < 0 if a < b, > 0 if a > b or 0 if a = b. |
6.74. concat(<string...>) → <string> | Concatenetes many strings together |
6.75. cron(<string>, [<string tz>]) → <period> | Constructs a period instance from a constant cron string |
6.76. happened([<number>], [<number tested>], <string>, [<string tz>]) → <number> | Returns true if the tested timestamp is inside the span, false otherwise. |
6.77. hllmerge(<string... data>) → <string> | Merge many instances of compressed base64 HyperLogLog data. |
6.78. itermap(<object>, [<string keyField>], [<string valueField>]) → <seq(row)> | Creates a seq that iterates through java.utilMap or row entries |
6.79. max(<comparable>, <comparable>, <comparable...>) → <comparable> | Returns the greatest value of all supplied arguments. |
6.80. metadata() → <object> | Returns a representation of the previous pipe metadata. |
6.81. min(<comparable>, <comparable>, <comparable...>) → <comparable> | Returns the least value of all supplied arguments. |
6.82. newmap(<object,object... pairs>) → <object> | Creates a instance of java.util.Map with the supplied keys and values. |
6.83. period(<number>, <string unit>, [<string tz>]) → <period> | Constructs a period instance from constant parameters |
6.84. pi() → <number> | Returns the constant value of pi. |
6.85. random([<number min>], [<number max>]) → <number> | Returns a random value between <min> and <max> (0 and 1 if not defined). |
6.86. range([<number start>], <number end>, [<number step>]) → <object> | Creates a seq that iterates through numbers |
6.87. span([<number>], <string>, [<string tz>]) → <row> | Calculates start and end timestamps of span based on target. |
6.88. spanend([<number>], <string>, [<string tz>]) → <number> | Calculates end timestamp of span based on target. |
6.89. spanstart([<number>], <string>, [<string tz>]) → <number> | Calculates start timestamp of span based on target. |
6.90. spantest([<number>], <number tested>, <string>, [<string tz>]) → <number> | Returns -1 if the tested timestamp is before the span, 0 if it is inside and 1 if it is after. |
6.91. timestamp() → <number> | Returns the most appropriate timestamp, whether in scalar or aggregation contexts. |
6.92. uuid() → <string> | Returns a random UUID string. |
6.93. System functions | |
6.93.1. sys.cpu() → <row> | Returns the system's CPU usage info. |
6.93.2. sys.disks() → <seq(row)> | Returns info about all devices in the filesystem. |
6.93.3. sys.fds() → <row> | Returns info about process file descriptors. |
6.93.4. sys.heap() → <row> | Returns the system's current JVM heap info. |
6.93.5. sys.hostname() → <string> | Returns the system's hostname. |
6.93.6. sys.memory() → <row> | Returns the system's memory usage info. |
6.93.7. sys.threads() → <row> | Returns info about running threads in current process. |
6.93.8. sys.timestamp() → <number> | Returns the system's real timestamp (not the query timestamp). |
6.93.9. sys.version() → <row> | Returns Pipes version info |
7. Aggregation Functions | |
7.1. <aggregation object expr>:if(<boolean condition>) → <object> | Aggregates only events that evaluates true to <condition>. |
7.2. <aggregation object expr>:overall() → <object> | Merges all the results from the target aggregation. |
7.3. <aggregation object expr>:overlast(<number window>) → <object> | Merges the results of the last <window> aggregations. |
7.4. <aggregation object expr>:prev([<number prev>]) → <object> | Delays and returns the previous <number>th result from target aggregation. |
7.5. all(<boolean>) → <boolean> | Returns true if all ocurrences evaluate true. |
7.6. any(<boolean>) → <boolean> | Returns true if any ocurrence evaluates true. |
7.7. avg(<number>, [<number weight>]) → <number> | Calculates the (possibly weighted) average of some expression. |
7.8. count([<object>]) → <number> | Counts all events in a window (except those that evaluate to false or null). |
7.9. dcount(<object...>) → <number> | Estimates the field's cardinality (distinct count) using HyperLogLog. |
7.10. describe(<aggregation object expr>) → <string> | Yields a string json explaining the target aggregation's inner state representation. |
7.11. first(<object>) → <object> | Yields the ocurrence with least timestamp. |
7.12. greatest(<object>, <sortfield...>) → <object> | Yields the greatest ocurrence in the window based on the sort fields. |
7.13. hll(<number log2m>, <object...>) → <number> | Estimates the field's cardinality (distinct count) using HyperLogLog. |
7.14. hllmerge(<string>) → <string> | Performs union of many HyperLogLog encoded data in a window. |
7.15. hllmergeeval(<string>) → <number> | Performs union of many HyperLogLog encoded data in a window and evaluates the result. |
7.16. hllset(<number log2m>, <object...>) → <string> | Similar to hll, but it doesn't evaluate final cardinality, just return the sketch data. |
7.17. last(<object>) → <object> | Yields the ocurrence with greatest timestamp. |
7.18. least(<object>, <sortfield...>) → <object> | Yields the least ocurrence in the window based on the sort fields. |
7.19. list(<object>) → <seq> | Creates a java.util.List from all events in a window. |
7.20. map(<object key>, <object value>) → <object> | Creates a java.util.Map from all events in a window. |
7.21. max(<comparable>) → <comparable> | Yields the greatest ocurrence in the window. |
7.22. median(<number>, [<number weight>]) → <number> | Estimates the median value of the population using Count-Min Sketch. |
7.23. min(<comparable>) → <comparable> | Yields the least ocurrence in the window. |
7.24. pcount(<boolean>) → <number> | Aggregates the proportion of events that evaluate true to expression. |
7.25. quantile(<number>, <number q>, [<number weight>]) → <number> | Estimates the q (0..1) quantile of the population using Count-Min Sketch. |
7.26. regression([<number x>], <number y>) → <row> | Computes regression and correlation statistics for given variables. |
7.27. set(<object>) → <seq> | Creates a java.util.Set from all events in a window. |
7.28. smooth(<aggregation number expr>, [<number alpha>], [<number beta>]) → <number> | Smoothes the curve of another aggregation. |
7.29. statistics(<number>) → <row> | Aggregates sample statistics for some property. |
7.30. stdev(<number>, [<number weight>]) → <number> | Calculates the (possibly weighted) standard deviation of some expression. |
7.31. sum(<number>) → <number> | Sums all evaluations of some expression. |
7.32. summary(<string>, [<string separator>], [<string lastSeparator>]) → <string> | Join all the strings in a window. |
7.33. top(<number k>, <object>, [<sortfield...>]) → <seq> | Yields the k minimum occurrences of the target object. |
7.34. variance(<number>, [<number weight>]) → <number> | Calculates the (possibly weighted) variance of some expression. |
7.35. when(<boolean expr>) → <number> | Yields the latest timestamp inside window when some condition was true. |
7.36. whenfirst(<boolean expr>) → <number> | Yields the first timestamp inside window when some condition was true. |
7.37. Window Meta-aggregations | |
7.37.1. wcount() → <number> | Yields how many outputs are merged in the current window. |
7.37.2. wstart() → <number> | Returns the window's first allowed timestamp (or item index). |
7.37.3. wend() → <number> | Returns the window's last allowed timestamp (or item index). |
7.37.4. ostart() → <number> | Returns the output's first allowed timestamp (or item index). |
7.37.5. oend() → <number> | Returns the output's last allowed timestamp (or item index). |
7.37.6. otimestamp() → <number> | Returns the timestamp when the outputs were merged (equal to oend() on time pipes). |
8. Timespan Language | |
8.1. Span types | |
8.2. Period types |
This chapter is an introduction to the Pipes language through examples. Each section will contain links to other chapters that develop a little more on the subjects presented here.
The simplest Pipes query is a filter. The simplest filter is *
that matches all events.
*
Another simple filter selects all events where somefield
is equal to somevalue
:
somefield:somevalue
Use quotes ("
or '
) to filter by complex strings:
somefield:"a complex value"
There are other filter types available, for example:
somefield:aaa*zzz
Filters events where somefield
has a string that starts with aaa
and ends with zzz
.
somefield#:[42, 1000)
Filters events where somefield
, converted to number, has a value between 42 (inclusive) and 1000 (exclusive).
Filters are composable with the &
and |
operators:
(host:A & value:30) | (host:B & value:42)
The operator &
can be omitted and has greater precedence than |
:
host:A value:30 | host:B value:42
The operator -
has the effect of select the complement of a filter:
host:A -value:30
Select events where host
equals "A"
, but value
isn't 30
.
If you do not define the field name, the default field is selected. For example, if the
default field is __type
, you can write:
SomeType value:42
Select events from SomeType with value equal to 42.
You can write an entire sub-filter inside some field declaration:
SomeType value:(42 | (43 -othervalue:X))
Select events from SomeType if value is 42 or 43 (but the latter only if othervalue isn't equal to X).
To know more about...
\<filter>
.After the filter, you can chain one or more pipes. The simplest pipe just selects some fields from the event.
SomeType => field1, field2
Selects only field1
and field2
from events matching SomeType
By default, references to fields from filtered events are assumed to be strings.
If you want to declare a field reference as number, use the operator #
:
SomeType => field1#, field2#
Selects only field1
and field2
from events matching SomeType
You can aggregate events, just like you would do in SQL:
SomeType => count() every minute
Outputs every minute the number of events matching SomeType
in that minute.
You can use more than one aggregation and give a name to the output:
SomeType => count() as {Event count}, avg(value#) as mean every minute
Outputs every minute the number of events and the mean of the variable value for events
matching SomeType
in that minute.
You can also define a time window to aggregate over:
SomeType => count() over last 2 hours every minute
Outputs every minute the number of events matching SomeType
in the last 2 hours.
If you want to group the results by some other field, use:
SomeType => count() by host every minute
Outputs every minute, for every host, the number of events matching SomeType
To know more about...
<named...> [by <named...>] [over <window>] [every <period> | at the end]
.Each query in Pipes creates new events. Those events can be chained into another pipe(s) to be transformed:
SomeType => count() by host every minute => @top 5, count desc
Outputs every minute the top 5 hosts that received most events matching SomeType
.
You can use as many pipes as you want:
SomeType => count(), avg(response_time#) as time by host every minute => @filter time > 3 => @top 5, count desc
Outputs every minute hosts with average response time greater than 3 seconds, getting the top 5 by event count.
There are many different pipe types:
SomeType => count() every minute => @filter count > 1000 => @throttle 30 minutes
Outputs the number of events for every minute that has more than 1000 events, but only if it didn't already happen in the last 30 minutes.
To know more about...
Sometimes it is important to mix information from different streams into a single event type.
For example, assuming CPU and Memory metrics come in different events, but with the same
event type Metrics
, you can write a query with the join
pipe to mix them both:
Metrics => [ @filter \metric_name:CPU => avg(value#) as cpu every minute join @filter \metric_name:Memory => avg(value#) as mem every minute ]
Outputs the average cpu and memory every minute, in the same event.
You can also define a field to join the results on. For example, if you need to calculate the metrics in previous example by host, you can join on this field using:
Metrics => [ @filter \metric_name:CPU => avg(value#) as cpu by host every minute join on host @filter \metric_name:Memory => avg(value#) as mem by host every minute ]
Outputs the average cpu and memory for each host every minute, in the same event.
Another example: imagine you need to create a ranking with the top 10 hosts by
event count in the minute, but with the 11th row being the total count in all hosts,
you can use the union
pipe to do that:
SomeType => [ count() by host every minute => @top 10, count desc union count() by 'Total' every minute ]
Outputs 11 rows every minute: the top 10 hosts by event count and a total row.
One last example: if you need to filter hosts with average CPU in the last minute greater than the global average, you can write:
Metrics metric_name:CPU => [ avg(value#) as cpu by host every minute join avg(value#) as global_avg every minute ] => @filter cpu > global_avg
Outputs every minute only hosts with average CPU greater than the global average.
To know more about...
join
pipe, please see <pipe> join [on <fields...>] <pipe>
.union
pipe, please see <pipe> union <pipe>
.Pipes is a language and an engine to perform distributed aggregations over real-time streams. It enables stream processing with low latency and minimum memory footprint (constant for most operations).
The language is desined to be as intuitive as possible, expressing more explicitly the data flow. The goal was to provide a language you can read from left to right (no visual backtracking) and still keep the declarative way to express the computations. The name Pipes is an allusion to the unix pipeline, which enables a powerful and elegant functional programming model on unix shells.
Pipes language operates over events. Events are (usually immutable) sets of (key, value) tuples and may have some time information assigned to it. In the Pipes event model, events have no intrinsic type. They're all part of a single public stream of tuples. When a type information is applicable, it is usually encoded as a property of each event (e.g. type or __type).
The engine makes no assumption on the nature, frequency or schema of the incoming events. Every operation is written keeping in mind that the event may not have the referenced field or the value may not be of same type always. It is safe to say events are pretty well representable by schemaless JSONs. As a Java library, Pipes' default configuration understands instances of java.util.Map container.
It is recommended that events have a property called 'timestamp', with a Java timestamp, that is the number of milliseconds since 1 January 1970 UTC. But even that is not enforced nor required.
Unlike a SQL database, stream processing allows very little preprocessing in the events (e.g. index creation), so the heavy optimization is done in the queries. This is why, most of the times, the queries will run in the same engine: this way, the engine can optimize and share the most processing between them.
The first part of a Pipes query always consist in selecting some events from that stream, using a
filter (see Chapter 3: Filters). Filters select values from the public stream. Each filter contains
basically predicates about some fields and boolean operations between them. This has a great potential for
optimization. For example, given the filters type:http status:404
, type:http status:200
and type:http status:(2?? | 3??)
, the engine builds automata like the ones in the picture bellow:
Filters automata
After filtering the public stream, you can chain the result through one or more pipes (see Chapter 4: Pipes).
Each pipe may transform, filter or aggregate the results from the previous one. In the language, this chaining
is represented by the operator =>
.
Pipes computation model
Each pipe in the chain can have one or more output rates. For example, a pipe can receive an input every time the filter matches some event in the public stream, but only output events in batches of one minute. There are four different types of output: every time period, every item, every batch and at the end. For more information, see Section 2.3: Output rates and aggregation windows.
Inside each pipe, you can interact with events using expressions that access, transforms or aggregates its properties. For more information, see Section 2.4: Expressions.
Most pipes are implemented using parallel algorithms that allow seamless distribution over multiple physical machines. Usually, each pipe can be classified in three groups:
When writing a typical query, it's likely your pipes will distribute as follows:
Typical pipe in single machine
Typical pipe in multiple machines
Every pipe in the chain will have one or more output rates. In this section, we will focus on how to declare them in simple pipe, but the concept itself can be applied to any pipe type.
There are four types of output rates: every time period, every item, every batch and at the end.
every <period>
: outputs every time the pipe's internal clock ticks a constant period. E.g. every 5 seconds
every <number> items
: outputs every time the specified amount of items is processed by the engine. E.g. every 5 items
every <number> batches
: outputs every time the specified amount item batches is processed by the engine. E.g. every 5 batches
at the end
: outputs only at the end of execution. Useful for summary queries.The available period units are:
unit | also accepts | equivalent to |
---|---|---|
millisecond | milli, ms | |
second | sec | 1000 ms |
minute | min | 60000 ms |
hour | 360000 ms | |
day | ||
week | wk | |
month | mon | |
bimester | 2 months | |
quarter | 3 months | |
semester | 6 months | |
year | yr |
Keep in mind that in the case of simple pipe, there may be a difference between the declared rate and the effective rate.
In the query bellow, for example, the declared output rate in the last pipe is every item
, but effectivelly the query outputs every 10 seconds
.
* => count() every 10 seconds => count, prev(count) every item
Outputs every 10 seconds
This happens because when writing a query, you declare the output with pipe's local view. But the effective output rate considers the entire query, from the filter to the last pipe. In the example above, although the last pipe outputs every time it receives two items
Also in simple pipe, it is allowed to aggregate over a period other than the output. You can, for example, output a result every minute, that corresponds to the last 5 minutes of aggregation. It is an aggregation over time window.
The window concept only applies to the simple pipe.
In traditional stream processing libraries, every event is stored inside a data window and the aggregation runs over them. This allows a fine grained configuration of outputs vs windows. In Pipes, the goal is to use the least resources possible. So, instead of storing events inside the window, we only store the aggregation outputs (in a mergeable format, see Section 2.4.5: Aggregations state representation). The picture below exemplifies:
Windows merges aggregations outputs
This way, the memory footprint of a query is very low and predictable. Of course this approach introduces some limitations (e.g. the window size must be a multiple of the output rate) but the benefits pay it off.
There are four types of windows:
over last (<period>|<number> items|<number> batches)
: aggregates over last <period> / <output_rate>
outputs. E.g. over last 5 minutes
over current (<period>|<number> items|<number> batches)
: aggregates since the beginning of the current period. E.g. over current day
over span <string>
: aggregates over some timespan (cf. Chapter 8: Timespan Language). E.g. over span 'last 2 hours and 5 minutes'
over all
: aggregates over all events, merges all outputs.
It is worth noticing that item-based windows can only be used with item-based outputs, but time-based windows can be used with all but
at the end
outputs.
In fact, at the end
pipes support no window definition as there is only a single output at the end. Below, there is a
table with examples showing what is valid and what is not.
window/output examples | every 2 minutes | every 2 items | every 2 batches | at the end |
---|---|---|---|---|
over last 10 minutes | ✔ valid | ✔ valid | ✔ valid | ✖ invalid |
over current 10 minutes | ✔ valid | ✔ valid | ✔ valid | ✖ invalid |
over last 10 items | ✖ invalid | ✔ valid | ✖ invalid | ✖ invalid |
over current 10 items | ✖ invalid | ✔ valid | ✖ invalid | ✖ invalid |
over last 10 batches | ✖ invalid | ✖ invalid | ✔ valid | ✖ invalid |
over current 10 batches | ✖ invalid | ✖ invalid | ✔ valid | ✖ invalid |
over span 'last 10 minutes' | ✔ valid | ✔ valid | ✔ valid | ✖ invalid |
over all | ✔ valid | ✔ valid | ✔ valid | ✖ invalid |
Expressions access and transform data from events. We will introduce them here, explaing Pipes' type system and how aggregations work. For detailed information on expressions, please refer Chapter 5: Operators, Chapter 6: Scalar Functions and Chapter 7: Aggregation Functions.
The Pipes language is statically typed, but has a very simple type system. There are four main types: number, string, boolean and object. All types inherit from object. There are a few other types, used in some specific functions. The table below summarizes all types:
type | java equivalent | example |
---|---|---|
number | java.lang.Double | 42 , 42.0 , 1e42 |
string | java.lang.String | "42" |
boolean | java.lang.Boolean | true , false |
object | java.lang.Object | null |
comparable | java.lang.Comparable | |
period | net.intelie.pipes.time.Period | 1 day , 42 minutes , 1 month {America/Sao_Paulo} |
seq | java.util.Iterable | (1, 2, 3, 4) , range(10) |
row | net.intelie.pipes.Row | (123 as abc, 456 as def) |
Pipes does not require events to be strongly typed (although it allows them to be). Because of this, the type of a property access may be not known at compile time. Still, the language is statically typed, so some type must be assigned to the expression. The chosen type depends on the configuration, but the default is to be a string. This means that even if the value of the field in the event contains a numeric type, it will be cast as string before being used by the engine.
Types are checked in compile time. For example, trying to use a function that requires a number passing a
string (like avg(someproperty)
) results in error:
PipeException: Error in call avg(someproperty) with types <PropertyAccess[string]>, cause(s): AvgAggregation: The parameter #1 <someproperty>; must be 'number' instead of 'string'.
It is possible to tell the engine what type an expression must have, using the type's name as a function. E.g.
number(someproperty)
indicates that someproperty is a numeric value and will coerce
to a number in cases it isn't. When used with other expressions, those functions also convert values:
number("42")
will just return 42.
The string and number types also have shorthands for these functions:
number(someproperty)
is the same as someproperty#
string(someproperty)
is the same as someproperty$
Some types also have arguments. In special, seq
has a type argument that informs the
type of all elements in the seq. E.g., the literal (1, 2, 3, 4)
is a seq(number)
.
This is specially useful when using the pipe @for <named(seq)>, <named... additional>
:
* => @for (1,2,3,4) as x
x
in the query will be a number
in the next pipe
The row
type has an also an argument telling the field list of that row. E.g., in the query:
* => A, B# by C# => last(*) as result every minute
the field result
is a row(timestamp<number>;C<number>;A<string>,B<number>)
An instance of typed row
has three main uses:
@yield [<object expr>]
expand
keyword in a select or by
clause. (see simple pipe)<object>-><identifier> → <object>
operator
The types row
and seq
share an intimate relation, because every row
is also a seq
by polymorphism. The type argument of the underlying seq
depends on the types of the row. A
row(number, number)
is a seq(number)
, but a row(number, string)
is a seq(comparable)
.
Examples
* => id, name, value# => last(*) every minute => @yield
Selects fields and outputs the last every minute.
Please note that last(*)
outputs a single field of type row
with 3 fields in its metadata and
the pipe @yield
expands them into an event.
* => id, name, value# => expand last(*) every minute
Equivalent to above. But the pipe @yield
accepts any scalar, while the expand
keyword only works with
row
values with metadata.
* => expand name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as {extracted_*_name}
Yields an event with the fields extracted_first_name
and extracted_last_name
.
Most Pipes's literals are well known in many programming languages. In special, we can enumerate:
Numeric literals
Since Pipes has no integer data type, only number
, the only numeric literals are those in the
following examples: 42
, 42.1
, 1e42
.
Boolean literals
Only true
or false
.
Also, not exactly a literal, but you can write boolean filters with the filter syntax
using the operator \<filter>
.
Null literal
null
String literals
There are some flavors of string literals:
name | example | escaping? | interpolation? | useful for |
---|---|---|---|---|
Single quoted | 'abcde' | ✔ yes | ✖ no | ordinary strings |
Single quoted raw | r'\(some text\)' | ✖ no | ✖ no | regexes |
Double quoted | "abc${x#+y#}def" | ✔ yes | ✔ yes | interpolated strings |
Row and seq literals
You can write row
instances using the syntax as in the following examples: (1, 2, 3, 4)
,
(123 as propa, '456' as propb)
.
The row will be strongly typed. And because of the polymorphism, the row
will also be a strongly typed seq
using the most specific type that covers all the values. For example,
(1, '2', 3)
will be a row(_1<number>, _2<string>, _3<number>)
, but also a seq(comparable)
.
If you wish to define a row with only one element, you MUST use a comma
after the single element ((42,)
). If you don't, the compiler will treat the value as an expression, i.e. (42) != (42,)
.
Every Pipes expression can be either a scalar or an aggregation. We call it a level.
Scalars are expressions that receive an event and computes its result immediately. E.g. to get the length of a string,
one can write len(someproperty)
. In this case, the whole expression is a scalar.
Aggregations are expressions that receive several events but only computes its result at the end of the window.
E.g. to get the average of a property during the window, one can write avg(someproperty#)
.
In this case, the whole expression is an aggregation.
When a scalar has only constant values (and operations over them) we can say it is a constant. E.g. the
literal value "some value"
is a constant. len("some value")
is also
a constant, and may be optimized in compile to time to the value 10.
When a scalar does not depend on the value of the event, but isn't a constant either, it is called a unbound scalar. Most notable examples are the system functions or the random() function.
Most functions just assume the same level as some (sometimes all) of its parameters. E.g. in the expression
len(first(someproperty))
the resulting expression is an aggregation, because
first(someproperty)
is also an aggregation. When a function always result in an
aggregation, we refer it just as aggregation.
Some functions (mostly aggregations) require parameters to have some specific level. E.g. in the aggregation
avg(<number>, [<number weight>]) → <number>
, the parameter must be a scalar.
For example, trying to compile avg(first(someproperty#))
result in an error:
PipeException: Error in call avg(first(someproperty#)) with types <FirstAggregation[number]>, cause(s): AvgAggregation: The parameter #1 <first(someproperty#)> must be a scalar instead of an aggregation.
For most purposes, every constant is also an unbound scalar (that always return the same value), every unbound scalar is also scalar (it just ignores the event) and every scalar may act like an aggregation, if required to.
Level set diagram
One of the main features in Pipes is the ability to run aggregations distributedly. To make this possible, most aggregations are written using parallel versions of the algorithms. Most of the solutions presented in this section only affect how semi-safe pipes (see Section 2.2: Chained computation model) represent their result over the wire. Additionally, this same mechanism is used by windows in simple pipe (see Section 2.3: Output rates and aggregation windows) and meta-aggregations (e.g. overlast, overall) to merge the results from many outputs.
Aggregation tree-merge-eval model
Lets take as example the average aggregation. Suppose there will be two machines calculating the
query type:http => avg(response_time#) every minute
in parallel. They share their results every minute. Suppose:
Using only this information, it is impossible to know the global average value. If we just take the mean value from both results (649.26) this value could be wrong, because the machine 1 could have received way more events than machine 2, so its result should weight more in the global result.
That's the reason why every aggregation has an internal state representation more complex than just its result. In the case of average aggregation its state representation has two fields: "mean" and "sumw".
Aggregations state representation
Some aggregations have a more complex inner state, like the distinct count aggregation, that uses the HyperLogLog data structure and must share the sketch content with the other nodes to merge its results. But even that state uses constant memory for its representation.
There is a meta-aggregation in pipes called describe that operates over other aggregations,
but instead of yielding the result at the end of the window, it show a json representation of the aggregation's inner
state. Using the same example as before, writing describe(avg(response_time#))
would return a value like:
{"sumw":262.0,"mean":416.628854962}
When using simple pipe with a non-empty group, the query tries to reduce memory consumption by removing any group that has not received any event in the last output period. Most aggregations do not hold any state other than the required in the last period, so this is fine. But some aggregations need additional state (e.g. overlast, prev) and may prevent this memory reclaim. There are even aggregations that need to be kept in memory forever to work properly (e.g. overall).
To solve this problem, every aggregation has a property called TTL (time to live). Which defines how long it should prevent row state from being deleted from a query if no events are received in that group. If there is no grouping, the property is ignored.
Most aggregations have TTL=1, but some aggregations with persistent state may have longer TTLs (even infinite).
There is also the function <object>:keep([<number ttl>]) → <object>
that allows to change the TTL of any expression to any desired value.
Property expressions are the way to access event's properties. In a glance, every identifier denotes a property access. Properties are scalar expressions, but they can also be used in aggregation pipes, yielding the last evaluated expression.
* => last(cpu#), len(host) by host
In the example above, len(host)
is a scalar, but acts as aggregations and returns the length of the
host name.
Sometimes, there are property names that collide with keywords in the language. Or cases where the property name doesn't match the traditional identifier syntax. In this case, you can use curly braces to define the property name, like in the example below:
* => last({CPU usage value}) as cpu, len(host) as {product} by host
The way properties are fetched is specific in each configuration, but by default, it reads values from
a java.util.Map instance, always as a string (casting to it when necessary). To access
it as a number, for example, use the cast operators, e.g. prop#
or prop:number()
.
Some configurations may allow indexing properties access. It may be done by using brackets. For example,
assuming the event has a property config
that is itself a java.util.Map:
* => config['hostname']
This indexing is identic in syntax to the indexing operator
(<object>[<object...>] → <object>
), but the semantics described here only apply to property access and is useful only
to apply indexing before the usual cast occurs. E.g. someprop
is a string, but someprop[0]
isn't
the first character of the string in someprop
. It has the indexing semantics defined in configuration.
Usually this is necessary for multivalued fields. Or fields you already know that contain a list instance.
The property expression can also be used to access values from previous pipes. In this case, the value will have the same type as defined in the referenced pipe. Example:
* => avg(cpu#) as avg, stdev(cpu#) as stdev every minute => avg-2*stdev as lower, avg+2*stdev as upper
There are also two special expressions that act like properties, but have special semantics: _
and *
.
The expression _
returns the value of the default property. In untyped pipes, this is the entire event.
In typed pipes, it is the first property of the previous pipe. I.e., in untyped pipes, _
is equivalent
to *
, and in typed pipes, it is equivalent to {0}
.
host1 | host2 => avg(cpu#) as avg by _ every minute => _*2 as doubled
host:host1 | host:host2 => avg(cpu#) as avg by * every minute => avg*2 as doubled
The expression *
returns the raw input event. Very useful when you don't want to repeat every field
present in the event (or when you don't even know them in advance).
host1 => last(*) every second => @yield
Yields only the last event every second. In the example above, the expression last(*)
has the type
object and is as weakly typed as the input stream.
host1 => cpu#, memory# => last(*) every second => last->cpu, last->memory
In the (contrived) example above, the expression last(*)
has the type row
and is strongly typed. That's why when we use the operator 'peek' on the property
last
, the resulting values are also strongly typed.
Pipes language allows previously-configured functions to be called inside queries. The default syntax
has nothing new. You can call a function by using parentheses after the function name. E.g.
fn(param-1, param-2, ..., param-n)
. You can check the functions available by default at
Chapter 6: Scalar Functions and Chapter 7: Aggregation Functions. Please note that it is possible to add
other functions by configuration, so it isn't a complete list.
Other than the default syntax, there are other ways to call a function. For example, the syntax
a:fn
is equivalent to fn(a)
and a:fn(b)
is equivalent to fn(a, b)
.
You can find a table bellow with all the alternative syntaxes for method call.
syntax | equivalent to |
---|---|
a:fn |
fn(a) |
a:fn(b, c) |
fn(a, b, c) |
a#fn |
fn(a#) |
a#fn(b, c) |
fn(a#, b, c) |
a$fn |
fn(a$) |
a$fn(b, c) |
fn(a$, b, c) |
:fn |
fn(_) |
:fn(b, c) |
fn(_, b, c) |
#fn |
fn(_#) |
#fn(b, c) |
fn(_#, b, c) |
$fn |
fn(_$) |
$fn(b, c) |
fn(_$, b, c) |
Every query in Pipes is required to start with a filter. The filter runtime is optimized to allow many queries running over the same stream, sharing as much processing as possible.
The filter syntax is different from the rest of the syntax. It is inspired in the Lucene's query syntax.
Examples:
type:http status:404
Filters all records where the field "type" has a value "http" and "status" has a value "404"
Special filter that allows all records through.
When used in conjunction with field syntax, selects all records that contain that field.
Examples:
*
All records (no filter)
type:*
Filters records that contains any value in the field "type".
Selects records where one of the current fields matches <term>.
The match is case insensitive.
The use of wildcards (* and ?) is allowed.
Use the field syntax to change the default field.
Examples:
http
If the default field is "somefield", selects records where "somefield" has a value "http" (case insensitive)
otherfield:http
Selects records where "otherfield" has a value "http" (case insensitive)
otherfield:http*404
Selects records where "otherfield" has a value that starts with "http" and ends with "404" (case insensitive)
Selects records where one of the current fields matches <term> with at most <maxEdits> edits.
The match is case insensitive.
Wildcards are not allowed in this filter.
Examples:
http~2
If the default field is "somefield", selects records where "somefield" has a value similar to "http" (e.g. "hxxp").
Selects records where one of the current fields is between <lower> and <upper>.
The match is case insensitive.
The range filter is inclusive in both ends. To make it exclusive, change "[" to "(" and/or "]" to ")".
The TO
keyword must be written in uppercase. It can also be replaced by a single
comma with no change in meaning.
Wildcards are not allowed in this filter. Except for the single *
.
In this filter it means an unbounded end.
There are shorthand syntaxes for some versions of this query:
[lower TO *]
can be writen as >= lower
(lower TO *]
can be writen as > lower
[* TO upper]
can be writen as <= upper
[* TO upper)
can be writen as < upper
Examples:
status:[200 TO 299]
Selects records where status is between 200 and 299
status:[200, 300)
Selects records where status is between 200 and 299 (300 exclusive)
status:[200, *]
Selects records where status is greater than or equal to 200.
status:>=200
Same as the filter above.
Sets the current field to <field>. Usually followed by <term> (e.g. somefield:someterm).
Any filter expression can be used inside a filter field. Even other filter fields.
Examples:
type:http
Selects records where the field "type" is equal to "http".
type:(http | cpu)
Selects records where the field "type" is equal to "http" or "cpu".
Selects the intersection of two other filters.
It's the default filter conjunction, so it can safely be omitted.
The operator &
is equivalent to &&
and AND
(must be uppercase).
Examples:
http verb:get status:404
Selects records with the default field equal to "http", "verb" equal to "get" and "status" equal to "404"
http & verb:get & status:404
Same as above, but with explicit &
tag:(important & resolved)
Selects records with the field "tag" containing both "important" and "resolved" values
Selects the union of two other filters.
The operator |
is equivalent to ||
and OR
(must be uppercase).
Examples:
http | verb:get | status:404
Selects records with the default field equal to "http", "verb" equal to "get" or "status" equal to "404"
tag:(important | resolved)
Selects records with the field "tag" containing either "important" or "resolved" values
Selects the complement of another filter.
The operator -
is equivalent to !
and NOT
(must be uppercase).
Examples:
-verb:get
Selects records with the field "verb" different of "get"
tag:(a* -abnormal)
Selects records with the field "tag" starting with "a", but not equal to "abnormal"
Pipes are the main building blocks in this language. No surprise the language is named after them. Pipes represent one single processing element that consumes events from the input and produces other events in the output.
The first pipe reads from the filtered public stream. Then, the output from one pipe can be chained as the
input to another using the operator =>
.
Each kind of pipe has its own properties, like rate of output and data window type and size. Some pipes can run on a distributed environment, some cannot. Some pipes may consume very low memory, some may need a lot of memory to run. All of these properties are derived and checked in compile time.
In this chapter we will split them in two types: stateful and
stateless pipes. The main difference between them is that stateless pipes
consume no additional memory and can be used to transform sequence expressions through
the operator <seq> |> <object> → <object>
.
Transforms or aggregates records over configurable data window and output.
This is the default type of pipe. It receives events, aggregates them and output
the result based on what is configured on the every
clause. By default, the aggregation
resets its state on every output, but the event retention can be configured on the over
clause. The acceptable values for both clauses are discussed at Section 2.3: Output rates and aggregation windows.
The first part of the pipe defines which aggregations will be made. It accepts all kinds
of expressions and treats them all as aggregations.
All expressions can act as aggregations.
The expression can receive an automatic name based on its contents or you can defined
custom one using the keyword as
. Example:
* => sum(x#)/count()**2 as value
Outputs the sum of the variable x
divided by the event count squared every second.
If the expression is of type row
and has metadata associated to it, you can also use the keyword
expand
to expand its values. Fore more info, see Section 2.4.2: Type arguments. It's worth
noting that expand
keyword skips any timestamp fields the row might have.
The by
clause allows one to group the results by one or more scalar values.
Aggregations are not allowed in the by
clause, only scalar values. When grouping,
the group only outputs if at least one of its aggregations has still TTL, this usually means
that it only outputs if the group received some event in the last period, but there
are exceptions. For more info, see Section 2.4.6: Aggregation TTL.
All clauses are optional, so the simplest possible pipe is an empty string, which
is equivalent to count() every 1 second
. Examples:
* =>
Outputs how many events match the query type:http
every second.
* => by host
Outputs how many events match the query type:http
every second, aggregating by host.
This pipe can be classified as safe, semi-safe or unsafe, depending on which expressions, period and output are chosen. For more information on pipes safety, see Section 2.2: Chained computation model.
The simple pipe is safe if and only if all the expressions are
at least scalars, it has no by
nor over
clauses and the every
clause
is ommitted or exactly every item
. This means in practice the pipe is safe if
it is only a stateless transformation item-by-item of the input stream. Example:
* => id, name, value# * 3.14
The simple pipe is unsafe if and only if the output type is
every <n> items
or it has some aggregation that forces the pipe to be unsafe
(e.g. smooth). This means in practice the pipe is unsafe if it depends
on some total order in the input, achieved running in a single node. This is true for
item batch outputs and some aggregations. Please notice that unsafe pipes with no
safe or semi-safe pipes before must be preceeded with @unsafe
. Example:
* => @unsafe => avg(value#) every 10 items
* => @unsafe => smooth(value#) every 10 seconds
The simple pipe is semi-safe in all other cases, i.e., when the output is by time period and it has no aggregation that makes it unsafe.
* => avg(value#) every 10 seconds
Computes the cartesian product of the outputs from two pipes with same output rates.
The outputs from two pipes are compatible if they are same type. For example every 5 seconds
is compatible with every 7 seconds
, but not with every 5 items
. In this case, the
resulting pipe would assume both outputs as its own output definition.
The resulting pipe will have all the fields from both queries (some fields may be renamed to avoid collision).
When defining fields in the on
clause, the fields must be present in both pipes and
must have the same type. A compilation error will occur if these restrictions are not met.
Also, fields in the on
clause aren't renamed, so they'll happen only once in the
resulting event.
Examples:
* => [ sum(value#) by host join sum(value#) as total ]
Computes the sum grouped by host with a field containing the total sum in each group.
The example above would generate events like:
{"timestamp":1399927869000, "host":"host1","sum_value":42.0, "total_sum":129.0} {"timestamp":1399927869000, "host":"host2","sum_value":43.0, "total_sum":129.0} {"timestamp":1399927869000, "host":"host3","sum_value":44.0, "total_sum":129.0}
You can also define a field to join the results on. For example, if you need to calculate the metrics in previous example by host, you can join on this field using:
Metrics => [ @filter \metric_name:CPU => avg(value#) as cpu by host every minute join on host @filter \metric_name:Memory => avg(value#) as mem by host every minute ]
Outputs the average cpu and memory for each host every minute, in the same event.
The example above would generate events like:
{"timestamp":1399927869000, "host":"host1","cpu":89.0, "mem":1314.0} {"timestamp":1399927869000, "host":"host2","cpu":21.0, "mem":715.55} {"timestamp":1399927869000, "host":"host3","cpu":40.0, "mem":16111.7}
Use ljoin
, rjoin
and lrjoin
for outer joins.
Metrics => [ @filter \metric_name:CPU => avg(value#) as cpu by host every minute lrjoin on host @filter \metric_name:Memory => avg(value#) as mem by host every minute ]
Outputs the average cpu and memory for each host every minute, in the same event.
The example above would generate events like:
{"timestamp":1399927869000, "host":"host1","mem":1314.0} {"timestamp":1399927869000, "host":"host2","cpu":21.0, "mem":715.55} {"timestamp":1399927869000, "host":"host3","cpu":40.0}
Concatenates the outputs from two pipes with compatible output rates.
The outputs from two pipes are compatible if they are same type. For example every 5 seconds
is compatible with every 7 seconds
, but not with every 5 items
. In this case, the
resulting pipe would assume both outputs as its own output definition.
If both pipes' type definitions are also compatible, the resulting pipe will assume the first pipe's types and field names. The field names must not be equal between the pipes, only their types and order.
Example:
* => [ sum(value#) by host union 'Total', sum(value#) ]
Computes the union of the sum grouped by host with the total sum. Note that the fields in the by
clause come first in the final order, that's why 'Total'
comes first in the second pipe.
The example above would generate events like:
{"timestamp":1399927869000, "host":"host1","sum_value":42.0} {"timestamp":1399927869000, "host":"host2","sum_value":43.0} {"timestamp":1399927869000, "host":"host3","sum_value":44.0} {"timestamp":1399927869000, "host":"Total","sum_value":129.0}
Compresses the result from the previous pipe to at most k most important rows.
Compress pipe keeps most of the previous pipe metadata, except for the output. It
outputs only at the end
. It is also an unsafe pipe (see
Section 2.2: Chained computation model).
The pipe can optionally be grouped. In this case, it may be useful to define a global maximum to avoid that a group explosion causes the number of points to grow too much.
Examples:
* => count() every minute => @compress 300, _
Computes the event count every minute, but compress it to the 300 most important points at end of the query.
* => count() by host every minute => @compress 300, 1000, _ by host
Computes the event count by host every minute, but compress it to each host's 300 most important points at end of the query. Also limits to output at most 1000 points globally (even if there are 4 hosts or more).
Only outputs rows that follows <period> time without any output.
The exceeding rows are discarded. Due to its nature, this is an semi-safe pipe (see Section 2.2: Chained computation model). When running in distributed environment, this pipe is used as-is both in the map and reduce pipes.
Debouncing: the red events are discarded.
Examples:
* => count() every minute => @filter _ > 100 => @debounce 15 minutes
Alert when the number of events in a minute is greater than 100, but only outputs if there was a silence of at least 15 minutes.
* => count() by host every minute => @filter _ > 100 => @debounce 15 minutes by host
Same as previous, but grouping by host.
Keeps the latest batch of input events and output it at the end.
This pipe is a convenience to output only the latest batch of events with the output
at the end
. This concept of item batch doesn't exist in the syntax, so it would
be harder to write queries that are only interested in the latest value for each group,
for example. They are especially useful when replaying events in the past for a graph
that only shows the latest value.
@latest
is an unsafe pipe (see Section 2.2: Chained computation model).
Examples:
* => count() by host => @latest
Outputs only the latest batch of counts by host at the end of the execution.
Outputs only events that change the last value of any expression in the group
Please note that the first event is always outputed, even if the property value is null.
Examples:
* => @unsafe => @onchange valid by host
Outputs only events whose value in the property valid
is different from the last event.
Outputs only events for which condition evaluates false and was not false in the previous event
Please note that the first event is always outputed.
Examples:
* => @unsafe => @onfalse valid by host
Outputs only events whose value in the property valid
is false
and different from the last event.
Outputs only events for which condition evaluates true and was not true in the previous event
Please note that the first event is always outputed.
Examples:
* => @unsafe => @ontrue valid by host
Outputs only events whose value in the property valid
is true
and different from the last event.
Limits the output of the previous pipe to at most <k> rows per <period>.
The exceeding rows are discarded. This is an unsafe pipe (see Section 2.2: Chained computation model).
Throttling: the red events are discarded.
Examples:
* => count() every minute => @filter _ > 100 => @throttle 15 minutes
Alert when the number of events in a minute is greater than 100, but throttles the output to once in every 15 minutes.
* => count() by host every minute => @filter _ > 100 => @throttle 2, 15 minutes by host
Same as previous, but allows at most two events by host every 15 minutes.
Extracts a batch of events in many batches, one for each event.
Available since Pipes v0.13
Specially useful if used in conjunction with batch output type.
Marks that any pipe executed after this must run in a non-distributed environment.
The engine forces that each unsafe pipe must be preceeded with a safe or semi-safe pipe. It is recommended to avoid excessive communication between nodes. When you really must write a query that breaks this rule, the unsafe pipe must be preceeded with this pipe, that is in fact an empty pipe that forces the pipe split, making everything after it to run in the reduce node. For more information, see Section 2.2: Chained computation model.
Examples:
* => @unsafe => count() over all every item
Outputs the total count of events since the beginning of the query every time an item arrives.
Modify a pipe to run entirely in the same node, ignoring any distribution invariants.
This modifier is useful when the data is known to be already partitioned. For example, if you are sure that each host will be consistently sent to the same node, you can avoid distribution costs by writing aggregations using this modifier.
Compare the following picture with the one described in Section 2.2: Chained computation model.
Typical atomic pipe in single machine
Typical atomic pipe in multiple machines
Examples:
* => atomic count() by host every minute
Outputs the event count by host every minute. Each output will be processed in the node the event arrived, never being merged between nodes (because it assumes that the data is already partitioned by host)
Chains many sequences in a batch into a single sequence
If the expression is not defined, it is assumed to be _
.
Examples:
* => @chain (x#, y#)
Yields a seq(number)
for each batch, with 2 numbers for each event in the input.
* => (x1:seq, x2:seq) |> @chain
Concatenates the sequences x1
and x2
for each event.
Filters the results from previous pipe.
Filter pipe does not change any pipe metadata, it just repeats previous pipe info.
Examples:
* => count() by host => @filter _ > 10
Filter only hosts that have event count greater than 10.
Unwinds events in the seq
instance
Available since Pipes v0.13
If the seq
has a type argument, the resulting field has the same type of that argument.
It is possible to use the keyword expand
if the inner type is a strongly typed row.
Examples:
* => @for products:seq as product
Creates one event for each element inside the field products
.
* => @for range(10) as i
Creates 10 events with timestamp<number>
and i<number>
.
* => @for expand range(10) |> (_+1 as a, _$ as b)
Creates 10 events with timestamp<number>
, a<number>
and b<string>
.
=> 100 as upto at the end => @for range(2, upto) as prime => @for range(1, floor(prime**0.5)+1) as j, prime => count(prime%j==0) by prime => @filter _==1 => summary(prime$) as primes
Computes prime numbers up to 100.
Changes the static type of any row to the highest corresponding seq.
Available since Pipes v0.13
Examples:
* => x, y# => @seq
Changes the type from row(timestamp<number>;;x<string>,y<number>)
to seq(comparable)
.
Sets fields in the original event (both typed and raw)
Examples:
* => @set (date+time):dateparse('yyyyMMddHHmmss') as timestamp
Adds a new field to the original event that is the timestamp parsed using
both date
and time
fields from the event.
* => date, time, text => @set (date+time):dateparse('yyyyMMddHHmmss') as timestamp
Adds a new field to the original event that is the timestamp parsed using
both date
and time
fields from the event. Equivalent to expand *, (date+time):dateparse('yyyyMMddHHmmss') as timestamp
as the previous pipe is typed.
Skips some results from previous pipe.
Skip pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
Examples:
* => count() by url, host => @sort _ desc by host => @skip 3 by host
Gets most accessed urls by host, except the 3 most accessed in each host.
Skips rows from every batch while some condition is true.
Available since Pipes v0.13
This pipe also accepts aggregation as condition. In this case, it
will behave like every row is a batch and with a over all
window.
Examples:
* => count() by url => @sort _ desc => @skipwhile :sum < 1000
Gets the most accessed urls, skipping until the access count sum is greater than or equal to 1000.
Gets some rows (based on index) from previous pipe.
Available since Pipes v0.13
Slice pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
Examples:
* => count() by url, host => @sort _ desc by host => @slice 2, 10, 3 by host
Gets the 2nd, 5th and 8th most accessed urls by host
Sorts the results from previous pipe.
Sort pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
The expressions can be any comparable
, optionally followed by asc
or desc
to make the sort order explicit.
Examples:
* => count() by host => @sort _ desc, host
Sorts the hosts by event count descending, and then by host (when the counts are equal).
Limits the results from previous pipe.
Take pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
Examples:
* => count() by url, host => @sort _ desc by host => @skip 3 by host => @take 2 by host
Gets the 4th and 5th most accessed urls by host
Takes rows from every batch while some condition is true.
Available since Pipes v0.13
This pipe also accepts aggregation as condition. In this case, it
will behave like every row is a batch and with a over all
window.
Examples:
* => count() by url => @sort _ desc => @takewhile :sum < 1000
Gets the most accessed urls, while the access count sum is lower than 1000.
Sorts the results and gets the first k rows (possibly grouped) from previous pipe.
Top pipe does not change any pipe metadata, it just repeats previous pipe info. It also doesn't make any aggregation, it just outputs as soon as it finishes processing input.
The expressions can be any comparable
, optionally followed by asc
or desc
to make the sort order explicit.
Examples:
* => count() by host => @top 10, _ desc
Outputs the 10 hosts with the most events.
* => count() by url, host => @top 10, _ desc by host
Outputs, for every host, the 10 urls with the most events.
Extracts one field of the stream to be the output event.
If the expression is not defined, it is assumed to be _
.
If the expression type is row
and it has metadata, the pipe assumes the type's metadata.
See concepts-expression-typeargs
for more information.
If the expression is an aggregation, the pipe will yield the all the input rows applied to the aggregation argument.
Examples:
* => name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as name => @yield name
Yields an event with the fields first
and last
.
* => name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as name => @yield list(name)
Yields a single event for every batch with a list of names in that batch.
Takes elements from a batch of seqs, one by one and make a seq of seqs.
Available since Pipes v0.13
If the expression is not defined, it is assumed to be _
.
Examples:
* => @zip (x#, y#)
Yields 2 sequences every batch, one with all the x#
in the batch, other with all the y#
.
* => (x1:seq, x2:seq) |> @zip
Generates a sequence like ((x1[0], x2[0]), (x1[1], x2[1]), ... )
.
Operators are elements of syntax that allows simple manipulation of scalar and aggregation values.
Every operator translates directly to a function call. E.g. the expression 2+2==4
is equivalent to .eq(.add(2, 2), 4)
.
You can find bellow a table with the operator group precedence, from highest to lowest. Operators inside same group have the same precedence.
Only the ternary operator is right-associative. All other operators are left-associative.
You can change both procedence and associativity by using parentheses. E.g. a+b*c
is equivalent to a+(b*c)
.
Returns a boolean expression that evaluates true if the filter accepts the event.
The filter syntax is the same described in Chapter 3: Filters.
Everything after the \
is considered part of the filter, until it finds a ]
,
)
, ,
or =>
.
Sometimes it is necessary to enclose the expression in parentheses to avoid ambiguities.
E.g. \field:value as something
should be (\field:value) as something
.
This is so because as
is a valid expression in filter language and means something
different there.
Examples:
CPU | Memory => [ @filter \CPU => max(value#) as cpu every minute join @filter \Memory => max(value#) as mem every minute ]
Yields the maximum value every minute for both streams on the same event.
CPU => max(value#):if(\host:A) as cpuA, max(value#):if(\host:B) as cpuB every minute
Yields the maximum CPU value every minute for both hosts on the same event.
Coerces the expression to number. Shorthand to <object>:number() → <number>
.
The expression a#
is equivalent to number(a)
.
Example:
* => avg(response_time#) every minute
Yields the average of the field response_time every minute.
Coerces the expression to string. Shorthand to <object>:string() → <string>
.
The expression a$
is equivalent to string(a)
.
Example:
* => 'Count: ' + count()$ every minute
Yields the number of events every minute as a string 'Count: X'.
Adds two numbers.
The expression a+b
is equivalent to .add(a, b)
.
Concatenates two strings.
The expression a+b
is equivalent to .add(a, b)
.
Concatenates two seqs.
Available since Pipes v0.13
The expression a+b
is equivalent to .add(a, b)
.
Subtracts one number from another.
The expression a-b
is equivalent to .sub(a, b)
.
Multiplies two numbers.
The expression a*b
is equivalent to .mul(a, b)
.
Divides one number by another (float division).
The expression a/b
is equivalent to .div(a, b)
.
Divides one number by another (integer division).
The expression a//b
is equivalent to .intdiv(a, b)
.
Raises one number to another's power.
The expression a**b
is equivalent to .pow(a, b)
or even pow(a, b)
.
Returns the rest of the division of one number by another.
The expression a%b
is equivalent to .mod(a, b)
.
Negates one number.
The expression -a
is equivalent to .neg(a)
.
Returns the logical AND of two booleans.
The expression a and b
is equivalent to a&b
, a&&b
and .and(a, b)
.
Returns the logical OR of two booleans.
The expression a or b
is equivalent to a|b
, a||b
and .or(a, b)
.
Returns the logical XOR of two booleans.
The expression a xor b
is equivalent to a^b
and .xor(a, b)
.
Returns the logical NOT of a boolean.
The expression not a
is equivalent to !a
and .not(a)
.
Checks whether two objects are equal.
The expression a==b
is equivalent to .eq(a, b)
.
Checks whether two objects are not equal.
The expression a!=b
is equivalent to .neq(a, b)
.
Checks whether the left operand compares lesser than the right one.
The expression a<b
is equivalent to .lt(a, b)
.
Checks whether the left operand compares lesser than or equal to the right one.
The expression a<=b
is equivalent to .lteq(a, b)
.
Checks whether the left operand compares greater than the right one.
The expression a>b
is equivalent to .gt(a, b)
.
Checks whether the left operand compares greater than or equal to the right one.
The expression a>=b
is equivalent to .gteq(a, b)
.
Extracts a property from an object.
The expression a->identifier[123, 456]
is equivalent to .property(a, 'identifier', 123, 456)
Example:
* => name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as match => '%s, %s':sprintf(match->last, match->first) as converted
Converts the name field to the format "Last, First".
* => last(*) as event every minute => event->name
Extracts name
from the untyped field event
.
Evaluates an expression in the semantic context of the target
The expression a->(b)
is almost equivalent to .peek(a, b)
, but
in the former case b
is evaluated in the semantic context of a
.
Example:
* => span('today')->(start*2, end, start+end)
Evaluates span('today')
and changes it into another row
object without having to go through another pipe.
Accesses an object by index/key in a list/string/map.
Available since Pipes v0.13
The expression a[b, c]
is equivalent to .get(a, b, c)
.
Please note that this operator has special semantics when used directly after a
property reference. So field[123]
may be different from (field+field2)[123]
.
Example:
* => (field1+field2)[4]
Yields the 5th letter (index 4) of a string made of field1+field2
.
Evaluates the expression in the parent semantic context (if any).
Available since Pipes v0.13
The expression ^a
is almost equivalent to .hat(a)
, but
in the former case, a
is evaluated in the parent semantic context.
This operator is useful in conjunction with other operators that generate child semantic contexts,
like <seq> |> <object> → <object>
and <object>-><identifier> → <object>
. In those operators, it may be necessary to
evaluate some expressions in the parent context.
Example:
=> at the end => @for range(10)|>(range(10)|>"${^_}x${_} = ${^_*_}")|>@chain as line
Yields a multiplication table with one event for each line.
Returns the first if it is not null; otherwise, returns the second.
The expression a??b
is equivalent to .coalesce(a, b)
.
Example:
* => first(name) ?? 'None' as first_name every second
Yields the first name in the window, or 'None' if no name was found.
If the condition is true, returns the first object; otherwise, returns the second.
The expression a?b,c
is equivalent to .iif(a, b, c)
.
Please note this is the only right-associative operator. It is so to allow constructions like:
AllMonitorings => (type == 'http' ? 'Http Monitoring', type == 'cpu' ? 'CPU Monitoring', type == 'mem' ? 'Memory Monitoring', 'Unknown') as monitoring_type
Example:
* => count()%2==0 ? 'Even', 'Odd' as type every second
Yields 'Even' or 'Odd' every second, depending on the number of events in the window.
Transforms a sequence into another sequence or object.
Available since Pipes v0.13
The expression a|>b
is almost equivalent to .transform(a, b)
, but
in the former case, b
is evaluated in the semantic context of a
.
E.g., in (1, 2, 3) |> _+1
, _
is each element of the sequence on the left.
If the expression on the right is an aggregation, the result is the application
of it into all elements of the sequence. E.g. range(10) |> sum(*)
is the
sum of all numbers between 0 and 9.
In the right operand, the expression _
has a special meaning. It always mean
the entire object (like the *
expression). This allows us to write the example above
as range(10) |> :sum
.
If the expression on the right is a scalar, the result is another seq
with the expression applied to all the elements in the original sequence.
E.g.: (1, 2, 3) |> $
casts all numbers in the sequence to string.
If the original sequence is an instance of row
, the result will
also be a row with the same name, but with types changed, e.g. (1 as a, 2 as b) |> $
is the same as ('1' as a, '2' as b)
.
This operator also allows a special construction: the right operand may be
a call to a stateless pipe. In this case, all the elements in
the sequence will be passed through the pipe as a single batch. E.g.
range(20)|>@filter(_%2==0)
returns all the even numbers between 0 and 19.
Please note that the syntax requires parentheses, as it would be in a function call.
The pipe syntax doesn't. If no arguments are needed, the parentheses are
optional, e.g. (range(5), "abcde":split)|>@zip
Some stateless pipes may use grouping, like @top
. In this case, you can
use it inside the parentheses, e.g. range(20)|>@top(2, _ desc by _%2)
.
The expression on the right is always evaluated on the context of the left operand. I.e.
it behaves like each element of the sequence is an event with the same type as the
sequence's type argument. You can use the operator ^<object> → <object>
to access
the parent semantic context, e.g. range(10)|>(range(10)|>^_*_)
returns
a multiplication table.
Example:
=> at the end => range(2, 100) |> @filter(range(2, floor(_**0.5)+1) |> not any(^_%_==0)) as primes
Computes all primes less than 100.
Calculates the absolute value of a number.
Returns the arc cosine of the argument to an angle in radians.
Returns the arc sine of the argument to an angle in radians.
Returns the arc tangent of the argument to an angle in radians.
Formats a number as the best possible byte multiple.
Returns the smallest number that is greatest than or equal to the argument.
Converts a unicode codepoint to a single-char string.
Available since Pipes v0.13
Returns the cosine of an angle in radians.
Adds <period> to timestamp argument.
Example:
* => timestamp():dateadd(1 day) every second
Adds one day to the current timestamp.
Rounds timestamp down to the nearest date that is divisible by <period>.
Example:
* => timestamp():datefloor(1 day) every second
Returns the beginning of the current day.
Formats timestamp using specified format
Example:
* => timestamp():dateformat("dd/MM/YYYY hh:mm", "UTC") as date every second
Formats the current timestamp assuming UTC time zone.
Sutracts <period> from timestamp argument.
Example:
* => timestamp():datesub(1 day) every second
Subtracts one day to the current timestamp.
Calculates the exponential of a number.
Returns the largest number that is lesser than or equal to the argument.
Formats a number according to format string and locale.
Examples:
* => 123.456:format(".00") every second
Format with 2 digits after decimal point.
=> 12345.678:format("0,000.00", "pt-BR") as money every second
Format 12345.678 to '12.345,68' using Brazilian locale.
Calculates the logarithm of a number.
Calculates the gaussian distribution function value, optionally integrated.
Raises one number to another.
Rounds a number to <precision> decimal places.
Selects the ith element from a list of arguments. Or null if it doesn't exist.
Example:
* => 1:select("a", "b", "c") as selected every second
Yields 'selected:b' every second. The list of arguments is zero-based.
Returns the sine of an angle in radians.
Returns the tangent of an angle in radians.
Converts a number to a string representing it in some base
Available since Pipes v0.13.12
Converts object to boolean.
Returns the underlying Java class of the object.
Converts object to comparable.
Transforms the parameter using the translation rules defined in <pairs>.
Much like property[keys]. Works for strings, containers and arrays.
This function infers the correct return type in compile time, whenever possible.
Returns the first index of the value in <list>, or null if <list> does not contain it.
Returns true if <list> contains the value, false otherwise.
Converts the object to its JSON string representation.
When used in a simple pipe, delays or disable (if ttl not supplied or < 0) inactive group removal.
Tries to get <target>'s size. Works for strings, containers and arrays.
Converts object to number.
Example:
* => avg(response_time:number()) every minute
Yields the average of the field response_time every minute.
Casts any object to its canonical object representation.
Casts any object to a row.
Casts any object to a seq.
Converts object to string.
Example:
* => 'Count: ' + count():string every minute
Yields the number of events every minute as a string 'Count: X'.
Returns the compile-time type of an expression.
Available since Pipes v0.13
Returns whether the target seq contains the argument.
Available since Pipes v0.13.2
Repeats the sequence a certain number of times.
Available since Pipes v0.13
Returns whether the target string contains the argument.
Parses timestamp using specified format
Returns whether the target string ends with the argument.
Converts a string representing a number in some base to the number itself
Available since Pipes v0.13.12
Evaluates compressed base64 HyperLogLog data.
Useful to evaluate the final result from hllset(<number log2m>, <object...>) → <string>
aggregations.
Example
This first query precomputes the distinct users every minute, but does not yield a result.
type:http => hllset(userid) as value every minute
The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.
type:firstquery => hlleval(hllmerge(value)) as distinct_users every 10 minutes
Returns the index of position of <s> inside the target string. Returns null otherwise.
Parses JSON string into objects.
Available since Pipes v0.13
It parses arrays as java.util.List, objects as java.util.Map, numbers as java.lang.Double and strings as java.lang.String.
Converts string to lowercase.
Trims leading characters from a string
Gets the unicode codepoint from the first char in the string.
Available since Pipes v0.13
Parses a number according to format string and locale.
Returns a strongly typed row composed by all named groups in <regex>.
The result is a row value with as many fields as there are named groups in the specified regex.
Examples:
* => expand name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$')
Extract the first and the last names from a single field.
* => name:regex(r'^(?<first>\w+)( \w+)* (?<last>\w+)$') as match => '%s, %s':sprintf(match->last, match->first) as converted
Converts the name field to the format "Last, First".
Returns a sequence of rows composed by all matches of all named groups in <regex>.
Available since Pipes v0.13
This method is similar to <string>:regex(<string regex>) → <row>
, but returns all matches, not only the first.
Returns the matched string by <regex> in target (or one specific group).
Returns all the matched strings by <regex> in target (or one specific group).
Available since Pipes v0.13
Returns true if the target matches <regex>. False otherwise.
Splits string by regular expression <regex> in up to <limit> pieces.
Available since Pipes v0.13
Replaces all matches of <regex> in target by <replacement>.
Repeats the string a number of times.
Available since Pipes v0.13
Replaces all instances of <from> with the string <to>.
Trims trailing characters from a string
Truncates the string to the specified length, keeping the end, optionally adding an ellipsis at the start.
Available since Pipes v0.13.3
The ellipsis length does count to the maximum length. If you can, use the char "\u2026"
as a single-char ellipsis.
Examples:
"0123456789":truncate(10, "...")
In this case, the string won't be truncated.
"01234567890":truncate(10, "...")
But in this it will return "...34567890"
Splits string by <delim> in up to <limit> pieces.
Available since Pipes v0.13
If the delimiter is not set, the string is then split by individual chars.
Uses the target string as format to arguments.
Returns whether the target string starts with the argument.
Returns the substring between the indices <from> and <to>.
Trims a string
Truncates the string to the specified length, optionally adding an ellipsis at the end.
The ellipsis length does count to the maximum length. If you can, use the char "\u2026"
as a single-char ellipsis.
Examples:
"0123456789":truncate(10, "...")
In this case, the string won't be truncated.
"01234567890":truncate(10, "...")
But in this it will return "01234567..."
Converts string to uppercase.
Decodes an application/x-www-form-urlencoded unicode string.
Available since Pipes v0.13.12
Encodes a string as application/x-www-form-urlencoded.
Available since Pipes v0.13.12
Returns a number < 0 if a < b, > 0 if a > b or 0 if a = b.
Concatenetes many strings together
Constructs a period instance from a constant cron string
Example:
* => count() every cron('*/3 * * * * *')
Yields the event count every three seconds.
* => timestamp():dateadd(cron('0 0 * * * *')) every second
Outputs every second the timestamp of the next hour.
Returns true if the tested timestamp is inside the span, false otherwise.
Please note that someprop:happened('today')
is equivalent to happened(timestamp(), someprop, 'today')
.
Also, happened('today')
is equivalent to happened(timestamp(), timestamp(), 'today')
.
Merge many instances of compressed base64 HyperLogLog data.
Useful to merge results from hllset(<number log2m>, <object...>) → <string>
aggregations.
Creates a seq that iterates through java.utilMap or row entries
Available since Pipes v0.13
Example:
* => @for itermap(some_map_property:object)
Creates one event for each map entry in some_map_property
* => abc, qwe# => @for itermap(*)
Creates one event for each field
Returns the greatest value of all supplied arguments.
Returns a representation of the previous pipe metadata.
Available since Pipes v0.13
Returns the least value of all supplied arguments.
Creates a instance of java.util.Map with the supplied keys and values.
Constructs a period instance from constant parameters
Example:
* => count() every period(3, 'seconds')
Yields the event count every three seconds same as the literal 3 seconds
.
Returns the constant value of pi.
Returns a random value between <min> and <max> (0 and 1 if not defined).
Creates a seq that iterates through numbers
Available since Pipes v0.13
Example:
* => @for range(10)
(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
* => @for range(5, 10)
(5, 6, 7, 8, 9)
* => @for range(7, 10, 0.5)
(7, 7.5, 8, 8.5, 9, 9.5)
Calculates start and end timestamps of span based on target.
Available since Pipes v0.13
The result is a row with three fields: timestamp
, start
and end
.
Example:
* => span('today')|>:dateformat
Computes and formats the start and end timestamps of current day for each event.
Calculates end timestamp of span based on target.
Please note that spanend('today')
is equivalent to spanend(timestamp(), 'today')
.
Calculates start timestamp of span based on target.
Please note that spanstart('today')
is equivalent to spanstart(timestamp(), 'today')
.
Returns -1 if the tested timestamp is before the span, 0 if it is inside and 1 if it is after.
Please note that someprop:spantest('today')
is equivalent to spantest(timestamp(), someprop, 'today')
.
Also, spantest('today')
is equivalent to spantest(timestamp(), timestamp(), 'today')
.
Returns the most appropriate timestamp, whether in scalar or aggregation contexts.
Returns a random UUID string.
Functions that return information about the system where the query is running.
Returns the system's CPU usage info.
Available since Pipes v0.13.1
The returned row has the following fields:
field | type | description |
---|---|---|
process | number | Current process CPU load (in range [0..1]) |
system | number | System's CPU load (in range [0..1]) |
loadavg | number | Load average in the last minute |
cores | number | Number of processor cores installed |
Returns info about all devices in the filesystem.
Available since Pipes v0.13.1
Each row in the returned sequence has the following fields:
field | type | description |
---|---|---|
name | string | Name of the device or partition |
type | string | Format type |
readonly | boolean | Is read only? |
unallocated | number | # of unallocated bytes in the device |
usable | number | # of usable bytes (considering write permissions, etc.) |
total | number | Total bytes in the device |
Returns info about process file descriptors.
Available since Pipes v0.13.1
Each row in the returned sequence has the following fields:
field | type | description |
---|---|---|
open | number | # of open file descriptors |
max | number | Maximum file descriptors that can be open by this process |
Returns the system's current JVM heap info.
Available since Pipes v0.13.1
The returned row has the following fields (all values in bytes):
field | type | description |
---|---|---|
used | number | Used heap memory |
max | number | Max heap size |
committed | number | Commited memory by the OS for the current VM |
Returns the system's hostname.
Available since Pipes v0.13.1
Returns the system's memory usage info.
Available since Pipes v0.13.1
The returned row has the following fields (all values in bytes):
field | type | description |
---|---|---|
used | number | Used physical memory |
total | number | Total physical memory |
usedswap | number | Used swap memory |
totalswap | number | Total swap memory |
Returns info about running threads in current process.
Available since Pipes v0.13.1
The returned row has the following fields:
field | type | description |
---|---|---|
running | number | # of running threads |
peak | number | Max # of running threads |
total | number | # of threads ever started in this JVM |
daemon | number | # of daemon threads |
deadlocked | number | # of deadlocked threads |
Returns the system's real timestamp (not the query timestamp).
Available since Pipes v0.13.1
Returns Pipes version info
Available since Pipes v0.13.1
Each row in the returned sequence has the following fields:
field | type | description |
---|---|---|
major | number | Version's major component |
minor | number | Version's minor component |
patch | number | Version's current patch |
released | number | Whether the version was officialy released or not (-SNAPSHOT) |
Aggregates only events that evaluates true to <condition>.
Merges all the results from the target aggregation.
Merges the results of the last <window> aggregations.
Delays and returns the previous <number>th result from target aggregation.
Returns true if all ocurrences evaluate true.
Returns true if any ocurrence evaluates true.
Calculates the (possibly weighted) average of some expression.
Counts all events in a window (except those that evaluate to false or null).
Estimates the field's cardinality (distinct count) using HyperLogLog.
Equivalent to hll(12, <object...>)
.
The aggregation uses a probabilistic algorithm known as HyperLogLog. With a fixed
log2m
value of 12, the standard error will be 1.625% and will use 4KB of
memory per group.
The aggregation considers all arguments, i.e., dcount(a, b)
aggregates the number of
distinct pairs of the tuple (a, b).
element | approximate memory (more info) |
---|---|
state | 4 KB |
tree | 4 KB |
merger | 512 KB |
Yields a string json explaining the target aggregation's inner state representation.
Yields the ocurrence with least timestamp.
Yields the greatest ocurrence in the window based on the sort fields.
Estimates the field's cardinality (distinct count) using HyperLogLog.
The aggregation uses a probabilistic algorithm known as HyperLogLog.
log2m
must be an integer between 6 and 16. The higher this value,
higher will be the precision. The error will probably be within:
104/sqrt(2**log2m)
.
E.g.: With log2m = 14
, the standard error will be
0.81%. But higher log2m
values will also consume more memory. The aggregation
will tipically use 2**log2m
bytes of memory.
The aggregation considers all arguments, i.e., hll(16, a, b)
aggregates the number of
distinct pairs of the tuple (a, b).
element | approximate memory (more info) |
---|---|
state | 2**log2m bytes |
tree | 2**log2m bytes |
merger | 128 * 2**log2m bytes |
Performs union of many HyperLogLog encoded data in a window.
Useful to merge results from hllset(<number log2m>, <object...>) → <string>
aggregations.
hlleval(hllmerge(expr))
is equivalent to hllmergeeval(expr)
.
Example
This first query precomputes the distinct users every minute, but does not yield a result.
type:http => hllset(userid) as value every minute
The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.
type:firstquery => hlleval(hllmerge(value)) as distinct_users every 10 minutes
Performs union of many HyperLogLog encoded data in a window and evaluates the result.
Useful to merge results from hllset(<number log2m>, <object...>) → <string>
aggregations.
hlleval(hllmerge(expr))
is equivalent to hllmergeeval(expr)
.
Example
This first query precomputes the distinct users every minute, but does not yield a result.
type:http => hllset(userid) as value every minute
The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.
type:firstquery => hllmergeeval(value) as distinct_users every 10 minutes
Similar to hll, but it doesn't evaluate final cardinality, just return the sketch data.
This aggregation is similar to hll(<number log2m>, <object...>) → <number>
, but instead of returning
the final distinct count, it yields a representation of its internal data structure, for
later merges. It can be used in conjunction with the scalar functions <string>:hlleval() → <number>
,
hllmerge(<string... data>) → <string>
and the aggregation hllmerge(<string>) → <string>
.
It returns a string with a base64'd representation of the compressed byte array. It is especially useful when some preprocessing and storage is needed.
Example
This first query precomputes the distinct users every minute, but does not yield a result.
type:http => hllset(userid) as value every minute
The second query aggregates and evalutes the precomputed values, but with a larger window of 10 minutes. The results from the first query may have been stored in the meantime.
type:firstquery => hlleval(hllmerge(value)) as distinct_users every 10 minutes
Yields the ocurrence with greatest timestamp.
Yields the least ocurrence in the window based on the sort fields.
Creates a java.util.List from all events in a window.
Available since Pipes v0.13
Creates a java.util.Map from all events in a window.
Yields the greatest ocurrence in the window.
Estimates the median value of the population using Count-Min Sketch.
Equivalent to quantile(<number>, 0.5, [<number weight>])
.
Yields the least ocurrence in the window.
Aggregates the proportion of events that evaluate true to expression.
Estimates the q (0..1) quantile of the population using Count-Min Sketch.
Computes regression and correlation statistics for given variables.
Available since Pipes v0.13.3
The argument x
, if not given, is assumed to be the event timestamp (if any).
The returned row has the following fields (all values in bytes):
field | type | description |
---|---|---|
n | number | Sample size |
slope | number | Slope of the regression line |
intercept | number | Where in the x-axis the line intercepts |
correlation | number | Correlation coefficient between x and y |
xmean | number | Mean value for variable x |
xstdev | number | Standard deviation for x |
ymean | number | Mean value for variable y |
ystdev | number | Standard deviation for y |
Creates a java.util.Set from all events in a window.
Smoothes the curve of another aggregation.
The argument alpha
is a (0..1) value, meaning the weight of the previous points
in the final value.
The argument beta
is a (0..1) value, meaning the weight of previous trends into
the current trend value.
This aggregation cannot be used inside a window (because it depends on total order of the elements).
Aggregates sample statistics for some property.
Available since Pipes v0.13.3
Please note that all statistics here are based on the sample variance, not the population variance.
This behavior is distinct from the already known variance(<number>, [<number weight>]) → <number>
and stdev(<number>, [<number weight>]) → <number>
aggregations.
The returned row has the following fields (all values in bytes):
field | type | description |
---|---|---|
n | number | Sample size |
mean | number | Sample mean value |
variance | number | Sample variance |
stdev | number | Sample standard deviation |
skewness | number | Sample skewness |
kurtosis | number | Sample kurtosis |
Calculates the (possibly weighted) standard deviation of some expression.
Sums all evaluations of some expression.
Join all the strings in a window.
Yields the k minimum occurrences of the target object.
Available since Pipes v0.13.18
Calculates the (possibly weighted) variance of some expression.
Yields the latest timestamp inside window when some condition was true.
Yields the first timestamp inside window when some condition was true.
Meta-aggregations give information on how and when the results were merged.
Meta-aggregations meaning in time axis
Yields how many outputs are merged in the current window.
Returns the window's first allowed timestamp (or item index).
Returns the window's last allowed timestamp (or item index).
Returns the output's first allowed timestamp (or item index).
Returns the output's last allowed timestamp (or item index).
Returns the timestamp when the outputs were merged (equal to oend()
on time pipes).
Pipes comes bundled with a timespan definition language, that helps defining relative dates with an almost natural language syntax. A timespan is an expression that when provided with a reference timestamp can calculate the start and the end of a relative period.
For example, the expression current month
will return:
Please notice that span intervals are always right-open.
definition | example expressions | description |
---|---|---|
now none |
now none
|
Now. |
today current|this <period> |
today current day this month |
From the beginning of the current period to the beginning of the next. |
yesterday yester|previous <period> |
yesterday yesterweek yester month previous year previous 5 years |
Previous specified period. |
last <period...> |
last day last 40 weeks last 2 years and 5 months |
Period of the specified length ending now. |
<period...> ago |
2 days ago 2 years and 5 months ago |
Single point in the past. |
<date> [<time>] |
2014 201406 20140627 2014-06-27 2014/06/27 2014-06-27 16 2014-06-27 4pm 2014-06-27 16:17:18 2014-06-27 4:17:18pm |
Constant date or time. |
<time> |
16 4pm 16:17:18 4:17:18pm |
Specified time today. |
timestamp|ts <number> |
timestamp 1403893236000 ts 1403893236000
|
Constant timestamp |
timezone|tz <string> <span> |
tz 'UTC' today timezone 'America/Sao_Paulo' today |
Evaluates span using the specified timezone. |
monday ... sunday mon ... sun |
monday mon wednesday wed sunday sun |
Day of week in the current week. |
january ... december jan ... dec |
january jan april apr december dec |
Month in the current year. |
[from|since] <span A> to|until <span B> |
yesterday to today from yesterday to today since last year until yesterday |
A range starting at the beginning of span A and ending at the end of span B. |
from|since <span A> |
since last year |
A range starting at the beginning of span A and ending now. |
until|to <span B> |
until yesterday |
A range starting at the the Big Bang (known as 1970) and ending at the end of span B. |
<period...> before <span> |
2 years before this month 2 days and an hour before this month |
A period of specified length ending exactly in the start of specified span. |
before|start|begining|left|exactly [of] <span> |
before this week start of this month beginning of december left of yesterday exactly today |
The point in the start of specified span. |
window [of] <span> |
window of yesterday window of 2 days before today |
The same as the argument span, but from the view of a timestamp inside it. |
<period...> after <span> |
2 days after previous month 2 days and an hour after previous month |
A period of specified length starting exactly in the end of specified span. |
after|end|right [of] <span> |
after this week right of yesterday end of this month |
The point in the end of specified span. |
<ordinal> <period> of <span> |
first quarter of previous year 3rd week of this month first hour today |
Some discrete period inside the specified span. |
[the] <period> [of] <span> |
the week of 2 days ago the quarter of last week |
Expands the point at the start of a span to the entire period defined. |
<span> shifted by <period...> |
this week shifted by 2 days |
Calculates the span referent to now, then shifts it some period in the past. |
<span A> shifted to <span B> <span A> of <span B> <span B> at <span A> |
sunday shifted to previous week sunday of previous week previous week at sunday sunday at 8 |
Calculates a span A as if "now" was the start of span B. |
<span A> aligned [left|right] to <span B> |
this month aligned to 7 months ago this month aligned left to 7 months ago this month aligned right to 7 months ago |
Calculates the span A and aligns either the left or the right ends of the first with the span B. |
<span> extend[ed] [left|right] by <percentage> <span> extend[ed] [left|right] by <period...> |
this month extend left by 20% this month extend right by 2 days |
Calculates the span and then extends either proportionally or by a fixed period. |
The periods are the units of time that powers span definitions.
Some spans accept only a single period definition (e.g. current day
). Others accept multiple spans
definition (e.g. last 4 days, 2 hours and 1 minute
). In this documentation
<period>
means single periods only and <period...>
means one or more periods.
A single period is composed of (optionally) an amount and an unit, e.g.:
day 1 day a day the day
A multiple period is composed of many single periods separated by commas or 'and', e.g.: 1 day and 5 hours
,
a day, an hour and 5 minutes
.
1 day and 5 hours a day, an hour and 5 minutes
The available period units are:
unit | also accepts | equivalent to |
---|---|---|
millisecond | milli ms | |
second | sec | 1000 ms |
minute | min | 60000 ms |
hour | 360000 ms | |
day | ||
week | wk | |
month | ||
bimester | 2 months | |
quarter | 3 months | |
semester | 6 months | |
year | yr |
Page generated at 26-Feb-2016 11:09:45