最近开发一个新的需求,虽然磕磕碰碰需求写完了,但是问题也随之而来:可能查询的数据会有上千万之多,所以可能会用到分批去查询处理,所以也就聚集了一些办法:

首先,分批处理必然会去遍历一个拥有某些前置条件的 List , 一般都会去遍历这个拥有全数据的 List , 所以分批处理的逻辑也从该 List 开始

# 1. List 分组实现

添加依赖

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>

在工具包的 Lists 报下有一个 partition 方法

1
2
3
4
5
6
7
8
9
10
public static <T> List<List<T>> partition(List<T> list, int size) {
checkNotNull(list);
checkArgument(size > 0);
return (list instanceof RandomAccess)
? new RandomAccessPartition<T>(list, size)
: new Partition<T>(list, size);
}

/*******************************************************************/
List<List<String>> partition = Lists.partition(list, 1000);

partition 中就是分组后的数据

# 2. 多线程实现分批

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CompletableFuture[] completableFuture = partition.stream()
.map(m -> CompletableFuture.runAsync(() -> {
Map<String, DriverSettlement> driverSettlementMap = m.stream()
.map(driversMap::get).collect(Collectors.toMap(DriverSettlement::getDriverId, t -> t));
//业务实现
}, ExecutorUtil.getExcelExecutor())).toArray(CompletableFuture[]::new);
//所有线程执行完毕,否则处于阻塞状态
CompletableFuture.allOf(completableFuture).join();

/******************************************************************/

public class ExecutorUtil {
private ExecutorUtil(){}

private static ThreadPoolExecutor excelExecutor =
new ThreadPoolExecutor(5, 5,
60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000),
new NamedThreadFactory("pay-verify-thread-%d"),
new ThreadPoolExecutor.CallerRunsPolicy());

public static ThreadPoolExecutor getExcelExecutor(){
return excelExecutor;
}
}
  • 所有子线程都需要完成后再执行主线程
1
CompletableFuture.allOf().join()
  • 其中任何一个子线程完成后就执行主线程
1
ComPletableFuture.anyOf()

线程池线程数量参考:

Edited on Views times