Spring WebFlux指南
响应式控制器
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.*;
@RestController
@RequestMapping("/users")
public class UserController {
private final UserRepository repo;
// 返回 Flux(0到N个元素)
@GetMapping
public Flux<User> listUsers() {
return repo.findAll();
}
// 返回 Mono(0或1个元素)
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return repo.findById(id)
.switchIfEmpty(Mono.error(new NotFoundException()));
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> createUser(@RequestBody User user) {
return repo.save(user);
}
}
Reactor 操作符
Flux<Integer> numbers = Flux.range(1, 10);
numbers
.filter(n -> n % 2 == 0) // [2,4,6,8,10]
.map(n -> n * n) // [4,16,36,64,100]
.take(3) // [4,16,36]
.flatMap(n -> fetchData(n)) // 异步映射
.onErrorResume(e -> Flux.empty()) // 错误恢复
.subscribe(System.out::println);
// 合并流
Flux.zip(stream1, stream2)
.map(tuple -> tuple.getT1() + tuple.getT2());
R2DBC 响应式仓库
// 依赖
// spring-boot-starter-data-r2dbc
// r2dbc-postgresql (or r2dbc-h2 for testing)
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByName(String name);
Mono<User> findByEmail(String email);
@Query("SELECT * FROM users WHERE active = :active")
Flux<User> findByActive(@Param("active") boolean active);
}