原文:Batch YARN Application,译者:UniKrau,校对:dyc87112

本指南主要探讨Spring Batch任务在Hadoop YARN上执行的过程

你将学会构建什么

通过Spring Hadoop, Spring Batch and Spring Boot构建一个简单Hadoop YARN应用程序

你需要准备什么

不需要在现有的或者运行的Hadoop实例上验证这个样例程序

怎样完成指南

像大多数Spring 入门文章一样,你可以逐渐的完成每一步,也可以跳过一些你熟悉的步骤。不管怎样,最后你都将得到一份可执行的代码。

你可以往下查看配置工程

如果已经熟悉跳过一些基本步骤你可以这样做

$ cd gs-yarn-batch-processing/initial 

完成上述步骤,你可以根据代码检查结果gs-yarn-batch-processing/complete

<h2 id=”set_up”>配置工程</h2>

首先要配置编译脚本。可以使用任何构建系统来构建Spring应用,但是在这里需要用Gradle。如果不熟悉,请参考Gradle构建java工程.

创建工程目录结构

在你选择的项目目录中,创建以下子目录结构

├── gs-yarn-batch-processing-appmaster
│ └── src
│ └── main
│ ├── resources
│ └── java
│ └── hello
│ └── appmaster
├── gs-yarn-batch-processing-container
│ └── src
│ └── main
│ ├── resources
│ └── java
│ └── hello
│ └── container
├── gs-yarn-batch-processing-client
│ └── src
│ └── main
│ ├── resources
│ └── java
│ └── hello
│ └── client
└── gs-yarn-batch-processing-dist
└── src
└── test
└── java
└── hello

举个例子,使用unix或者Linux系统的同学使用mkdir -p命令分别创建以下文件夹

mkdir -p gs-yarn-batch-processing-appmaster/src/main/resources
mkdir -p gs-yarn-batch-processing-appmaster/src/main/java/hello/appmaster
mkdir -p gs-yarn-batch-processing-container/src/main/resources
mkdir -p gs-yarn-batch-processing-container/src/main/java/hello/container
mkdir -p gs-yarn-batch-processing-client/src/main/resources
mkdir -p gs-yarn-batch-processing-client/src/main/java/hello/client
mkdir -p gs-yarn-batch-processing-dist/src/test/java/hello

创建Gradle编译文件

初始化Gradle编译文件 ,也可以用Spring Tool Suite (STS)工具直接导入源码。

build.gradle

buildscript {
    repositories {
        maven { url "http://repo.spring.io/libs-release" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.3.3.RELEASE")
    }
}

allprojects {
    apply plugin: 'base'
}

subprojects { subproject ->
    apply plugin: 'java'
    apply plugin: 'eclipse'
    apply plugin: 'idea'
    version =  '0.1.0'
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/libs-release" }
    }
    dependencies {
        compile("org.springframework.data:spring-yarn-boot:2.4.0.RELEASE")
    }
    task copyJars(type: Copy) {
        from "$buildDir/libs"
        into "$rootDir/gs-yarn-batch-processing-dist/target/gs-yarn-batch-processing-dist/"
        include "**/*.jar"
    }
    configurations {
        compile.exclude group: "org.slf4j", module: "slf4j-log4j12"
        runtime.exclude group: "org.slf4j", module: "slf4j-log4j12"
    }
    assemble.doLast {copyJars.execute()}
}

project('gs-yarn-batch-processing-client') {
    apply plugin: 'spring-boot'
}

project('gs-yarn-batch-processing-appmaster') {
    apply plugin: 'spring-boot'
    dependencies {
        compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
        runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
    }
}

project('gs-yarn-batch-processing-container') {
    apply plugin: 'spring-boot'
    dependencies {
        compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
        runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
    }
}

