0
2
2
0
专栏/.../

TIKV 源码学习笔记--BatchSystem 创建初始化流程

 ylldty  发表于  2024-03-13

前言

我们在前面的概述里面,已经描述了 batchSystem 的重要组件:

  • FSM

    • n * normalFsm
    • controlFsm
  • MailBox

    • n * normalMailBox
    • controlMailBox
  • scheduler

    • normalScheduler
    • controlScheduler
  • poller

  • router

本文将会带大家看一下,TIKV 在启动过程中,是如何构建与初始化 BatchSystem 这些组件的。

另外,BatchSystem 还有两个比较重要的异步任务:

一个是 ApplyFsm,其是和 normalFsm 一一对应的,每当 normalFsm 内部的 raft 模块生成了 committedEntries,就需要 ApplyFsm 将其进行 Apply,最后落盘到 RocksDB

一个是 AsyncWriter,专门用于处理 multiRaft 生成的 MsgraftLog,将 Msg 发送给其他 TIKV 实例,将 raftLog 通过 raftLogEngine 持久化到 RocksDB

BatchSystem 的代码有两个难点,一个是代码的结构稍微不太集中,分别分布在:

  • components/batch-system/src/
  • components/raftstore-v2/src/
  • components/raftstore/src/

一个是异步任务池和 channel 的大量使用,让 Msg 的流转很难理解。

本文专门研究 BatchSystem 的初始化和启动流程,重点观察各个组件在初始化过程中各个异步任务池的作用,还有各个 channel 对应的 Msg 流转分发逻辑。希望经过本文的解析,读者可以通过代码更好的理解 BatchSystemmultiRaft 原理、Region 合并与分裂的流程。

BatchSystem 的创建

BatchSystem 创建入口是 components/server/src/server2.rs::run_impl::init_engines, 主要创建 BatchSystem 的框架,特别是创建并且初始化 control 相关组件,例如 controlFsmcontrolMailBoxcontrolScheduler 等等。

简要逻辑

components/server/src/server2.rs::run_impl::init_engines
: 创建 node,内部 StoreSystem 实现 multi raft 分布式协议
: 创建 raftLogEngine 用于存储和恢复节点 raft log
: 创建 tablet_registry 用于存储节点 KV 数据

-- src/server/raftkv2/node.rs::NodeV2::try_bootstrap_store
   : 创建 NodeV2.StoreSystem

---- components/raftstore-v2/src/batch/store.rs::create_store_batch_system
     : 创建并且初始化 controlFsm

------ components/batch-system/src/batch.rs::create_system
       : 创建 controlMailBox、controlScheduler、normalScheduler、router
       : 创建 BatchSystem

关键代码路径

init_engines

components/server/src/server2.rs

init_engines 函数主要用于创建 TIKV 各个重要模块的数据结构对象,其中:

  • node 对象代表当前 TIKV 的节点实例,每一个 TIKV 程序仅对应一个 node 对象
  • raft_engine 对象实际上是 RaftLogEngineRaft 接受其他节点或者自己节点产生的任何 raftlog 都需要通过这个对象写入 rocksdb,是分布式系统恢复一致性的重要保证
  • tablet_registry 模块和 rocksdb 强相关, TIKV Raft Apply 后需要持久化的任何 KV 数据都需要通过 tablet_registry 来写入 rocksdb
  • router 对象我们在前一个文章中已经描述过,其内部包含了各个 Raft Region MailBox,专门用于路由 RPC 请求到 raftStore 模块
fn init_engines(
        &mut self,
        flow_listener: engine_rocks::FlowListener,
    ) -> Arc<EnginesResourceInfo> {
        ...

        // Create raft engine
        let (raft_engine, ...) = CER::build(
            &self.core.config,
            ...
        );
        ...

        let mut node = NodeV2::new(
            &self.core.config.server,
            self.pd_client.clone(),
            ...
        );
        node.try_bootstrap_store(&self.core.config.raft_store, &raft_engine)
            .unwrap_or_else(|e| fatal!("failed to bootstrap store: {:?}", e));
        let router = node.router().clone();
        ...
        

        let builder = builder.state_storage(Arc::new(StateStorage::new(
            raft_engine.clone(),
            router.clone(),
        )));
        let factory = Box::new(builder.build());
        let registry = TabletRegistry::new(factory, self.core.store_path.join("tablets"))
            .unwrap_or_else(|e| fatal!("failed to create tablet registry {:?}", e));
        self.tablet_registry = Some(registry.clone());
        ...
       

        let router = RaftRouter::new(node.id(), router);
        ...
        
        let mut engine = RaftKv2::new(router.clone(), ...);
        ...

        self.engines = Some(TikvEngines {
            raft_engine,
            engine,
        });
        self.router = Some(router);
        self.node = Some(node);
        ...

        engines_info
    }

