十九:从库MTS多线程并行回放(一)(笔记)

一、分发调用流程

->ev->apply_event(rli); Log_event::apply_event 这里如果是非MTS进行应用 如果MTS  如果是GTID event 进行WORKER线程的分配 ,如果不是则获取WORKER线程
      
      -> 是否是进行 MTS recovery if (rli->is_mts_recovery())
         根据 bitmap 设置进行跳过处理 
         
          if (rli->is_mts_recovery())//如果是恢复 这个地方就是前面恢复扫描出来的位置
               {
                 bool skip=
                   bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index) &&
                   (get_mts_execution_mode(::server_id,
                                           rli->mts_group_status ==
                                           Relay_log_info::MTS_IN_GROUP,
                                           rli->current_mts_submode->get_type() ==
                                           MTS_PARALLEL_TYPE_DB_NAME)
                    == EVENT_EXEC_PARALLEL);
                 if (skip)
                 {
                   DBUG_RETURN(0);
                 }
                 else
                 {
                   DBUG_RETURN(do_apply_event(rli));
                 }
               }
         
         
      -> 如果是单线程直接调用 do_apply_event
      
      -> 如果是多线程MTS !!!!!!!!!!!!!!!!
        ->Log_event::get_slave_worker 主要是根据不同的EVENT进行不同的操作 包含1、判定是否可以并发 2、判定由哪一个worker进行执行
          ->如果是GTID event !!!匿名GTID Event也可以
             ->is_gtid_event
           ->初始化一个组 Slave_job_group
           ->在GAQ中分配队列序号
           ->rli->mts_groups_assigned++ 增加
           ->使用GTID-event的位置和mts_groups_assigned将GROUP中的master_log_pos位置 和 total_seqno初始化
           ->将GROUP加到 GAQ并且分配的 序号 gaq->assigned_group_index= gaq->en_queue(&group);
           ->初始化一个Slave_job_item 
           ->加入到rli->curr_group_da.push_back中
           ->进行GTID 模式下 判定是否可以并发
            ->schedule_next_event
              ->Mts_submode_logical_clock::schedule_next_event 基于COMMIT_ORDER和WRITE SET的都使用这个方法 
                主要判断是否可以进行并发并且进行等待
               ->获取GTID EVENT中的last commit和seq number
               ->如果不能进行并发则需要等待last commit > LWM SEQ NUMBER(最新一次除没有提交事物之前的一个事物的seq number)
                 ->wait_for_last_committed_trx 进入等待 他会设置一个min_waited_timestamp 作为
                   其他事物提交时更新LWM SEQ NUMBER的标记,等待直到last commit<=LWM SEQ NUMBER
                   等待标记为
                   stage_worker_waiting_for_commit_parent 
                   Waiting for dependent transaction to commit
                   
                   同时还会更新 mts_total_wait_overlap
                   my_atomic_add64(&rli->mts_total_wait_overlap, diff_timespec(&ts[1], &ts[0]));
                  

                    获取 LWM SEQ NUMBER 的源码注释: 
                                   
                                  the last time index containg lwm
                                      +------+
                                      | LWM  |
                                      |  |   |
                                      V  V   V
                       GAQ:x  xoooooxxxxxXXXXX...X
                                    ^   ^
                                    |   | LWM+1
                                    |
                                    +- tne new current_lwm
                    
                             <---- logical (commit) time ----
                             
                       here `x' stands for committed, `X' for committed and discarded from
                       the running range of the queue, `o' for not committed.
                         
               
               
               
         ->如果 query event !!!
           ->初始化一个Slave_job_item 
           ->将其加入到rli->curr_group_da.push_back(job_item);中
           ->设置 rli->curr_group_seen_begin= true; 说明找到了query event
           ->进行DATABASE模式的分配 不考虑
           
         ->如果是MAP EVENT
           ->开始获取WORKER线程到这里已经可以并发执行了,需要进行WORKER线程的获取
            ret_worker=rli->current_mts_submode->get_least_occupied_worker(rli, &rli->workers,this); 
            Mts_submode_logical_clock::get_least_occupied_worker
            -> 第一次rli->last_assigned_worker为空 这需要新分配
               -> Mts_submode_logical_clock::get_free_worker 进行分配
                  ->循环每一个worker线程,看是否有正在等待处理的event,找到一个没有任何工作的worker线程
                    这里也能出是轮询每一个worker线程找到空闲的worker线程就可以了。判断标准就是
                     if (w_i->jobs.len == 0)
                                     
                  -> 如果没有找到,分配失败,进行等待等待为
                    stage_slave_waiting_for_workers_to_process_queue
                    Waiting for slave workers to process their queues
                  -> 循环获取work线程,直到成功
                  -> 获取成功后更新信息
                     等待的时间:rli->mts_total_wait_worker_avail += diff_timespec(&ts[1], &ts[0]);
                     增加一次等待次数:rli->mts_wq_no_underrun_cnt++;
                  ->如果开启了参数 slave_preserve_commit_order=1 注册事物
                    rli->get_commit_order_manager()->register_trx(worker);
                  ->ptr_group->worker_id= ret_worker->id;//设置本次事物组的worker_id 就是分配的工作线程
           ->伴随着Woker线程的分配,如果是开启了参数slave_preserve_commit_order需要注册这个事务
             if (rli->get_commit_order_manager() != NULL && worker != NULL)
               rli->get_commit_order_manager()->register_trx(worker);//注册事物

        ->如果是DEL event
             步骤同上 只是不需要分配work线程了因为已经分配了
        ->如果是XID event
             步骤同上 不过还需要更新group 的checkpoint信息 如下:
               if (!ret_worker->checkpoint_notified) //将GROUP中填写 checkpoint信息
               {
                 if (!ptr_group)
                   ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index);
                 ptr_group->checkpoint_log_name= 
                   my_strdup(key_memory_log_event, rli->get_group_master_log_name(), MYF(MY_WME));
                 ptr_group->checkpoint_log_pos= rli->get_group_master_log_pos();
                 ptr_group->checkpoint_relay_log_name=
                   my_strdup(key_memory_log_event, rli->get_group_relay_log_name(), MYF(MY_WME));
                 ptr_group->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();
                 ptr_group->shifted= ret_worker->bitmap_shifted; //checkpoint 后 移动的个数 用于后面提交的时候改变参考Slave_worker::commit_positions    设置参考mts_checkpoint_routine()
                 ret_worker->bitmap_shifted= 0;//重置移动量
                 ret_worker->checkpoint_notified= TRUE;
               }
               ptr_group->checkpoint_seqno= rli->checkpoint_seqno; //获取seqno 这个值会在chkpt后减去偏移量
               ptr_group->ts= common_header->when.tv_sec + (time_t) exec_time; // Seconds_behind_master related  //checkpoint的时候会将这个值再次传递 mts_checkpoint_routine()
               rli->checkpoint_seqno++;//增加seqno     
               到这里 Log_event::get_slave_worker 每个event的处理流程完成,每次都会回到
               Log_event::apply_event
       ->Log_event::apply_event 返回到 apply_event_and_update_pos      
    ->回到apply_event_and_update_pos 下面逻辑MTS才进行 也就是入队到woker中去
      开始进入worker 队列,GTID和QUERY EVNET会跟随 MAP EVENT一起进入队列加入了li->curr_group_da中
      初始化map event的Slave_job_item 
      设置ev属于在GAP中的位置 ev->mts_group_idx= rli->gaq->assigned_group_index;
      如果是map event的话还会帮助GTID和QUERY event入队
      然后自己入队(append_item_to_jobs(job_item, w, rli))
      
      其他event 比如delete event和xid event则自己调用(append_item_to_jobs(job_item, w, rli))
      入队
      -> append_item_to_jobs(job_item, w, rli)
        ->如果入队的event 因为worker线程的队列已经满了则等待:
          进入状态stage_slave_waiting_worker_queue
          Waiting for Slave Worker queue

          wroker队列的大小为:mts_slave_worker_queue_len_max= 16384;
          每次等待增加一次
          worker->jobs.overfill= TRUE;
          worker->jobs.waited_overfill++;
          rli->mts_wq_overfill_cnt++;
      
      
      
      
      (rli->is_parallel_exec() && rli->mts_events_assigned % 1024 == 1) 
      如果每个event的前面的操作操作120秒 则会出现通知 这个警告经常遇到:
      从上面我们看到的等待来讲超过120秒的可能有3种
      
      1、由于上一组并发有大事物没有提交
         导致不能并发worker线程的等待时间
      2、worker线程都在完成工作及在应用上一个事物的event,没有新的worker线程以供新分配
      3、worker线程已经分配,但是由于worker线程的分配队列为16384,如果应用比较慢则可能入不了
         分配队列,一般也是大事物造成的。
      
      
      
sql_print_information("Multi-threaded slave statistics%s: "
                "seconds elapsed = %lu; "
                "events assigned = %llu; "
                "worker queues filled over overrun level = %lu; "
                "waited due a Worker queue full = %lu; "
                "waited due the total size = %lu; "
                "waited at clock conflicts = %llu "
                "waited (count) when Workers occupied = %lu "
                "waited when Workers occupied = %llu",
                rli->get_for_channel_str(),
                static_cast<unsigned long>
                (my_now - rli->mts_last_online_stat),//消耗总时间 单位秒
                rli->mts_events_assigned,//总的event分配的个数
                rli->mts_wq_overrun_cnt,// worker线程分配队列大于 90%的次数 当前硬编码  14746
                rli->mts_wq_overfill_cnt,    //由于work 分配队列已满造成的等待次数 当前硬编码 16384
                rli->wq_size_waits_cnt, //大Event的个数 一般不会存在
                rli->mts_total_wait_overlap,//由于上一组并行有大事物没有提交导致不能分配worker线程的等待时间 单位纳秒
                rli->mts_wq_no_underrun_cnt, //work线程由于没有空闲的而等待的次数
                rli->mts_total_wait_worker_avail);//work线程由于没有空闲的而等待的时间   单位纳秒
                                
    ->回到apply_event_and_update_pos 下面 进行pos的更新 这个pos是 event_relay_log_pos ,不会出现在show slave或者其他地方  
      更新内部变量读取到的relay log位置和名字 这个不用于外部访问
         uint event_relay_log_number; 这两个是正在执行的relay log的位置
         ulonglong event_relay_log_pos;                        

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 159,015评论 4 362
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 67,262评论 1 292
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 108,727评论 0 243
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 43,986评论 0 205
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 52,363评论 3 287
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 40,610评论 1 219
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 31,871评论 2 312
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 30,582评论 0 198
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 34,297评论 1 242
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 30,551评论 2 246
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 32,053评论 1 260
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 28,385评论 2 253
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 33,035评论 3 236
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 26,079评论 0 8
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 26,841评论 0 195
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 35,648评论 2 274
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 35,550评论 2 270

推荐阅读更多精彩内容