Locust Setup Guide
Basic locustfile.py
from locust import HttpUser, task, between
class WebsiteUser(HttpUser):
# Wait between 1 and 3 seconds between tasks
wait_time = between(1, 3)
# Host — can also be set via --host flag
host = "https://api.example.com"
def on_start(self):
"""Called when a new user starts (login, setup)."""
response = self.client.post("/auth/login", json={
"username": "testuser",
"password": "testpass",
})
token = response.json()["token"]
self.client.headers.update({"Authorization": f"Bearer {token}"})
@task(3) # weight 3 — runs 3x more than weight-1 tasks
def browse_products(self):
self.client.get("/products")
@task(1)
def view_product(self):
product_id = self.client.get("/products").json()[0]["id"]
self.client.get(f"/products/{product_id}", name="/products/[id]")
@task(2)
def search(self):
self.client.get("/search?q=widget")
TaskSet — Grouped Behavior
from locust import HttpUser, TaskSet, task, between, constant
class UserBrowsingBehavior(TaskSet):
@task(5)
def view_listing(self):
self.client.get("/listings")
@task(2)
def view_single(self):
self.client.get("/listings/1")
@task(1)
def create_listing(self):
self.client.post("/listings", json={"title": "Test"})
def on_stop(self):
"""Called when this TaskSet stops."""
pass
class BrowsingUser(HttpUser):
tasks = [UserBrowsingBehavior]
wait_time = between(0.5, 2)
# Nested TaskSets
class AdminBehavior(TaskSet):
tasks = {UserBrowsingBehavior: 3, AdminTaskSet: 1}
# wait_time options
from locust import constant, constant_pacing
wait_time = constant(1) # exactly 1 second
wait_time = constant_pacing(2) # pace to 1 req per 2 sec
wait_time = between(1, 5) # random 1-5 seconds
Running Locust
# Web UI mode (open http://localhost:8089)
locust -f locustfile.py
# Headless mode
locust -f locustfile.py \
--headless \
--users 100 \
--spawn-rate 10 \
--run-time 2m \
--host https://api.example.com
# Export CSV results
locust -f locustfile.py \
--headless -u 50 -r 5 -t 1m \
--csv results/run1 \
--html results/report.html
# Stop with exit code based on failures
locust -f locustfile.py \
--headless -u 100 -r 10 -t 5m \
--exit-code-on-error 1
# Config file: locust.conf
# headless = true
# users = 100
# spawn-rate = 10
Custom Load Shape
from locust import LoadTestShape
class StepLoadShape(LoadTestShape):
"""Ramp up in steps."""
step_time = 30
step_load = 20
spawn_rate = 10
time_limit = 300
def tick(self):
run_time = self.get_run_time()
if run_time > self.time_limit:
return None # stop
current_step = run_time // self.step_time
return (current_step * self.step_load, self.spawn_rate)
class DoubleWaveShape(LoadTestShape):
"""Two waves of load."""
stages = [
{"duration": 60, "users": 10, "spawn_rate": 5},
{"duration": 120, "users": 50, "spawn_rate": 10},
{"duration": 180, "users": 10, "spawn_rate": 5},
{"duration": 240, "users": 70, "spawn_rate": 20},
{"duration": 300, "users": 0, "spawn_rate": 10},
]
def tick(self):
run_time = self.get_run_time()
for stage in self.stages:
if run_time < stage["duration"]:
return (stage["users"], stage["spawn_rate"])
return None
Distributed Testing
# Master node (starts web UI and distributes work)
locust -f locustfile.py --master --host https://api.example.com
# Worker nodes (run on separate machines)
locust -f locustfile.py --worker --master-host=192.168.1.100
# Multiple workers on same machine
locust -f locustfile.py --worker --master-host=localhost &
locust -f locustfile.py --worker --master-host=localhost &
locust -f locustfile.py --worker --master-host=localhost &
# Docker Compose distributed setup
# master:
# image: locustio/locust
# ports: ["8089:8089"]
# command: -f /mnt/locust/locustfile.py --master
# worker:
# image: locustio/locust
# command: -f /mnt/locust/locustfile.py --worker --master-host master
# deploy:
# replicas: 4
Events & Custom Assertions
from locust import events
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
print("Load test starting!")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
stats = environment.stats.total
print(f"Total requests: {stats.num_requests}")
print(f"Failure rate: {stats.fail_ratio:.1%}")
# Fail if error rate too high
if stats.fail_ratio > 0.01:
environment.process_exit_code = 1
# Request event
@events.request.add_listener
def on_request(request_type, name, response_time, response_length,
exception, **kwargs):
if exception:
print(f"Request failed: {name} — {exception}")