起因
公司项目开发用的比较早的Spring Cloud全家桶(Zuul、Eureka、Feign、Hystrix、Spring Cloud Config、Spring Boot),新的一个项目开发时候因为涉及一些老的微服务接口改造,增加了较大量的公共参数改造,接口批量修改影响的范围比较大,所以参考调用链的方式,通过实现feign.RequestInterceptor
拦截器来修改RequestTemplate
header参数的方式将公共参数传递给下一级微服务。
问题
因为Hystrix默认而且官方强烈建议使用Thread隔离级别,所以在RequestInterceptor
的拦截器中无法直接获取到当前的Request对象,也就无法将请求中的公共参数进行传递
根据一些资料可以比较出,信号量默认因为不涉及到线程调度,开销小,但硬伤也比较明显:不支持异步请求、无法限制请求超时时间
在高并发、大流量情况下,还是默认的Thread隔离级别相对合适一些
解决方案
- 修改Hystrix隔离级别为信号量(Semaphore)模式
- 自定义Hystrix隔离隔离策略国人实现的Hystrix隔离策略补丁
最终我决定采用第二种方案
踩坑记录
首先,我封装了一个公共的Spring Boot Starter来处理公共参数及参数传递的工作。创建了一些AOP对于所有Controller参数进行拦截,并且放在自定义的ThreadLocal对象中,在请求结束的时候再从ThreadLocal中移除对象,防止内存泄露
然后实现feign.RequestInterceptor
拦截器,从ThreadLocal中获取参数,放在RequestTemplate
的header传递给下一层。
第一坑
然后就踩到了第一个坑:feign.RequestInterceptor
拦截器调试时候都很好,测试时候发现随着请求量的上升,ThreadLocal中经常出现获取不到内容的情况
一轮日志打印+Google后发现,FeignClient调用下级微服务的时候是从线程池中拿到一个新的线程来执行Feign调用,此时无法正确的拿到当前的线程,造成ThreadLocal获取值失败
尝试了阿里开源的TransmittableThreadLocal类后,发现没有像阿里说的那样『在使用线程池等会池化复用线程的执行组件情况下传递ThreadLocal』则是TransmittableThreadLocal目标场景
,能解决ThreadLocal获取不到值的情况
第二坑
因为AOP偷懒直接用了@Around
注解拦截请求,请求结束后调用ThreadLocal remove方法移除数据,实际因为公司的项目采用了抛出异常的方式来处理返回错误,在出现参数错误等场景的时候,会造成TreadLocal没有正常remove,给排查问题造成了很大困扰
最终解决方案
- TreadLocal使用JDK自带的
InheritableThreadLocal
- AOP调整为
@After
中统一remove ThreadLocal数据
- AOP拦截到参数后,将参数塞进ThreadLocal的同时,通过
request.setAttribute
方式临时将序列化好的参数保存在当前Request的Attribute中(需注意调用request.setAttribute前最好先request.removeAttribute掉)
feign.RequestInterceptor
拦截器获取当前Request对象,从Attribute中获取数据放入RequestTemplate
的header中传递给下一级
附:自定义Hystrix隔离隔离策略代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private static final Log log = LogFactory
.getLog(RequestAttributeHystrixConcurrencyStrategy.class);
private HystrixConcurrencyStrategy delegate;
public RequestAttributeHystrixConcurrencyStrategy() {
log.info("注入 Hystrix 熔断策略");
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
// Welcome to singleton hell...
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
} catch (Exception e) {
log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (log.isDebugEnabled()) {
log.debug("Current Hystrix plugins configuration is ["
+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
}
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new WrappedCallable<>(callable, requestAttributes);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
static class WrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return target.call();
} finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}
|