project('gs-yarn-batch-processing-dist') {
    dependencies {
        compile project(":gs-yarn-batch-processing-client")
        compile project(":gs-yarn-batch-processing-appmaster")
        compile project(":gs-yarn-batch-processing-container")
        testCompile("org.springframework.data:spring-yarn-boot-test:2.4.0.RELEASE")
        testCompile("org.hamcrest:hamcrest-core:1.2.1")
        testCompile("org.hamcrest:hamcrest-library:1.2.1")
    }
    test.dependsOn(':gs-yarn-batch-processing-client:assemble')
    test.dependsOn(':gs-yarn-batch-processing-appmaster:assemble')
    test.dependsOn(':gs-yarn-batch-processing-container:assemble')
    clean.doLast {ant.delete(dir: "target")}
}

task wrapper(type: Wrapper) {
    gradleVersion = '1.11'
}

根据以上的编译文件,我们简单地创建三个不同的jar包,每一个类都有扮演各自相应地角色。最终Spring Boot Gradle插件会把这三个包再次打包成一个可执行jar包。

settings.gradle文件定义其子项目

settings.gradle内容

include 'gs-yarn-batch-processing-client','gs-yarn-batch-processing-appmaster','gs-yarn-batch-processing-container','gs-yarn-batch-processing-dist'

Spring Batch介绍

可以用单线程或进程的任务地方法解决批处理问题,因此处理许多复杂的问题的时候优可能会先考虑是否能用批处理模式解决。Spring Batch提供了很多并行处理方法解决这些问题。有两种高级层次多并行处理模型:单进程,多线程;多进程。

在Hadoop YARN计算框架的集群上Spring Hadoop支持Spring Batch类型的任务。为了更好的并行处理,Spring Batch做了分区,而且在YARN上执行的时候采用remote steps的方式。

要在YARN上运行Spring Batch Job的关键地方在于Application Master能否判断这个任务是不是简单类型的或者不要进行分区。任务没有分区的时候,整个任务都运行在Application Master内并且不会启动额外的容器。好神奇的样子,竟然不要容器就能在YARN跑任务,但是别忘了Application Master其实也是Hadoop 集群分配的资源,当然也是一个能运行在YARN上容器。

要在Hadoop集群上运行Spring Batch任务,还是有些限制的

  • Job Context -Job上下文,Application Master是运行任务的主要实体
  • Job Repository -Job仓库,Application Master需要访问驻留在内存或者关系型数据库的仓库。显然Spring Batch是支持这两种。
  • Remote Steps -由于Spring Batch有分区能力的基因,所以Remote Steps需要访问工作仓库

快速浏览Spring Batch分区是怎样的机制,其理念是:一个分区好的任务需要三样东西:Remote Steps,Partition Handler,Partitioner。站在用户的角度来看,任何一个 remote step都有点过于简化了。Spring Batch本身不包含任何专门网格计算或者特殊远程过程调用实现。然而Spring Batch确实提供PartitionHandler的实现,PartitionHandler会根据Spring的TaskExecutor的策略,然后使用各自独立的线程运行工作集。Spring Hadoop针对Hadoop集群提供了remote step的实现。

了解更多关于Spring Batch Partitioning的信息,参照Spring Batch文档

<h2 id=”remote_batch”>创建一个Remote Batch Step</h2>

这里创建一个PrintTasklet类

 gs-yarn-batch-processing-container/src/main/java/hello/container/PrintTasklet.java
package hello.container;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

public class PrintTasklet implements Tasklet {

    private static final Log log = LogFactory.getLog(PrintTasklet.class);

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
            throws Exception {
        log.info("PrintTasklet says Hello");
        return RepeatStatus.FINISHED;
    }

}

一个任务的step当中,Tasklet接口是Spring Batch最简单通俗易懂的概念之一。这个tasklet目的只是简单示范一下真正Partitioned Step是如何执行的。当然就不用介绍如何处理复杂任务处理啦

PrintTasklet类负责输出简单的日志

然后创建ContainerApplication类

gs-yarn-batch-processing-container/src/main/java/hello/container/ContainerApplication.java
package hello.container;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnRemoteBatchProcessing;

@Configuration
@EnableAutoConfiguration(exclude = { BatchAutoConfiguration.class })
@EnableYarnRemoteBatchProcessing
public class ContainerApplication {

