如何使用JCTools实现Java并发程序
在本文中,我们将介绍JCTools(Java并发工具)库。
简单地说,这提供了许多适用于多线程环境的实用数据结构。
非阻塞算法传统上,在可变共享状态下工作的多线程代码使用锁来确保数据一致性和发布(一个线程所做的更改对另一个线程可见)。
这种方法有许多缺点:
线程在试图获取锁时可能会被阻塞,在另一个线程的操作完成之前不会取得任何进展—这有效地防止了并行性 锁争用越重,JVM处理调度线程、管理争用和等待线程队列的时间就越多,实际工作就越少 如果涉及多个锁,并且它们以错误的顺序获取/释放,则可能出现死锁 优先级反转的危险是可能的——高优先级线程被锁定,试图获得由低优先级线程持有的锁 大多数情况下,使用粗粒度锁会严重损害并行性—细粒度锁需要更仔细的设计,增加锁开销,并且更容易出错另一种方法是使用非阻塞算法,即任何线程的故障或挂起都不会导致另一个线程的故障或挂起的算法。
如果所涉及的线程中至少有一个能够在任意时间段内取得进展,即在处理过程中不会出现死锁,则非阻塞算法是无锁的。
此外,如果保证每个线程的进程,这些算法是无等待的。
下面是一个非阻塞堆栈示例,它定义了基本状态:
public class ConcurrentStack<E> { AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); private static class Node <E> { public E item; public Node<E> next; // standard constructor }}
还有一些API方法:
public void push(E item){ Node<E> newHead = new Node<E>(item); Node<E> oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while(!top.compareAndSet(oldHead, newHead));}public E pop() { Node<E> oldHead; Node<E> newHead; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item;}
我们可以看到,该算法使用细粒度比较和交换(CAS)指令,并且是无锁的(即使多个线程调用top.compareAndSet()同时,它们中的一个保证会成功)但不能无等待,因为不能保证CAS最终会对任何特定线程成功。
依赖首先,让我们将JCTools依赖项添加到pom.xml文件:
<dependency> <groupId>org.jctools</groupId> <artifactId>jctools-core</artifactId> <version>2.1.2</version></dependency>
请注意,Maven Central上提供了最新的可用版本。
JCTools队列该库提供了许多队列以在多线程环境中使用,即一个或多个线程以线程安全的无锁方式写入队列,一个或多个线程以线程安全的无锁方式从队列中读取。
所有队列实现的通用接口是org.jctools.queues.MessagePassingQueue。
队列类型所有队列都可以根据其生产者/消费者策略进行分类:
单个生产者,单个消费者?此类类使用前缀Spsc命名,例如SpscArrayQueue 单个生产者,多个消费者?使用Spmc前缀,例如SpmcArrayQueue 多个生产者,单个消费者-使用Mpsc前缀,例如MpscArrayQueue 多个生产者、多个消费者—使用Mpmc前缀,例如MpmcArrayQueue需要注意的是,在内部没有策略检查,也就是说,如果使用不正确,队列可能会无声地发生故障。
例如,下面的测试从两个线程填充单个生产者队列并通过,即使不能保证使用者看到来自不同生产者的数据:
SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);Thread producer1 = new Thread(() -> queue.offer(1));producer1.start();producer1.join();Thread producer2 = new Thread(() -> queue.offer(2));producer2.start();producer2.join();Set<Integer> fromQueue = new HashSet<>();Thread consumer = new Thread(() -> queue.drain(fromQueue::add));consumer.start();consumer.join();assertThat(fromQueue).containsOnly(1, 2);队列实现
总结以上分类,以下是JCTools队列列表:
SpscArrayQueue?单个生产者,单个消费者,在内部使用一个数组,限制容量 SpscLinkedQueue?单个生产者,单个消费者,内部使用链表,未绑定容量 SpscChunkedArrayQueue?单生产商、单消费者,从初始容量开始,一直增长到最大容量 SpscGrowableArrayQueue?单生产者、单消费者,从初始容量开始,一直增长到最大容量。这与SpscChunkedArrayQueue是相同的契约,唯一的区别是内部块管理。建议使用SpscChunkedArrayQueue,因为它有一个简化的实现 SpscUnboundedArrayQueue?单个生产者,单个消费者,在内部使用数组,未绑定容量 SpmcArrayQueue?单个生产者、多个使用者,在内部使用一个阵列,限制容量 MpscArrayQueue—多个生产者、单个消费者在内部使用一个阵列,限制容量 MpscLinkedQueue?多个生产者,单个消费者,在内部使用链表,未绑定容量 MpmcArrayQueue—多个生产者、多个消费者在内部使用一个阵列,限制容量 原子队列前面提到的所有队列都使用sun.misc.Unsafe. 然而,随着java9和JEP-260的出现,这个API在默认情况下变得不可访问。
因此,有其他队列使用java.util.concurrent.atomic.AtomicLongFieldUpdater(公共API,性能较差)而不是sun.misc.Unsafe.
它们是从上面的队列生成的,它们的名称中间插入了单词Atomic,例如SpscChunkedAtomicArrayQueue或MpmcAtomicArrayQueue。
如果可能,建议使用“常规”队列,并且仅在sun.misc.Unsafe像Hot Java9+和JRockit一样被禁止/无效。
容量所有JCTools队列也可能具有最大容量或未绑定。当队列已满且受容量限制时,它将停止接受新元素。
在以下示例中,我们:
填满队列 确保在此之后停止接受新元素 从中排出,并确保之后可以添加更多元素请注意,为了可读性,删除了几个代码语句。
SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);CountDownLatch startConsuming = new CountDownLatch(1);CountDownLatch awakeProducer = new CountDownLatch(1);Thread producer = new Thread(() -> { IntStream.range(0, queue.capacity()).forEach(i -> { assertThat(queue.offer(i)).isTrue(); }); assertThat(queue.offer(queue.capacity())).isFalse(); startConsuming.countDown(); awakeProducer.await(); assertThat(queue.offer(queue.capacity())).isTrue();});producer.start();startConsuming.await();Set<Integer> fromQueue = new HashSet<>();queue.drain(fromQueue::add);awakeProducer.countDown();producer.join();queue.drain(fromQueue::add);assertThat(fromQueue).containsAll( IntStream.range(0, 17).boxed().collect(toSet()));其他数据结构工具
JCTools还提供了一些非队列数据结构。
它们都列在下面:
NonBlockingHashMap?一个无锁的ConcurrentHashMap替代方案,具有更好的伸缩性和通常更低的突变成本。它是实现sun.misc.Unsafe,因此,不建议在Java9+或JRockit环境中使用此类 NonBlockingHashMapLong?与NonBlockingHashMap类似,但使用基本长键 NonBlockingHashSet?一个简单的包装器,围绕着像JDK的java.util.Collections.newSetFromMap()一样的NonBlockingHashMap NonBlockingIdentityHashMap?与NonBlockingHashMap类似,但按标识比较键。 NonBlockingSetInt?一个多线程位向量集,实现为一个原始long数组。在无声自动装箱的情况下工作无效 性能测试让我们使用JMH来比较JDK的ArrayBlockingQueue和JCTools队列的性能。JMH是Sun/Oracle JVM gurus提供的一个开源微基准框架,它保护我们不受编译器/JVM优化算法的不确定性的影响。
请注意,为了提高可读性,下面的代码段遗漏了几个语句。
public class MpmcBenchmark { @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) public volatile String implementation; public volatile Queue<Long> queue; @Benchmark @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) public void write(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // intentionally left blank } } @Benchmark @Group(GROUP_NAME) @GroupThreads(CONSUMER_THREADS_NUMBER) public void read(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // intentionally left blank } }}
结果:
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/opMpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/opMpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op
我们可以看到,MpmcArrayQueue的性能略好于MpmcAtomicArrayQueue,而ArrayBlockingQueue的速度慢了两倍。
使用JCTools的缺点使用JCTools有一个重要的缺点——不可能强制正确使用库类。例如,考虑在我们的大型成熟项目中开始使用MpscArrayQueue的情况(注意,必须有一个使用者)。
不幸的是,由于项目很大,有可能有人出现编程或配置错误,现在从多个线程读取队列。这个系统看起来像以前一样工作,但现在有可能消费者错过了一些信息。这是一个真正的问题,可能会有很大的影响,是很难调试。
理想情况下,应该可以运行具有特定系统属性的系统,该属性强制JCTools确保线程访问策略。例如,本地/测试/暂存环境(而不是生产环境)可能已启用它。遗憾的是,JCTools没有提供这样的属性。
另一个需要考虑的问题是,尽管我们确保JCTools比JDK的对应工具快得多,但这并不意味着我们的应用程序获得了与我们开始使用自定义队列实现时相同的速度。大多数应用程序不会在线程之间交换很多对象,而且大多是I/O绑定的。
结论现在,我们对JCTools提供的实用程序类有了基本的了解,并了解了它们在重载下与JDK的对应类相比的性能。
总之,只有当我们在线程之间交换大量对象时,才有必要使用该库,即使这样,也有必要非常小心地保留线程访问策略。
以上示例的完整源代码地址:https://github.com/eugenp/tutorials/tree/master/libraries-5
JCTools git地址:https://github.com/JCTools/JCTools
以上就是如何使用JCTools实现Java并发程序的详细内容,更多关于使用JCTools实现Java并发程序的资料请关注好吧啦网其它相关文章!
相关文章: