1 前言

欢迎访问南瓜慢说 www.pkslow.com获取更多精彩文章!

Spring相关文章:Springboot-Cloud

Spring Batch远程分区对于大量数据的处理非常擅长,它的实现有多种方式,如本地Jar包模式MQ模式Kubernetes模式。这三种模式的如下:

(1)本地Jar包模式:分区处理的worker为一个Java进程,从jar包启动,通过jvm参数和数据库传递参数;官方提供示例代码。

(2)MQ模式worker是一个常驻进程,ManagerWorker通过消息队列来传递参数;网上有不少相关示例代码。

(3)Kubernetes模式workerK8s中的PodManager直接启动Pod来处理;网上并没有找到任何示例代码。

本文将通过代码来讲解第一种模式(本地Jar包模式),其它后续再介绍。

建议先看下面文章了解一下:

Spring Batch入门:通过例子讲解Spring Batch入门,优秀的批处理框架

Spring Batch并行处理介绍:大量数据也不在话下,Spring Batch并行处理四种模式初探

2 代码讲解

本文代码中,ManagerWorker是放在一起的,在同一个项目里,也只会打一个jar包而已;我们通过profile来区别是manager还是worker,也就是通过Spring Profile实现一份代码,两份逻辑。实际上也可以拆成两份代码,但放一起更方便测试,而且代码量不大,就没有必要了。

2.1 项目准备

2.1.1 数据库

首先我们需要准备一个数据库,因为ManagerWorker都需要同步状态到DB上,不能直接使用嵌入式的内存数据库了,需要一个外部可共同访问的数据库。这里我使用的是H2 Database,安装可参考:把H2数据库从jar包部署到Kubernetes,并解决Ingress不支持TCP的问题

2.1.2 引入依赖

maven引入依赖如下所示:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-task</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency> <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-deployer-local</artifactId>
<version>2.4.1</version>
</dependency> <dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>

spring-cloud-deployer-local用于部署和启动worker,非常关键;其它就是Spring BatchTask相关的依赖;以及数据库连接。

2.1.3 主类入口

Springboot的主类入口如下:

@EnableTask
@SpringBootApplication
@EnableBatchProcessing
public class PkslowRemotePartitionJar {
public static void main(String[] args) {
SpringApplication.run(PkslowRemotePartitionJar.class, args);
}
}

Springboot的基础上,添加了Spring BatchSpring Cloud Task的支持。

2.2 关键代码编写

前面的数据库搭建和其它代码没有太多可讲的,接下来就开始关键代码的编写。

2.2.1 分区管理Partitioner

Partitioner是远程分区中的核心bean,它定义了分成多少个区、怎么分区,要把什么变量传递给worker。它会返回一组<分区名,执行上下文>的键值对,即返回Map<String, ExecutionContext>。把要传递给worker的变量放在ExecutionContext中去,支持多种类型的变量,如Stringintlong等。实际上,我们不建议通过ExecutionContext来传递太多数据;可以传递一些标识或主键,然后worker自己去拿数据即可。

具体代码如下:

private static final int GRID_SIZE = 4;
@Bean
public Partitioner partitioner() {
return new Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> partitions = new HashMap<>(gridSize); for (int i = 0; i < GRID_SIZE; i++) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put("partitionNumber", i);
partitions.put("partition" + i, executionContext);
} return partitions;
}
};
}

上面分成4个区,程序会启动4个worker来处理;给worker传递的参数是partitionNumber

2.2.2 分区处理器PartitionHandler

PartitionHandler也是核心的bean,它决定了怎么去启动worker,给它们传递什么jvm参数(跟之前的ExecutionContext传递不一样)。

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception { Resource resource = this.resourceLoader.getResource(workerResource); DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository); List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false"); partitionHandler
.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler
.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PkslowWorkerJob"); return partitionHandler;
}

上面代码中:

resourceworkerjar包地址,表示将启动该程序;

workerStepworker将要执行的step

