骆驼MDC Logback旧信息在卷
我们有一个高负载的Apache Camel应用程序,利用logback/MDC记录信息。我们发现一些MDC信息在logback的文档中预先警告过。我发现这太问题,解决这个问题:骆驼MDC Logback旧信息在卷
How to use MDC with thread pools?
我们应该如何应用此我们CAMEL应用避免旧的信息?是否有一个简单的方法来将链接问题中建议的默认ThreadPoolExecutor全局更改为自定义变体?我看到你可以为游泳池本身做到这一点,但没有看到执行者的任何例子。请记住,我们的应用程序相当庞大,并且每天都为大量订单提供服务 - 我希望尽可能减少对现有应用程序的影响。
我想通了,并想发布我所做的事情,以防止他人受益。请注意,我使用JDK 6/camel2.13.2
骆驼有一个使用
DefaultThreadPoolFactory
一个DefaultExecutorServiceManager
。我将默认工厂扩展为MdcThreadPoolFactory
-
DefaultThreadPoolFactory
具有生成RejectableThreadPoolExecutor
s和RejectableScheduledThreadPoolExecutor
s的方法。我将这两个扩展为Mdc *版本,覆盖方法以包装Runnable并在线程之间切换MDC信息(如我原始问题中的链接所指定的那样)。package com.mypackage.concurrent import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor import org.slf4j.MDC; import java.util.Map; import java.util.concurrent.*; /** * A SLF4J MDC-compatible {@link ThreadPoolExecutor}. * <p/> * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately. * <p/> * Created by broda20. * Date: 10/29/15 */ public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor { @SuppressWarnings("unchecked") private Map<String, Object> getContextForTask() { return MDC.getCopyOfContextMap(); } public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } /** * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) * all delegate to this. */ @Override public void execute(Runnable command) { super.execute(wrap(command, getContextForTask())); } public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) { return new Runnable() { @Override public void run() { Map previous = MDC.getCopyOfContextMap(); if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } try { runnable.run(); } finally { if (previous == null) { MDC.clear(); } else { MDC.setContextMap(previous); } } } }; } }
MdcScheduledThreadPoolExecutor:
我在我的应用程序配置由骆驼自动拾取和使用的
ExecutorServiceManager
MdcThreadPoolExecutor创建MdcThreadPoolFactory
的bean实例
package com.mypackage.concurrent
import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
/**
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
* <p/>
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
* <p/>
* Created by broda20.
* Date: 10/29/15
*/
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor {
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return MDC.getCopyOfContextMap();
}
public MdcScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, handler);
}
public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, threadFactory, handler);
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@Override
public void execute(Runnable command) {
super.execute(wrap(command, getContextForTask()));
}
public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
return new Runnable() {
@Override
public void run() {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
}
};
}
}
MdcThreadPoolFactory:
package com.mypackage.concurrent
import org.apache.camel.impl.DefaultThreadPoolFactory
import org.apache.camel.spi.ThreadPoolProfile
import org.apache.camel.util.concurrent.SizedScheduledExecutorService
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
public class MdcThreadPoolFactory extends DefaultThreadPoolFactory {
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return MDC.getCopyOfContextMap();
}
public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut,
RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {
// the core pool size must be 0 or higher
if (corePoolSize < 0) {
throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
}
// validate max >= core
if (maxPoolSize < corePoolSize) {
throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
}
BlockingQueue<Runnable> workQueue;
if (corePoolSize == 0 && maxQueueSize <= 0) {
// use a synchronous queue for direct-handover (no tasks stored on the queue)
workQueue = new SynchronousQueue<Runnable>();
// and force 1 as pool size to be able to create the thread pool by the JDK
corePoolSize = 1;
maxPoolSize = 1;
} else if (maxQueueSize <= 0) {
// use a synchronous queue for direct-handover (no tasks stored on the queue)
workQueue = new SynchronousQueue<Runnable>();
} else {
// bounded task queue to store tasks on the queue
workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
answer.setThreadFactory(threadFactory);
answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
if (rejectedExecutionHandler == null) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
answer.setRejectedExecutionHandler(rejectedExecutionHandler);
return answer;
}
@Override
public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
if (rejectedExecutionHandler == null) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
//JDK7: answer.setRemoveOnCancelPolicy(true);
// need to wrap the thread pool in a sized to guard against the problem that the
// JDK created thread pool has an unbounded queue (see class javadoc), which mean
// we could potentially keep adding tasks, and run out of memory.
if (profile.getMaxPoolSize() > 0) {
return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
} else {
return answer;
}
}
}
最后,bean实例:
<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/>
为了得到这个骆驼2.16.3工作由org.apache.camel.util.component.AbstractApiProducer要求的新主题。进程(Exchange,AsyncCallback)我也不得不重写java.util.concurrent.ScheduledThreadPoolExecutor.submit(Runnable) –
cool。当我能够升级我们的骆驼 –
我随后改变了这个以覆盖public ScheduledFuture > schedule(Runnable command,long delay,TimeUnit unit),这是submit()和execute()委托给(在JDK8中最小)。我认为这会让核心骆驼很好的投入。如果我找时间了解这一点,您可以向Apache签署版权(或需要进行任何许可工作)? –