一時技癢,擼了個動態線程池,源碼放Github了

一時技癢,擼了個動態線程池,源碼放Github了

闡述背景

線程池在日常工作中用的還挺多,當需要異步,批量處理一些任務的時候我們會定義一個線程池來處理。

在使用線程池的過程中有一些問題,下面簡單介紹下之前遇到的一些問題。

場景一:實現一些批量處理數據的功能,剛開始線程池的核心線程數設的比較小,然後想調整下,只能改完后重啟應用。

場景二:有一個任務處理的應用,會接收 MQ 的消息進行任務的處理,線程池的隊列也允許緩存一定數量的任務。當任務處理的很慢的時候,想看看到底有多少沒有處理完不是很方便。當時為了快速方便,就直接啟動了一個線程去循環打印線程池隊列的大小。

正好之前在我公眾號有轉發過美團的一篇線程池應用的文章(https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w),覺得他們的思路非常好,就是沒有開放源碼,所以自己就抽時間在我的開源項目 Kitty 中增加了一個動態線程池的組件,支持了 Cat 監控,動態變更核心參數,任務堆積告警等。今天就給大家分享一下實現的方式。

項目源代碼地址:https://github.com/yinjihuan/kitty

使用方式

添加依賴

依賴線程池的組件,目前 Kitty 未發布,需要自己下載源碼 install 本地或者私有倉庫。

<dependency>
    <groupId>com.cxytiandi</groupId>
    <artifactId>kitty-spring-cloud-starter-dynamic-thread-pool</artifactId>
</dependency>

添加配置

然後在 Nacos 配置線程池的信息,我的這個整合了 Nacos。推薦一個應用創建一個單獨的線程池配置文件,比如我們這個叫 dataId 為 kitty-cloud-thread-pool.properties,group 為 BIZ_GROUP。

內容如下:

kitty.threadpools.nacosDataId=kitty-cloud-thread-pool.properties
kitty.threadpools.nacosGroup=BIZ_GROUP
kitty.threadpools.accessToken=ae6eb1e9e6964d686d2f2e8127d0ce5b31097ba23deee6e4f833bc0a77d5b71d
kitty.threadpools.secret=SEC6ec6e31d1aa1bdb2f7fd5eb5934504ce09b65f6bdc398d00ba73a9857372de00
kitty.threadpools.owner=尹吉歡
kitty.threadpools.executors[0].threadPoolName=TestThreadPoolExecutor
kitty.threadpools.executors[0].corePoolSize=4
kitty.threadpools.executors[0].maximumPoolSize=4
kitty.threadpools.executors[0].queueCapacity=5
kitty.threadpools.executors[0].queueCapacityThreshold=5
kitty.threadpools.executors[1].threadPoolName=TestThreadPoolExecutor2
kitty.threadpools.executors[1].corePoolSize=2
kitty.threadpools.executors[1].maximumPoolSize=4

nacosDataId,nacosGroup

監聽配置修改的時候需要知道監聽哪個 DataId,值就是當前配置的 DataId。

accessToken,secret

釘釘機器人的驗證信息,用於告警。

owner

這個應用的負責人,告警的消息中會显示。

threadPoolName

線程池的名稱,使用的時候需要關注。

剩下的配置就不一一介紹了,跟線程池內部的參數一致,還有一些可以查看源碼得知。

注入使用

