Java 101: Java паралелност без болка, Част 2

Предишна 1 2 3 4 Страница 3 Следваща Страница 3 от 4

Атомни променливи

Многопоточните приложения, работещи на многоядрени процесори или многопроцесорни системи, могат да постигнат добро използване на хардуера и да бъдат силно мащабируеми. Те могат да постигнат тези цели, като техните нишки прекарват по-голямата част от времето си в извършване на работа, вместо да чакат да свърши работата или да чакат да придобият ключалки, за да имат достъп до споделени структури от данни.

Традиционният механизъм за синхронизация на Java, който налага взаимно изключване (нишката, която държи ключалката, която пази набор от променливи, има изключителен достъп до тях) и видимостта (промените в защитените променливи стават видими за други нишки, които впоследствие придобиват заключването), въздейства хардуерно използване и мащабируемост, както следва:

  • Съсредоточената синхронизация (множество нишки постоянно се конкурират за заключване) е скъпа и в резултат на това пропускателната способност страда. Основна причина за разходите е честото превключване на контекста, което се извършва; операцията за превключване на контекст може да отнеме много цикли на процесора, за да завърши. За разлика от това, неограничената синхронизация е евтина за съвременните JVM.
  • Когато нишка, съдържаща заключване, се забави (напр. Поради забавяне на планирането), нито една нишка, която изисква заключването, не прави никакъв напредък и хардуерът не се използва толкова добре, колкото би било в противен случай.

Може би си мислите, че можете да използвате volatileкато алтернатива за синхронизация. Въпреки това, volatileпроменливи решават само проблема с видимост. Те не могат да се използват за безопасно внедряване на атомните последователности за четене-промяна-запис, които са необходими за безопасно внедряване на броячи и други обекти, които изискват взаимно изключване.

Java 5 представи алтернатива за синхронизация, която предлага взаимно изключване, комбинирано с производителността на volatile. Тази алтернативна атомна променлива се основава на инструкцията за сравнение и замяна на микропроцесор и до голяма степен се състои от типовете в java.util.concurrent.atomicпакета.

Разбиране на сравнение и размяна

Инструкцията за сравнение и размяна (CAS) е непрекъсваема инструкция, която чете място в паметта, сравнява прочетената стойност с очаквана стойност и съхранява нова стойност в местоположението на паметта, когато прочетената стойност съвпада с очакваната стойност. В противен случай нищо не се прави. Действителната инструкция за микропроцесора може да се различава до известна степен (напр. Върнете true, ако CAS е успял или false по друг начин, вместо прочетената стойност).

Инструкции за микропроцесор CAS

Съвременните микропроцесори предлагат някаква CAS инструкция. Например, микропроцесорите Intel предлагат cmpxchgсемейството инструкции, докато PowerPC микропроцесорите предлагат инструкции за връзка за натоварване (напр. lwarx) И условия за съхранение (напр. stwcx) За същата цел.

CAS дава възможност да се поддържат атомни последователности за четене-промяна-запис. Обикновено бихте използвали CAS, както следва:

  1. Прочетете стойност v от адрес X.
  2. Извършете многоетапно изчисление, за да извлечете нова стойност v2.
  3. Използвайте CAS, за да промените стойността на X от v на v2. CAS успява, когато стойността на X не се е променила, докато изпълнявате тези стъпки.

За да видите как CAS предлага по-добра производителност (и мащабируемост) при синхронизация, помислете за пример за брояч, който ви позволява да прочетете текущата му стойност и да увеличите брояча. Следният клас реализира брояч, базиран на synchronized:

Листинг 4. Counter.java (версия 1)

public class Counter { private int value; public synchronized int getValue() { return value; } public synchronized int increment() { return ++value; } }

Високата конкуренция за заключването на монитора ще доведе до прекомерно превключване на контекста, което може да забави всички нишки и да доведе до приложение, което не се мащабира добре.

Алтернативата CAS изисква прилагане на инструкцията за сравнение и размяна. Следващият клас емулира CAS. Той използва synchronizedвместо действителната хардуерна инструкция за опростяване на кода:

Листинг 5. EmulatedCAS.java

