一.背景
在我们的项目中经常会遇到跟第三方通信的场景,由于三方不受我们的约束,我们不能像在我们项目中用rpc或者已feign的形式进行网络通信,最常见的方式是采用http请求。根据我们不同的业务场景,我们可能是会在我们的业务中同步发送http请求,也有可能做成异步,但无论是同步还是异步,我们都希望我们发送出去的请求是成功的。
但事实上,我们经常会遇到网络或者其他原因导致的失败,失败后只会记录一下日志,没有其他的操作。我们希望一些请求失败后能够过一段时间自我重试,有一定的自愈能力,使服务更加健壮,减少人工干预,保证服务的一个高可用性。
思路:
1.统一请求入参,将工程中零散的发送http请求代码聚合封装起来
2.在请求工具类中记录中记录相关参数,如入参、url、发送时间、场景、相应结果等,如果出现异常,需要记录异常原因,放入异常队列(重试使用),也可记录日志(定时任务重试,达到最大重试次数后人工介入修复)
3.当达到一定次数(可配置项,也可配成每分钟失败次数、比例),应该及时熔断该链路,返回友好提示信息,避免有问题的路径挤压了其他正常请求访问。
4.错误异常信息应该及时监测反馈,而不是三方收不到东西过来主动过来询问,将一些错误日志可视化,再页面上可重复操作。
5.将接受第三方请求与下发异步化,采用线程池的方式,线程池参数可先初始化给一个值,后面可压测结果动态调整。因为没有借助中间件的关系,任务队列长度不宜过大,以免过度挤压jvm空间。队列满时也可以返回友好相应,告诉第三方服务器已达到最大压力,让三方主动稍后再试,留给平台缓冲时间。
简单请求重试流程:
二.设计实现
主要模块
RetryPolicy:重试策略,可配置什么时候重试,包括最大重试次数、重试时间、重试步长等。
RetryQueue:重试队列,用于管理重试任务。
RetryTask:重试任务,包装需要重试的请求。
RetryManager:重试任务管理器。
代码实现
重试任务
我们将我们的重试http请求相关信息包装成任务类,增强其扩展性。
import lombok.Data;
import java.io.Serializable;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
public class RetryTask implements Delayed , Serializable {
private static final long serialVersionUID = 1L;
private String type;
private String url;
private String content;
//抽象策略类
private AbstractRetryPolicy retryPolicy;
public RetryTask(String url, String content, AbstractRetryPolicy retryPolicy) {
this.type = "retry";
this.url = url;
this.content = content;
this.retryPolicy = retryPolicy;
}
public RetryTask(String type, String url, String content, AbstractRetryPolicy retryPolicy) {
this.type = type;
this.url = url;
this.content = content;
this.retryPolicy = retryPolicy;
}
//重试方法
public RetryTask retry() {
//具体的重试逻辑依赖具体的重试策略类
this.retryPolicy.retry();
return this;
}
//决定是否要重试
public boolean isRetry() {
return this.retryPolicy.isRetry();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.retryPolicy.calExpireTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
RetryTask task = (RetryTask) o;
return (int) (this.retryPolicy.calExpireTime()- task.getRetryPolicy().calExpireTime());
}
}
我们再来看看具体的重试策略有哪些
重试策略
我们的重试策略类负责我们具体重试逻辑的实现,他包含了两个个重要的参数:延迟时间、最大重试次数
import lombok.Data;
@Data
public abstract class AbstractRetryPolicy {
//延迟时间
protected Long delayMils;
//当前重试次数
protected Integer retryTimes = 0;
//最大重试次数
protected Integer maxRetryTimes;
public abstract Long calExpireTime();
public abstract Long calDelayMilse();
protected void setRetryTimes(Integer retryTimes){
this.retryTimes = retryTimes;
}
public boolean isRetry() {
return retryTimes < maxRetryTimes;
}
public void retry(){
retryTimes = retryTimes++;
}
}
具体有如下两个策略类实现
简单重试策略类
public class SimpleRetryPolicy extends AbstractRetryPolicy {
@Override
public Long calExpireTime() {
return System.currentTimeMillis() + super.delayMils;
}
@Override
public Long calDelayMilse() {
return super.getDelayMils();
}
@Override
public boolean isRetry() {
return super.isRetry();
}
@Override
public void retry() {
super.retry();
}
}
带有步长的重试策略类
package com.hesc.transfer.retry.policy;
import lombok.Data;
@Data
public class StepRetryPolicy extends AbstractRetryPolicy {
//步长
private Long stepTime;
@Override
public Long calExpireTime() {
return System.currentTimeMillis() + super.delayMils + super.retryTimes * stepTime;
}
@Override
public Long calDelayMilse() {
return super.delayMils + super.retryTimes * stepTime;
}
@Override
public boolean isRetry() {
return super.isRetry();
}
@Override
public void retry() {
super.retry();
}
}
重试队列
接下来是我们的重试队列接口,他所拥有的主要功能为添加任务和取出任务,如果无任务时应该阻塞。
public interface RetryQueue {
boolean offer(RetryTask task);
RetryTask take();
}
他有三个主要实现,基于jdk实现,基于redisson的阻塞队列实现,基于redis的zset实现。
1.jdk版本实现较为简单,可以用于比较轻量的场景,宕机有丢失消息风险。
2.基于redisson的阻塞队列实现的版本可以在有redisson框架下实现,较为方便,不会丢失消息,适合分布式架构。
2.基于redis的zset实现的版本扩展性更强,不会丢失消息,适合分布式架构。
首先看基于jdk实现的重试队列
import lombok.Data;
import org.bouncycastle.cert.ocsp.Req;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.atomic.AtomicInteger;
@Data
@Service
@Transactional
public class JDKRetryQueue implements RetryQueue {
private final static DelayQueue<RetryTask> queue = new DelayQueue<>();
private final static Integer MAX_SIZE = 1000;
private final static AtomicInteger size = new AtomicInteger(0);
/**
* 不在方法上锁是为了保证吞吐量,延迟队列底层会自己上锁
* 用原子类保证数量正确即可,不必要求强一致性size,保证最终一致即可(即允许细微超出队列长度)
*/
public boolean offer(RetryTask task) {
//并发场景下可能会超出队列(稍许),在可控范围内,故方法不上锁,保证性能
if (MAX_SIZE - size.get() > 0) {
if (queue.offer(task)) {
size.getAndIncrement();
return true;
}
}
return false;
}
/**
* take为阻塞方法,也无需上锁
* 上同
*/
public RetryTask take() {
RetryTask task = null;
try {
task = queue.take();
size.getAndDecrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
return task;
}
}
基于redisson的阻塞队列实现
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import org.redisson.Redisson;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@Transactional
public class RedissionRetryQueue implements RetryQueue {
@Autowired
private RedissonClient redissonClient;
@Override
public boolean offer(RetryTask task) {
try {
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque("retry");
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(task, task.getRetryPolicy().calDelayMilse(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
@Override
public RetryTask take() {
String lockName = "ZsetQueue";
RLock lock = redissonClient.getLock(lockName);
try {
if (lock.tryLock(2000L, 2000L, TimeUnit.MILLISECONDS)) {
RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque("retry");
// 将队列中放入的第一个元素取出
RetryTask value = (RetryTask) blockingDeque.take();
return value;
}else {
Thread.sleep(5*1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
return take();
}
}
基于redis的zset实现
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import java.util.Collections;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ZsetRetryQueue implements RetryQueue {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
private static BlockingQueue queue = new LinkedBlockingQueue<>();
private static Boolean started = false;
@Override
public boolean offer(RetryTask task) {
try {
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
// 设置score,在当前时间上加score
long scheduleTime = System.currentTimeMillis() + task.getRetryPolicy().calDelayMilse();
zset.add("retry", task, scheduleTime);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
@Override
public RetryTask take() {
//初始化启动项
enableTimer();
RetryTask task = null;
try {
task = (RetryTask) queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return task;
}
private void enableTimer() {
if (!started) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
String lockName = "redissionQueue";
RLock lock = redissonClient.getLock(lockName);
try {
if (lock.tryLock(2000L, 2000L, TimeUnit.MILLISECONDS)) {
Long now = System.currentTimeMillis();
// 取出小于等于当前时间的元素
Set<Object> set = zset.rangeByScore("retry", 0, now);
if (CollectionUtils.isNotEmpty(set)) {
if (queue.addAll(set))
// zset中删除过期的元素
zset.removeRangeByScore("retry", 0, now);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}, 0, 1000);
}
started = true;
}
}
重试管理器
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
//@Component
@Slf4j
public class RetryManager implements InitializingBean {
@Autowired
private HttpOperationService httpOperationService;
@Autowired
private RetryQueue retryQueue;
//先不处理线程打断异常,暂无停止queue的需要,后续可以留后门停止运行的queue
@Override
public void afterPropertiesSet() throws Exception {
new Thread(()->{
while (true) {
RetryTask task = null;
task = retryQueue.take();
if (task.isRetry()) {
httpOperationService.retryPost(task);
}else{
//钉钉报警
dingdingCall(task);
//记录日志
saveLog(task);
}
}).start();
}
}
通信服务
@Slf4j
@Service
@Transactional
public class HttpOperationServiceImpl implements HttpOperationService {
@Autowired
private HttpLogService httpLogService;
@Autowired
private RetryQueue retryQueue;
...
@Override
public String retryPost(RetryTask task) {
return retryPost(task.getType(), task.getUrl(), task.getContent());
}
@Override
public String retryPost(String type, String url, String json) {
SevenProblemLogEntity logEntity = SevenProblemLogEntity.builder()
.type(type).url(url).content(json).traceId(MDC.get("TRACE_ID")).build();
String res = null;
try {
res = post(url, json);
logEntity.setMsg(oldPost);
} catch (Exception e) {
logEntity.setMsg(e.res());
//TODO 默认重试策略,后期应改成从配置文件中指定
SimpleRetryPolicy policy = new SimpleRetryPolicy();
//设置延迟时间
policy.setDelayMils(10 * 60 * 1000l);
//设置最大重试次数
policy.setMaxRetryTimes(3);
RetryTask task = new RetryTask(type, url, json, policy);
//加入重试队列
retryQueue.offer(task);
}finally {
//TODO 最好加个重试标志位
httpLogService.save(logEntity);
}
return res;
}
...
private String post(String url, String json){
...
}
ok,此时一个简单的重试队列开发就已经完成了。我们可以不用为每次发生的网络抖动或者一些三方服务升级导致的短暂不可用造成数据不一致而频繁的人工介入了。这极大增强了我们服务的高可用性,并且对一些网络请求进行追溯,当异常发生的时候,我们可以及时介入,及时处理问题。
评论区