@Autowired
private DynamicThreadPoolManager dynamicThreadPoolManager;
dynamicThreadPoolManager.getThreadPoolExecutor("TestThreadPoolExecutor").execute(() -> {
    log.info("線程池的使用");
    try {
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}, "getArticle");

通過 DynamicThreadPoolManager 的 getThreadPoolExecutor 方法獲取線程池對象,然後傳入 Runnable,Callable 等。第二個參數是這個任務的名稱,之所以要擴展一個參數是因為如果任務沒有標識,那麼無法區分任務。

這個線程池組件默認集成了 Cat 打點,設置了名稱可以在 Cat 上查看這個任務相關的監控數據。

擴展功能

任務執行情況監控

在 Cat 的 Transaction 報表中會以線程池的名稱為類型显示。

詳情中會以任務的名稱显示。

核心參數動態修改

核心參數目前只支持 corePoolSize,maximumPoolSize,queueCapacity(隊列類型為 LinkedBlockingDeque 才可以修改),rejectedExecutionType,keepAliveTime,unit 這些參數的修改。

一般 corePoolSize,maximumPoolSize,queueCapacity 是最常要動態改變的。

需要改動的話直接在 Nacos 中將對應的配置值修改即可,客戶端會監聽配置的修改,然後同步修改先線程池的參數。

隊列容量告警

queueCapacityThreshold 是隊列容量告警的閥值,如果隊列中的任務數量超過了 queueCapacityThreshold 就會告警。

拒絕次數告警

當隊列容量滿了后,新進來的任務會根據用戶設置的拒絕策略去選擇對應的處理方式。如果是採用 AbortPolicy 策略,也會進行告警。相當於消費者已經超負荷了。

線程池運行情況

底層對接了 Cat,所以將線程的運行數據上報給了 Cat。我們可以在 Cat 中查看這些信息。

如果你想在自己的平台去展示,我這邊暴露了/actuator/thread-pool 端點,你可以自行拉取數據。

{
	threadPools: [{
		threadPoolName: "TestThreadPoolExecutor",
		activeCount: 0,
		keepAliveTime: 0,
		largestPoolSize: 4,
		fair: false,
		queueCapacity: 5,
		queueCapacityThreshold: 2,
		rejectCount: 0,
		waitTaskCount: 0,
		taskCount: 5,
		unit: "MILLISECONDS",
		rejectedExecutionType: "AbortPolicy",
		corePoolSize: 4,
		queueType: "LinkedBlockingQueue",
		completedTaskCount: 5,
		maximumPoolSize: 4
	}, {
		threadPoolName: "TestThreadPoolExecutor2",
		activeCount: 0,
		keepAliveTime: 0,
		largestPoolSize: 0,
		fair: false,
		queueCapacity: 2147483647,
		queueCapacityThreshold: 2147483647,
		rejectCount: 0,
		waitTaskCount: 0,
		taskCount: 0,
		unit: "MILLISECONDS",
		rejectedExecutionType: "AbortPolicy",
		corePoolSize: 2,
		queueType: "LinkedBlockingQueue",
		completedTaskCount: 0,
		maximumPoolSize: 4
	}]
}

自定義拒絕策略

平時我們使用代碼創建線程池可以自定義拒絕策略,在構造線程池對象的時候傳入即可。這裏由於創建線程池都被封裝好了,我們只能在 Nacos 配置拒絕策略的名稱來使用對應的策略。默認是可以配置 JDK 自帶的 CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy 這四種。

如果你想自定義的話也是支持的,定義方式跟以前一樣,如下:

@Slf4j
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        log.info("進來了。。。。。。。。。");
    }
}

要讓這個策略生效的話使用的是 SPI 的方式,需要在 resources 下面創建一個 META-INF 的文件夾,然後創建一個 services 的文件夾,再創建一個 java.util.concurrent.RejectedExecutionHandler 的文件,內容為你定義的類全路徑。

自定義告警方式

默認是內部集成了釘釘機器人的告警方式,如果你不想用也可以將其關閉。或者將告警信息對接到你的監控平台去。

如果沒有告警平台也可以在項目中實現新的告警方式,比如短信等。

只需要實現 ThreadPoolAlarmNotify 這個類即可。

/**
 * 自定義短信告警通知
 *
 * @作者 尹吉歡
 * @個人微信 jihuan900
 * @微信公眾號 猿天地
 * @GitHub https://github.com/yinjihuan
 * @作者介紹 http://cxytiandi.com/about
 * @時間 2020-05-27 22:26
 */
@Slf4j
@Component
public class ThreadPoolSmsAlarmNotify implements ThreadPoolAlarmNotify {
    @Override
    public void alarmNotify(AlarmMessage alarmMessage) {
        log.info(alarmMessage.toString());
    }
}

