原文:Restartable Batch YARN Application,译者:UniKrau,校对:dyc87112
导言
本指南不仅让你知道一个Spring Batch job的执行过程以及在Hadoop YARN上的partitioned steps,而且模拟了在分区步骤过程出错的过程。凸显重启job的完整性。
你将学会什么
使用Spring Hadoop and Spring Boot构建一个简单Hadoop YARN程序。它含有一个job,当这个job在YARN partitioned steps执行完毕的时候产生两master steps。同样也模拟了每个step出错的情况,所以能知道job的重启点是在step运行失败的地方开始执行
你要准备什么
- 大约15分钟
- 文本编辑器或者是IDE
- JDK 1.6以上
- Gradle 2.3+或者Maven 3.0+
- 你也可以直接将代码导入到本地开发工具中:
- Spring Tool Suite (STS)
- IntelliJ IDEA
- 使用本地单实例模式,需要Hadoop 2.2.0以上的版本
测试样例程序没有必要在Hadoop实例上运行.
怎样完成这个指南
像大多数Spring 入门文章一样,你可以逐渐的完成每一步,也可以跳过一些你熟悉的步骤。不管怎样,最后你都将得到一份可执行的代码。
你可以往下查看配置工程
如果已经熟悉跳过一些基本步骤你可以这样做
- 下载 并解压本指南的代码库, 或者使用Git克隆: git clone https://github.com/spring-guides/gs-yarn-batch-restart.git
- 使用以下命令跳转到目录
$ cd gs-yarn-batch-restart/initial
完成上述步骤,你可以根据代码检查结果gs-yarn-batch-restart/complete
<h2 id=”set_up”>配置工程 </h2>
首先配置脚本,可以使用Gradle工具构建Spring app。如果不熟悉请参考Building Java Projects with Gradle.
创建工程目录结构
在你选择的项目目录中,创建以下子目录结构
├── gs-yarn-batch-restart-appmaster
│ └── src
│ └── main
│ ├── resources
│ └── java
│ └── hello
│ └── appmaster
├── gs-yarn-batch-restart-container
│ └── src
│ └── main
│ ├── resources
│ └── java
│ └── hello
│ └── container
├── gs-yarn-batch-restart-client
│ └── src
│ └── main
│ ├── resources
│ └── java
│ └── hello
│ └── client
└── gs-yarn-batch-restart-dist
└── src
└── test
└── java
└── hello
mkdir -p gs-yarn-batch-restart-appmaster/src/main/resources
mkdir -p gs-yarn-batch-restart-appmaster/src/main/java/hello/appmaster
mkdir -p gs-yarn-batch-restart-container/src/main/resources
mkdir -p gs-yarn-batch-restart-container/src/main/java/hello/container
mkdir -p gs-yarn-batch-restart-client/src/main/resources
mkdir -p gs-yarn-batch-restart-client/src/main/java/hello/client
mkdir -p gs-yarn-batch-restart-dist/src/test/java/hello
创建一个gradle编译文件
完整文件参考the initial Gradle build file. 可以使用Spring Tool Suite (STS)工具直接导入源码.
build.gradle
buildscript {
repositories {
maven { url "http://repo.spring.io/libs-release" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.3.3.RELEASE")
}
}
allprojects {
apply plugin: 'base'
}
subprojects { subproject ->
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
version = '0.1.0'
repositories {
mavenCentral()
maven { url "http://repo.spring.io/libs-release" }
}
dependencies {
compile("org.springframework.data:spring-yarn-boot:2.4.0.RELEASE")
}
task copyJars(type: Copy) {
from "$buildDir/libs"
into "$rootDir/gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/"
include "**/*.jar"
}
configurations {
compile.exclude group: "org.slf4j", module: "slf4j-log4j12"
runtime.exclude group: "org.slf4j", module: "slf4j-log4j12"
}
assemble.doLast {copyJars.execute()}
}
project('gs-yarn-batch-restart-client') {
apply plugin: 'spring-boot'
}
project('gs-yarn-batch-restart-appmaster') {
apply plugin: 'spring-boot'
dependencies {
compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
runtime("org.hsqldb:hsqldb:2.3.1")
runtime("commons-dbcp:commons-dbcp:1.2.2")
}
}
project('gs-yarn-batch-restart-container') {
apply plugin: 'spring-boot'
dependencies {
compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
}
}
project('gs-yarn-batch-restart-dist') {
dependencies {
compile project(":gs-yarn-batch-restart-client")
compile project(":gs-yarn-batch-restart-appmaster")
compile project(":gs-yarn-batch-restart-container")
testCompile("org.hsqldb:hsqldb:2.3.1")
testCompile("org.springframework.data:spring-yarn-boot-test:2.4.0.RELEASE")
testCompile("org.hamcrest:hamcrest-core:1.2.1")
testCompile("org.hamcrest:hamcrest-library:1.2.1")
}
test.dependsOn(':gs-yarn-batch-restart-client:assemble')
test.dependsOn(':gs-yarn-batch-restart-appmaster:assemble')
test.dependsOn(':gs-yarn-batch-restart-container:assemble')
clean.doLast {ant.delete(dir: "target")}
}
task wrapper(type: Wrapper) {
gradleVersion = '1.11'
}
以上的gradle文件只是简单的创建类三个jar包— client, appmaster and container, 每一个类都有相应的功能角色. Spring Boot’s gradle 插件会从新把这些jars打包成可执行的jar。
还需要 settings.gradle
文件定义其子工程.
settings.gradle
include 'gs-yarn-batch-restart-client','gs-yarn-batch-restart-appmaster','gs-yarn-batch-restart-container','gs-yarn-batch-restart-dist'
<h2 id=”rbs”>创建一个Remote Batch Step </h2>
第一、创建一个HdfsTasklet
类.
gs-yarn-batch-restart-container/src/main/java/hello/container/HdfsTasklet.java
package hello.container;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.fs.FsShell;
public class HdfsTasklet implements Tasklet {
private static final Log log = LogFactory.getLog(HdfsTasklet.class);
@Autowired
private Configuration configuration;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
String fileName = chunkContext.getStepContext().getStepName().replaceAll("\\W", "");
FsShell shell = new FsShell(configuration);
boolean exists = shell.test("/tmp/" + fileName);
shell.close();
if (exists) {
log.info("File /tmp/" + fileName + " exist");
log.info("Hello from HdfsTasklet ok");
return RepeatStatus.FINISHED;
} else {
log.info("Hello from HdfsTasklet fail");
throw new RuntimeException("File /tmp/" + fileName + " does't exist");
}
}
}
- 要使用
FsShell
首先🉐️要使用注解@AutoWired
Hadoop’sConfiguration
。 - 文件不存在HDFS的时候
RuntimeException
类抛异常。Tasklet
类实现了这个功能。如果文件不存在则会从Tasklet
返回FINISHED
值。 - 要了解文件名如何传递的,需要访问类似
remoteStep1:partition0
这样的stepname
. 然后过滤掉无关的字符。
第二、 创建 ContainerApplication
类.
gs-yarn-batch-restart-container/src/main/java/hello/container/ContainerApplication.java
package hello.container;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnRemoteBatchProcessing;
@Configuration
@EnableAutoConfiguration(exclude = { BatchAutoConfiguration.class })
@EnableYarnRemoteBatchProcessing
public class ContainerApplication {
@Autowired
private StepBuilderFactory steps;
@Bean
public Step remoteStep1() throws Exception {
return steps
.get("remoteStep1")
.tasklet(tasklet())
.build();
}
@Bean
public Step remoteStep2() throws Exception {
return steps
.get("remoteStep2")
.tasklet(tasklet())
.build();
}
@Bean
public Tasklet tasklet() {
return new HdfsTasklet();
}
public static void main(String[] args) {
SpringApplication.run(ContainerApplication.class, args);
}
}
- 简单创建了两个
step
分别命名为remoteStep1
和remoteStep2
。还附加了HdfsTasklet
到这个两个step
上
第三、 为container app 创建 application.yml
文件.
gs-yarn-batch-restart-container/src/main/resources/application.yml
spring:
batch:
job:
enabled: false
hadoop:
fsUri: hdfs://localhost:8020
resourceManagerHost: localhost
yarn:
batch:
enabled: true
- 在Spring Boot core可以通过禁用job的功能来使用YARN的相关特性
- 为HDFS添加配置文件,当然要访问集群可以自定义配置文件
- 在YARN上用过启用
spring.yarn.batch.enable
属性来启用批处理功能.
Create a Batch Job
创建一个AppmasterApplication
类.
gs-yarn-batch-restart-appmaster/src/main/java/hello/appmaster/AppmasterApplication.java
package hello.appmaster;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.SimplePartitioner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnBatchProcessing;
import org.springframework.yarn.batch.partition.StaticPartitionHandler;
@Configuration
@EnableAutoConfiguration
@EnableYarnBatchProcessing
public class AppmasterApplication {
@Autowired
private JobBuilderFactory jobFactory;
@Autowired
private StepBuilderFactory stepFactory;
@Bean
public Job job() throws Exception {
return jobFactory.get("job")
.incrementer(jobParametersIncrementer())
.start(master1())
.next(master2())
.build();
}
@Bean
public JobParametersIncrementer jobParametersIncrementer() {
return new RunIdIncrementer();
}
@Bean
protected Step master1() throws Exception {
return stepFactory.get("master1")
.partitioner("remoteStep1", partitioner())
.partitionHandler(partitionHandler1())
.build();
}
@Bean
protected Step master2() throws Exception {
return stepFactory.get("master2")
.partitioner("remoteStep2", partitioner())
.partitionHandler(partitionHandler2())
.build();
}
@Bean
protected Partitioner partitioner() {
return new SimplePartitioner();
}
@Bean
protected PartitionHandler partitionHandler1() {
StaticPartitionHandler handler = new StaticPartitionHandler();
handler.setStepName("remoteStep1");
handler.setGridSize(2);
return handler;
}
@Bean
protected PartitionHandler partitionHandler2() {
StaticPartitionHandler handler = new StaticPartitionHandler();
handler.setStepName("remoteStep2");
handler.setGridSize(2);
return handler;
}
public static void main(String[] args) {
SpringApplication.run(AppmasterApplication.class, args);
}
}
- 简单起见,分别创建
master1
和master2
两个master steps。然后配置这两个steps,以便在YARN上进行partitioned,并将grid的大小设置为2
为appmaster创建一个application.yml
文件.
gs-yarn-batch-restart-appmaster/src/main/resources/application.yml
spring:
batch:
job:
enabled: false
datasource:
url: jdbc:hsqldb:hsql://localhost:9001/testdb
driverClassName: org.hsqldb.jdbcDriver
hadoop:
fsUri: hdfs://localhost:8020
resourceManagerHost: localhost
yarn:
appName: gs-yarn-batch-restart
applicationDir: /app/gs-yarn-batch-restart/
batch:
enabled: true
name: job
jobs:
- name: job
enabled: true
next: true
failNext: false
restart: true
failRestart: false
appmaster:
keepContextAlive: false
launchcontext:
archiveFile: gs-yarn-batch-restart-container-0.1.0.jar
- 同样地,在Spring Boot core可以通过禁用job的功能来使用YARN的相关特性
- 为HDFS添加配置文件,当然要访问集群可以自定义配置文件
- 在YARN上用过启用
spring.yarn.batch.enable
属性来启用批处理功能 - 给job取个名称以便能够自动运行
- 启用job的命名功能,并且允许job下一次运行的时候,即使没有自定自增的参数也不会运行失败
- 启用job自动重启功能,使得如果job重启失败后job不会出现失败
创建一个Yarn Client
现在创建一个ClientApplication
类.
gs-yarn-batch-restart-client/src/main/java/hello/client/ClientApplication.java
package hello.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.yarn.client.YarnClient;
@EnableAutoConfiguration
public class ClientApplication {
public static void main(String[] args) {
SpringApplication.run(ClientApplication.class, args)
.getBean(YarnClient.class)
.submitApplication();
}
}
- @EnableAutoConfiguration 它能使Spring Boot添加各类属性配置
- auto-configuration 能使得Spring YARN 组件像一般的Spring Boot app 一样运行 这个
main()
方法使用了Spring Boot的SpringApplication.run()
方法启动application. From there, we simply request a bean of typeYarnClient
and execute itssubmitApplication()
method. What happens next depends on application configuration, which we go through later in this guide. Did you notice that there wasn’t a single line of XML?
为client创建一个application.yml
文件
gs-yarn-batch-restart-client/src/main/resources/application.yml
spring:
hadoop:
fsUri: hdfs://localhost:8020
resourceManagerHost: localhost
yarn:
appName: gs-yarn-batch-restart
applicationDir: /app/gs-yarn-batch-restart/
client:
files:
- "file:target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-container-0.1.0.jar"
- "file:target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-appmaster-0.1.0.jar"
launchcontext:
archiveFile: gs-yarn-batch-restart-appmaster-0.1.0.jar
- 在这个文件里配置了应用程序提交时需要的元数据信息
编译程序
简单的执行gradle clean和build命令
./gradlew clean build
跳过测试命令:
./gradlew clean build -x test
执行maven的clean和package命令
mvn clean package
跳过单元测试使用以下命令:
mvn clean package -DskipTests=true
gradle编译成功后会有以下三个jar包
gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-client-0.1.0.jar
gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-appmaster-0.1.0.jar
gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-container-0.1.0.jar
运行程序流程
现在准备将已经编译和打包程序在Hadoop YARN上执行。另外需要将Spring Batch job的状态保存在数据库里。HSQL
实例可以很方便的使用内存模型运行程序。在终端运行以下命令:
$ cd db/
$ unzip hsqldb-2.3.1.zip
$ cd hsqldb-2.3.1/hsqldb/data/
$ java -cp ../lib/hsqldb.jar org.hsqldb.server.Server --database.0 mem:testdb --dbname.0 testdb --silent false --trace true
备注: 可以下载HSQL源码压缩文件http://sourceforge.net/projects/hsqldb/files/hsqldb/hsqldb_2_3/,如果想从源码编译HSQL jar包。
配置程序以便运行,首先要在HDFS上创建两个空文件/tmp/remoteStep1partition0
和/tmp/remoteStep1partition1
$ hdfs dfs -touchz /tmp/remoteStep1partition0
$ hdfs dfs -touchz /tmp/remoteStep1partition1
运行程序
$ cd gs-yarn-batch-restart-dist
$ java -jar target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-client-0.1.0.jar
可以在YARN资源管理器看到程序是处于失败状态,因为第二阶段的partitioned steps导致的,在HDFS上创建/tmp/remoteStep2partition0
和/tmp/remoteStep2partition1
文件:
$ hdfs dfs -touchz /tmp/remoteStep2partition0
$ hdfs dfs -touchz /tmp/remoteStep2partition1
重新运行
$ java -jar target/dist/gs-yarn-batch-restart-client-0.1.0.jar
现在程序已经运行成功,并且只执行之前失败的partitioned steps
测试程序
这个类可以作为执行这个程序的单元测试类,而且不需要在Hadoop集群上运行
gs-yarn-batch-restart-dist/src/test/java/hello/AppIT.java
package hello;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import hello.client.ClientApplication;
import java.io.File;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.hsqldb.Server;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.io.Resource;
import org.springframework.data.hadoop.fs.FsShell;
import org.springframework.yarn.boot.test.junit.AbstractBootYarnClusterTests;
import org.springframework.yarn.test.context.MiniYarnClusterTest;
import org.springframework.yarn.test.junit.ApplicationInfo;
import org.springframework.yarn.test.support.ContainerLogUtils;
@MiniYarnClusterTest
public class AppIT extends AbstractBootYarnClusterTests {
@Test
public void testApp() throws Exception {
FsShell shell = new FsShell(getConfiguration());
shell.touchz("/tmp/remoteStep1partition0");
shell.touchz("/tmp/remoteStep1partition1");
ApplicationInfo info1 = submitApplicationAndWait(ClientApplication.class, new String[0], 4, TimeUnit.MINUTES);
assertThat(info1.getYarnApplicationState(), is(YarnApplicationState.FINISHED));
assertLogs(ContainerLogUtils.queryContainerLogs(getYarnCluster(), info1.getApplicationId()), 10, 2, 2);
shell.touchz("/tmp/remoteStep2partition0");
shell.touchz("/tmp/remoteStep2partition1");
shell.close();
ApplicationInfo info2 = submitApplicationAndWait(ClientApplication.class, new String[0], 4, TimeUnit.MINUTES);
assertThat(info2.getYarnApplicationState(), is(YarnApplicationState.FINISHED));
assertLogs(ContainerLogUtils.queryContainerLogs(getYarnCluster(), info2.getApplicationId()), 6, 2, 0);
}
private void assertLogs(List<Resource> resources, int numResources, int numOk, int numFail) throws Exception {
int ok = 0;
int fail = 0;
assertThat(resources, notNullValue());
assertThat(resources.size(), is(numResources));
for (Resource res : resources) {
File file = res.getFile();
String content = ContainerLogUtils.getFileContent(file);
if (file.getName().endsWith("stdout")) {
assertThat(file.length(), greaterThan(0l));
if (file.getName().equals("Container.stdout")) {
assertThat(content, containsString("Hello from HdfsTasklet"));
if (content.contains("Hello from HdfsTasklet ok")) {
ok++;
}
if (content.contains("Hello from HdfsTasklet fail")) {
fail++;
}
}
} else if (file.getName().endsWith("stderr")) {
assertThat("stderr file is not empty: " + content, file.length(), is(0l));
}
}
assertThat("Failed to find ok's from logs", ok, is(numOk));
assertThat("Failed to find fail's from logs", fail, is(numFail));
}
private Server server = null;
@Before
public void startDb() {
if (server == null) {
server = new Server();
server.setSilent(false);
server.setTrace(true);
server.setDatabaseName(0, "testdb");
server.setDatabasePath(0, "mem:testdb");
server.start();
}
}
@After
public void stopDb() {
if (server != null) {
server.stop();
server = null;
}
}
}
总结
恭喜! 你已经开发一个 Spring YARN 程序
其他类似内容
以下有用的相关链接:
- Simple YARN Application
- Simple Single Project YARN Application
- Batch YARN Application
- Building Spring YARN Projects with Maven
- Building Spring YARN Projects with Gradle
本文由spring4all.com翻译小分队创作,采用知识共享-署名-非商业性使用-相同方式共享 4.0 国际 许可 协议进行许可。
可重启的YARN批处理程序
原文:Restartable Batch YARN Application
译者:UniKrau
校对:dyc87112
导言
本指南不仅让你知道一个Spring Batch job的执行过程以及在Hadoop YARN上的partitioned steps,而且模拟了在分区步骤过程出错的过程。凸显重启job的完整性。
你将学会什么
使用Spring Hadoop and Spring Boot构建一个简单Hadoop YARN程序。它含有一个job,当这个job在YARN partitioned steps执行完毕的时候产生两master steps。同样也模拟了每个step出错的情况,所以能知道job的重启点是在step运行失败的地方开始执行
你要准备什么
- 大约15分钟
- 文本编辑器或者是IDE
- JDK 1.6以上
- Gradle 2.3+或者Maven 3.0+
- 你也可以直接将代码导入到本地开发工具中:
- Spring Tool Suite (STS)
- IntelliJ IDEA
- 使用本地单实例模式,需要Hadoop 2.2.0以上的版本
测试样例程序没有必要在Hadoop实例上运行.
怎样完成这个指南
像大多数Spring 入门文章一样,你可以逐渐的完成每一步,也可以跳过一些你熟悉的步骤。不管怎样,最后你都将得到一份可执行的代码。
你可以往下查看配置工程
如果已经熟悉跳过一些基本步骤你可以这样做
- 下载 并解压本指南的代码库, 或者使用Git克隆: git clone https://github.com/spring-guides/gs-yarn-batch-restart.git
- 使用以下命令跳转到目录
$ cd gs-yarn-batch-restart/initial
完成上述步骤,你可以根据代码检查结果gs-yarn-batch-restart/complete
<h2 id=”set_up”>配置工程 </h2>
首先配置脚本,可以使用Gradle工具构建Spring app。如果不熟悉请参考Building Java Projects with Gradle.
创建工程目录结构
在你选择的项目目录中,创建以下子目录结构
├── gs-yarn-batch-restart-appmaster
│ └── src
│ └── main
│ ├── resources
│ └── java
│ └── hello
│ └── appmaster
├── gs-yarn-batch-restart-container
│ └── src
│ └── main
│ ├── resources
│ └── java
│ └── hello
│ └── container
├── gs-yarn-batch-restart-client
│ └── src
│ └── main
│ ├── resources
│ └── java
│ └── hello
│ └── client
└── gs-yarn-batch-restart-dist
└── src
└── test
└── java
└── hello
mkdir -p gs-yarn-batch-restart-appmaster/src/main/resources
mkdir -p gs-yarn-batch-restart-appmaster/src/main/java/hello/appmaster
mkdir -p gs-yarn-batch-restart-container/src/main/resources
mkdir -p gs-yarn-batch-restart-container/src/main/java/hello/container
mkdir -p gs-yarn-batch-restart-client/src/main/resources
mkdir -p gs-yarn-batch-restart-client/src/main/java/hello/client
mkdir -p gs-yarn-batch-restart-dist/src/test/java/hello
创建一个gradle编译文件
完整文件参考the initial Gradle build file. 可以使用Spring Tool Suite (STS)工具直接导入源码.
build.gradle
buildscript {
repositories {
maven { url "http://repo.spring.io/libs-release" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.3.3.RELEASE")
}
}
allprojects {
apply plugin: 'base'
}
subprojects { subproject ->
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
version = '0.1.0'
repositories {
mavenCentral()
maven { url "http://repo.spring.io/libs-release" }
}
dependencies {
compile("org.springframework.data:spring-yarn-boot:2.4.0.RELEASE")
}
task copyJars(type: Copy) {
from "$buildDir/libs"
into "$rootDir/gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/"
include "**/*.jar"
}
configurations {
compile.exclude group: "org.slf4j", module: "slf4j-log4j12"
runtime.exclude group: "org.slf4j", module: "slf4j-log4j12"
}
assemble.doLast {copyJars.execute()}
}
project('gs-yarn-batch-restart-client') {
apply plugin: 'spring-boot'
}
project('gs-yarn-batch-restart-appmaster') {
apply plugin: 'spring-boot'
dependencies {
compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
runtime("org.hsqldb:hsqldb:2.3.1")
runtime("commons-dbcp:commons-dbcp:1.2.2")
}
}
project('gs-yarn-batch-restart-container') {
apply plugin: 'spring-boot'
dependencies {
compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
}
}
project('gs-yarn-batch-restart-dist') {
dependencies {
compile project(":gs-yarn-batch-restart-client")
compile project(":gs-yarn-batch-restart-appmaster")
compile project(":gs-yarn-batch-restart-container")
testCompile("org.hsqldb:hsqldb:2.3.1")
testCompile("org.springframework.data:spring-yarn-boot-test:2.4.0.RELEASE")
testCompile("org.hamcrest:hamcrest-core:1.2.1")
testCompile("org.hamcrest:hamcrest-library:1.2.1")
}
test.dependsOn(':gs-yarn-batch-restart-client:assemble')
test.dependsOn(':gs-yarn-batch-restart-appmaster:assemble')
test.dependsOn(':gs-yarn-batch-restart-container:assemble')
clean.doLast {ant.delete(dir: "target")}
}
task wrapper(type: Wrapper) {
gradleVersion = '1.11'
}
以上的gradle文件只是简单的创建类三个jar包— client, appmaster and container, 每一个类都有相应的功能角色. Spring Boot’s gradle 插件会从新把这些jars打包成可执行的jar。
还需要 settings.gradle
文件定义其子工程.
settings.gradle
include 'gs-yarn-batch-restart-client','gs-yarn-batch-restart-appmaster','gs-yarn-batch-restart-container','gs-yarn-batch-restart-dist'
<h2 id=”rbs”>创建一个Remote Batch Step </h2>
第一、创建一个HdfsTasklet
类.
gs-yarn-batch-restart-container/src/main/java/hello/container/HdfsTasklet.java
package hello.container;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.fs.FsShell;
public class HdfsTasklet implements Tasklet {
private static final Log log = LogFactory.getLog(HdfsTasklet.class);
@Autowired
private Configuration configuration;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
String fileName = chunkContext.getStepContext().getStepName().replaceAll("\\W", "");
FsShell shell = new FsShell(configuration);
boolean exists = shell.test("/tmp/" + fileName);
shell.close();
if (exists) {
log.info("File /tmp/" + fileName + " exist");
log.info("Hello from HdfsTasklet ok");
return RepeatStatus.FINISHED;
} else {
log.info("Hello from HdfsTasklet fail");
throw new RuntimeException("File /tmp/" + fileName + " does't exist");
}
}
}
- 要使用
FsShell
首先🉐️要使用注解@AutoWired
Hadoop’sConfiguration
。 - 文件不存在HDFS的时候
RuntimeException
类抛异常。Tasklet
类实现了这个功能。如果文件不存在则会从Tasklet
返回FINISHED
值。 - 要了解文件名如何传递的,需要访问类似
remoteStep1:partition0
这样的stepname
. 然后过滤掉无关的字符。
第二、 创建 ContainerApplication
类.
gs-yarn-batch-restart-container/src/main/java/hello/container/ContainerApplication.java
package hello.container;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnRemoteBatchProcessing;
@Configuration
@EnableAutoConfiguration(exclude = { BatchAutoConfiguration.class })
@EnableYarnRemoteBatchProcessing
public class ContainerApplication {
@Autowired
private StepBuilderFactory steps;
@Bean
public Step remoteStep1() throws Exception {
return steps
.get("remoteStep1")
.tasklet(tasklet())
.build();
}
@Bean
public Step remoteStep2() throws Exception {
return steps
.get("remoteStep2")
.tasklet(tasklet())
.build();
}
@Bean
public Tasklet tasklet() {
return new HdfsTasklet();
}
public static void main(String[] args) {
SpringApplication.run(ContainerApplication.class, args);
}
}
- 简单创建了两个
step
分别命名为remoteStep1
和remoteStep2
。还附加了HdfsTasklet
到这个两个step
上
第三、 为container app 创建 application.yml
文件.
gs-yarn-batch-restart-container/src/main/resources/application.yml
spring:
batch:
job:
enabled: false
hadoop:
fsUri: hdfs://localhost:8020
resourceManagerHost: localhost
yarn:
batch:
enabled: true
- 在Spring Boot core可以通过禁用job的功能来使用YARN的相关特性
- 为HDFS添加配置文件,当然要访问集群可以自定义配置文件
- 在YARN上用过启用
spring.yarn.batch.enable
属性来启用批处理功能.
Create a Batch Job
创建一个AppmasterApplication
类.
gs-yarn-batch-restart-appmaster/src/main/java/hello/appmaster/AppmasterApplication.java
package hello.appmaster;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.SimplePartitioner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnBatchProcessing;
import org.springframework.yarn.batch.partition.StaticPartitionHandler;
@Configuration
@EnableAutoConfiguration
@EnableYarnBatchProcessing
public class AppmasterApplication {
@Autowired
private JobBuilderFactory jobFactory;
@Autowired
private StepBuilderFactory stepFactory;
@Bean
public Job job() throws Exception {
return jobFactory.get("job")
.incrementer(jobParametersIncrementer())
.start(master1())
.next(master2())
.build();
}
@Bean
public JobParametersIncrementer jobParametersIncrementer() {
return new RunIdIncrementer();
}
@Bean
protected Step master1() throws Exception {
return stepFactory.get("master1")
.partitioner("remoteStep1", partitioner())
.partitionHandler(partitionHandler1())
.build();
}
@Bean
protected Step master2() throws Exception {
return stepFactory.get("master2")
.partitioner("remoteStep2", partitioner())
.partitionHandler(partitionHandler2())
.build();
}
@Bean
protected Partitioner partitioner() {
return new SimplePartitioner();
}
@Bean
protected PartitionHandler partitionHandler1() {
StaticPartitionHandler handler = new StaticPartitionHandler();
handler.setStepName("remoteStep1");
handler.setGridSize(2);
return handler;
}
@Bean
protected PartitionHandler partitionHandler2() {
StaticPartitionHandler handler = new StaticPartitionHandler();
handler.setStepName("remoteStep2");
handler.setGridSize(2);
return handler;
}
public static void main(String[] args) {
SpringApplication.run(AppmasterApplication.class, args);
}
}
- 简单起见,分别创建
master1
和master2
两个master steps。然后配置这两个steps,以便在YARN上进行partitioned,并将grid的大小设置为2
为appmaster创建一个application.yml
文件.
gs-yarn-batch-restart-appmaster/src/main/resources/application.yml
spring:
batch:
job:
enabled: false
datasource:
url: jdbc:hsqldb:hsql://localhost:9001/testdb
driverClassName: org.hsqldb.jdbcDriver
hadoop:
fsUri: hdfs://localhost:8020
resourceManagerHost: localhost
yarn:
appName: gs-yarn-batch-restart
applicationDir: /app/gs-yarn-batch-restart/
batch:
enabled: true
name: job
jobs:
- name: job
enabled: true
next: true
failNext: false
restart: true
failRestart: false
appmaster:
keepContextAlive: false
launchcontext:
archiveFile: gs-yarn-batch-restart-container-0.1.0.jar
- 同样地,在Spring Boot core可以通过禁用job的功能来使用YARN的相关特性
- 为HDFS添加配置文件,当然要访问集群可以自定义配置文件
- 在YARN上用过启用
spring.yarn.batch.enable
属性来启用批处理功能 - 给job取个名称以便能够自动运行
- 启用job的命名功能,并且允许job下一次运行的时候,即使没有自定自增的参数也不会运行失败
- 启用job自动重启功能,使得如果job重启失败后job不会出现失败
创建一个Yarn Client
现在创建一个ClientApplication
类.
gs-yarn-batch-restart-client/src/main/java/hello/client/ClientApplication.java
package hello.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.yarn.client.YarnClient;
@EnableAutoConfiguration
public class ClientApplication {
public static void main(String[] args) {
SpringApplication.run(ClientApplication.class, args)
.getBean(YarnClient.class)
.submitApplication();
}
}
- @EnableAutoConfiguration 它能使Spring Boot添加各类属性配置
- auto-configuration 能使得Spring YARN 组件像一般的Spring Boot app 一样运行 这个
main()
方法使用了Spring Boot的SpringApplication.run()
方法启动application. From there, we simply request a bean of typeYarnClient
and execute itssubmitApplication()
method. What happens next depends on application configuration, which we go through later in this guide. Did you notice that there wasn’t a single line of XML?
为client创建一个application.yml
文件
gs-yarn-batch-restart-client/src/main/resources/application.yml
spring:
hadoop:
fsUri: hdfs://localhost:8020
resourceManagerHost: localhost
yarn:
appName: gs-yarn-batch-restart
applicationDir: /app/gs-yarn-batch-restart/
client:
files:
- "file:target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-container-0.1.0.jar"
- "file:target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-appmaster-0.1.0.jar"
launchcontext:
archiveFile: gs-yarn-batch-restart-appmaster-0.1.0.jar
- 在这个文件里配置了应用程序提交时需要的元数据信息
编译程序
简单的执行gradle clean和build命令
./gradlew clean build
跳过测试命令:
./gradlew clean build -x test
执行maven的clean和package命令
mvn clean package
跳过单元测试使用以下命令:
mvn clean package -DskipTests=true
gradle编译成功后会有以下三个jar包
gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-client-0.1.0.jar
gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-appmaster-0.1.0.jar
gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-container-0.1.0.jar
运行程序流程
现在准备将已经编译和打包程序在Hadoop YARN上执行。另外需要将Spring Batch job的状态保存在数据库里。HSQL
实例可以很方便的使用内存模型运行程序。在终端运行以下命令:
$ cd db/
$ unzip hsqldb-2.3.1.zip
$ cd hsqldb-2.3.1/hsqldb/data/
$ java -cp ../lib/hsqldb.jar org.hsqldb.server.Server --database.0 mem:testdb --dbname.0 testdb --silent false --trace true
备注: 可以下载HSQL源码压缩文件http://sourceforge.net/projects/hsqldb/files/hsqldb/hsqldb_2_3/,如果想从源码编译HSQL jar包。
配置程序以便运行,首先要在HDFS上创建两个空文件/tmp/remoteStep1partition0
和/tmp/remoteStep1partition1
$ hdfs dfs -touchz /tmp/remoteStep1partition0
$ hdfs dfs -touchz /tmp/remoteStep1partition1
运行程序
$ cd gs-yarn-batch-restart-dist
$ java -jar target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-client-0.1.0.jar
可以在YARN资源管理器看到程序是处于失败状态,因为第二阶段的partitioned steps导致的,在HDFS上创建/tmp/remoteStep2partition0
和/tmp/remoteStep2partition1
文件:
$ hdfs dfs -touchz /tmp/remoteStep2partition0
$ hdfs dfs -touchz /tmp/remoteStep2partition1
重新运行
$ java -jar target/dist/gs-yarn-batch-restart-client-0.1.0.jar
现在程序已经运行成功,并且只执行之前失败的partitioned steps
测试程序
这个类可以作为执行这个程序的单元测试类,而且不需要在Hadoop集群上运行
gs-yarn-batch-restart-dist/src/test/java/hello/AppIT.java
package hello;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import hello.client.ClientApplication;
import java.io.File;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.hsqldb.Server;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.io.Resource;
import org.springframework.data.hadoop.fs.FsShell;
import org.springframework.yarn.boot.test.junit.AbstractBootYarnClusterTests;
import org.springframework.yarn.test.context.MiniYarnClusterTest;
import org.springframework.yarn.test.junit.ApplicationInfo;
import org.springframework.yarn.test.support.ContainerLogUtils;
@MiniYarnClusterTest
public class AppIT extends AbstractBootYarnClusterTests {
@Test
public void testApp() throws Exception {
FsShell shell = new FsShell(getConfiguration());
shell.touchz("/tmp/remoteStep1partition0");
shell.touchz("/tmp/remoteStep1partition1");
ApplicationInfo info1 = submitApplicationAndWait(ClientApplication.class, new String[0], 4, TimeUnit.MINUTES);
assertThat(info1.getYarnApplicationState(), is(YarnApplicationState.FINISHED));
assertLogs(ContainerLogUtils.queryContainerLogs(getYarnCluster(), info1.getApplicationId()), 10, 2, 2);
shell.touchz("/tmp/remoteStep2partition0");
shell.touchz("/tmp/remoteStep2partition1");
shell.close();
ApplicationInfo info2 = submitApplicationAndWait(ClientApplication.class, new String[0], 4, TimeUnit.MINUTES);
assertThat(info2.getYarnApplicationState(), is(YarnApplicationState.FINISHED));
assertLogs(ContainerLogUtils.queryContainerLogs(getYarnCluster(), info2.getApplicationId()), 6, 2, 0);
}
private void assertLogs(List<Resource> resources, int numResources, int numOk, int numFail) throws Exception {
int ok = 0;
int fail = 0;
assertThat(resources, notNullValue());
assertThat(resources.size(), is(numResources));
for (Resource res : resources) {
File file = res.getFile();
String content = ContainerLogUtils.getFileContent(file);
if (file.getName().endsWith("stdout")) {
assertThat(file.length(), greaterThan(0l));
if (file.getName().equals("Container.stdout")) {
assertThat(content, containsString("Hello from HdfsTasklet"));
if (content.contains("Hello from HdfsTasklet ok")) {
ok++;
}
if (content.contains("Hello from HdfsTasklet fail")) {
fail++;
}
}
} else if (file.getName().endsWith("stderr")) {
assertThat("stderr file is not empty: " + content, file.length(), is(0l));
}
}
assertThat("Failed to find ok's from logs", ok, is(numOk));
assertThat("Failed to find fail's from logs", fail, is(numFail));
}
private Server server = null;
@Before
public void startDb() {
if (server == null) {
server = new Server();
server.setSilent(false);
server.setTrace(true);
server.setDatabaseName(0, "testdb");
server.setDatabasePath(0, "mem:testdb");
server.start();
}
}
@After
public void stopDb() {
if (server != null) {
server.stop();
server = null;
}
}
}
总结
恭喜! 你已经开发一个 Spring YARN 程序
其他类似内容
以下有用的相关链接:
- Simple YARN Application
- Simple Single Project YARN Application
- Batch YARN Application
- Building Spring YARN Projects with Maven
- Building Spring YARN Projects with Gradle
本文由spring4all.com翻译小分队创作,采用知识共享-署名-非商业性使用-相同方式共享 4.0 国际 许可 协议进行许可。
暂无评论内容