博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop运行原理之Job运行(一) JobTracker启动及初始化
阅读量:7223 次
发布时间:2019-06-29

本文共 4266 字,大约阅读时间需要 14 分钟。

  这部分的计划是这样的,首先解释JobTracker的启动过程和作业从JobClient提交到JobTracker上;然后分析TaskTracker和heartbeat;最后将整个流程debug一遍来加深映象。

  在看JobTracker源代码的时候就会发现,它里边有main()方法,这就说明了它是一个独立的java进程。在hadoop根目录下的bin文件夹中的hadoop脚本中可以看到,它指定了JobTracker类。如下图所示:

  JobTracker的main()方法中最主要的是以下两条语句:

1 public static void main(String argv[] 2                           ) throws IOException, InterruptedException { 3     StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG); 4      5     try { 6       if(argv.length == 0) { 7         JobTracker tracker = startTracker(new JobConf());//用来生成JobTracker对象 8         tracker.offerService();//初始化JobTracker,并启动作业调度器 9       }10       else {11         if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {12           dumpConfiguration(new PrintWriter(System.out));13         }14         else {15           System.out.println("usage: JobTracker [-dumpConfiguration]");16           System.exit(-1);17         }18       }19     } catch (Throwable e) {20       LOG.fatal(StringUtils.stringifyException(e));21       System.exit(-1);22     }23   }

  startTracker()方法比较简单,通过几次方法调用最终生成JobTracker对象。下面重点分析offerService()方法。由于篇幅限制,只列出了最重要的部分:

1 public void offerService() throws InterruptedException, IOException { 2      ...... 3  4     // Initialize the JobTracker FileSystem within safemode 5     setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER); 6     initializeFilesystem(); 7     setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE); 8      9     // Initialize JobTracker10     initialize();11     12      ......13     taskScheduler.start();

  首先进入安全模式下(SAFEMODE_ENTER),初始化文件系统,然后退出安全模式(SAFEMODE_LEAVE)。然后初始化JobTracker。最后启动作业调度器(TaskScheduler)。默认的作业调度器是JobQueueTaskScheduler,在mapred-default.xml中配置。所以taskScheduler.start()会调用JobQueueTaskScheduler的start()方法。如下所示:

  JobQueueTaskScheduler使用FIFO来对job进行调度。下面来进入到JobQueueTaskScheduler来分析start()方法。

1 @Override2   public synchronized void start() throws IOException {3     super.start();4     taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);5     eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);6     eagerTaskInitializationListener.start();7     taskTrackerManager.addJobInProgressListener(8         eagerTaskInitializationListener);9   }

  这里用到了观察者模式,JobQueueTaskScheduler向JobTracker注册了两个JobInProgressListener:EagerTaskInitializationListener和JobQueueJobInProgressListener,分别用于作业初始化和作业排序。

  这里的taskTrackerManager实际上是JobTracker,因为JobTracker的父类就是TaskTrackerManager。在JobTracker的startTracker()方法中,将JobTracker实例传递给TaskTrackerManager。如下所示:  

1 public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize) 2   throws IOException, InterruptedException {3     DefaultMetricsSystem.initialize("JobTracker");4     JobTracker result = null;5     while (true) {6       try {7         result = new JobTracker(conf, identifier);8         result.taskScheduler.setTaskTrackerManager(result);9        ......

  在eagerTaskInitializationListener.start()方法启动一个线程JobInitManager,这个线程用来监控jobInitQueue,即List<JobInProgress>。当有新的job(JobInProgress)加入到队列中时,JobInitManager线程就对其进行初始化。

 

1 class JobInitManager implements Runnable { 2     3     public void run() { 4       JobInProgress job = null; 5       while (true) { 6         try { 7           synchronized (jobInitQueue) { 8             while (jobInitQueue.isEmpty()) { 9               jobInitQueue.wait();10             }11             job = jobInitQueue.remove(0); //从队列中拿出一个job12           }13           threadPool.execute(new InitJob(job)); //对job进行初始化14         } catch (InterruptedException t) {15           LOG.info("JobInitManagerThread interrupted.");16           break;17         } 18       }19       LOG.info("Shutting down thread pool");20       threadPool.shutdownNow();21     }22   }23 24 class InitJob implements Runnable {25   26     private JobInProgress job;27     28     public InitJob(JobInProgress job) {29       this.job = job;30     }31     32     public void run() {33       ttm.initJob(job); //实质上调用JobTracker的initJob()方法进行初始化34     }35   }

 

  这里JobInitManager线程最终调用了JobTracker的initJob()方法来对job进行初始化。具体过程下篇文章中再写。

   最后画个流程图来总结一下,画的不好,将就看一下吧。

   本文基于hadoop1.2.1

   如有错误,还请指正

   参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

  转载请注明出处: 

转载于:https://www.cnblogs.com/gwgyk/p/3998753.html

你可能感兴趣的文章
python基础 - 文件读写
查看>>
成大事必备9种能力、9种手段、9种心态
查看>>
php 依赖注入容器
查看>>
matlab设定mex接驳的C/C++编译器
查看>>
【NLP】干货!Python NLTK结合stanford NLP工具包进行文本处理
查看>>
Linux系统备份与恢复
查看>>
Conversion to Dalvik format failed: Unable to execute dex: Multiple dex files define ...
查看>>
输入一个整数,将这个整数以字符串的形式逆序输出 程序不考虑负数的情况,若数字含有0,则逆序形式也含有0,如输入为100,则输出为001...
查看>>
机场打车有感
查看>>
利用数组创建的顺序表实现各种功能
查看>>
POJ - 1062 昂贵的聘礼(最短路Dijkstra)
查看>>
tomcat运行模式APR安装
查看>>
【菜鸟也疯狂UML系列】——概述
查看>>
Oracle成长点点滴滴(2)— 权限管理
查看>>
文件包含漏洞检测工具fimap
查看>>
详细解说 STL 排序(Sort)(转)
查看>>
P1197 [JSOI2008]星球大战(并查集判断连通块+正难则反)
查看>>
Maven单独构建多模块项目中的单个模块
查看>>
Xamarin Essentials教程剪贴板Clipboard
查看>>
BTA 常问的 Java基础40道常见面试题及详细答案
查看>>