代碼實現

具體的就不講的很細了,源碼在https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool,大家自己去看,並不複雜。

創建線程池

根據配置創建線程池,ThreadPoolExecutor 是自定義的,因為需要做 Cat 埋點。

/**
 * 創建線程池
 * @param threadPoolProperties
 */
private void createThreadPoolExecutor(DynamicThreadPoolProperties threadPoolProperties) {
    threadPoolProperties.getExecutors().forEach(executor -> {
        KittyThreadPoolExecutor threadPoolExecutor = new KittyThreadPoolExecutor(
                executor.getCorePoolSize(),
                executor.getMaximumPoolSize(),
                executor.getKeepAliveTime(),
                executor.getUnit(),
                getBlockingQueue(executor.getQueueType(), executor.getQueueCapacity(), executor.isFair()),
                new KittyThreadFactory(executor.getThreadPoolName()),
                getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()), executor.getThreadPoolName());
        threadPoolExecutorMap.put(executor.getThreadPoolName(), threadPoolExecutor);
    });
}

刷新線程池

首先需要監聽 Nacos 的修改。

/**
 * 監聽配置修改,spring-cloud-alibaba 2.1.0版本不支持@NacosConfigListener的監聽
 */
public void initConfigUpdateListener(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
    ConfigService configService = nacosConfigProperties.configServiceInstance();
    try {
        configService.addListener(dynamicThreadPoolProperties.getNacosDataId(), dynamicThreadPoolProperties.getNacosGroup(), new AbstractListener() {
            @Override
            public void receiveConfigInfo(String configInfo) {
                new Thread(() -> refreshThreadPoolExecutor()).start();
                log.info("線程池配置有變化,刷新完成");
            }
        });
    } catch (NacosException e) {
        log.error("Nacos配置監聽異常", e);
    }
}

然後再刷新線程池的參數信息,由於監聽事件觸發的時候,這個時候配置其實還沒刷新,所以我就等待了 1 秒鐘,讓配置完成刷新然後直接從配置類取值。

雖然有點挫還是可以用,其實更好的方式是解析 receiveConfigInfo 那個 configInfo,configInfo 就是改變之後的整個配置內容。因為不太好解析成屬性文件,就沒做,後面再改吧。

/**
 * 刷新線程池
 */
private void refreshThreadPoolExecutor() {
    try {
        // 等待配置刷新完成
        Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    dynamicThreadPoolProperties.getExecutors().forEach(executor -> {
        ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(executor.getThreadPoolName());
        threadPoolExecutor.setCorePoolSize(executor.getCorePoolSize());
        threadPoolExecutor.setMaximumPoolSize(executor.getMaximumPoolSize());
        threadPoolExecutor.setKeepAliveTime(executor.getKeepAliveTime(), executor.getUnit());
        threadPoolExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()));
        BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
        if (queue instanceof ResizableCapacityLinkedBlockIngQueue) {
            ((ResizableCapacityLinkedBlockIngQueue<Runnable>) queue).setCapacity(executor.getQueueCapacity());
        }
    });
}

其他的刷新都是線程池自帶的,需要注意的是線程池隊列大小的刷新,目前只支持 LinkedBlockingQueue 隊列,由於 LinkedBlockingQueue 的大小是不允許修改的,所以按照美團那篇文章提供的思路,自定義了一個可以修改的隊列,其實就是把 LinkedBlockingQueue 的代碼複製了一份,改一下就可以。

往 Cat 上報運行信息

往 Cat 的 Heartbeat 報表上傳數據的代碼如下,主要還是 Cat 本身提供了擴展的能力。只需要定時去調用下面的方式上報數據即可。