Node::try_bootstrap_store

src/server/raftkv2/node.rs

这个函数主要功能是生成唯一的 nodeId,然后利用 create_store_batch_system 创建 raftStore 的各个对象。

pub fn try_bootstrap_store(
        &mut self,
        cfg: &raftstore_v2::Config,
        raft_engine: &ER,
    ) -> Result<()> {
        let store_id = Bootstrap::new(
            raft_engine,
            self.cluster_id,
            &*self.pd_client,
            self.logger.clone(),
        )
        .bootstrap_store()?;
        self.store.set_id(store_id);

        let (router, system) = raftstore_v2::create_store_batch_system(
            cfg,
            store_id,
            self.logger.clone(),
            self.resource_ctl.clone(),
        );
        self.system = Some((router, system));
        Ok(())
    }

raftstore_v2::create_store_batch_system

components/raftstore-v2/src/batch/store.rs

  • StoreFsm 实际上就是 controlFsm,不管有多少个 regioncontrolFsm 只有一个,这里提前将 controlFsm 创建出来
  • StoreFsm::new 会返回两个结果,一个是 fsm,一个是 channel 的发送端 tx。channel 的接收端 rxfsm 成员变量,tx 后续会绑定到 controlMailBox 内部
  • create_system 将会继续创建 raftStore 的其他组件对象
pub fn create_store_batch_system<EK, ER>(
    cfg: &Config,
    store_id: u64,
    logger: Logger,
    resource_ctl: Option<Arc<ResourceController>>,
) -> (StoreRouter<EK, ER>, StoreSystem<EK, ER>)
where
    EK: KvEngine,
    ER: RaftEngine,
{
    let (store_tx, store_fsm) = StoreFsm::new(cfg, store_id, logger.clone());
    let (router, system) =
        batch_system::create_system(&cfg.store_batch_system, store_tx, store_fsm, resource_ctl);
    let system = StoreSystem {
        system,
        workers: None,
        schedulers: None,
        logger: logger.clone(),
        shutdown: Arc::new(AtomicBool::new(false)),
        node_start_time: monotonic_raw_now(),
    };
    (StoreRouter { router, logger }, system)
}

batch_system::create_system

components/batch-system/src/batch.rs

  • 上一个步骤中,controlFsm 就是这里的 controller 参数,sendercontrolFsm channel 的发送端。因此 create_system 首先就利用 controlFsmsender 来创建 controlMailBox
  • 接下来,还需要创建 normal_schedulercontrol_schedulerpoller,这三个组件共用一套 channel
  • 最后,把 controlMailBoxnormal_schedulercontrol_scheduler 放入 router 中去
  • 使用 router 构建 BatchSystem
  • pool_state_builder 一般用于动态增加或者减少 poller 数量
pub fn create_system<N: Fsm, C: Fsm>(
    cfg: &Config,
    sender: mpsc::LooseBoundedSender<C::Message>,
    controller: Box<C>,
    resource_ctl: Option<Arc<ResourceController>>,
) -> (BatchRouter<N, C>, BatchSystem<N, C>) {
    let control_box = BasicMailbox::new(sender, controller, ...);
    let (sender, receiver) = unbounded(resource_ctl);
    
    let normal_scheduler = NormalScheduler {
        sender: sender.clone(),
        ...
    };
    let control_scheduler = ControlScheduler {
        sender: sender.clone(),
    };

    let pool_state_builder = PoolStateBuilder {
        ...
        fsm_receiver: receiver.clone(),
        fsm_sender: sender,
        ...
    };
    let router = Router::new(control_box, normal_scheduler, control_scheduler, ...);
    let system = BatchSystem {
        ...
        router: router.clone(),
        receiver,
        ...
        workers: Arc::new(Mutex::new(Vec::new())),
        ...
        pool_state_builder: Some(pool_state_builder),
    };
    (router, system)
}

进度