commandLineArgs定义了启动workerjvm参数,如--spring.profiles.active=worker

environmentmanager的系统环境变量,可以传递给worker,当然也可以选择不传递;

MaxWorkers是最多能同时启动多少个worker,类似于线程池大小;设置为2,表示最多同时有2个worker来处理4个分区。

2.2.3 Manager和Worker的Batch定义

完成了分区相关的代码,剩下的就只是如何定义ManagerWorker的业务代码了。

Manager作为管理者,不用太多业务逻辑,代码如下:

@Bean
@Profile("!worker")
public Job partitionedJob(PartitionHandler partitionHandler) throws Exception {
Random random = new Random();
return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
.start(step1(partitionHandler))
.build();
} @Bean
public Step step1(PartitionHandler partitionHandler) throws Exception {
return this.stepBuilderFactory.get("step1")
.partitioner(workerStep().getName(), partitioner())
.step(workerStep())
.partitionHandler(partitionHandler)
.build();
}

Worker主要作用是处理数据,是我们的业务代码,这里就演示一下如何获取Manager传递过来的partitionNumber

@Bean
public Step workerStep() {
return this.stepBuilderFactory.get("workerStep")
.tasklet(workerTasklet(null, null))
.build();
} @Bean
@StepScope
public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Thread.sleep(6000); //增加延时,查看效果,通过jps:在jar情况下会新起java进程
System.out.println("This tasklet ran partition: " + partitionNumber); return RepeatStatus.FINISHED;
}
};
}

通过表达式@Value("#{stepExecutionContext['partitionNumber']}") 获取Manager传递过来的变量;注意要加注解@StepScope

3 程序运行

因为我们分为ManagerWorker,但都是同一份代码,所以我们先打包一个jar出来,不然manager无法启动。配置数据库和Workerjar包地址如下:

spring.datasource.url=jdbc:h2:tcp://localhost:9092/test
spring.datasource.username=pkslow
spring.datasource.password=pkslow
spring.datasource.driver-class-name=org.h2.Driver pkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar

执行程序如下:

可以看到启动了4次Java程序,还给出日志路径。

通过jps命令查看,能看到一个Manager进程,还有两个worker进程:

4 复杂变量传递

前面讲了Manager可以通过ExecutionContext传递变量,如简单的Stringlong等。但其实它也是可以传递复杂的Java对象的,但对应的类需要可序列化,如:

import java.io.Serializable;

public class Person implements Serializable {
private Integer age;
private String name;
private String webSite;
//getter and setter
}

Manager传递:

executionContext.put("person", new Person(0, "pkslow", "www.pkslow.com"));

Worker接收:

@Value("#{stepExecutionContext['person']}") Person person

5 总结

本文介绍了Spring Batch远程分区的本地Jar包模式,只能在一台机器上运行,所以也是无法真正发挥出远程分区的作用。但它对我们后续理解更复杂的模式是有很大帮助的;同时,我们也可以使用本地模式进行开发测试,毕竟它只需要一个数据库就行了,依赖很少。


欢迎关注微信公众号<南瓜慢说>,将持续为你更新...

多读书,多分享;多写作,多整理。

