前言
我们在前面的概述里面,已经描述了 batchSystem
的重要组件:
-
FSM
n * normalFsm
controlFsm
-
MailBox
n * normalMailBox
controlMailBox
-
scheduler
normalScheduler
controlScheduler
-
poller
-
router
本文将会带大家看一下,TIKV
在启动过程中,是如何构建与初始化 BatchSystem
这些组件的。
另外,BatchSystem
还有两个比较重要的异步任务:
一个是 ApplyFsm
,其是和 normalFsm
一一对应的,每当 normalFsm
内部的 raft
模块生成了 committedEntries
,就需要 ApplyFsm
将其进行 Apply
,最后落盘到 RocksDB
一个是 AsyncWriter
,专门用于处理 multiRaft
生成的 Msg
和 raftLog
,将 Msg
发送给其他 TIKV
实例,将 raftLog
通过 raftLogEngine
持久化到 RocksDB
BatchSystem
的代码有两个难点,一个是代码的结构稍微不太集中,分别分布在:
components/batch-system/src/
components/raftstore-v2/src/
components/raftstore/src/
一个是异步任务池和 channel
的大量使用,让 Msg
的流转很难理解。
本文专门研究 BatchSystem
的初始化和启动流程,重点观察各个组件在初始化过程中各个异步任务池的作用,还有各个 channel
对应的 Msg
流转分发逻辑。希望经过本文的解析,读者可以通过代码更好的理解 BatchSystem
、multiRaft
原理、Region
合并与分裂的流程。
BatchSystem 的创建
BatchSystem
创建入口是 components/server/src/server2.rs::run_impl::init_engines
, 主要创建 BatchSystem
的框架,特别是创建并且初始化 control
相关组件,例如 controlFsm
、controlMailBox
、controlScheduler
等等。
简要逻辑
components/server/src/server2.rs::run_impl::init_engines
: 创建 node,内部 StoreSystem 实现 multi raft 分布式协议
: 创建 raftLogEngine 用于存储和恢复节点 raft log
: 创建 tablet_registry 用于存储节点 KV 数据
-- src/server/raftkv2/node.rs::NodeV2::try_bootstrap_store
: 创建 NodeV2.StoreSystem
---- components/raftstore-v2/src/batch/store.rs::create_store_batch_system
: 创建并且初始化 controlFsm
------ components/batch-system/src/batch.rs::create_system
: 创建 controlMailBox、controlScheduler、normalScheduler、router
: 创建 BatchSystem
关键代码路径
init_engines
components/server/src/server2.rs
init_engines
函数主要用于创建 TIKV
各个重要模块的数据结构对象,其中:
-
node
对象代表当前TIKV
的节点实例,每一个TIKV
程序仅对应一个node
对象 -
raft_engine
对象实际上是RaftLogEngine
,Raft
接受其他节点或者自己节点产生的任何raftlog
都需要通过这个对象写入rocksdb
,是分布式系统恢复一致性的重要保证 -
tablet_registry
模块和rocksdb
强相关,TIKV
Raft
Apply
后需要持久化的任何KV
数据都需要通过tablet_registry
来写入rocksdb
-
router
对象我们在前一个文章中已经描述过,其内部包含了各个Raft Region
MailBox
,专门用于路由RPC
请求到raftStore
模块
fn init_engines(
&mut self,
flow_listener: engine_rocks::FlowListener,
) -> Arc<EnginesResourceInfo> {
...
// Create raft engine
let (raft_engine, ...) = CER::build(
&self.core.config,
...
);
...
let mut node = NodeV2::new(
&self.core.config.server,
self.pd_client.clone(),
...
);
node.try_bootstrap_store(&self.core.config.raft_store, &raft_engine)
.unwrap_or_else(|e| fatal!("failed to bootstrap store: {:?}", e));
let router = node.router().clone();
...
let builder = builder.state_storage(Arc::new(StateStorage::new(
raft_engine.clone(),
router.clone(),
)));
let factory = Box::new(builder.build());
let registry = TabletRegistry::new(factory, self.core.store_path.join("tablets"))
.unwrap_or_else(|e| fatal!("failed to create tablet registry {:?}", e));
self.tablet_registry = Some(registry.clone());
...
let router = RaftRouter::new(node.id(), router);
...
let mut engine = RaftKv2::new(router.clone(), ...);
...
self.engines = Some(TikvEngines {
raft_engine,
engine,
});
self.router = Some(router);
self.node = Some(node);
...
engines_info
}
Node::try_bootstrap_store
src/server/raftkv2/node.rs
这个函数主要功能是生成唯一的 nodeId
,然后利用 create_store_batch_system
创建 raftStore
的各个对象。
pub fn try_bootstrap_store(
&mut self,
cfg: &raftstore_v2::Config,
raft_engine: &ER,
) -> Result<()> {
let store_id = Bootstrap::new(
raft_engine,
self.cluster_id,
&*self.pd_client,
self.logger.clone(),
)
.bootstrap_store()?;
self.store.set_id(store_id);
let (router, system) = raftstore_v2::create_store_batch_system(
cfg,
store_id,
self.logger.clone(),
self.resource_ctl.clone(),
);
self.system = Some((router, system));
Ok(())
}
raftstore_v2::create_store_batch_system
components/raftstore-v2/src/batch/store.rs
-
StoreFsm
实际上就是controlFsm
,不管有多少个region
,controlFsm
只有一个,这里提前将controlFsm
创建出来 -
StoreFsm::new
会返回两个结果,一个是fsm
,一个是channel
的发送端tx
。channel 的接收端rx
是fsm
成员变量,tx
后续会绑定到controlMailBox
内部 -
create_system
将会继续创建raftStore
的其他组件对象
pub fn create_store_batch_system<EK, ER>(
cfg: &Config,
store_id: u64,
logger: Logger,
resource_ctl: Option<Arc<ResourceController>>,
) -> (StoreRouter<EK, ER>, StoreSystem<EK, ER>)
where
EK: KvEngine,
ER: RaftEngine,
{
let (store_tx, store_fsm) = StoreFsm::new(cfg, store_id, logger.clone());
let (router, system) =
batch_system::create_system(&cfg.store_batch_system, store_tx, store_fsm, resource_ctl);
let system = StoreSystem {
system,
workers: None,
schedulers: None,
logger: logger.clone(),
shutdown: Arc::new(AtomicBool::new(false)),
node_start_time: monotonic_raw_now(),
};
(StoreRouter { router, logger }, system)
}
batch_system::create_system
components/batch-system/src/batch.rs
- 上一个步骤中,
controlFsm
就是这里的controller
参数,sender
是controlFsm
channel
的发送端。因此create_system
首先就利用controlFsm
和sender
来创建controlMailBox
- 接下来,还需要创建
normal_scheduler
、control_scheduler
与poller
,这三个组件共用一套channel
- 最后,把
controlMailBox
、normal_scheduler
、control_scheduler
放入router
中去 - 使用
router
构建BatchSystem
-
pool_state_builder
一般用于动态增加或者减少poller
数量
pub fn create_system<N: Fsm, C: Fsm>(
cfg: &Config,
sender: mpsc::LooseBoundedSender<C::Message>,
controller: Box<C>,
resource_ctl: Option<Arc<ResourceController>>,
) -> (BatchRouter<N, C>, BatchSystem<N, C>) {
let control_box = BasicMailbox::new(sender, controller, ...);
let (sender, receiver) = unbounded(resource_ctl);
let normal_scheduler = NormalScheduler {
sender: sender.clone(),
...
};
let control_scheduler = ControlScheduler {
sender: sender.clone(),
};
let pool_state_builder = PoolStateBuilder {
...
fsm_receiver: receiver.clone(),
fsm_sender: sender,
...
};
let router = Router::new(control_box, normal_scheduler, control_scheduler, ...);
let system = BatchSystem {
...
router: router.clone(),
receiver,
...
workers: Arc::new(Mutex::new(Vec::new())),
...
pool_state_builder: Some(pool_state_builder),
};
(router, system)
}
进度
目前经过 init_engines
处理后,batchSystm
的当前进度为:
-
FSM
n * normalFsm
(未创建)controlFsm
(已创建、已初始化)
-
MailBox
n * normalMailBox
(未创建)controlMailBox
(已创建、已初始化)
-
scheduler
normalScheduler
(已创建、已初始化)controlScheduler
(已创建、已初始化)
-
poller
(未创建、未初始化) -
router
(已创建部分、未初始化)
初始化 BatchSystem
经过 init_engines
处理后,batchSystem
大概框架已经建立完毕,而且 control
组件基本初始化完成。接下来的流程中,batchSystem
将会着重创建并初始化剩余的组件。
简要逻辑
components/server/src/server2.rs::run_impl::init_servers
-- src/server/raftkv2/node.rs::NodeV2::start
---- components/raftstore-v2/src/batch/store.rs::StoreSystem::start
: 启动 async_write ,循环等待 multiRaft 生产的 raftlog,然后通过 raftLogEngine 写入
: 构建 StorePollerBuilder,并且通过初始化 StorePollerBuilder 构建 PeerFsm
: 使用 BatchSystem::spawn 方法构建 poller、StorePoller,启动多个异步任务 poller.poll
: 根据 StorePollerBuilder 构建 PeerFsm 来构建 mailboxes,并将它们注册到 router 成员变量里面去
: 发送控制命令 `StoreMsg::Start,创建 ApplyFsm,启动 Apply 的异步任务,对 committed entries 进行 Apply
------ components/raftstore/src/store/async_io/write.rs::StoreWriters::spawn/increase_to
----------- components/raftstore/src/store/async_io/write.rs::Worker::run
--------------- components/raftstore/src/store/async_io/write.rs::Worker::handle_msg/write_to_db
: 创建 async_write 并且开启异步协程循环,通过 async_write channel 的 rx 接收 raft log,异步写入 rocksdb
: 向外部返回 async_write channel 的 tx
------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::new
------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::init
: 创建各个 region 的 PeerFsm,也就是 normalFsm
: `PeerFsm` 内部会继续创建 `Peer`,比较关键的是创建了 `raftnode`,也就是创建了 `raft` 模块
------ components/batch-system/src/batch.rs::BatchSystem::spawn
---------- components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::build
: 构建 poll_ctx,特别地注意 schedulers.write(async_writeChannel的tx)
: 构建 Poller、StorePoller
---------- components/batch-system/src/batch.rs::Poller::poll
: 接收 fsm,然后交给 StorePoller 的 handle_normal / handle_control 对 fsm 进行处理
: 对于 raft 生成的 raftlog/msg,利用 schedulers.write(async_writeChannel的tx) 发送给 async_write 进行处理
: 对于 raft 生成的 committedEntries, 通过 peer 的 apply_scheduler(也就是 ApplyFsm 的 channel tx) 发送给 ApplyFsm 进行处理
--------------- components/raftstore-v2/src/batch/store.rs::StorePoller::handle_control
-------------------- components/raftstore-v2/src/fsm/peer.rs::PeerFsmDelegate::on_start
------------------------- components/raftstore-v2/src/operation/command/mod.rs::Peer::schedule_apply_fsm
: 创建 ApplyFsm,异步运行 handle_all_tasks,对 committedEntries 进行 Apply
关键代码路径
init_servers
components/server/src/server2.rs
主要作用调用 NodeV2::start
fn init_servers<F: KvFormat>(&mut self) -> Arc<VersionTrack<ServerConfig>> {
...
let engines = self.engines.as_mut().unwrap();
...
self.node
.as_mut()
.unwrap()
.start(
engines.raft_engine.clone(),
self.tablet_registry.clone().unwrap(),
self.router.as_ref().unwrap(),
server.transport(),
...
)
...
server_config
}
NodeV2::start
src/server/raftkv2/node.rs
主要作用调用 StoreSystem::start
pub fn start<T>(
&mut self,
raft_engine: ER,
registry: TabletRegistry<EK>,
router: &RaftRouter<EK, ER>,
...
) -> Result<()>
where
T: Transport + 'static,
{
let store_id = self.id();
if let Some(region) = Bootstrap::new(
...
)
.bootstrap_first_region(&self.store, store_id)?
{
...
registry.tablet_factory().open_tablet(ctx, &path).unwrap();
}
...
self.start_store(
raft_engine,
registry,
router,
...
)?;
Ok(())
}
fn start_store<T>(
&mut self,
raft_engine: ER,
registry: TabletRegistry<EK>,
router: &RaftRouter<EK, ER>,
...
) -> Result<()>
where
T: Transport + 'static,
{
...
let system = &mut self.system.as_mut().unwrap().1;
system.start(
store_id,
store_cfg,
raft_engine,
registry,
...
)?;
Ok(())
}
StoreSystem::start
components/raftstore-v2/src/batch/store.rs
- 启动
async_write
,循环等待multiRaft
生产的raftlog
,然后通过raftLogEngine
写入 - 构建
StorePollerBuilder
,并且通过初始化StorePollerBuilder
构建PeerFsm
- 使用
BatchSystem::spawn
方法构建poller
、StorePoller
,启动多个异步任务poller.poll
,等待normalScheduler
/controlScheduler
发来的fsm
,然后交给StorePoller
的handle_normal
/handle_control
对fsm
进行处理 - 根据
StorePollerBuilder
构建PeerFsm
来构建mailboxes
,并将它们注册到router
成员变量里面去 - 发送控制命令
StoreMsg::Start
,创建ApplyFsm
,启动Apply
的异步任务,持久化committed entries
pub fn start<T, C>(
&mut self,
store_id: u64,
cfg: Arc<VersionTrack<Config>>,
raft_engine: ER,
tablet_registry: TabletRegistry<EK>,
...
) -> Result<()>
where
T: Transport + 'static,
C: PdClient + 'static,
{
...
let mut workers = Workers::new(background, pd_worker, ...);
workers
.async_write
.spawn(store_id, raft_engine.clone(), ...)?;
...
let tablet_scheduler = workers.tablet.start_with_timer(
"tablet-worker",
tablet::Runner::new(
tablet_registry.clone(),
...
),
);
let schedulers = Schedulers {
read: read_scheduler,
pd: workers.pd.scheduler(),
tablet: tablet_scheduler,
write: workers.async_write.senders(),
...
};
let builder = StorePollerBuilder::new(
cfg.clone(),
store_id,
raft_engine.clone(),
tablet_registry,
router.clone(),
schedulers.clone(),
...
store_meta.clone(),
...
);
self.schedulers = Some(schedulers);
let peers = builder.init()?;
self.system.spawn(tag, builder.clone());
let apply_pool = builder.apply_pool.clone();
let refresh_config_runner = refresh_config::Runner::new(
...
self.system.build_pool_state(builder),
...
);
assert!(workers.refresh_config_worker.start(refresh_config_runner));
self.workers = Some(workers);
let mut mailboxes = Vec::with_capacity(peers.len());
let mut address = Vec::with_capacity(peers.len());
{
let mut meta = store_meta.as_ref().lock().unwrap();
for (region_id, (tx, mut fsm)) in peers {
...
address.push(region_id);
mailboxes.push((
region_id,
BasicMailbox::new(tx, fsm, router.state_cnt().clone()),
));
}
}
router.register_all(mailboxes);
// Make sure Msg::Start is the first message each FSM received.
let watch = Arc::new(ReplayWatch::new(self.logger.clone()));
for addr in address {
router
.force_send(addr, PeerMsg::Start(Some(watch.clone())))
.unwrap();
}
router.send_control(StoreMsg::Start).unwrap();
Ok(())
}
StorePollerBuilder::new
components/raftstore-v2/src/batch/store.rs
- 构建
apply_pool
,为后续ApplyFsm
的运行启动做准备 - 返回
StorePollerBuilder
pub fn new(
cfg: Arc<VersionTrack<Config>>,
store_id: u64,
engine: ER,
tablet_registry: TabletRegistry<EK>,
trans: T,
router: StoreRouter<EK, ER>,
schedulers: Schedulers<EK, ER>,
...
) -> Self {
...
let apply_pool = YatpPoolBuilder::new(DefaultTicker::default())
...
.build_future_pool();
let global_stat = GlobalStoreStat::default();
StorePollerBuilder {
...
}
}
StorePollerBuilder::init
components/raftstore-v2/src/batch/store.rs
- 创建各个
region
的PeerFsm
,也就是normalFsm
PeerFsm
内部会继续创建Peer
,比较关键的是创建了raftnode
,也就是创建了raft
模块
fn init(&self) -> Result<HashMap<u64, SenderFsmPair<EK, ER>>> {
let mut regions = HashMap::default();
let cfg = self.cfg.value();
self.engine
.for_each_raft_group::<Error, _>(&mut |region_id| {
let storage = match Storage::new(
region_id,
self.store_id,
self.engine.clone(),
self.schedulers.read.clone(),
&self.logger,
)? {
Some(p) => p,
None => return Ok(()),
};
...
let (sender, peer_fsm) = PeerFsm::new(
&cfg,
&self.tablet_registry,
self.key_manager.as_deref(),
&self.snap_mgr,
storage,
)?;
...
Ok(())
})?;
Ok(regions)
}
impl<EK: KvEngine, ER: RaftEngine> PeerFsm<EK, ER> {
pub fn new(
cfg: &Config,
tablet_registry: &TabletRegistry<EK>,
key_manager: Option<&DataKeyManager>,
snap_mgr: &TabletSnapManager,
storage: Storage<EK, ER>,
) -> Result<SenderFsmPair<EK, ER>> {
let peer = Peer::new(cfg, tablet_registry, key_manager, snap_mgr, storage)?;
let (tx, rx) = mpsc::loose_bounded(cfg.notify_capacity);
let fsm = Box::new(PeerFsm {
peer,
mailbox: None,
receiver: rx,
...
});
Ok((tx, fsm))
}
}
impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
/// Creates a new peer.
///
/// If peer is destroyed, `None` is returned.
pub fn new(
cfg: &Config,
tablet_registry: &TabletRegistry<EK>,
key_manager: Option<&DataKeyManager>,
snap_mgr: &TabletSnapManager,
storage: Storage<EK, ER>,
) -> Result<Self> {
...
let raft_group = RawNode::new(&raft_cfg, storage, &logger)?;
let region = raft_group.store().region_state().get_region().clone();
let tablet_index = storage.region_state().get_tablet_index();
if tablet_index != 0 {
raft_group
.store()
.recover_tablet(tablet_registry, ...);
let mut ctx = TabletContext::new(®ion, Some(tablet_index));
tablet_registry.load(ctx, false)?;
}
let cached_tablet = tablet_registry.get_or_default(region_id);
let mut peer = Peer {
tablet: cached_tablet,
...
raft_group,
...
};
...
Ok(peer)
}
}
BatchSystem::spawn
components/batch-system/src/batch.rs
- 构建
poller
- 构建
StorePoller
- 启动多个异步任务
poller.poll
,等待normalScheduler
/controlScheduler
发来的fsm
,然后交给StorePoller
的handle_normal
/handle_control
对fsm
进行处理
pub fn spawn<B>(&mut self, name_prefix: String, mut builder: B)
where
B: HandlerBuilder<N, C>,
B::Handler: Send + 'static,
{
for i in 0..self.pool_size {
self.start_poller(
Priority::Normal,
&mut builder,
);
}
...
}
fn start_poller<B>(&mut self, name: String, priority: Priority, builder: &mut B)
where
B: HandlerBuilder<N, C>,
B::Handler: Send + 'static,
{
let handler = builder.build(priority);
let receiver = match priority {
Priority::Normal => self.receiver.clone(),
...
};
let mut poller = Poller {
router: self.router.clone(),
fsm_receiver: receiver,
handler,
...
};
let t = thread::Builder::new()
.name(name)
.spawn_wrapper(move || {
...
poller.poll();
})
.unwrap();
self.workers.lock().unwrap().push(t);
}
StorePollerBuilder::build
components/raftstore-v2/src/batch/store.rs
- 构建
StorePoller
,其handle_normal
/handle_control
方法会对fsm
进行解析,获取其msg
,递交给raft
模块进行进一步处理 - 对于
raft
模块生成的raftlog
,会使用schedulers.write
通过async_write/raftLogEngine
将日志存储到rocksdb
进行持久化 - 对于
raft
模块生成的apply entries
,会通过ApplyFsm
使用tablet_registry
存储到rocksdb
fn build(&mut self, _priority: batch_system::Priority) -> Self::Handler {
...
let mut poll_ctx = StoreContext {
store_id: self.store_id,
trans: self.trans.clone(),
...
router: self.router.clone(),
...
schedulers: self.schedulers.clone(),
store_meta: self.store_meta.clone(),
...
engine: self.engine.clone(),
tablet_registry: self.tablet_registry.clone(),
apply_pool: self.apply_pool.clone(),
...
};
...
StorePoller::new(poll_ctx, ...)
}
Poller::poll
poll
函数主要是循环等待 channel
的接收端 fsm_receiver
发送 fsm
,然后使用 handler
,也就是 StorePoller
进行处理。
components/batch-system/src/batch.rs
pub fn poll(&mut self) {
...
let mut run = true;
while run && self.fetch_fsm(&mut batch) {
...
if batch.control.is_some() {
let len = self.handler.handle_control(batch.control.as_mut().unwrap());
...
}
for (i, p) in batch.normals.iter_mut().enumerate() {
...
let res = self.handler.handle_normal(p);
...
}
let mut fsm_cnt = batch.normals.len();
while batch.normals.len() < max_batch_size {
if let Ok(fsm) = self.fsm_receiver.try_recv() {
run = batch.push(fsm);
}
if !run || fsm_cnt >= batch.normals.len() {
break;
}
let p = batch.normals[fsm_cnt].as_mut().unwrap();
let res = self.handler.handle_normal(p);
}
...
}
...
}
fn fetch_fsm(&mut self, batch: &mut Batch<N, C>) -> bool {
if batch.control.is_some() {
return true;
}
if let Ok(fsm) = self.fsm_receiver.try_recv() {
return batch.push(fsm);
}
if batch.is_empty() {
self.handler.pause();
if let Ok(fsm) = self.fsm_receiver.recv() {
return batch.push(fsm);
}
}
!batch.is_empty()
}
StorePoller::handle_control
前面 StoreSystem::start
发出的 StoreMsg::Start
消息会触发 StorePoller::handle_control
,进而触发 ApplyFsm
的构建:
components/raftstore-v2/src/batch/store.rs
fn handle_control(&mut self, fsm: &mut StoreFsm) -> Option<usize> {
...
let mut delegate = StoreFsmDelegate::new(fsm, &mut self.poll_ctx);
delegate.handle_msgs(&mut self.store_msg_buf);
...
}
components/raftstore-v2/src/fsm/peer.rs:
pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec<StoreMsg>)
where
T: Transport,
{
for msg in store_msg_buf.drain(..) {
match msg {
StoreMsg::Start => self.on_start(),
StoreMsg::Tick(tick) => self.on_tick(tick),
...
}
}
}
fn on_start(&mut self, watch: Option<Arc<ReplayWatch>>) {
...
if self.fsm.peer.storage().is_initialized() {
self.fsm.peer.schedule_apply_fsm(self.store_ctx);
}
...
}
Peer::schedule_apply_fsm
- 构建
ApplyFsm
- 启动
handle_all_tasks
异步任务来接受apply_scheduler
发送过来的apply entries
components/raftstore-v2/src/operation/command/mod.rs
pub fn schedule_apply_fsm<T>(&mut self, store_ctx: &mut StoreContext<EK, ER, T>) {
...
let (apply_scheduler, mut apply_fsm) = ApplyFsm::new(
&store_ctx.cfg,
self.peer().clone(),
region_state,
mailbox,
...
);
store_ctx
.apply_pool
.spawn(async move { apply_fsm.handle_all_tasks().await })
.unwrap();
self.set_apply_scheduler(apply_scheduler);
}
ApplyFsm::handle_all_tasks
Apply
流程异步处理任务,用于处理 raft
生成的 CommittedEntries
,对其进行 Apply
components/raftstore-v2/src/fsm/apply.rs
impl<EK: KvEngine, R: ApplyResReporter> ApplyFsm<EK, R> {
pub async fn handle_all_tasks(&mut self) {
loop {
...
let res = futures::select! {
res = self.receiver.next().fuse() => res,
...
};
...
loop {
match task {
// TODO: flush by buffer size.
ApplyTask::CommittedEntries(ce) => self.apply.apply_committed_entries(ce).await,
...
}
...
}
}
}
}
StoreWriters::spawn
这个异步任务专门处理 raft
生成的 raft log
与 raft msg
对于 raft log
,使用 raft_engine
写入到 rocksdb
对于 raft msg
,使用 trans
发送给其他 tikv
实例
components/raftstore/src/store/async_io/write.rs
impl<EK, ER> StoreWriters<EK, ER>
where
EK: KvEngine,
ER: RaftEngine,
{
pub fn senders(&self) -> WriteSenders<EK, ER> {
WriteSenders::new(self.writers.clone())
}
pub fn spawn<T: Transport + 'static, N: PersistedNotifier>(
&mut self,
store_id: u64,
raft_engine: ER,
kv_engine: Option<EK>,
notifier: &N,
trans: &T,
cfg: &Arc<VersionTrack<Config>>,
) -> Result<()> {
let pool_size = cfg.value().store_io_pool_size;
if pool_size > 0 {
self.increase_to(
pool_size,
StoreWritersContext {
store_id,
notifier: notifier.clone(),
raft_engine,
kv_engine,
transfer: trans.clone(),
cfg: cfg.clone(),
},
)?;
}
Ok(())
}
pub fn increase_to<T: Transport + 'static, N: PersistedNotifier>(
&mut self,
size: usize,
writer_meta: StoreWritersContext<EK, ER, T, N>,
) -> Result<()> {
...
self.writers
.update(move |writers: &mut SharedSenders<EK, ER>| -> Result<()> {
let mut cached_senders = writers.get();
for i in current_size..size {
let (tx, rx) = bounded(
writer_meta.cfg.value().store_io_notify_capacity,
);
let mut worker = Worker::new(
...
rx,
...
);
let t =
thread::Builder::new()
.name(thd_name!(tag))
.spawn_wrapper(move || {
set_io_type(IoType::ForegroundWrite);
worker.run();
})?;
cached_senders.push(tx);
handlers.push(t);
}
writers.set(cached_senders);
Ok(())
})?;
Ok(())
}
}
impl<EK, ER, N, T> Worker<EK, ER, N, T>
where
EK: KvEngine,
ER: RaftEngine,
N: PersistedNotifier,
T: Transport,
{
fn run(&mut self) {
let mut stopped = false;
while !stopped {
let handle_begin = match self.receiver.recv() {
Ok(msg) => {
stopped |= self.handle_msg(msg);
}
...
};
...
self.write_to_db(true);
}
}
pub fn write_to_db(&mut self, notify: bool) {
...
let mut write_raft_time = 0f64;
if !self.batch.raft_wbs[0].is_empty() {
...
for i in 0..self.batch.raft_wbs.len() {
self.raft_engine
.consume_and_shrink(
&mut self.batch.raft_wbs[i],
...
)
...
}
self.batch.raft_wbs.truncate(1);
...
}
for task in &mut self.batch.tasks {
for msg in task.messages.drain(..) {
...
if let Err(e) = self.trans.send(msg) {
...
}...
}
}
...
}
}
总结
BatchSystem
的整体代码充斥着各种异步任务,还有各种 channel
的发送和接收。如果没有对其初始化比较了解的话,可能很难看得懂 msg
是如何在整个系统中流转的。
经过本文的流程梳理,相信大家已经对整体 BatchSystem
有了比较熟悉的认知,后续研究 Raft Msg
流转,或者 Region
的 Merge
与 Split
应该比较有信心了。