yml配置
1 2 3 4 5 6 7 |
tieasy: threadPool: corePoolSize: 100 maxPoolSize: 400 queueCapacity: 10000 keepAliveSeconds: 60 threadNamePrefix: tieasy-thread-pool |
对应映射bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Getter @Setter @Component @ConfigurationProperties(prefix = "tieasy") public class SystemConfig { private ThreadPool threadPool; @Data public static class ThreadPool { private int corePoolSize; private int maxPoolSize; private int queueCapacity; private int keepAliveSeconds; private String threadNamePrefix; } } |
eventBus配置类
这边需要关注下ExecutorService的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Configuration @Slf4j public class EventBusConfig { @Bean public EventBus eventPublisherEventBus(ExecutorService es){ return new AsyncEventBus(es, new CustomSubscriberExceptionHandler()); } @Bean public ExecutorService executorService(SystemConfig systemConfig) { SystemConfig.ThreadPool threadPool = systemConfig.getThreadPool(); CustomizableThreadFactory threadFactory = new CustomizableThreadFactory(threadPool.getThreadNamePrefix); return new ThreadPoolExecutor(threadPool.getCorePoolSize(), threadPool.getMaxPoolSize(), threadPool.getKeepAliveSeconds(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(threadPool.getQueueCapacity()), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()) { protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); printException(r, t); } }; } private static void printException(Runnable r, Throwable t) { if (t == null && r instanceof Future<?>) { try { Future<?> future = (Future<?>) r; if (future.isDone()) future.get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } if (t != null) log.error("publish occur error: ", t); } } |
订阅异常处理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import com.google.common.eventbus.SubscriberExceptionContext; import com.google.common.eventbus.SubscriberExceptionHandler; import lombok.extern.slf4j.Slf4j; /** * @author 791202.com * @date 1477/01/01 */ @Slf4j public class CustomSubscriberExceptionHandler implements SubscriberExceptionHandler { @Override public void handleException(Throwable exception, SubscriberExceptionContext context) { log.error("Could not dispatch event: " + context.getSubscriber() + " to " + context.getSubscriberMethod(), exception); } } |
0