Flux Query Reference
Basic Query Structure
Flux is a functional data scripting language. Queries are built by piping functions with |>.
// Read from a bucket with time range
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_user")
|> filter(fn: (r) => r.host == "server01")
// Absolute time range
from(bucket: "metrics")
|> range(start: 2024-01-01T00:00:00Z, stop: 2024-02-01T00:00:00Z)
// Multiple field filters
from(bucket: "sensors")
|> range(start: -30m)
|> filter(fn: (r) =>
r._measurement == "temperature" and
(r.location == "room1" or r.location == "room2")
)
aggregateWindow
Downsample data into regular time windows with an aggregate function.
// 5-minute averages
from(bucket: "metrics")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
// Other aggregation functions
|> aggregateWindow(every: 1h, fn: sum)
|> aggregateWindow(every: 1h, fn: max)
|> aggregateWindow(every: 1h, fn: min)
|> aggregateWindow(every: 1h, fn: count)
|> aggregateWindow(every: 1h, fn: last)
|> aggregateWindow(every: 1h, fn: first)
// Custom function in aggregateWindow
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) => tables |> percentile(percentile: 0.99)
)
map, keep, drop, rename
// map: transform each row
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> map(fn: (r) => ({
r with
_value: (r._value * 9.0 / 5.0) + 32.0, // Celsius to Fahrenheit
unit: "fahrenheit"
}))
// keep: keep only specific columns
|> keep(columns: ["_time", "_value", "host", "_field"])
// drop: remove specific columns
|> drop(columns: ["_start", "_stop"])
// rename: rename columns
|> rename(columns: {_value: "metric_value", host: "server"})
join & pivot
// Join two streams
cpu = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
mem = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
join(
tables: {cpu: cpu, mem: mem},
on: ["_time", "host"]
)
|> map(fn: (r) => ({ r with ratio: r._value_cpu / r._value_mem }))
// pivot: reshape field rows into columns
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> pivot(
rowKey: ["_time", "host"],
columnKey: ["_field"],
valueColumn: "_value"
)
// Result columns: _time, host, usage_user, usage_system, usage_idle
Tasks & Variables
// Flux variables
option task = { name: "hourly_downsampling", every: 1h, offset: 5m }
start = -task.every
data = from(bucket: "raw_metrics")
|> range(start: start)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 1h, fn: mean)
data |> to(bucket: "downsampled_metrics", org: "myorg")
// Useful functions
|> limit(n: 100) // take first N rows
|> sort(columns: ["_time"], desc: true) // sort by time desc
|> unique(column: "host") // unique values
|> distinct(column: "_field") // distinct field names
|> tail(n: 10) // last N rows
|> count() // count rows
|> mean() // mean of _value