    @Autowired
    private StepBuilderFactory stepBuilder;

    @Bean
    protected Tasklet tasklet() {
        return new PrintTasklet();
    }

    @Bean
    protected Step remoteStep() throws Exception {
        return stepBuilder
            .get("remoteStep")
            .tasklet(tasklet())
            .build();
    }

    public static void main(String[] args) {
        SpringApplication.run(ContainerApplication.class, args);
    }

}

@EnableYarnRemoteBatchProcessing注解Spring为YARN容器提供Batch处理的功能@EnableBatchProcessing自动使得JavaConfig的所有建造者都可用。

  • @AutoWired step建造者把steps转换成beans
  • 把PrintTasklet转换成Bean
  • 一个step转换成bean以后就知道怎么执行一个tasklet了

接下来为容器写一个application.yml文件

gs-yarn-batch-processing-container/src/main/resources/application.yml

格式如下

spring:
    batch:
        job:
            enabled: false
    hadoop:
        fsUri: hdfs://localhost:8020
    yarn:
        batch:
            enabled: true
  • 在Spring Boot core里禁用batch功能,因此就可以使用YARN特性
  • 配置HDFS文件,在真实的集群里是可以自定义的
  • 通过spring.yarn.batch.enabled property可以在YARN上使用批处理

创建一个Batch任务

创建AppmasterApplication类

gs-yarn-batch-processing-appmaster/src/main/java/hello/appmaster/AppmasterApplication.java
package hello.appmaster;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.SimplePartitioner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnBatchProcessing;
import org.springframework.yarn.batch.partition.StaticPartitionHandler;

@Configuration
@EnableAutoConfiguration
@EnableYarnBatchProcessing
public class AppmasterApplication {

    @Autowired
    private JobBuilderFactory jobFactory;

    @Autowired
    private StepBuilderFactory stepFactory;

    @Bean
    public Job job() throws Exception {
        return jobFactory.get("job")
            .incrementer(jobParametersIncrementer())
            .start(master())
            .build();
    }

    @Bean
    public JobParametersIncrementer jobParametersIncrementer() {
        return new RunIdIncrementer();
    }

    @Bean
    protected Step master() throws Exception {
        return stepFactory
            .get("master")
            .partitioner("remoteStep", partitioner())
            .partitionHandler(partitionHandler())
            .build();
    }

    @Bean
    protected Partitioner partitioner() {
        return new SimplePartitioner();
    }

    @Bean
    protected PartitionHandler partitionHandler() {
        StaticPartitionHandler handler = new StaticPartitionHandler();
        handler.setStepName("remoteStep");
        handler.setGridSize(2);
        return handler;
    }

    public static void main(String[] args) {
        SpringApplication.run(AppmasterApplication.class, args);
    }

}
  • @EnableYarnBatchProcessing为appmaster启动Batch处理功能
  • 为steps和jobs提供建造者

为appmaster创建application.yml文件

gs-yarn-batch-processing-appmaster/src/main/resources/application.yml

spring:
    batch:
        job:
            enabled: false
    hadoop:
        fsUri: hdfs://localhost:8020
        resourceManagerHost: localhost
    yarn:
        appName: gs-yarn-batch-processing
        applicationDir: /app/gs-yarn-batch-processing/
        batch:
            enabled: true
            name: job
            jobs:
              - name: job
                enabled: true
                next: true
        appmaster:
            keepContextAlive: false
            launchcontext:
                archiveFile: gs-yarn-batch-processing-container-0.1.0.jar

相关的解析说明

  • 在Spring Boot core里禁用batch功能,因此就可以使用YARN特性
  • 配置HDFS文件,在真实的集群里是可以自定义的
  • 通过spring.yarn.batch.enabled property可以在YARN上使用批处理
  • enabled 是否自动运行
  • next 是否允许下一步操作

创建Yarn客户端

创建ClientApplication类

gs-yarn-batch-processing-client/src/main/java/hello/client/ClientApplication.java

package hello.client;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.yarn.client.YarnClient;