public class EmulatedCAS { private int value; public synchronized int getValue() { return value; } public synchronized int compareAndSwap(int expectedValue, int newValue) { int readValue = value; if (readValue == expectedValue) value = newValue; return readValue; } }

Тук valueидентифицира място в паметта, което може да бъде извлечено от getValue(). Също така, compareAndSwap()изпълнява CAS алгоритъма.

Следният клас използва EmulatedCASза внедряване на synchronizedброяч (преструва се, че EmulatedCASне изисква synchronized):

Листинг 6. Counter.java (версия 2)

public class Counter { private EmulatedCAS value = new EmulatedCAS(); public int getValue() { return value.getValue(); } public int increment() { int readValue = value.getValue(); while (value.compareAndSwap(readValue, readValue+1) != readValue) readValue = value.getValue(); return readValue+1; } }

Counterкапсулира EmulatedCASекземпляр и декларира методи за извличане и увеличаване на броячна стойност с помощта на този екземпляр. getValue()извлича "текущата стойност на брояча" на екземпляра и increment()безопасно увеличава стойността на брояча.

increment()многократно се извиква, compareAndSwap()докато readValueстойността на 'не се промени. След това е безплатно да промените тази стойност. Когато не е включено заключване, спорът се избягва заедно с прекомерното превключване на контекста. Производителността се подобрява и кодът е по-мащабируем.

ReentrantLock и CAS

По-рано научихте, че ReentrantLockпредлага по-добра производителност, отколкото synchronizedпри високи нишки. За да се повиши производителността, ReentrantLockсинхронизирането на се управлява от подклас на абстрактния java.util.concurrent.locks.AbstractQueuedSynchronizerклас. На свой ред този клас използва недокументирания sun.misc.Unsafeклас и неговия compareAndSwapInt()CAS метод.

Проучване на пакета за атомни променливи

Не е нужно да внедрявате compareAndSwap()чрез непоносимия Java Native Interface. Вместо това, Java 5 предлага тази поддръжка чрез java.util.concurrent.atomic: набор от класове, използвани за програмиране без заключване, безопасно за нишки на единични променливи.

Според java.util.concurrent.atomicJavadoc тези класове

разширява понятието volatileстойности, полета и елементи на масив до тези, които също осигуряват атомна условна операция за актуализиране на формуляра boolean compareAndSet(expectedValue, updateValue). Този метод (който варира в типовете аргументи в различните класове) атомично задава променлива на updateValueако тя в момента съдържа expectedValue, отчитане вярно за успех.

This package offers classes for Boolean (AtomicBoolean), integer (AtomicInteger), long integer (AtomicLong) and reference (AtomicReference) types. It also offers array versions of integer, long integer, and reference (AtomicIntegerArray, AtomicLongArray, and AtomicReferenceArray), markable and stamped reference classes for atomically updating a pair of values (AtomicMarkableReference and AtomicStampedReference), and more.

Implementing compareAndSet()

Java implements compareAndSet() via the fastest available native construct (e.g., cmpxchg or load-link/store-conditional) or (in the worst case) spin locks.

Consider AtomicInteger, which lets you update an int value atomically. We can use this class to implement the counter shown in Listing 6. Listing 7 presents the equivalent source code.

Listing 7. Counter.java (version 3)

import java.util.concurrent.atomic.AtomicInteger; public class Counter { private AtomicInteger value = new AtomicInteger(); public int getValue() { return value.get(); } public int increment() { int readValue = value.get(); while (!value.compareAndSet(readValue, readValue+1)) readValue = value.get(); return readValue+1; } }

Listing 7 is very similar to Listing 6 except that it replaces EmulatedCAS with AtomicInteger. Incidentally, you can simplify increment() because AtomicInteger supplies its own int getAndIncrement() method (and similar methods).

Fork/Join framework

Computer hardware has evolved significantly since Java's debut in 1995. Back in the day, single-processor systems dominated the computing landscape and Java's synchronization primitives, such as synchronized and volatile, as well as its threading library (the Thread class, for example) were generally adequate.