public void registerStatusExtension(ThreadPoolProperties prop, KittyThreadPoolExecutor executor) {
    StatusExtensionRegister.getInstance().register(new StatusExtension() {
        @Override
        public String getId() {
            return "thread.pool.info." + prop.getThreadPoolName();
        }
        @Override
        public String getDescription() {
            return "線程池監控";
        }
        @Override
        public Map<String, String> getProperties() {
            AtomicLong rejectCount = getRejectCount(prop.getThreadPoolName());
            Map<String, String> pool = new HashMap<>();
            pool.put("activeCount", String.valueOf(executor.getActiveCount()));
            pool.put("completedTaskCount", String.valueOf(executor.getCompletedTaskCount()));
            pool.put("largestPoolSize", String.valueOf(executor.getLargestPoolSize()));
            pool.put("taskCount", String.valueOf(executor.getTaskCount()));
            pool.put("rejectCount", String.valueOf(rejectCount == null ? 0 : rejectCount.get()));
            pool.put("waitTaskCount", String.valueOf(executor.getQueue().size()));
            return pool;
        }
    });
}

定義線程池端點

通過自定義端點來暴露線程池的配置和運行的情況,可以讓外部的監控系統拉取數據做對應的處理。

@Endpoint(id = "thread-pool")
public class ThreadPoolEndpoint {
    @Autowired
    private DynamicThreadPoolManager dynamicThreadPoolManager;
    @Autowired
    private DynamicThreadPoolProperties dynamicThreadPoolProperties;
    @ReadOperation
    public Map<String, Object> threadPools() {
        Map<String, Object> data = new HashMap<>();
        List<Map> threadPools = new ArrayList<>();
        dynamicThreadPoolProperties.getExecutors().forEach(prop -> {
            KittyThreadPoolExecutor executor = dynamicThreadPoolManager.getThreadPoolExecutor(prop.getThreadPoolName());
            AtomicLong rejectCount = dynamicThreadPoolManager.getRejectCount(prop.getThreadPoolName());
            Map<String, Object> pool = new HashMap<>();
            Map config = JSONObject.parseObject(JSONObject.toJSONString(prop), Map.class);
            pool.putAll(config);
            pool.put("activeCount", executor.getActiveCount());
            pool.put("completedTaskCount", executor.getCompletedTaskCount());
            pool.put("largestPoolSize", executor.getLargestPoolSize());
            pool.put("taskCount", executor.getTaskCount());
            pool.put("rejectCount", rejectCount == null ? 0 : rejectCount.get());
            pool.put("waitTaskCount", executor.getQueue().size());
            threadPools.add(pool);
        });
        data.put("threadPools", threadPools);
        return data;
    }
}

Cat 監控線程池中線程的執行時間

本來是將監控放在 KittyThreadPoolExecutor 的 execute,submit 方法里的。後面測試下來發現有問題,數據在 Cat 上確實有了,但是執行時間都是 1 毫秒,也就是沒生效。

不說想必大家也知道,因為線程是後面單獨去執行的,所以再添加任務的地方埋點沒任務意義。

後面還是想到了一個辦法來實現埋點的功能,就是利用線程池提供的 beforeExecute 和 afterExecute 兩個方法,在線程執行之前和執行之後都會觸發這兩個方法。

@Override
protected void beforeExecute(Thread t, Runnable r) {
    String threadName = Thread.currentThread().getName();
    Transaction transaction = Cat.newTransaction(threadPoolName, runnableNameMap.get(r.getClass().getSimpleName()));
    transactionMap.put(threadName, transaction);
    super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    String threadName = Thread.currentThread().getName();
    Transaction transaction = transactionMap.get(threadName);
    transaction.setStatus(Message.SUCCESS);
    if (t != null) {
        Cat.logError(t);
        transaction.setStatus(t);
    }
    transaction.complete();
    transactionMap.remove(threadName);
}

後面的代碼大家自己去看就行了,本文到這裏就結束了。如果感覺本文還不錯的記得轉發下哦!

多謝多謝。

最後感謝美團技術團隊的那篇文章,雖然沒有分享源碼,但是思路什麼的和應用場景都講的很明白。

感興趣的 Star 下唄:https://github.com/yinjihuan/kitty

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心