1. 解决的问题
当一个线程执行过程中开启了新的异步线程,会导致异步线程与当前线程的traceId不一致的问题。
在线程池中,traceId可能在线程创建那一刻就已经固定了,不会跟着使用场景上下文traceId变动,在后面的线程复用环节中一直都是这个traceId,会带来traceId混乱在一起的情况,同样也会带来异步线程与当前线程的traceId不一致的问题。
最终,导致异步线程在日志上无法准确追踪到整个调用链路。
2. 环境
总的来说基于 Zipkin server + Brave library,这一块基本使用需要参考一下文档:
https://zipkin.io/pages/instrumenting.html
https://github.com/openzipkin/brave
3. Brave的currentTraceContext
以MDCCurrentTraceContext作为上下文容器进行初始化方式为例:
var Tracing = Tracing.newBuilder().endpoint(endpoint)
.spanReporter(spanReporter()).currentTraceContext(MDCCurrentTraceContext.create()).build();
当调用tracing#Tracer#nextSpan()
来开启一个新的执行片段(Span)时:
/**
* Returns a new child span if there's a {@link #currentSpan()} or a new trace if there isn't.
*
* <p>Prefer {@link #startScopedSpan(String)} if you are tracing a synchronous function or code
* block.
*/
public Span nextSpan() {
TraceContext parent = currentTraceContext.get();
return parent != null ? newChild(parent) : newTrace();
}
本质上是从currentTraceContext
中取出TraceContext
再进行后续操作,而TraceContext
中又有我们需要的traceId:TraceContext#traceIdString
因此,认为只要将这个currentTraceContext
从当前线程“传递”到异步线程中就可以满足需求。
我这里追了下代码,MDCCurrentTraceContext
是通过ThreadLocal(准确的说是InheritableThreadLocal)绑定到线程中的,TraceContext
有多种实现,也有可能有其他的方式。猜想作为框架的Brave其实应该提供一个方法来统一处理这种需求。
4. 用Brave提供的API实现
Brave提供了以下API来装饰线程(池),帮我们做这个“传递”的动作:
- brave.propagation.CurrentTraceContext#wrap(java.lang.Runnable)
- brave.propagation.CurrentTraceContext#wrap(java.util.concurrent.Callable)
- brave.propagation.CurrentTraceContext#executor
- brave.propagation.CurrentTraceContext#executorService
下文是几个Demo代码。
4.1. Runnable使用
// Ignore DI Tracing
// Ignore DI THREAD_POOL
CompletableFuture<Void> smartFuture = CompletableFuture.runAsync(tracing.currentTraceContext().wrap(() -> {
var tracer = tracing.tracer();
var span = getNextSpan(tracer, "spanName");
try (var ignored = tracer.withSpanInScope(span)) {
// biz code
} catch (Exception e) {
span.error(e);
throw e;
} finally {
span.finish();
}
}), THREAD_POOL);
4.2. Spring线程池ThreadPoolTaskExecutor中使用
// Ignore DI Tracing
@Bean
public ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(20);
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setQueueCapacity(100);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setThreadNamePrefix("thread-prefix");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// decorate runnable used in thread pool
threadPoolTaskExecutor.setTaskDecorator(tracing.currentTraceContext()::wrap);
return threadPoolTaskExecutor;
}
Java自带的ThreadPoolExecutor也是一样的,重写即可,方法同Spring的ThreadPoolTaskExecutor类中的setTaskDecorator方法。