@EnableAutoConfiguration
public class ClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(ClientApplication.class, args)
            .getBean(YarnClient.class)
            .submitApplication();
    }

}

ClientApplication与其他指南的例子程序的类差不多,不过这里的目的只是提交一个YARN application

创建一个clientapplication.yml文件

gs-yarn-batch-processing-client/src/main/resources/application.yml

内容如下

spring:
    hadoop:
        fsUri: hdfs://localhost:8020
        resourceManagerHost: localhost
    yarn:
        appName: gs-yarn-batch-processing
        applicationDir: /app/gs-yarn-batch-processing/
        client:
            files:
              - "file:target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-container-0.1.0.jar"
              - "file:target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-appmaster-0.1.0.jar"
            launchcontext:
                archiveFile: gs-yarn-batch-processing-appmaster-0.1.0.jar
  • 为application需要提交的,定义好了所有的文件

编译程序

使用Gradle命令:clean清空工作目录,build编译

./gradlew clean build

跳过所有的单元测试

./gradlew clean build -x test

使用maven命令:clean清空工作目录,package打包

mvn clean package

Gradle编译成功之后在target里有以下三个jar包

gs-yarn-batch-processing-dist/target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-client-0.1.0.jar
gs-yarn-batch-processing-dist/target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-appmaster-0.1.0.jar
gs-yarn-batch-processing-dist/target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-container-0.1.0.jar

运行Application

目前为止已经成功的编译打包了application,可以在Hadoop YARN上试试

运行这个client jar包

$ cd gs-yarn-batch-processing-dist
$ java -jar target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-client-0.1.0.jar

如果这个程序没有出错,在YARN可以看到两段执行过程

创建一个单元测试类

以下这个执行应用程序的类可以作为单元测试类,并不需要在Hadoop集群上跑

gs-yarn-batch-processing-dist/src/test/java/hello/AppIT.java

package hello;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import hello.client.ClientApplication;

import java.io.File;
import java.util.List;
import java.util.Scanner;

import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.junit.Test;
import org.springframework.core.io.Resource;
import org.springframework.yarn.boot.test.junit.AbstractBootYarnClusterTests;
import org.springframework.yarn.test.context.MiniYarnClusterTest;
import org.springframework.yarn.test.junit.ApplicationInfo;
import org.springframework.yarn.test.support.ContainerLogUtils;

@MiniYarnClusterTest
public class AppIT extends AbstractBootYarnClusterTests {

    @Test
    public void testAppSubmission() throws Exception {

        ApplicationInfo info = submitApplicationAndWait(ClientApplication.class, new String[0]);
        assertThat(info.getYarnApplicationState(), is(YarnApplicationState.FINISHED));

        List<Resource> resources = ContainerLogUtils.queryContainerLogs(getYarnCluster(), info.getApplicationId());
        assertThat(resources, notNullValue());
        assertThat(resources.size(), is(6));

        for (Resource res : resources) {
            File file = res.getFile();
            if (file.getName().endsWith("stdout")) {
                // there has to be some content in stdout file
                assertThat(file.length(), greaterThan(0l));
                if (file.getName().equals("Container.stdout")) {
                    Scanner scanner = new Scanner(file);
                    String content = scanner.useDelimiter("\\A").next();
                    scanner.close();
                    // this is what container will log in stdout
                    assertThat(content, containsString("PrintTasklet says Hello"));
                }
            } else if (file.getName().endsWith("stderr")) {
                String content = "";
                if (file.length() > 0) {
                    Scanner scanner = new Scanner(file);
                    content = scanner.useDelimiter("\\A").next();
                    scanner.close();
                }
                // can't have anything in stderr files
                assertThat("stderr file is not empty: " + content, file.length(), is(0l));
            }
        }
    }
}

总结

恭喜了,可以开发Spring Batch任务的Spring YARN程序了

本文由spring4all.com翻译小分队创作,采用知识共享-署名-非商业性使用-相同方式共享 4.0 国际 许可 协议进行许可。

评论 抢沙发

请登录后发表评论

    暂无评论内容