Multiprocessor systems became cheaper and developers found themselves needing to create Java applications that effectively exploited the hardware parallelism that these systems offered. However, they soon discovered that Java's low-level threading primitives and library were very difficult to use in this context, and the resulting solutions were often riddled with errors.

What is parallelism?

Parallelism is the simultaneous execution of multiple threads/tasks via some combination of multiple processors and processor cores.

The Java Concurrency Utilities framework simplifies the development of these applications; however, the utilities offered by this framework do not scale to thousands of processors or processor cores. In our many-core era, we need a solution for achieving a finer-grained parallelism, or we risk keeping processors idle even when there is lots of work for them to handle.

Professor Doug Lea presented a solution to this problem in his paper introducing the idea for a Java-based fork/join framework. Lea describes a framework that supports "a style of parallel programming in which problems are solved by (recursively) splitting them into subtasks that are solved in parallel." The Fork/Join framework was eventually included in Java 7.

Overview of the Fork/Join framework

The Fork/Join framework is based on a special executor service for running a special kind of task. It consists of the following types that are located in the java.util.concurrent package:

  • ForkJoinPool: an ExecutorService implementation that runs ForkJoinTasks. ForkJoinPool provides task-submission methods, such as void execute(ForkJoinTask task), along with management and monitoring methods, such as int getParallelism() and long getStealCount().
  • ForkJoinTask: an abstract base class for tasks that run within a ForkJoinPool context. ForkJoinTask describes thread-like entities that have a much lighter weight than normal threads. Many tasks and subtasks can be hosted by very few actual threads in a ForkJoinPool instance.
  • ForkJoinWorkerThread: a class that describes a thread managed by a ForkJoinPool instance. ForkJoinWorkerThread is responsible for executing ForkJoinTasks.
  • RecursiveAction: an abstract class that describes a recursive resultless ForkJoinTask.
  • RecursiveTask: an abstract class that describes a recursive result-bearing ForkJoinTask.

The ForkJoinPool executor service is the entry-point for submitting tasks that are typically described by subclasses of RecursiveAction or RecursiveTask. Behind the scenes, the task is divided into smaller tasks that are forked (distributed among different threads for execution) from the pool. A task waits until joined (its subtasks finish so that results can be combined).

ForkJoinPool manages a pool of worker threads, where each worker thread has its own double-ended work queue (deque). When a task forks a new subtask, the thread pushes the subtask onto the head of its deque. When a task tries to join with another task that hasn't finished, the thread pops another task off the head of its deque and executes the task. If the thread's deque is empty, it tries to steal another task from the tail of another thread's deque. This work stealing behavior maximizes throughput while minimizing contention.

Using the Fork/Join framework

Fork/Join was designed to efficiently execute divide-and-conquer algorithms, which recursively divide problems into sub-problems until they are simple enough to solve directly; for example, a merge sort. The solutions to these sub-problems are combined to provide a solution to the original problem. Each sub-problem can be executed independently on a different processor or core.

Lea's paper presents the following pseudocode to describe the divide-and-conquer behavior:

Result solve(Problem problem) { if (problem is small) directly solve problem else { split problem into independent parts fork new subtasks to solve each part join all subtasks compose result from subresults } }

The pseudocode presents a solve method that's called with some problem to solve and which returns a Result that contains the problem's solution. If the problem is too small to solve via parallelism, it's solved directly. (The overhead of using parallelism on a small problem exceeds any gained benefit.) Otherwise, the problem is divided into subtasks: each subtask independently focuses on part of the problem.

Operation fork launches a new fork/join subtask that will execute in parallel with other subtasks. Operation join delays the current task until the forked subtask finishes. At some point, the problem will be small enough to be executed sequentially, and its result will be combined along with other subresults to achieve an overall solution that's returned to the caller.

The Javadoc for the RecursiveAction and RecursiveTask classes presents several divide-and-conquer algorithm examples implemented as fork/join tasks. For RecursiveAction the examples sort an array of long integers, increment each element in an array, and sum the squares of each element in an array of doubles. RecursiveTask's solitary example computes a Fibonacci number.

Листинг 8 представя приложение, което демонстрира примера за сортиране в не-fork / join, както и fork / join контексти. Той също така представя информация за времето, за да контрастира на скоростите на сортиране.