QQ个性网:专注于分享免费的QQ个性内容

关于我们| 网站公告| 广告服务| 联系我们| 网站地图

搜索
编程 JavaScript Java C++ Python SQL C Io ML COBOL Racket APL OCaml ABC Sed Bash Visual Basic Modula-2 Logo Delphi IDL Groovy Julia REXX Chapel X10 Forth Eiffel C# Go Rust PHP Swift Kotlin R Dart Perl Ruby TypeScript MATLAB Shell Lua Scala Objective-C F# Haskell Elixir Lisp Prolog Ada Fortran Erlang Scheme Smalltalk ABAP D ActionScript Tcl AWK IDL J PostScript IDL PL/SQL PowerShell

Java中大型集合的优化

日期:2025/04/05 12:38来源:未知 人气:53

导读:什么是集合在计算机科学中,集合是一组可变数量的数据项(也可能是0个)的组合,这些数据项可能共享某些特征,需要以某种操作方式一起进行操作。一般来讲,这些数据项的类型是相同的,或基类相同(若使用的语言支持继承)。列表(或数组)通常不被认为是集合,因为其大小固定,但事实上它常常在实现中作为某些形式的集合使用。集合的种类包括列表,集,多重集,树和图。枚举类型可以是列表或集。在Java或......

什么是集合

在计算机科学中,集合是一组可变数量的数据项(也可能是0个)的组合,这些数据项可能共享某些特征,需要以某种操作方式一起进行操作。一般来讲,这些数据项的类型是相同的,或基类相同(若使用的语言支持继承)。列表(或数组)通常不被认为是集合,因为其大小固定,但事实上它常常在实现中作为某些形式的集合使用。集合的种类包括列表,集,多重集,树和图。枚举类型可以是列表或集。

在Java或者其它编程语言中,都有一个体现集合的数据结构,它是一组表示为单个单元并且可以在其上执行一组操作的单个对象。从处理大量数据的计算程序的角度来看,涉及集合的典型操作是对其每个对象的转换。其实归根到底,无非是对数据的提取、转换和存储。

在Java1.2起,一直依赖于作为java.util.Collection其集合层次结构根的接口。在 Java 7 发布之前,要显着提高处理大型集合的性能的唯一方法是并行化此操作。然而,随着Java 8的出现,发布了新的集合包:java.util.stream。它可以支持对元素流进行功能样式操作的 Stream API。Stream API 集成到 Collections API 中,它可以对集合进行批量操作,例如顺序或并行 map-reduce 。

从那时起,考虑到应用于集合的转换操作的并行化,Java 提供了一种本地方式来尝试 获得相关的性能改进。 这种策略被认为是一种“尝试”,因为简单地使用并行流操作并不能保证更好的性能。在本文中,将对大型 Java 集合应用一个非常简单的转换操作。因此,将对三种不同的并行处理策略进行基准测试。每一个的性能将与使用串行和并行本机流所获得的结果进行比较。

变换操作

对于转换操作,定义了一个功能接口。它只需要一个 R 类型的元素,应用一个变换操作,然后返回一个 S 类型的变换对象。

@FunctionalInterfacepublic interface ElementConverter<R, S> { S apply(R param);}

操作本身包括将作为参数提供的字符串大写。创建了两个ElementConverter接口实现,一个将单个字符串转换为单个大写字符串:

public class UpperCaseConverter implements ElementConverter<String, String> { @Override public String apply(String param) { return param.toUpperCase(); }}

另一个对集合执行相同的操作:

public class CollectionUpperCaseConverter implements ElementConverter<List, List> { @Override public List apply(List param) { return param.stream().map(String::toUpperCase).collect(Collectors.toList()); }}

AsynchronousExecutor除了一些其他辅助策略之外,还为每个并行处理策略使用专用方法实现了一个类。

