時序數據庫 DolphinDB 線程簡介
本文基于 DolphinDB server 最新版 2.00.X,從任務管理、存儲引擎、流數據、集群管理、高可用幾個方面,為大家簡單介紹 DolphinDB 在運行中可能使用到的各種線程,及其相關的配置項和函數,以便用戶掌握 DolphinDB 的線程運行情況。
1.任務管理
工作線程(Worker)接收客戶端請求,將任務分解為多個小任務,根據任務的粒度自己執行或者發送給執行線程 localExecutor 或 remoteExecutor 執行。
Worker
常規交互作業的工作線程。每個節點都存在 Worker 線程,可以分為以下幾類。
- ZeroWorker, FirstWorker, SecondWorker, ThirdWorker, ForthWorker
客戶端提交至節點的作業為0級,由 ZeroWorker 處理。根據作業所涉及到的分區,ZeroWorker 將其分解為多個子任務。其中本地節點上的子任務由 ZeroWorker 與 localExecutor 并行執行;需要由遠程節點執行的子任務則降低為1級,并通過 remoteExecutor 發送到對應節點上的 FirstWorker 處理。
以此類推,若某個級別的子任務需要進一步拆解,則拆分出來的由遠程節點執行的子任務降低一級,發送至遠程節點上對應層級的 Worker 處理。
ZeroWorker 和 FirstWorker 的數量由 workerNum 決定,默認值為機器上的 CPU 核數,最大值不超過 license 中的最大 核數。其余層級的 Work 數量為上級的0.75倍,最小個數為1。
- UrgentWorker
處理緊急任務,只接收一些特殊的系統級任務,譬如登錄,取消作業等。由 urgentWorkerNum 配置,默認值為1,最大值為 CPU 內核數。
- WebWorker
處理 HTTP 請求,由 webWorkerNum 配置。默認為1,最大值為 CPU 內核數。
- InfraWorker
開啟高可用后,用于接收 raft 心跳匯報的線程,防止集群負載大時,心跳信息無法及時匯報。默認有2個該線程。
LocalExecutor
本地執行線程。Worker 拆解完任務后生成的本地子任務隊列,由同一節點下的 localExecutor 執行。所有 Worker共享本地執行線程,每個 localExecutor 一次只能處理一個子任務。通過 localExecutors
配置線程個數,默認為機器上的 CPU 核數減1。
RemoteExecutor
將遠程任務發送到遠程節點的線程,在非 single 模式的節點上可以通過 remoteExecutors
配置線程個數。默認值為集群中節點個數和本地 Worker 的較小值。
AsynchronousRemoteExecutor
接收對其他節點發起的遠程調用(Remote Procedure Call, RPC)任務的線程,并將收到的遠程調用放到 RemoteExecutor 的任務隊列中。每個非 single 模式的節點上有且僅有一個該線程。
RemoteTaskDispatcher
在遠程調用出錯需要重試時,或者一個被關閉的連接上仍有未完成的任務時,這些任務會先放到一個隊列里,由 RemoteTaskDispatcher 從這個隊列取任務并重新交由 AsynchronousRemoteExecutor 去發起遠程調用。
DynamicWorkerManager 和 DynamicWorker
DynamicWorker 是動態工作線程,作為 Worker 的補充。DynamicWorkerManager 是創建 DynamicWorker 的線程,每個節點有且僅有一個該線程。如果所有的工作線程被占滿,有新任務到來時,通過該線程創建 DynamicWorker 來執行新任務。根據系統并發任務的繁忙程度,總共可以創建三組動態工作線程,每一個級別可以創建 maxDynamicWorker 個動態工作線程。
動態工作線程在任務執行完后若閑置60秒則會被系統自動回收,不再占用系統資源。maxDynamicWorker 的默認值為 workerNum。
DynamicExecutorManager 和 DynamicExecutor
與 DynamicWorkerManager 和 DynamicWorker 類似,DynamicExecutor 是由 DynamicWorkerManager 動態創建的執行線程。DynamicExecutor 線程數量上限由 maxDynamicLocalExecutor
決定,默認值為 localExecutors。DynamicWorkerManager 最多可以創建3組動態執行線程,每組線程最多為 maxDynamicLocalExecutor 個。
DynamicExecutor 在閑置60秒后被自動回收。
BlockIOWorker
執行對硬盤讀寫任務的線程。通過 diskIOConcurrencyLevel
控制線程數量,默認值為1。
BatchJobWorker
執行批處理作業任務的工作線程。其上限通過 maxBatchJobWorker
設置,默認值是 workerNum。該線程在任務執行完后若閑置60秒會被系統自動回收,不再占用系統資源。
2.存儲引擎
存儲相關的線程在 server 啟動過程中創建。其主要負責數據的寫入與落盤,并在各種情況下(如節點宕機,磁盤損壞導致數據損壞)維護各節點間數據的完整與一致性。
2.1 預寫日志
在數據節點上,OLAP 和 TSDB 都有兩個寫預寫日志(WAL)的線程 RedoLogHeadWriter 和 RedoLogDataWriter,分別負責寫 WAL 的元數據和數據。
存儲 OLAP 和 TSDB WAL 日志的目錄分別由 redoLogDir
, TSDBRedoLogDir
配置。
- RedoLogHeadWriter
事務發生時通過 RedoLogHeadWriter 同步寫事務元數據信息到 redoLog 目錄下的 header.log 中。 - RedoLogDataWriter
事務的數據會通過 RedoLogDataWriter 異步寫入 redoLog 目錄下的 tid.log 中。
2.2 OLAP 引擎
在開啟了 OLAP 的 cacheEngine 后,會創建一個 ChunkCacheEngineGCWorker 線程。
- ChunkCacheEngineGCWorker
負責將 cacheEngine 中的數據寫入磁盤的線程。用于清理 cacheEngine,并將磁盤的隨機寫變成順序寫的線程。BackgroundRoutineService 每隔60秒,或者寫入時 cacheEngine 的占用量超過 OLAPCacheEngineSize 的30%,就會觸發 ChunkCacheEngineGCWorker 將 cacheEngine 中的數據寫入磁盤。cacheEngine 中一張表的一個分區數據稱為一個 tabletCache,ChunkCacheEngineGCWorker 寫入磁盤時會根據每個 tabletCache 的大小和在 cacheEngine 中存在時間決定寫入磁盤優先級,tabletCache 的大小越大、存在時間越久寫入磁盤優先級越高。
2.3 TSDB 引擎(1.30.X 版本 server 無此類線程)
除了將數據寫入磁盤外,TSDB 引擎的線程還要負責 cacheEngine 中數據的排序與合并,并維護磁盤上的 levelFile,以提高讀寫性能。這些線程也僅在數據節點上存在。
- TableAsyncSortRunner
異步地對 TSDB cacheEngine 中的表進行排序的線程。TSDB 在寫入 cacheEngine 時,如果 cacheEngine 中的表太大,會影響查詢性能,因此需要進行排序。但若同步進行排序,會影響寫入性能。所以 DolphinDB 提供此線程異步地對表進行排序??梢酝ㄟ^TSDBAsyncSortingWorkerNum
來控制排序線程的數量,默認值為1。也可以通過函數disableTSDBAsyncSorting
和enableTSDBAsyncSorting
,來手動開啟和關閉異步排序功能。 - CacheDumpTaskDispatcher
分配 cacheEngine 寫入磁盤任務的線程。線程數量固定為1。 當 cacheEngine 的內存占用大于 TSDBCacheEngineSize 時,系統會對 cacheEngine 做一次快照,并將快照送到 CacheDumpTaskDispatcher 線程準備寫入磁盤。CacheDumpTaskDispatcher 線程將任務分配給 ChunkCacheDumpRunner 線程,由該線程寫入磁盤。若磁盤上存在需要合并的 levelFile,則交由 MergeRunner 線程進行合并。 - ChunkCacheDumpRunner
將 cacheEngine 中的數據寫入磁盤的線程。線程的個數等于volumes
的配置值。 - MergeRunner
對磁盤上 levelFile 進行合并的線程,線程的個數等于volumes
的配置值。 - DelLevelFileRunner
檢查并刪除無效的 levelFile(即已經被合并的較小 size 的文件)的線程。線程數量固定為1。每隔30秒會自動執行一次。
2.4 數據恢復
數據恢復(recovery)相關線程負責節點宕機,或者數據損壞時,數據副本間的數據恢復。
- RecoveryReportingService
在數據節點上,任何一個 chunk 發生數據錯誤或者版本號不一致都通過該線程來向控制節點匯報。每個數據節點有且僅有一個該線程。 - RecoveryWorker
發生 recovery 時,數據恢復的源節點將數據發送給目標節點的線程。該線程僅存在于數據節點,個數可由recoveryWorkers
配置,默認值為1??梢酝ㄟ^resetRecoveryWorkerNum
函數動態修改線程個數,通過getRecoveryWorkerNum
函數獲取實際 RecoveryWorker 線程數量。 - RecoverMetaLogWriter 和 RecoverRedoLogDataWriter
在線恢復(onlineRecovery)過程中,為了避免節點宕機或離線影響恢復過程,會分別通過 RecoverMetaLogWriter 和 RecoverRedoLogDataWriter 寫 recover redoLog 的元數據(Metadata)和數據(data)。與 redoLog 不同的是,recover redoLog 的 Metadata 和 data 需要進行寫磁盤時才能開始 recovery。通過enableDfsRecoverRedo
配置是否開啟 recover redoLog,默認是開啟。開啟后,在每個數據節點上存在一個相應的線程。recover redoLog 的文件目錄也可以通過recoverLogDir
配置,默認在節點根目錄下的log/recoverLog
中。 - DFSChunkRecoveryWorker
在控制節點上處理 recovery 任務的線程。同時進行 recovery 任務的數量默認為集群中數據節點個數的2倍,可由dfsRecoveryConcurrency
配置。
2.5 事務相關
如果集群中的某個節點在處理事務的過程中宕機了,那么重啟后僅依靠該節點有可能無法確定事務的最終狀態,需要在集群中進行事務決議來確定。
- UnresolvedTransactionReporter
在數據節點啟動時,如果數據節點自己不能判斷某些事務的狀態,通過該線程來向控制節點匯報并發起事務決議,判斷事務最終處于回滾還是完成狀態。該線程只有一個,且在所有需要決議的事務決議后結束。 - DFSTransactionResolutionWorker
該線程處理由數據節點發起的事務決議、控制節點啟動時回放元數據后無法決定狀態的事務或運行時超時未更新狀態的事務。在控制節點上存在一個該線程。 - ChunkNodeLogWriter
數據節點寫元數據的線程。元數據默認在各個數據節點根目錄下的storage/CHUNK_METADATA
中,可以通過配置項chunkMetaDir
修改。在數據節點上存在一個該線程。 - EditLogBatchWriter
控制節點寫元數據的線程。由于對控制節點上元數據的修改比較頻繁,所以由該線程統一將寫入緩沖區的數據寫入磁盤并同步,同時還對寫元數據失敗的情況進行回滾處理。在控制節點上存在一個該線程。
2.6 其他
- SnapshotEngineWorker
為減少開啟快照引擎對寫入的影響,而將分布式表數據異步寫入快照引擎的線程。在數據節點上存在一個該線程??梢酝ㄟ^函數registerSnapshotEngine
和unregisterSnapshotEngine
對一個分布式表注冊和取消注冊快照引擎。 - DFSChunkRebalanceWorker
節點間平衡數據或者多塊磁盤間平衡數據的任務,均交由控制節點上的 DFSChunkRebalanceWorker 線程處理。在控制節點上存在一個該線程。同時發起的數據平衡任務數量默認為集群中數據節點個數的兩倍??捎?nbsp;dfsRebalanceConcurrency
配置。通過函數rebalanceChunksWithinDataNode
和rebalanceChunksAmongDataNodes
手動觸發節點或磁盤間的數據平衡。
3.流數據
本節通過發布訂閱、計算引擎和高可用三個模塊介紹流數據相關線程。這些線程都僅在數據節點或單節點上存在。
3.1 發布訂閱
以下為數據節點上普通流表的訂閱發布流程中涉及到的線程。
- MessageThrottle
實現流數據訂閱 throttle 參數功能的線程,數量為1。系統每隔一段時間檢查當前節點上是否存在經過 throttle 時間但仍未達到 batchSize 的訂閱(subscribeTable 函數中指定了 batchSize 和 throttle )。如果存在,則觸發一次訂閱的消息處理。通過subThrottle
配置觸發檢查的間隔時間,默認值為1000,單位為毫秒。 - AsynchronousPublisher
在 AsynchronousPublisher 線程中檢查每個發布節點對每個訂閱節點建立的連接。如果這個連接對應的發布隊列有更新,就將更新的數據發布到訂閱端。通過maxPubConnections
配置發布節點連接的訂閱節點數量上限,默認值為0,表示不可以作為發布節點,即不會創建 AsynchronousPublisher 線程,大于0時會創建一個該線程。 - AsynchronousSubscriber
監聽所有的訂閱連接,接收、解析連接上收到的數據,并發送到相應的訂閱消息隊列。配置了subPort之后,會創建一個該線程。 通過maxSubConnections
配置一個訂閱節點可以連接的發布節點數量上限,默認值為64。 - LocalSubscriberImp
在 LocalSubscriberImp 線程中直接檢查有數據更新的本地訂閱,并將符合條件的本地訂閱中的數據發送到訂閱消息隊列中。配置 subPort 后,會創建一個該線程。 - StreamExecutor
StreamExecutor 線程從訂閱消息隊列中取出數據,寫入相應訂閱的 handler 中,同時維護訂閱的偏移量、消息總數等信息。每個訂閱消息隊列對應一個 StreamExecutor 線程,數量由配置項 subExecutors 決定,默認值為1,最大不超過 CPU 核數。 - PersistenceWorker
以異步方式持久化的流表會通過 PersistenceWorker 線程將數據寫到磁盤上。persistenceWorkerNum
控制持久化線程的數量,默認為1。由persistenceDir
配置開啟持久化的流表的保存路徑。 - AsynchronousReconnector
針對所有設置參數 reconnect=true 的訂閱,系統會在非正常中斷后通過該線程嘗試自動重連。在配置了subPort
之后,會創建一個該線程。
3.2 計算引擎
創建計算引擎時,若配置了如下參數,便會創建兩個線程:CheckTimeExecutor 和 SystemTimeExecutor。
- CheckTimeExecutor
包括 TimeSeriesCheckTimeExecutor, SessionCheckTimeExecutor, CSEngineCheckTimeExecutor, AsofJoinCheckTimeExecutor 和 LookupJoinCheckTimeExecutor。
在創建 TimeSeriesEngine 時設置了 updateTime、創建 SessionWindowEngine 時設置了 forceTriggerTime、創建 CrossSectionalEngine 時設置了 triggeringPattern=“interval”、創建 AsofJoinEngine 時設置了 delayedTime、創建 LookupJoinEngine 時設置了 checkTimes,那么每個引擎就會創建一個 CheckTimeExecutor 線程,表示如果經過了參數設置的時間還未觸發計算,則強制觸發一次引擎的計算。 - SystemTimeExecutor
包括 TimeSeriesSystemTimeExecutor, SessionSystemTimeExecutor, CrossSectionalEngineExecutor 和 WindowJoinSystemTimeExecutor。
在創建 TimeSeriesEngine, SessionWindowEngine, CrossSectionalEngine 和 WindowJoinEngine 時,如果設置了 useSystemTime=true,那么每個引擎就會創建一個 SystemTimeExecutor 線程,表示每隔固定的時間觸發一次引擎的計算。
3.3 流數據高可用
配置項 streamingRaftGroups
中每個 group 都會在 group 內的節點上生成下述的三個線程。
- StreamingDataFileWriter
在 raft 的 leader 節點上向流表寫數據時,要通過該線程應用 leader 上寫數據的 entryLog,向流表寫數據。 - StreamingRaftReplayWorker
當一個節點成為某個 group 的 leader 時,就會通過該線程回放此 group 的 raftLog。 - StreamingHA::CkptWorker
為節點上的 raftLog 做 checkpoint 以回收垃圾的線程。垃圾回收的間隔可由streamingHAPurgeInterval
設置,默認值為300,單位是秒。
4.集群管理
在集群中控制節點通過心跳監控其他節點的存活狀態。
- HeartBeatSender
控制數據節點或計算節點向控制節點每隔0.5秒發送一次心跳的線程。心跳信息中同時還會匯報節點當前的一些信息(如 CPU、內存、磁盤占用)給控制節點。在數據節點或計算節點上存在一個該線程。 通過 lanCluster 控制心跳采用 udp 或 tcp 協議,當為 true 時使用 udp,false 時使用 tcp,默認值為true。 - HeartBeatReceiver
僅當 lanCluster=true 時,在控制節點和數據節點、計算節點上存在的接收 udp 心跳的線程。 - HeartBeatMonitor
僅在控制節點存在的線程。每隔一秒檢查一次是否收到集群中數據節點或計算節點的心跳信息。如果一個節點連續3次檢查都沒有心跳,就認為這個節點已經宕機了。 如果數據節點配置了 datanodeRestartInterval(值大于0),那么當節點宕機時間超過設置值,就會通過 agent 重啟該數據節點。該配置項默認值為0。 - ServiceMgmtCenter
僅在控制節點存在的線程。當一個代理節點重新上線時,通過該線程將公鑰信息保存到代理節點上。當一個數據節點重新上線時,會讓其匯報節點的所有 chunk 信息,并且在數據節點上刪除控制節點上不存在的chunk。
5.控制節點高可用
本節簡述開啟控制節點高可用之后,raft 相關的線程。對于每種線程,在 raft group 內的每個控制節點上都有且僅有一個。
- RaftTimer
負責計時(心跳發送間隔和發起選舉時間)的線程。leader 通過該線程每隔一段時間向 follower 發送心跳信息,follower 如果一段時間沒有收到 leader 的心跳,將發起選舉。 通過raftElectionTick
可以設置在 [raftElectionTick, 2*raftElectionTick] 之間的一個隨機時間后未收到 leader 的心跳將發起選舉,默認值為800,單位是10ms。 - RaftInputWorker
從輸入消息隊列取出消息應用到當前節點的線程。 - RaftOutputWorker
從輸出消息隊列取出消息并應用到相應節點的線程。 - RaftProposeWorker
處理對 raftLog 讀寫請求的線程。 - SnapshotSender
將 leader 當前狀態的快照發送給其他節點的線程。 - RaftLeaderSwitchWorker
執行 raft 節點角色切換的線程。 - DFSRaftReplayWorker
將記錄的 raftLog 應用到當前節點的線程。
6.其他
- ThreadPoolSocketGroup
在 server 的端口上監聽收到的消息請求,并交由相應的工作隊列處理。每個節點有且僅有一個線程。 - BackgroundRoutineService
server 的后臺線程,每個節點會生成 4 個該線程。server 會在該線程中注冊一些函數,這些函數會在BackgroundRoutineService 線程運行過程中每隔一段時間就被調用一次。 - LogWriter
將節點運行過程中生成的 log 寫入文件的線程。每個節點都有一個該線程。 - StdConsole
啟動 server 后在命令行窗口接收命令的線程。在 server 啟動參數中如果設置 console=true,那么就會啟動一個該線程。