博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HDFS源码分析心跳汇报之数据块汇报
阅读量:7208 次
发布时间:2019-06-29

本文共 7632 字,大约阅读时间需要 25 分钟。

        在《HDFS源码分析心跳汇报之数据块增量汇报》一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode。那么,时间间隔更长的正常数据块汇报都做了些什么呢?本文,我们将开始研究下时间间隔更长的正常数据块汇报。

        首先,看下正常数据块汇报是如何发起的?我们先看下BPServiceActor工作线程的offerService()方法:

/**   * Main loop for each BP thread. Run until shutdown,   * forever calling remote NameNode functions.   */  private void offerService() throws Exception {    //    // Now loop for a long time....    //    while (shouldRun()) {// 又是一个利用shouldRun()判断的while循环      try {        // 省略部分代码        ...        // 调用blockReport()方法,进行数据块汇报,放返回来自名字节点NameNode的相关命令cmds        List
cmds = blockReport(); // 调用processCommand()方法处理来自名字节点NameNode的相关命令cmds processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); // 省略部分代码 // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // 计算等待时间waitTime:心跳时间间隔减去上次心跳后截至到现在已过去的时间 long waitTime = dnConf.heartBeatInterval - (Time.now() - lastHeartbeat); synchronized(pendingIncrementalBRperStorage) { if (waitTime > 0 && !sendImmediateIBR) {// 如果等待时间大于0,且不是立即发送数据块增量汇报 try { // 利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步 pendingIncrementalBRperStorage.wait(waitTime); } catch (InterruptedException ie) { LOG.warn("BPOfferService for " + this + " interrupted"); } } } // synchronized } catch(RemoteException re) {
       // 省略部分代码
} catch (IOException e) {
// 省略部分代码
} } // while (shouldRun())

          可以看出,在BPServiceActor工作线程offerService()方法的while循环内,数据块汇报blockReport()方法执行时,仅有下面的waitTime的等待时间,其他情况下都是立即执行的。那么等待时间waitTime是如何计算的呢?它就是心跳时间间隔减去上次心跳后截至到现在已过去的时间,并且,如果等待时间waitTime大于0,且不是立即发送数据块增量汇报(标志位sendImmediateIBR为false),那么才会利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步。在这里,我们就可以大胆猜测,数据块汇报的时间间隔应该是大于心跳时间间隔的,并且两者之间的距离肯定不小。 

        那么,我们开始研究实现正常数据块汇报的blockReport()方法吧,代码如下:

