distributedshell|distributedshell yarn编程指南

hadoop yarn是一个独立的调度框架,我们自己也可以通用yarn提供的api编写程序将我们自己的写的程序用yarn来调度运行
hadoop 官提供了一个distributedshell yarn程序,该程序用途是实现使用yarn调度框架执行shell,Hadoop提供这个程序的目的是为了通过这个简单的例子说明如果自己写一个yarn程序
具体代码地址:
https://github.com/apache/hadoop/tree/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell
编写yarn程序一般需要

  • 编写一个客户端,客户端定义了启动ApplicationMaster的方式,提交application到RM
  • 编写自己的ApplicationMaster,在ApplicationMaster中创建与RM,NN交互的客户端用于向RM申请资源并且在NN中启动容器运行任务
接口 底层接口 ApplicationClientProtocol
clientsResourceManager之间用于提交/中断job和获取 application ,集群metrics ,node queueACLs信息的底层协议
ApplicationMasterProtocol
AM和RM通信的底层协议,ApplicationMasterProtocol.allocate用于AM和RM心跳
ContainerManagementProtocol
AM和NN通信的底层协议,用于启动和停止容器以及获取运行容器的状态信息
高层接口 Client<-->ResourceManager
使用 YarnClient 对象
ApplicationMaster<-->ResourceManager
用于向RM申请Container,使用AMRMClientAsync对象, AMRMClientAsync.CallbackHandler 用于异步事件处理
ApplicationMaster<-->NodeManager
用于在NodeManager上启动容器,使用NMClientAsyncNodeManager通信,NMClientAsync.CallbackHandler用于异步事件处理
distributedshell 下面我们来看一下hadoop的这个例子程序distributedshell是怎么编写的
编写Client org.apache.hadoop.yarn.applications.distributedshell.Client这个是客户端入口程序用于和RM交互
  1. 程序首先初始化了YarnClient
yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf);

然后调用yarnClient.createApplication方法创建App,获取application id,底层api使用的是ApplicationClientProtocol
// Get a new application id YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse();

根据请求RM获取到的application id 构造ApplicationSubmissionContext,ApplicationSubmissionContext代表了RM启动ApplicationMaster所需要的信息,客户端需要在这个上下文中设置如下信息:
  • Application 信息: id, name
  • Queue, priority 信息:提交application到哪个队列, 优先级.
  • User: 提交application的用户
  • ContainerLaunchContext: AM将要运行的容器的所有信息的定义,比如Local Resources (binaries, jars, files etc.), Environment settings (CLASSPATH etc.), 需要执行的Command 和 security Tokens (RECT).
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); ... //设置appName appContext.setApplicationName(appName); ... //准备Local Resources ,Environment ,和Command ... // 通过准备的信息构造 application master的 ContainerLaunchContext ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null); ... //设置启动资源信息 Resource capability = Resource.newInstance(amMemory, amVCores); appContext.setResource(capability); //设置ContainerLaunchContext appContext.setAMContainerSpec(amContainer); //设置优先级 Priority pri = Priority.newInstance(amPriority); appContext.setPriority(pri); //设置队列 appContext.setQueue(amQueue); //提交应用 yarnClient.submitApplication(appContext);

举个例子帮助理解
假如以test用户运行hadoop-yarn-applications-distributedshell, 运行命令如下:
hadoop jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar -jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar-queue root.download -shell_script /tmp/a.sh

假设yarnClient.createApplication()
  • 申请到的appid为 application_1576067711791_1132781
  • Client会将 -jar参数 传过来的jar路径即hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar作为为本地资源(Local Resources),放到test用户的如下hdfs 路径
/user/test/DistributedShell/application_1576067711791_1132781/AppMaster.jar

  • 对应的执行脚本/tmp/a.sh也会放到对于hadoop家目录对于application 的hdfs路径下:
/user/test/DistributedShell/application_1576067711791_1132781/ExecScript.sh

其中运行ApplicationMastercomond为:
org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1>/AppMaster.stdout 2>/AppMaster.stderr

客户端与RM交互图 distributedshell|distributedshell yarn编程指南
文章图片
Client<->RM ApplicationSubmissionContext结构图 distributedshell|distributedshell yarn编程指南
文章图片
image.png 编写 ApplicationMaster (AM) AM使用ApplicationAttemptId与RM交互
可以从传进来的env环境变量中获取 Container信息,进一步获取ApplicationAttemptId
ContainerId containerId = ConverterUtils.toContainerId(envs .get(Environment.CONTAINER_ID.name())); appAttemptID = containerId.getApplicationAttemptId();

在AM完全初始化自身之后,我们可以启动两个客户端:一个与RM通信,一个与NM通信。并设置相关的事件处理函数
//与RM通信并设置相关的事件处理函数 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); //与NM通信并设置相关的事件处理函数 containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start();

AM必须向RM发出心跳信息
// Register self with ResourceManager // This will start heartbeating to the RM appMasterHostname = NetUtils.getHostname(); RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);

AM向RM申请container资源
// Dump out information about cluster capability as seen by the // resource manager int maxMem = response.getMaximumResourceCapability().getMemory(); LOG.info("Max mem capabililty of resources in this cluster " + maxMem); int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores); // A resource ask cannot exceed the max. if (containerMemory > maxMem) { LOG.info("Container memory specified above max threshold of cluster." + " Using max value." + ", specified=" + containerMemory + ", max=" + maxMem); containerMemory = maxMem; }if (containerVirtualCores > maxVCores) { LOG.info("Container virtual cores specified above max threshold of cluster." + " Using max value." + ", specified=" + containerVirtualCores + ", max=" + maxVCores); containerVirtualCores = maxVCores; }List previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() + " previous attempts' running containers on AM registration."); numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size(); // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for // containers // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). //申请启动container for (int i = 0; i < numTotalContainersToRequest; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); } numRequestedContainers.set(numTotalContainers);

AMRMClientAsync.CallbackHandler onContainersAllocated回调函数启动LaunchContainerRunnable线程执行
//containerListener是 containerListener = createNMCallbackHandler(); //containerListener是NMCallbackHandler类型的 LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer, containerListener); Thread launchThread = new Thread(runnableLaunchContainer);

LaunchContainerRunnable 【distributedshell|distributedshell yarn编程指南】LaunchContainerRunnablerun方法里面构造了需要在container中运行shellContainerLaunchContext,并且绑定containerListener回调函数,然后使用nmClientAsync 异步启动container
//封装用于启动shell脚本的ctx ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, shellEnv, commands, null, allTokens.duplicate(), null); //注册NMCallbackHandler回调函数 containerListener.addContainer(container.getId(), container); //使用nmClientAysnc异步启动container nmClientAsync.startContainerAsync(container, ctx);

整体交互流程图 distributedshell|distributedshell yarn编程指南
文章图片
yarn 交互

    推荐阅读