目前经过 init_engines 处理后,batchSystm 的当前进度为:

  • FSM

    • n * normalFsm (未创建)
    • controlFsm (已创建、已初始化)
  • MailBox

    • n * normalMailBox (未创建)
    • controlMailBox (已创建、已初始化)
  • scheduler

    • normalScheduler (已创建、已初始化)
    • controlScheduler (已创建、已初始化)
  • poller (未创建、未初始化)

  • router (已创建部分、未初始化)

初始化 BatchSystem

经过 init_engines 处理后,batchSystem 大概框架已经建立完毕,而且 control 组件基本初始化完成。接下来的流程中,batchSystem 将会着重创建并初始化剩余的组件。

简要逻辑

components/server/src/server2.rs::run_impl::init_servers
-- src/server/raftkv2/node.rs::NodeV2::start
---- components/raftstore-v2/src/batch/store.rs::StoreSystem::start
     : 启动 async_write ,循环等待 multiRaft 生产的 raftlog,然后通过 raftLogEngine 写入
     : 构建 StorePollerBuilder,并且通过初始化 StorePollerBuilder 构建 PeerFsm
     : 使用 BatchSystem::spawn 方法构建 poller、StorePoller,启动多个异步任务 poller.poll
     : 根据 StorePollerBuilder 构建 PeerFsm 来构建 mailboxes,并将它们注册到 router 成员变量里面去
     : 发送控制命令 `StoreMsg::Start,创建 ApplyFsm,启动 Apply 的异步任务,对 committed entries 进行 Apply

------ components/raftstore/src/store/async_io/write.rs::StoreWriters::spawn/increase_to
----------- components/raftstore/src/store/async_io/write.rs::Worker::run
--------------- components/raftstore/src/store/async_io/write.rs::Worker::handle_msg/write_to_db
                : 创建 async_write 并且开启异步协程循环,通过 async_write channel 的 rx 接收 raft log,异步写入 rocksdb
                : 向外部返回 async_write channel 的 tx


------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::new
------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::init
       : 创建各个 region 的 PeerFsm,也就是 normalFsm
       : `PeerFsm` 内部会继续创建 `Peer`,比较关键的是创建了 `raftnode`,也就是创建了 `raft` 模块


------ components/batch-system/src/batch.rs::BatchSystem::spawn
---------- components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::build
           : 构建 poll_ctx,特别地注意 schedulers.write(async_writeChannel的tx)
           : 构建 Poller、StorePoller
---------- components/batch-system/src/batch.rs::Poller::poll
           : 接收 fsm,然后交给 StorePoller 的 handle_normal / handle_control 对 fsm 进行处理
           : 对于 raft 生成的 raftlog/msg,利用 schedulers.write(async_writeChannel的tx) 发送给 async_write 进行处理
           : 对于 raft 生成的 committedEntries, 通过 peer 的 apply_scheduler(也就是 ApplyFsm 的 channel tx) 发送给 ApplyFsm 进行处理


--------------- components/raftstore-v2/src/batch/store.rs::StorePoller::handle_control
-------------------- components/raftstore-v2/src/fsm/peer.rs::PeerFsmDelegate::on_start
------------------------- components/raftstore-v2/src/operation/command/mod.rs::Peer::schedule_apply_fsm
                          : 创建 ApplyFsm,异步运行 handle_all_tasks,对 committedEntries 进行 Apply

关键代码路径

init_servers

components/server/src/server2.rs

主要作用调用 NodeV2::start

fn init_servers<F: KvFormat>(&mut self) -> Arc<VersionTrack<ServerConfig>> {
        ...
        let engines = self.engines.as_mut().unwrap();
        ...

        
        self.node
            .as_mut()
            .unwrap()
            .start(
                engines.raft_engine.clone(),
                self.tablet_registry.clone().unwrap(),
                self.router.as_ref().unwrap(),
                server.transport(),
                ...
            )
        ...

        server_config
    }

NodeV2::start

src/server/raftkv2/node.rs

主要作用调用 StoreSystem::start

pub fn start<T>(
        &mut self,
        raft_engine: ER,
        registry: TabletRegistry<EK>,
        router: &RaftRouter<EK, ER>,
        ...
    ) -> Result<()>
    where
        T: Transport + 'static,
    {
        let store_id = self.id();
        if let Some(region) = Bootstrap::new(
            ...
        )
        .bootstrap_first_region(&self.store, store_id)?
        {
            ...
            registry.tablet_factory().open_tablet(ctx, &path).unwrap();
        }

        ...

        self.start_store(
            raft_engine,
            registry,
            router,
            ...
        )?;

        Ok(())
    }

fn start_store<T>(
        &mut self,
        raft_engine: ER,
        registry: TabletRegistry<EK>,
        router: &RaftRouter<EK, ER>,
        ...
    ) -> Result<()>
    where
        T: Transport + 'static,
    {
        ...

        let system = &mut self.system.as_mut().unwrap().1;
        system.start(
            store_id,
            store_cfg,
            raft_engine,
            registry,
            ...
        )?;
        Ok(())
    }

StoreSystem::start

components/raftstore-v2/src/batch/store.rs

  • 启动 async_write ,循环等待 multiRaft 生产的 raftlog,然后通过 raftLogEngine 写入
  • 构建 StorePollerBuilder,并且通过初始化 StorePollerBuilder 构建 PeerFsm
  • 使用BatchSystem::spawn 方法构建 pollerStorePoller,启动多个异步任务 poller.poll,等待 normalScheduler/controlScheduler 发来的 fsm,然后交给 StorePollerhandle_normal / handle_controlfsm 进行处理
  • 根据 StorePollerBuilder 构建 PeerFsm 来构建 mailboxes,并将它们注册到 router 成员变量里面去
  • 发送控制命令 StoreMsg::Start,创建 ApplyFsm,启动 Apply 的异步任务,持久化 committed entries
pub fn start<T, C>(
        &mut self,
        store_id: u64,
        cfg: Arc<VersionTrack<Config>>,
        raft_engine: ER,
        tablet_registry: TabletRegistry<EK>,
        ...
    ) -> Result<()>
    where
        T: Transport + 'static,
        C: PdClient + 'static,
    {
        
        ...

        let mut workers = Workers::new(background, pd_worker, ...);
        workers
            .async_write
            .spawn(store_id, raft_engine.clone(), ...)?;

        ...

        let tablet_scheduler = workers.tablet.start_with_timer(
            "tablet-worker",
            tablet::Runner::new(
                tablet_registry.clone(),
                ...
            ),
        );

        let schedulers = Schedulers {
            read: read_scheduler,
            pd: workers.pd.scheduler(),
            tablet: tablet_scheduler,
            write: workers.async_write.senders(),
            ...
        };

        let builder = StorePollerBuilder::new(
            cfg.clone(),
            store_id,
            raft_engine.clone(),
            tablet_registry,
            router.clone(),
            schedulers.clone(),
            ...
            store_meta.clone(),
            ...
        );

        self.schedulers = Some(schedulers);
        let peers = builder.init()?;
        self.system.spawn(tag, builder.clone());

        
        let apply_pool = builder.apply_pool.clone();
        let refresh_config_runner = refresh_config::Runner::new(
            ...
            self.system.build_pool_state(builder),
            ...
        ); 
        assert!(workers.refresh_config_worker.start(refresh_config_runner));      
        self.workers = Some(workers);

        let mut mailboxes = Vec::with_capacity(peers.len());
        let mut address = Vec::with_capacity(peers.len());
        {
            let mut meta = store_meta.as_ref().lock().unwrap();
            for (region_id, (tx, mut fsm)) in peers {
                ...

                address.push(region_id);
                mailboxes.push((
                    region_id,
                    BasicMailbox::new(tx, fsm, router.state_cnt().clone()),
                ));
            }
        }
        router.register_all(mailboxes);

        // Make sure Msg::Start is the first message each FSM received.
        let watch = Arc::new(ReplayWatch::new(self.logger.clone()));
        for addr in address {
            router
                .force_send(addr, PeerMsg::Start(Some(watch.clone())))
                .unwrap();
        }
        router.send_control(StoreMsg::Start).unwrap();
        Ok(())
    }

StorePollerBuilder::new

components/raftstore-v2/src/batch/store.rs

  • 构建 apply_pool,为后续 ApplyFsm 的运行启动做准备
  • 返回 StorePollerBuilder
pub fn new(
        cfg: Arc<VersionTrack<Config>>,
        store_id: u64,
        engine: ER,
        tablet_registry: TabletRegistry<EK>,
        trans: T,
        router: StoreRouter<EK, ER>,
        schedulers: Schedulers<EK, ER>,
        ...
    ) -> Self {
        ...
        let apply_pool = YatpPoolBuilder::new(DefaultTicker::default())
            ...
            .build_future_pool();
        let global_stat = GlobalStoreStat::default();
        StorePollerBuilder {
        ...
        }
    }

StorePollerBuilder::init

components/raftstore-v2/src/batch/store.rs

  • 创建各个 regionPeerFsm,也就是 normalFsm
  • PeerFsm 内部会继续创建 Peer,比较关键的是创建了 raftnode,也就是创建了 raft 模块
fn init(&self) -> Result<HashMap<u64, SenderFsmPair<EK, ER>>> {
        let mut regions = HashMap::default();
        let cfg = self.cfg.value();
        
        self.engine
            .for_each_raft_group::<Error, _>(&mut |region_id| {       
                let storage = match Storage::new(
                    region_id,
                    self.store_id,
                    self.engine.clone(),
                    self.schedulers.read.clone(),
                    &self.logger,
                )? {
                    Some(p) => p,
                    None => return Ok(()),
                };

                ...

                let (sender, peer_fsm) = PeerFsm::new(
                    &cfg,
                    &self.tablet_registry,
                    self.key_manager.as_deref(),
                    &self.snap_mgr,
                    storage,
                )?;
                ...

                Ok(())
            })?;
        
        Ok(regions)
    }


impl<EK: KvEngine, ER: RaftEngine> PeerFsm<EK, ER> {
    pub fn new(
        cfg: &Config,
        tablet_registry: &TabletRegistry<EK>,
        key_manager: Option<&DataKeyManager>,
        snap_mgr: &TabletSnapManager,
        storage: Storage<EK, ER>,
    ) -> Result<SenderFsmPair<EK, ER>> {
        let peer = Peer::new(cfg, tablet_registry, key_manager, snap_mgr, storage)?;
        
        let (tx, rx) = mpsc::loose_bounded(cfg.notify_capacity);
        let fsm = Box::new(PeerFsm {
            peer,
            mailbox: None,
            receiver: rx,
            ...
        });
        Ok((tx, fsm))
    }
}

impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
    /// Creates a new peer.
    ///
    /// If peer is destroyed, `None` is returned.
    pub fn new(
        cfg: &Config,
        tablet_registry: &TabletRegistry<EK>,
        key_manager: Option<&DataKeyManager>,
        snap_mgr: &TabletSnapManager,
        storage: Storage<EK, ER>,
    ) -> Result<Self> {
        ...
        let raft_group = RawNode::new(&raft_cfg, storage, &logger)?;
        let region = raft_group.store().region_state().get_region().clone();
        
        let tablet_index = storage.region_state().get_tablet_index();
        if tablet_index != 0 {
            raft_group
                .store()
                .recover_tablet(tablet_registry, ...);

            let mut ctx = TabletContext::new(&region, Some(tablet_index));
            tablet_registry.load(ctx, false)?;
        }
        let cached_tablet = tablet_registry.get_or_default(region_id);
        
        let mut peer = Peer {
            tablet: cached_tablet,
            ...
            raft_group,
            ...
        };
        ...

        Ok(peer)
    }
}

BatchSystem::spawn

components/batch-system/src/batch.rs

  • 构建 poller
  • 构建 StorePoller
  • 启动多个异步任务 poller.poll,等待 normalScheduler/controlScheduler 发来的 fsm,然后交给 StorePollerhandle_normal / handle_controlfsm 进行处理
pub fn spawn<B>(&mut self, name_prefix: String, mut builder: B)
    where
        B: HandlerBuilder<N, C>,
        B::Handler: Send + 'static,
    {
        for i in 0..self.pool_size {
            self.start_poller(
                Priority::Normal,
                &mut builder,
            );
        }
        ...
    }

fn start_poller<B>(&mut self, name: String, priority: Priority, builder: &mut B)
    where
        B: HandlerBuilder<N, C>,
        B::Handler: Send + 'static,
    {
        let handler = builder.build(priority);
        let receiver = match priority {
            Priority::Normal => self.receiver.clone(),
            ...
        };
        let mut poller = Poller {
            router: self.router.clone(),
            fsm_receiver: receiver,
            handler,
            ...
        };
        
        let t = thread::Builder::new()
            .name(name)
            .spawn_wrapper(move || {
                ...
                poller.poll();
            })
            .unwrap();
        self.workers.lock().unwrap().push(t);
    }

StorePollerBuilder::build

components/raftstore-v2/src/batch/store.rs

  • 构建 StorePoller,其 handle_normal / handle_control 方法会对 fsm 进行解析,获取其 msg,递交给 raft 模块进行进一步处理
  • 对于 raft 模块生成的 raftlog,会使用 schedulers.write 通过 async_write/raftLogEngine 将日志存储到 rocksdb 进行持久化
  • 对于 raft 模块生成的 apply entries,会通过 ApplyFsm 使用 tablet_registry 存储到 rocksdb
fn build(&mut self, _priority: batch_system::Priority) -> Self::Handler {
        
        ...
        let mut poll_ctx = StoreContext {
            
            store_id: self.store_id,
            trans: self.trans.clone(),
            ...
            router: self.router.clone(),
            ...
            schedulers: self.schedulers.clone(),
            store_meta: self.store_meta.clone(),
            ...
            engine: self.engine.clone(),
            tablet_registry: self.tablet_registry.clone(),
            apply_pool: self.apply_pool.clone(),
            ...
        };

        ...
        StorePoller::new(poll_ctx, ...)
    }

Poller::poll

poll 函数主要是循环等待 channel 的接收端 fsm_receiver 发送 fsm,然后使用 handler,也就是 StorePoller 进行处理。

components/batch-system/src/batch.rs

pub fn poll(&mut self) {
        ...
        let mut run = true;
        while run && self.fetch_fsm(&mut batch) {
            ...
            if batch.control.is_some() {
                let len = self.handler.handle_control(batch.control.as_mut().unwrap());
                ...
            }

            for (i, p) in batch.normals.iter_mut().enumerate() {
                ...
                let res = self.handler.handle_normal(p);
                ...
            }
            let mut fsm_cnt = batch.normals.len();
            while batch.normals.len() < max_batch_size {
                if let Ok(fsm) = self.fsm_receiver.try_recv() {
                    run = batch.push(fsm);
                }
                
                if !run || fsm_cnt >= batch.normals.len() {
                    break;
                }
                let p = batch.normals[fsm_cnt].as_mut().unwrap();
                let res = self.handler.handle_normal(p);
                
            }
            ...
        }
        ...
    }

fn fetch_fsm(&mut self, batch: &mut Batch<N, C>) -> bool {
        if batch.control.is_some() {
            return true;
        }

        if let Ok(fsm) = self.fsm_receiver.try_recv() {
            return batch.push(fsm);
        }

        if batch.is_empty() {
            self.handler.pause();
            if let Ok(fsm) = self.fsm_receiver.recv() {
                return batch.push(fsm);
            }
        }
        !batch.is_empty()
    }

StorePoller::handle_control

前面 StoreSystem::start 发出的 StoreMsg::Start 消息会触发 StorePoller::handle_control,进而触发 ApplyFsm 的构建:

components/raftstore-v2/src/batch/store.rs

fn handle_control(&mut self, fsm: &mut StoreFsm) -> Option<usize> {
        ...
        let mut delegate = StoreFsmDelegate::new(fsm, &mut self.poll_ctx);
        delegate.handle_msgs(&mut self.store_msg_buf);
        ...
    }

components/raftstore-v2/src/fsm/peer.rs:
pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec<StoreMsg>)
    where
        T: Transport,
    {
        for msg in store_msg_buf.drain(..) {
            match msg {
                StoreMsg::Start => self.on_start(),
                StoreMsg::Tick(tick) => self.on_tick(tick),
                ...
            }
        }
    }

fn on_start(&mut self, watch: Option<Arc<ReplayWatch>>) {
        ...
        if self.fsm.peer.storage().is_initialized() {
            self.fsm.peer.schedule_apply_fsm(self.store_ctx);
        }
        ...
    }

Peer::schedule_apply_fsm

  • 构建 ApplyFsm
  • 启动 handle_all_tasks 异步任务来接受 apply_scheduler 发送过来的 apply entries

components/raftstore-v2/src/operation/command/mod.rs

pub fn schedule_apply_fsm<T>(&mut self, store_ctx: &mut StoreContext<EK, ER, T>) {
        
        ...
        let (apply_scheduler, mut apply_fsm) = ApplyFsm::new(
            &store_ctx.cfg,
            self.peer().clone(),
            region_state,
            mailbox,
            ...
        );

        store_ctx
            .apply_pool
            .spawn(async move { apply_fsm.handle_all_tasks().await })
            .unwrap();
        self.set_apply_scheduler(apply_scheduler);
    }

ApplyFsm::handle_all_tasks

Apply 流程异步处理任务,用于处理 raft 生成的 CommittedEntries,对其进行 Apply

components/raftstore-v2/src/fsm/apply.rs

impl<EK: KvEngine, R: ApplyResReporter> ApplyFsm<EK, R> {
    pub async fn handle_all_tasks(&mut self) {
        loop {
            ...
            let res = futures::select! {
                res = self.receiver.next().fuse() => res,
                ...
            };
            ...
            loop {
                match task {
                    // TODO: flush by buffer size.
                    ApplyTask::CommittedEntries(ce) => self.apply.apply_committed_entries(ce).await,
                    ...
                }

               ...
            }
        }
    }
}

StoreWriters::spawn

这个异步任务专门处理 raft 生成的 raft lograft msg

对于 raft log,使用 raft_engine 写入到 rocksdb

对于 raft msg,使用 trans 发送给其他 tikv 实例

components/raftstore/src/store/async_io/write.rs

impl<EK, ER> StoreWriters<EK, ER>
where
    EK: KvEngine,
    ER: RaftEngine,
{
    pub fn senders(&self) -> WriteSenders<EK, ER> {
        WriteSenders::new(self.writers.clone())
    }

    pub fn spawn<T: Transport + 'static, N: PersistedNotifier>(
        &mut self,
        store_id: u64,
        raft_engine: ER,
        kv_engine: Option<EK>,
        notifier: &N,
        trans: &T,
        cfg: &Arc<VersionTrack<Config>>,
    ) -> Result<()> {
        let pool_size = cfg.value().store_io_pool_size;
        if pool_size > 0 {
            self.increase_to(
                pool_size,
                StoreWritersContext {
                    store_id,
                    notifier: notifier.clone(),
                    raft_engine,
                    kv_engine,
                    transfer: trans.clone(),
                    cfg: cfg.clone(),
                },
            )?;
        }
        Ok(())
    }

    pub fn increase_to<T: Transport + 'static, N: PersistedNotifier>(
        &mut self,
        size: usize,
        writer_meta: StoreWritersContext<EK, ER, T, N>,
    ) -> Result<()> {
        ...
        self.writers
            .update(move |writers: &mut SharedSenders<EK, ER>| -> Result<()> {
                let mut cached_senders = writers.get();
                for i in current_size..size {
                    let (tx, rx) = bounded(
                        writer_meta.cfg.value().store_io_notify_capacity,
                    );
                    let mut worker = Worker::new(
                        ...
                        rx,
                        ...
                    );
                    
                    let t =
                        thread::Builder::new()
                            .name(thd_name!(tag))
                            .spawn_wrapper(move || {
                                set_io_type(IoType::ForegroundWrite);
                                worker.run();
                            })?;
                    cached_senders.push(tx);
                    handlers.push(t);
                }
                writers.set(cached_senders);
                Ok(())
            })?;
        Ok(())
    }
}


impl<EK, ER, N, T> Worker<EK, ER, N, T>
where
    EK: KvEngine,
    ER: RaftEngine,
    N: PersistedNotifier,
    T: Transport,
{
    fn run(&mut self) {
        let mut stopped = false;
        while !stopped {
            let handle_begin = match self.receiver.recv() {
                Ok(msg) => {
                    stopped |= self.handle_msg(msg);
                }
                ...
            };

            ...
            self.write_to_db(true);
        }
    }

    pub fn write_to_db(&mut self, notify: bool) {
        ...
        
        let mut write_raft_time = 0f64;
        if !self.batch.raft_wbs[0].is_empty() {
            ...
            for i in 0..self.batch.raft_wbs.len() {
                self.raft_engine
                    .consume_and_shrink(
                        &mut self.batch.raft_wbs[i],
                        ...
                    )
                    ...
            }
            self.batch.raft_wbs.truncate(1);
            ...
        }

        for task in &mut self.batch.tasks {
            for msg in task.messages.drain(..) {
                ...
                if let Err(e) = self.trans.send(msg) {
                    ...
                }...
            }
        }
        ...
    }
}

总结

BatchSystem 的整体代码充斥着各种异步任务,还有各种 channel 的发送和接收。如果没有对其初始化比较了解的话,可能很难看得懂 msg 是如何在整个系统中流转的。

经过本文的流程梳理,相信大家已经对整体 BatchSystem 有了比较熟悉的认知,后续研究 Raft Msg 流转,或者 RegionMergeSplit 应该比较有信心了。

0
2
2
0

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论