/**   * Report the list blocks to the Namenode   * @return DatanodeCommands returned by the NN. May be null.   * @throws IOException   */  List
blockReport() throws IOException { // send block report if timer has expired. // 到期就发送数据块汇报 // 取当前开始时间startTime final long startTime = now(); // 如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null, // 数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时 if (startTime - lastBlockReport <= dnConf.blockReportInterval) { return null; } // 构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommand ArrayList
cmds = new ArrayList
(); // Flush any block information that precedes the block report. Otherwise // we have a chance that we will miss the delHint information // or we will report an RBW replica after the BlockReport already reports // a FINALIZED one. // 调用reportReceivedDeletedBlocks()方法发送数据块增量汇报 reportReceivedDeletedBlocks(); // 记录上次数据块增量汇报时间lastDeletedReport lastDeletedReport = startTime; // 设置数据块汇报起始时间brCreateStartTime为当前时间 long brCreateStartTime = now(); // 从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists, // key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongs Map
perVolumeBlockLists = dn.getFSDataset().getBlockReports(bpos.getBlockPoolId()); // Convert the reports to the format expected by the NN. int i = 0; int totalBlockCount = 0; // 创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小 StorageBlockReport reports[] = new StorageBlockReport[perVolumeBlockLists.size()]; // 遍历perVolumeBlockLists for(Map.Entry
kvPair : perVolumeBlockLists.entrySet()) { // 取出value:BlockListAsLongs BlockListAsLongs blockList = kvPair.getValue(); // 将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports, // StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组 reports[i++] = new StorageBlockReport( kvPair.getKey(), blockList.getBlockListAsLongs()); // 累加数据块数目totalBlockCount totalBlockCount += blockList.getNumberOfBlocks(); } // Send the reports to the NN. int numReportsSent; long brSendStartTime = now(); // 根据数据块总数目判断是否需要多次发送消息 if (totalBlockCount < dnConf.blockReportSplitThreshold) {// 如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送 // split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000 // Below split threshold, send all reports in a single message. // 发送的数据块汇报消息数numReportsSent设置为1 numReportsSent = 1; // 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息 DatanodeCommand cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports); // 将数据块汇报后返回的命令cmd加入到命令列表cmds if (cmd != null) { cmds.add(cmd); } } else { // Send one block report per message. // 发送的数据块汇报消息数numReportsSent设置为1 numReportsSent = i; // 遍历reports,取出每个StorageBlockReport for (StorageBlockReport report : reports) { StorageBlockReport singleReport[] = { report }; // 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息 DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), singleReport); // 将数据块汇报后返回的命令cmd加入到命令列表cmds if (cmd != null) { cmds.add(cmd); } } } // Log the block report processing stats from Datanode perspective // 计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中 long brSendCost = now() - brSendStartTime; long brCreateCost = brSendStartTime - brCreateStartTime; dn.getMetrics().addBlockReport(brSendCost); LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount + " blocks total. Took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing. " + " Got back commands " + (cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds))); // 调用scheduleNextBlockReport()方法,调度下一次数据块汇报 scheduleNextBlockReport(startTime); // 返回命令cmds return cmds.size() == 0 ? null : cmds; }
        数据块汇报的blockReport()方法处理流程大体如下:

        1、取当前开始时间startTime;

        2、如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null:

              数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时;

        3、构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommand;

        4、调用reportReceivedDeletedBlocks()方法发送数据块增量汇报;

        5、记录上次数据块增量汇报时间lastDeletedReport;

        6、设置数据块汇报起始时间brCreateStartTime为当前时间;

        7、从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists:

              key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongs;

        8、创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小;

        9、取出value:BlockListAsLongs:

              9.1、取出value:BlockListAsLongs;

              9.2、将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports,StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组;

              9.3、累加数据块数目totalBlockCount;

        10、根据数据块总数目判断是否需要多次发送消息:

                10.1、如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送(split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000):

                           10.1.1、发送的数据块汇报消息数numReportsSent设置为1;

                           10.1.2、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

                           10.1.3、将数据块汇报后返回的命令cmd加入到命令列表cmds;

                 10.2、如果数据块总数目在split阈值之上,将数据块汇报按照DatanodeStorage分多个消息来发送:

                            10.2.1、发送的数据块汇报消息数numReportsSent设置为i,即DatanodeStorage数目;

                            10.2.2、遍历reports,取出每个StorageBlockReport:

                                           10.2.2.1、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

                                           10.2.2.2、将数据块汇报后返回的命令cmd加入到命令列表cmds;

        11、计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中;

        12、调用scheduleNextBlockReport()方法,调度下一次数据块汇报;

        13、返回命令cmds。

转载地址:http://telum.baihongyu.com/

你可能感兴趣的文章
阿里云服务器Linux CentOS安装配置(六)resin多端口配置、安装、部署
查看>>
jQuery对象与DOM对象之间的转换(转)
查看>>
asp.net跳转页面的三种方法比较
查看>>
Bzoj1076 [SCOI2008]奖励关
查看>>
JCo 指南
查看>>
git使用--pull代码时冲突
查看>>
1-3-1动态随屏幕变化而变化
查看>>
Reading papers_6(Pattern Recognition And Machine Learning一书,ing...)
查看>>
java mybatis 新增记录 与 insertSelective 保存问题
查看>>
cocos2d-x 搭建android开发环境记录
查看>>
列表与元组的区别
查看>>
关于野指针、空指针
查看>>
key_buffer_size设置注意事项
查看>>
C#对各种文件的操作-ini(2)
查看>>
JavaScript入门学习笔记(二)
查看>>
Project Euler 13 Large sum
查看>>
自动工作量资料档案库(AWR)报告的获得
查看>>
virtualBox中的centOS虚拟机硬盘扩容
查看>>
Android应用目录结构分析
查看>>
动画总结(UIView的动画)
查看>>