起因

公司项目开发用的比较早的Spring Cloud全家桶(Zuul、Eureka、Feign、Hystrix、Spring Cloud Config、Spring Boot),新的一个项目开发时候因为涉及一些老的微服务接口改造,增加了较大量的公共参数改造,接口批量修改影响的范围比较大,所以参考调用链的方式,通过实现feign.RequestInterceptor拦截器来修改RequestTemplate header参数的方式将公共参数传递给下一级微服务。

问题

因为Hystrix默认而且官方强烈建议使用Thread隔离级别,所以在RequestInterceptor的拦截器中无法直接获取到当前的Request对象,也就无法将请求中的公共参数进行传递

根据一些资料可以比较出,信号量默认因为不涉及到线程调度,开销小,但硬伤也比较明显:不支持异步请求、无法限制请求超时时间

在高并发、大流量情况下,还是默认的Thread隔离级别相对合适一些

解决方案

  1. 修改Hystrix隔离级别为信号量(Semaphore)模式
  2. 自定义Hystrix隔离隔离策略国人实现的Hystrix隔离策略补丁

最终我决定采用第二种方案

踩坑记录

首先,我封装了一个公共的Spring Boot Starter来处理公共参数及参数传递的工作。创建了一些AOP对于所有Controller参数进行拦截,并且放在自定义的ThreadLocal对象中,在请求结束的时候再从ThreadLocal中移除对象,防止内存泄露

然后实现feign.RequestInterceptor拦截器,从ThreadLocal中获取参数,放在RequestTemplate的header传递给下一层。

第一坑

然后就踩到了第一个坑:feign.RequestInterceptor拦截器调试时候都很好,测试时候发现随着请求量的上升,ThreadLocal中经常出现获取不到内容的情况

一轮日志打印+Google后发现,FeignClient调用下级微服务的时候是从线程池中拿到一个新的线程来执行Feign调用,此时无法正确的拿到当前的线程,造成ThreadLocal获取值失败

尝试了阿里开源的TransmittableThreadLocal类后,发现没有像阿里说的那样『在使用线程池等会池化复用线程的执行组件情况下传递ThreadLocal』则是TransmittableThreadLocal目标场景,能解决ThreadLocal获取不到值的情况

第二坑

因为AOP偷懒直接用了@Around注解拦截请求,请求结束后调用ThreadLocal remove方法移除数据,实际因为公司的项目采用了抛出异常的方式来处理返回错误,在出现参数错误等场景的时候,会造成TreadLocal没有正常remove,给排查问题造成了很大困扰

最终解决方案

  1. TreadLocal使用JDK自带的InheritableThreadLocal
  2. AOP调整为@After中统一remove ThreadLocal数据
  3. AOP拦截到参数后,将参数塞进ThreadLocal的同时,通过request.setAttribute方式临时将序列化好的参数保存在当前Request的Attribute中(需注意调用request.setAttribute前最好先request.removeAttribute掉)
  4. feign.RequestInterceptor拦截器获取当前Request对象,从Attribute中获取数据放入RequestTemplate的header中传递给下一级

附:自定义Hystrix隔离隔离策略代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    private static final Log log = LogFactory
            .getLog(RequestAttributeHystrixConcurrencyStrategy.class);

    private HystrixConcurrencyStrategy delegate;

    public RequestAttributeHystrixConcurrencyStrategy() {
        log.info("注入 Hystrix 熔断策略");
        try {
            this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
                // Welcome to singleton hell...
                return;
            }
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
                    .getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
                    .getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
                    .getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
                    .getPropertiesStrategy();
            this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
                    propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance()
                    .registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        } catch (Exception e) {
            log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
        }
    }

    private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
                                                 HystrixMetricsPublisher metricsPublisher,
                                                 HystrixPropertiesStrategy propertiesStrategy) {
        if (log.isDebugEnabled()) {
            log.debug("Current Hystrix plugins configuration is ["
                    + "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
                    + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
                    + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
            log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
        }
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        return new WrappedCallable<>(callable, requestAttributes);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                            HystrixProperty<Integer> corePoolSize,
                                            HystrixProperty<Integer> maximumPoolSize,
                                            HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue) {
        return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
                keepAliveTime, unit, workQueue);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                            HystrixThreadPoolProperties threadPoolProperties) {
        return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
    }

    @Override
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return this.delegate.getBlockingQueue(maxQueueSize);
    }

    @Override
    public <T> HystrixRequestVariable<T> getRequestVariable(
            HystrixRequestVariableLifecycle<T> rv) {
        return this.delegate.getRequestVariable(rv);
    }

    static class WrappedCallable<T> implements Callable<T> {

        private final Callable<T> target;
        private final RequestAttributes requestAttributes;

        public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
            this.target = target;
            this.requestAttributes = requestAttributes;
        }

        @Override
        public T call() throws Exception {
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                return target.call();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}