分布式限流配置管理 - API与推送机制
2025/11/13大约 9 分钟
分布式限流配置管理 - API与推送机制
一、Controller层实现
4.1 GatewayRateLimitController
/**
* 网关限流配置管理接口
*/
@Slf4j
@RestController
@RequestMapping("/gateway-rate-limit")
public class GatewayRateLimitController {
@Resource
private GatewayRateLimitService rateLimitService;
/**
* 创建限流配置
*/
@PostMapping("/create")
public Result<String> createRateLimitConfig(@RequestBody RateLimitConfigReqVO reqVO) {
log.info("创建限流配置请求: {}", reqVO);
try {
String id = rateLimitService.createRateLimitConfig(reqVO);
return Result.success(id);
} catch (Exception e) {
log.error("创建限流配置失败", e);
return Result.error(e.getMessage());
}
}
/**
* 更新限流配置
*/
@PutMapping("/update")
public Result<Boolean> updateRateLimitConfig(@RequestBody RateLimitConfigReqVO reqVO) {
log.info("更新限流配置请求: {}", reqVO);
try {
Boolean success = rateLimitService.updateRateLimitConfig(reqVO);
return Result.success(success);
} catch (Exception e) {
log.error("更新限流配置失败", e);
return Result.error(e.getMessage());
}
}
/**
* 删除限流配置
*/
@DeleteMapping("/delete/{id}")
public Result<Boolean> deleteRateLimitConfig(@PathVariable String id) {
log.info("删除限流配置请求,ID: {}", id);
try {
Boolean success = rateLimitService.deleteRateLimitConfig(id);
return Result.success(success);
} catch (Exception e) {
log.error("删除限流配置失败", e);
return Result.error(e.getMessage());
}
}
/**
* 启用/禁用限流配置
*/
@PutMapping("/status/{id}/{status}")
public Result<Boolean> updateStatus(@PathVariable String id, @PathVariable Integer status) {
log.info("更新限流配置状态请求,ID: {}, status: {}", id, status);
try {
Boolean success = rateLimitService.updateStatus(id, status);
return Result.success(success);
} catch (Exception e) {
log.error("更新限流配置状态失败", e);
return Result.error(e.getMessage());
}
}
/**
* 查询限流配置详情
*/
@GetMapping("/detail/{id}")
public Result<GatewayRateLimitDO> getRateLimitConfig(@PathVariable String id) {
log.info("查询限流配置详情,ID: {}", id);
try {
GatewayRateLimitDO config = rateLimitService.getRateLimitConfig(id);
return Result.success(config);
} catch (Exception e) {
log.error("查询限流配置详情失败", e);
return Result.error(e.getMessage());
}
}
/**
* 分页查询限流配置列表
*/
@GetMapping("/list")
public Result<PageResult<GatewayRateLimitDO>> listRateLimitConfigs(
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "10") Integer pageSize,
@RequestParam(required = false) String limitType) {
log.info("分页查询限流配置列表,pageNum: {}, pageSize: {}, limitType: {}", pageNum, pageSize, limitType);
try {
PageResult<GatewayRateLimitDO> result = rateLimitService.listRateLimitConfigs(pageNum, pageSize, limitType);
return Result.success(result);
} catch (Exception e) {
log.error("分页查询限流配置列表失败", e);
return Result.error(e.getMessage());
}
}
/**
* 刷新所有网关节点的限流配置
*/
@PostMapping("/refresh")
public Result<Void> refreshAllGatewayConfigs() {
log.info("刷新所有网关节点的限流配置");
try {
rateLimitService.refreshAllGatewayConfigs();
return Result.success();
} catch (Exception e) {
log.error("刷新限流配置失败", e);
return Result.error(e.getMessage());
}
}
}
### 4.2 API接口文档
#### 1. 创建限流配置
**接口地址:** `POST /gateway-rate-limit/create`
**请求头:**Content-Type: application/json
**请求参数:**
```json
{
"ruleName": "全局限流",
"limitType": "GLOBAL",
"limitTarget": "GLOBAL",
"limitCount": 10000,
"timeWindow": 1,
"status": 1,
"strategy": "TOKEN_BUCKET",
"mode": "DISTRIBUTED",
"localBatchSize": 100,
"localCapacityMultiplier": 1.0
}参数说明:
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
| ruleName | String | 是 | 规则名称 |
| limitType | String | 是 | 限流类型:GLOBAL/SERVICE/INTERFACE/IP |
| limitTarget | String | 是 | 限流目标 |
| limitCount | Integer | 是 | 限流阈值(每秒请求数) |
| timeWindow | Integer | 是 | 时间窗口(秒) |
| status | Integer | 否 | 状态:0-禁用,1-启用(默认1) |
| strategy | String | 是 | 限流策略:TOKEN_BUCKET/SLIDING_WINDOW |
| mode | String | 否 | 限流模式:DISTRIBUTED/LOCAL_DISTRIBUTED(默认DISTRIBUTED) |
| localBatchSize | Integer | 否 | 本地批量获取令牌数(仅在LOCAL_DISTRIBUTED模式下使用,默认100) |
| localCapacityMultiplier | Double | 否 | 本地容量倍数(仅在LOCAL_DISTRIBUTED模式下使用,默认1.0) |
响应示例:
{
"code": 200,
"msg": "success",
"data": 1234567890
}2. 更新限流配置
接口地址: PUT /gateway-rate-limit/update
请求参数:
{
"id": 1234567890,
"ruleName": "全局限流(已更新)",
"limitType": "GLOBAL",
"limitTarget": "GLOBAL",
"limitCount": 15000,
"timeWindow": 1,
"status": 1,
"strategy": "TOKEN_BUCKET",
"mode": "DISTRIBUTED",
"localBatchSize": 100,
"localCapacityMultiplier": 1.0
}响应示例:
{
"code": 200,
"msg": "success",
"data": true
}3. 删除限流配置
接口地址: DELETE /gateway-rate-limit/delete/{id}
路径参数:
id: 配置ID
响应示例:
{
"code": 200,
"msg": "success",
"data": true
}4. 启用/禁用限流配置
接口地址: PUT /gateway-rate-limit/status/{id}/{status}
路径参数:
id: 配置IDstatus: 状态(0-禁用,1-启用)
响应示例:
{
"code": 200,
"msg": "success",
"data": true
}5. 查询限流配置详情
接口地址: GET /gateway-rate-limit/detail/{id}
路径参数:
id: 配置ID
响应示例:
{
"code": 200,
"msg": "success",
"data": {
"id": 1234567890,
"ruleName": "全局限流",
"limitType": "GLOBAL",
"limitTarget": "GLOBAL",
"limitCount": 10000,
"timeWindow": 1,
"status": 1,
"strategy": "TOKEN_BUCKET",
"mode": "DISTRIBUTED",
"localBatchSize": 100,
"localCapacityMultiplier": 1.0,
"createTime": "2025-11-12 10:00:00",
"updateTime": "2025-11-12 10:00:00"
}
}6. 分页查询限流配置列表
接口地址: GET /gateway-rate-limit/list
请求参数:
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
| pageNum | Integer | 否 | 页码(默认1) |
| pageSize | Integer | 否 | 每页大小(默认10) |
| limitType | String | 否 | 限流类型(可选) |
响应示例:
{
"code": 200,
"msg": "success",
"data": {
"list": [
{
"id": 1,
"ruleName": "全局限流",
"limitType": "GLOBAL",
"limitTarget": "GLOBAL",
"limitCount": 10000,
"timeWindow": 1,
"status": 1,
"strategy": "TOKEN_BUCKET",
"mode": "DISTRIBUTED",
"localBatchSize": 100,
"localCapacityMultiplier": 1.0,
"createTime": "2025-11-12 10:00:00",
"updateTime": "2025-11-12 10:00:00"
}
],
"total": 10,
"pageNum": 1,
"pageSize": 10
}
}7. 刷新所有网关节点配置
接口地址: POST /gateway-rate-limit/refresh
响应示例:
{
"code": 200,
"msg": "success",
"data": null
}二、配置推送机制
5.1 推送架构
┌─────────────────────────────────────────────────────────┐
│ 网关中心 │
│ │
│ 配置变更 → 保存数据库 → 同步Redis → 发布消息 │
└─────────────────────────────────────────────────────────┘
↓
Redis Pub/Sub
↓
┌─────────────────────────────────────────────────────────┐
│ 网关核心节点集群 │
│ │
│ Gateway-1 ← 订阅消息 ← Redis │
│ Gateway-2 ← 订阅消息 ← Redis │
│ Gateway-3 ← 订阅消息 ← Redis │
│ │
│ 接收消息 → 解析配置 → 更新本地限流器 → 配置生效 │
└─────────────────────────────────────────────────────────┘5.2 消息格式
单个配置更新消息
{
"id": 1234567890,
"ruleName": "全局限流",
"limitType": "GLOBAL",
"limitTarget": "GLOBAL",
"limitCount": 10000,
"timeWindow": 1,
"status": 1,
"strategy": "TOKEN_BUCKET",
"mode": "DISTRIBUTED",
"localBatchSize": 100,
"localCapacityMultiplier": 1.0
}字段说明:
| 字段 | 类型 | 说明 |
|---|---|---|
| id | Long | 配置ID |
| ruleName | String | 规则名称 |
| limitType | String | 限流类型 |
| limitTarget | String | 限流目标 |
| limitCount | Integer | 限流阈值 |
| timeWindow | Integer | 时间窗口 |
| status | Integer | 状态:0-禁用,1-启用 |
| strategy | String | 限流策略 |
| mode | String | 限流模式 |
| localBatchSize | Integer | 本地批量大小 |
| localCapacityMultiplier | Double | 本地容量倍数 |
全量重载消息
RELOAD_ALL触发场景:
- 管理员手动触发刷新
- 系统启动时初始化
- 配置异常时恢复
5.3 网关节点处理流程
配置监听器
@Component
@Slf4j
public class RateLimitConfigListener implements MessageListener {
@Resource
private DistributedRateLimiter rateLimiter;
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 初始化时加载所有限流配置
*/
@PostConstruct
public void init() {
log.info("初始化限流配置");
loadAllRateLimitConfigs();
}
/**
* 处理Redis消息
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
log.info("收到限流配置更新消息,频道: {}", channel);
if ("rate-limit-config-update".equals(channel)) {
handleConfigUpdate(message);
}
}
/**
* 处理配置更新消息
*/
private void handleConfigUpdate(Message message) {
try {
String body = new String(message.getBody());
log.info("限流配置更新内容: {}", body);
if ("RELOAD_ALL".equals(body)) {
// 重新加载所有配置
loadAllRateLimitConfigs();
} else {
// 更新单个配置
RateLimitConfig config = JSON.parseObject(body, RateLimitConfig.class);
if (config != null) {
String key = buildConfigKey(config);
rateLimiter.updateConfig(key, config);
log.info("限流配置已更新: {}", key);
}
}
} catch (Exception e) {
log.error("处理限流配置更新失败", e);
}
}
/**
* 从Redis加载所有限流配置
*/
private void loadAllRateLimitConfigs() {
try {
log.info("开始加载所有限流配置");
// 清空现有配置
rateLimiter.clearAllConfigs();
// 从Redis获取所有限流配置
Set<String> keys = redisTemplate.keys("rate_limit_config:*");
if (keys == null || keys.isEmpty()) {
log.info("未找到限流配置");
return;
}
int count = 0;
for (String key : keys) {
try {
Map<Object, Object> configMap = redisTemplate.opsForHash().entries(key);
if (!configMap.isEmpty()) {
RateLimitConfig config = convertToConfig(configMap);
if (config != null && config.getEnabled()) {
String configKey = buildConfigKey(config);
rateLimiter.updateConfig(configKey, config);
count++;
}
}
} catch (Exception e) {
log.error("加载限流配置失败: {}", key, e);
}
}
log.info("限流配置加载完成,共加载 {} 条配置", count);
} catch (Exception e) {
log.error("加载所有限流配置失败", e);
}
}
/**
* 将Map转换为RateLimitConfig
*/
private RateLimitConfig convertToConfig(Map<Object, Object> configMap) {
try {
RateLimitConfig.RateLimitConfigBuilder builder = RateLimitConfig.builder()
.id(Long.valueOf(configMap.get("id").toString()))
.ruleName((String) configMap.get("ruleName"))
.limitType((String) configMap.get("limitType"))
.limitTarget((String) configMap.get("limitTarget"))
.limitCount(Integer.valueOf(configMap.get("limitCount").toString()))
.timeWindow(Integer.valueOf(configMap.get("timeWindow").toString()))
.enabled(Boolean.valueOf(configMap.get("enabled").toString()))
.strategy((String) configMap.get("strategy"));
// 处理新增的配置字段
Object mode = configMap.get("mode");
if (mode != null) {
builder.mode((String) mode);
}
Object localBatchSize = configMap.get("localBatchSize");
if (localBatchSize != null) {
builder.localBatchSize(Integer.valueOf(localBatchSize.toString()));
}
Object localCapacityMultiplier = configMap.get("localCapacityMultiplier");
if (localCapacityMultiplier != null) {
builder.localCapacityMultiplier(Double.valueOf(localCapacityMultiplier.toString()));
}
return builder.build();
} catch (Exception e) {
log.error("转换限流配置失败", e);
return null;
}
}
/**
* 构建配置键
*/
private String buildConfigKey(RateLimitConfig config) {
String limitType = config.getLimitType();
String limitTarget = config.getLimitTarget();
if ("GLOBAL".equals(limitType)) {
return "GLOBAL";
} else if ("SERVICE".equals(limitType)) {
return "SERVICE:" + limitTarget;
} else if ("INTERFACE".equals(limitType)) {
return "INTERFACE:" + limitTarget;
} else if ("IP".equals(limitType)) {
return "IP:" + limitTarget;
}
return limitType + ":" + limitTarget;
}
}Redis配置
Redis 消息监听容器由 Spring Boot 自动配置,RateLimitConfigListener 作为 MessageListener 的实现会自动被注册到 Redis 消息监听容器中。
配置说明:
- 订阅频道:
rate-limit-config-update - 监听器:
RateLimitConfigListener - 消息格式:JSON 字符串或特殊消息
RELOAD_ALL
5.4 推送时序图
5.5 推送可靠性保障
1. 配置持久化
MySQL(主存储) + Redis(缓存)
- MySQL:配置持久化,保证数据不丢失
- Redis:配置缓存,提高查询性能2. 启动时加载
@PostConstruct
public void init() {
// 网关节点启动时从Redis加载所有配置
loadAllRateLimitConfigs();
}保障:
- 即使错过消息推送,启动时也能加载最新配置
- 避免配置丢失
3. 全量重载机制
public void refreshAllGatewayConfigs() {
// 发布全量重载消息
redisTemplate.convertAndSend("rate-limit-config-update", "RELOAD_ALL");
}触发场景:
- 管理员手动触发
- 配置异常时恢复
- 定期同步(可选)
4. 异常处理
@Override
public void onMessage(Message message, byte[] pattern) {
try {
// 处理消息
} catch (Exception e) {
log.error("处理限流配置更新消息失败", e);
// 不抛出异常,避免影响其他消息处理
}
}保障:
- 单个配置更新失败不影响其他配置
- 记录错误日志,便于排查问题
5.6 推送性能优化
1. 增量更新
只推送变更的配置,而非全量配置
- 减少消息大小
- 降低网络开销
- 提高更新速度2. 本地缓存
// 配置缓存
private final Map<String, RateLimitConfig> configCache = new ConcurrentHashMap<>();
// 本地限流器缓存
private final Map<String, RateLimiter> localLimiters = new ConcurrentHashMap<>();优势:
- 避免频繁查询Redis
- 提高限流判断性能
3. 异步处理
@Async
public void handleConfigUpdate(String body) {
// 异步处理配置更新
}优势:
- 不阻塞消息接收
- 提高系统吞吐量
三、监控与告警
6.1 关键指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| 配置更新延迟 | 从发布到生效的时间 | > 1秒 |
| 配置加载失败率 | 加载失败的配置数量 | > 0 |
| 限流拒绝率 | 被限流拒绝的请求比例 | > 10% |
| Redis异常率 | Redis限流异常的比例 | > 1% |
6.2 日志记录
// 配置创建日志
log.info("创建限流配置成功: id={}, type={}, target={}, count={}",
id, limitType, limitTarget, limitCount);
// 配置更新日志
log.info("更新限流配置成功: id={}, oldCount={}, newCount={}",
id, oldCount, newCount);
// 配置推送日志
log.info("发布配置更新消息: type={}, target={}", limitType, limitTarget);
// 配置接收日志
log.info("收到限流配置更新消息: type={}, target={}", limitType, limitTarget);
// 配置生效日志
log.info("限流配置生效: key={}, count={}", key, limitCount);
// 异常日志
log.error("处理限流配置更新消息失败: message={}", message, e);6.3 健康检查
@GetMapping("/health")
public Result<Map<String, Object>> health() {
Map<String, Object> health = new HashMap<>();
// 检查数据库连接
health.put("database", checkDatabase());
// 检查Redis连接
health.put("redis", checkRedis());
// 检查配置数量
health.put("configCount", getConfigCount());
return Result.success(health);
}