Spring Batch远程分区的本地Jar包模式的更多相关文章

  1. Spring Batch 远程分区和远程分块的区别

    Partitioning is a master/slave step configuration that allows for partitions of data to be processed ...

  2. 如何将本地jar包放入本地maven仓库和远程私服仓库

    1.将本地jar包放入本地仓库.只需执行如下命令即可: mvn install:install-file -Dfile=D:/demo/fiber.jar -DgroupId=com.sure -Da ...

  3. (转)如何在maven的pom.xml中添加本地jar包

    1 maven本地仓库认识 maven本地仓库中的jar目录一般分为三层:图中的1 2 3分别如下所示: 1 groupId 2 artifactId 3 version 4 jar包的依赖 如果要将 ...

  4. (转)如何在maven的pom.xml中添加本地jar包

    转载自: https://www.cnblogs.com/lixuwu/p/5855031.html 1 maven本地仓库认识 maven本地仓库中的jar目录一般分为三层:图中的1 2 3分别如下 ...

  5. maven 把本地jar包打进本地仓库

    maven 把本地jar包打进本地仓库 1.本地有自己写的项目jar包,但是需要用maven依赖对其进行引用: 2.某个jar包在远程仓库没有,导致pom.xml报错,此时可以从网上单独下载此jar包 ...

  6. maven引用本地jar包

    教程:http://www.cnblogs.com/dcba1112/archive/2011/05/01/2033805.html 安装 到下载maven: http://maven.apache. ...

  7. 本地jar包上传docker容器

    先安装docker的注册服务器: [root@VM_0_7_centos ~]# docker run -d -p : --restart=always --name registry2 regist ...

  8. Maven中安装本地Jar包到仓库中或将本地jar包上传

    摘要 maven install 本地jar 命令格式 mvn install:install-file -DgroupId=<group_name> -DartifactId=<a ...

  9. maven管理本地jar包注意事项

    今天lucene中集成第三方中文分词器IKAnalyzer的时候遇到了相似的问题:lucene版本4.9.IKAnalyzer版本2012FF_hf1 直接去maven仓库下载,pom配置如下: &l ...

  10. maven添加本地jar包

    今天遇到一个mavan仓库中没有的jar包, 故只能添加本地jar包, 花了不少时间找资料,终于OK.故在此记录. 1. 第一次,在网上看到说可以用<systemPath> 解决, 如下: ...

随机推荐

  1. hashMap的数据结构

    HashMap底层实现还是数组,只是数组的每一项都是一条链.

  2. Mybatis的分页插件PageHelper

    Mybatis的分页插件PageHelper 项目地址:http://git.oschina.net/free/Mybatis_PageHelper  文档地址:http://git.oschina. ...

  3. SC命令详解

    我们知道在MStools SDK,也就是在Resource Kit有一个很少有人知道的命令行软件,SC.exe,这个软件向所有的Windows NT和Windows 2000要求控制他们的API函数. ...

  4. 关于正则表达式的转义 PHP

    如正则的函数 preg_replace($patern, $replacement, $content) 等等 其中如果 $content 中要替换 \ 成 /,必须在 $patern中写成 \\\\ ...

  5. C# group 子句

    group 子句返回一个 IGrouping<TKey,TElement> 对象序列,这些对象包含零个或更多与该组的键值匹配的项. 例如,可以按照每个字符串中的第一个字母对字符串序列进行分 ...

  6. iOS动态性 运行时runtime初探(强制获取并修改私有变量,强制增加及修改私有方法等)

    借助前辈的力量综合一下资料. OC是运行时语言,只有在程序运行时,才会去确定对象的类型,并调用类与对象相应的方法.利用runtime机制让我们可以在程序运行时动态修改类.对象中的所有属性.方法,就算是 ...

  7. C语言所有作业练习题

    2015.08.11 1.计算十进制 42 转换为二进制.八进制.十六进制分别对应的值 2.计算二进制 11010110 对应的十进制值 3.计算八进制 075 对应的十进制值 4.计算十六进制 0x ...

  8. cocos2dx-lua http请求下载图片,使用XMLHttpRequest类

    HttpFileDownLoadSimple.lua local downloader = {} --数据拆分,以没1024*5字节拆成一段,打包写入文件 (拆完再拼接,转成字符串) local fu ...

  9. transient和synchronized的使用

    transient和synchronized这两个关键字没什么联系,这两天用到了它们,所以总结一下,两个关键字做个伴! transient 持久化时不被存储,当你的对象实现了Serializable接 ...

  10. vue环境的搭建与第一个demo

    参考两个博客 1 2 git.npm和淘宝镜像的安装过程过程省略了,直接开始webpack + vue-cli + 创建demo 首先,在磁盘创建一个文件夹,命名为vue-projects,里面再建一 ...