public class AsynchronousExecutor<T, E> { private static final Integer MINUTES_WAITING_THREADS = 1; private Integer numThreads; private ExecutorService executor; private List outputList; public AsynchronousExecutor(int threads) { this.numThreads = threads; this.executor = Executors.newFixedThreadPool(this.numThreads); this.outputList = new ArrayList<>(); } // Methods for each parallel processing strategy public void shutdown() { this.executor.shutdown(); try { this.executor.awaitTermination(MINUTES_WAITING_THREADS, TimeUnit.MINUTES); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } }

子列表分区

第一个在集合上提升转换操作的并行策略是基于java.util.AbstractList. 简而言之,CollectionPartitioner将源集合拆分为子列表,其大小是根据将在处理中使用的线程数计算的。首先,通过获取源集合大小与线程数之间的商来计算块大小。然后根据索引对从源集合中复制每个子列表,这些索引(fromIndex, toIndex)的值被同步计算 为:

fromIndex =thread.id +chunk.size;toIndex =MIN(fromIndex +chunk.size, source collection.size);public final class CollectionPartitioner extends AbstractList<List> { private final List list; private final int chunkSize; public CollectionPartitioner(List list, int numThreads) { this.list = list; this.chunkSize = (list.size() % numThreads == 0) ? (list.size() / numThreads) : (list.size() / numThreads) + 1; } @Override public synchronized List get(int index) { var fromIndex = index * chunkSize; var toIndex = Math.min(fromIndex + chunkSize, list.size()); if (fromIndex > toIndex) { return Collections.emptyList(); // Index out of allowed interval } return this.list.subList(fromIndex, toIndex); } @Override public int size() { return (int) Math.ceil((double) list.size() / (double) chunkSize); }}

一旦每个线程将变换操作应用于其各自子列表的所有对象,它必须将修改后的对象同步添加 到输出列表中。这些步骤由AsynchronousExecutor类的特定方法指导。

public class AsynchronousExecutor<T, E> { public void processSublistPartition(List inputList, ElementConverter<List, List> converter) { var partitioner = new CollectionPartitioner(inputList, numThreads); IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> { var thOutput = converter.apply(partitioner.get(t)); if (Objects.nonNull(thOutput) && !thOutput.isEmpty()) { synchronized (this.outputList) { this.outputList.addAll(thOutput); } } })); }}

浅分区

第二种并行处理策略采用了浅拷贝概念背后的思想。事实上,进程中涉及的线程并没有收到从源集合复制的子列表。相反,每个线程使用子列表分区策略的相同代数计算其各自的索引对(fromIndex,toIndex),并直接对源集合进行操作。但是,假定源集合不能被修改作为问题的要求。在这种情况下,线程读取与源集合切片相关的对象,并将新转换的对象存储在与原始集合大小相同的新集合中。

请注意,该策略在变换操作期间没有任何同步执行点,即所有线程完全独立地执行各自的任务。但是可以使用至少两种不同的方法来组装输出集合。

基于列表的浅分区

在这种方法中,在处理集合之前会创建一个由默认元素组成的新列表。这个新列表的不相交切片 - 由索引对(fromIndex,toIndex)分隔 - 由线程访问。它们存储从源集合中读取相应切片生成的每个新对象。该类的一个新方法AsynchronousExecutor专门用于这种方法。

public class AsynchronousExecutor<T, E> { public void processShallowPartitionList(List inputList, ElementConverter<T, E> converter) { var chunkSize = (inputList.size() % this.numThreads == 0) ? (inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1; this.outputList = new ArrayList<>(Collections.nCopies(inputList.size(), null)); IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> { var fromIndex = t * chunkSize; var toIndex = Math.min(fromIndex + chunkSize, inputList.size()); if (fromIndex > toIndex) { fromIndex = toIndex; } IntStream.range(fromIndex, toIndex) .forEach(i -> this.outputList.set(i, converter.apply(inputList.get(i)))); })); }}

基于数组的浅分区

这种方法与前一种方法的不同之处在于线程使用数组而不是列表来存储转换后的新对象。毕竟,线程完成了它们的操作,数组被转换为输出列表。同样,一个新的方法被添加到AsynchronousExecutor这个策略的类中。

public class AsynchronousExecutor<T, E> { public void processShallowPartitionArray(List inputList, ElementConverter<T, E> converter) var chunkSize = (inputList.size() % this.numThreads == 0) ? (inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1; Object[] outputArr = new Object[inputList.size()]; IntStream.range(0,numThreads). forEach(t ->this.executor.execute(()-> { var fromIndex = t * chunkSize; var toIndex = Math.min(fromIndex + chunkSize, inputList.size()); if (fromIndex > toIndex) { fromIndex = toIndex; } IntStream.range(fromIndex, toIndex) .forEach(i -> outputArr[i] = converter.apply(inputList.get(i))); })); this. shutdown(); this.outputList =(List)Arrays.asList(outputArr); }}

对标策略

每个策略的 CPU 时间是通过取 5 次执行的平均值来计算的,每次执行都会生成 1,000,000 和 10,000,000 个随机 String 对象的集合。该代码在运行 Ubuntu 20.04 LTS 64 位操作系统的机器上执行,该操作系统具有 12GB 的 RAM 和 3.40GHz 的 CPU Intel Xeon E3-1240 V3,每个插槽有 4 个内核(每个插槽 2 个线程)。结果如下表所示:

第一个预期结果是本机串行流 实现了最高的 CPU 时间。实际上,它被添加到建立初始性能参数的测试中。简单地将策略更改为原生并行流 ,1M 对象的集合和 10M 对象的 44% 的集合提高了约 34.4%。因此,从现在开始,本机并行流策略的性能将作为其他三种策略的参考。

考虑到 100 万个对象的集合,基于列表的浅分区 策略没有观察到 CPU 时间的相关减少——只有大约 7% 的细微改进——而子列表分区策略 的性能更差。亮点是基于阵列的浅分区 ,它减少了大约 35.4% 的 CPU 时间。另一方面,对于 10 倍大的集合,所有三种策略都超过了并行流时间。子列表分区 实现了最佳性能改进- 它将执行时间减少了约 20.5%。但是,基于阵列的浅分区 也观察到了非常相似的性能——它将 CPU 时间提高了近 20%。

由于基于数组的浅分区策略在两种集合大小下都表现出相关的性能,因此对其加速比进行了分析。加速比通过 T(1)/T(p) 的比率计算,其中 T是用p 个线程运行程序的 CPU 时间; T(1) 对应于顺序执行程序所经过的时间。下面是为该策略绘制Speed Up X Number of Threads 图表的结果。

加快处理 1,000,000 个对象的集合

由于所有测试都是在 4 核和每核 2 个线程的机器上进行的,因此预计该策略的加速率会随着使用多达 8 个线程而增加。尽管图表反映了这种行为,但该算法达到的最大加速比为 4.4X 。1000 万个对象的集合达到了非常相似的比率——这就是没有绘制新图表的原因。这意味着该策略不会根据使用的线程数线性提高其性能。理想情况下,通过使用 8 个线程,加速比应该对应于 CPU 时间的 8 倍改进。

结论

原生并行流的使用提供了一个可靠的初始阈值来加速大型集合的处理。尽管如此,值得尝试替代并行化策略以实现更好的性能率。本文介绍了三种不同的算法,可用于克服并行流的性能。

daxing

关于我们|网站公告|广告服务|联系我们| 网站地图

Copyright © 2002-2023 某某QQ个性网 版权所有 | 备案号:粤ICP备xxxxxxxx号

声明: 本站非腾讯QQ官方网站 所有软件和文章来自互联网 如有异议 请与本站联系 本站为非赢利性网站 不接受任何赞助和广告