Wielowątkowość w Java – podstawowe kolekcje

W ostatnim artykule pokazałem, jak w wielu wątkach współdzielić dane. Dziś zajmę się tematem wbudowanych w Javę rozwiązań, które ułatwią to zadanie. Jeśli dopiero zaczynasz czytać tę serię zacznij od pierwszego wpisu.

Przykłady z tego artykułu są dostępne na GitHubie: https://github.com/Finanteq/java-multithreading-examples

Używanie zwykłych kolekcji

Dzięki poprzedniemu artykułowi znasz już podstawy związane z pracą na współdzielonych danych. Jednak w pracy z wielowątkowością spotkasz się z potrzebą użycia bardziej złożonych danych, takich jak listy, zbiory itp.

Jeśli zamierzasz użyć tego typu struktur danych w swoim wielowątkowym programie, to licz się z tym, że możesz uzyskać nieoczekiwane efekty. Wielowątkowość ma wiele haczyków i to jest jeden z nich. Nie wszystkie klasy wbudowane w Javę są przystosowane do współbieżności.

Zobacz jak to wygląda w prostym programie, który w jednym wątku do listy dopisuje liczby parzyste, a w drugim nieparzyste. Obydwa wątki powinny w sumie dodać 2000 liczb. Jeden wątek powinien dodać 1000 liczb (0, 2, 4…1998) i drugi również 1000 liczb (1, 3, 5…1999).

import java.util.ArrayList;

public class ArrayListExample {
    public static void main(String[] args) throws InterruptedException {
        var numbers = new ArrayList<>();

        // This thread adds even numbers to a list, starting with 0
        // eg. 0, 2, 4, 6...
        Thread evenThread = new Thread(() -> {
            for (int evenNumber = 0; evenNumber < 2000; evenNumber += 2) {
                numbers.add(evenNumber);
            }
        });
        evenThread.start();

        // This thread adds odd numbers to a list, starting with 1
        // eg. 1, 3, 5, 7...
        Thread oddThread = new Thread(() -> {
            for (int oddNumber = 1; oddNumber < 2000; oddNumber += 2) {
                numbers.add(oddNumber);
            }
        });
        oddThread.start();

        // wait for threads to finish
        evenThread.join();
        oddThread.join();

        // why it's not always 2000?
        System.out.println(numbers.size());
    }
}

Gdy uruchomisz ten program, to za każdym razem możesz uzyskać inny wynik. W moim przypadku rzadko kiedy w liście numbers było dokładnie 2000 liczb.

1905
2000
1989
1834

Pewnie po przeczytaniu poprzedniego artykułu domyślasz się, że problemem jest brak synchronizacji. Błąd objawia się podobnie jak błąd w klasie Counter.

Twórcy Javy przewidzieli sytuację, w której będziemy chcieli użyć listy w środowisku wielowątkowym. Dlatego w klasie Collections znajdziemy metodę synchronizedList, która dodaje synchronizację do listy. Oprócz niej jest też kilka innych metod, którymi możemy dodać synchronizację do innych kolekcji.

Poprawiony program wygląda tak:

import java.util.ArrayList;
import java.util.Collections;

public class SynchronizedArrayListExample {
    public static void main(String[] args) throws InterruptedException {
        var numbers = Collections.synchronizedList(new ArrayList<>());

        // This thread adds even numbers to a list, starting with 0
        // eg. 0, 2, 4, 6...
        Thread evenThread = new Thread(() -> {
            for (int evenNumber = 0; evenNumber < 2000; evenNumber += 2) {
                numbers.add(evenNumber);
            }
        });
        evenThread.start();

        // This thread adds odd numbers to a list, starting with 1
        // eg. 1, 3, 5, 7...
        Thread oddThread = new Thread(() -> {
            for (int oddNumber = 1; oddNumber < 2000; oddNumber += 2) {
                numbers.add(oddNumber);
            }
        });
        oddThread.start();

        // wait for threads to finish
        evenThread.join();
        oddThread.join();

        // now it's always correct
        System.out.println(numbers.size());
    }
}

W zależności od potrzeb, możesz wykorzystać następujące metody z klasy Collections do dodania synchronizacji w swoim programie:

  • Collections.synchronizedList(List<T>) – do list (np. ArrayList),
  • Collections.synchronizedSet(Set<T>) – do zbiorów (np. HashSet),
  • Collections.synchronizedMap(Map<K, V>) – do map (np. HashMap),
  • Collections.synchronizedCollection(Collection<T>) – do innych kolekcji (np. ArrayDeque).

Współbieżność a lista w Javie

Dlaczego twórcy Javy nie przewidzieli takiej sytuacji w klasie ArrayList? Otóż przewidzieli i ArrayList działa tak celowo. Klasa ArrayList zapewnia najlepszą wydajność jednowątkową. „Pod spodem” nie ma synchronizacji, aby program jednowątkowy nie tracił wydajności podczas używania tej klasy.

Jeżeli decydujemy się napisać program wielowątkowy, to możemy zadbać o synchronizację sami lub użyć innej dedykowanej do tego celu klasy. W powyższym przykładzie widzisz implementacje pierwszego rozwiązania. Jest to mało optymalne, ponieważ synchronizacja sprawia, że wątki przestają działać równolegle.

Drugim rozwiązaniem jest użycie klasy dedykowanej do użycia w aplikacjach wielowątkowych. W języku Java mamy klasę, która robi to samo co ArrayList, a jest przeznaczona do użytku w wielowątkowych programach – CopyOnWriteArrayList.

CopyOnWriteArrayList działa w taki sposób, że lista tworzy swoją kopię za każdym razem, gdy jest modyfikowana. Po aktualizacji nowa wersja zastępuje starą wersję listy. W taki sposób klasa gwarantuje spójność danych, gdy kilka wątków próbuje jednocześnie modyfikować listę.

Jak pewnie zauważyłeś, nie jest to wydajny sposób na aktualizację listy. Taka operacja potrzebuje więcej pamięci, a kopiowanie jest wolniejsze niż zwykłe wstawienie elementu do tablicy. Dlatego używaj tej klasy gdy lista będzie rzadko aktualizowana (albo kiedy nie masz wyjścia).

Dzięki temu, że CopyOnWriteArrayList wykorzystuje tablicę i synchronizację to operacje pobrania n-tego elementu, sprawdzenia rozmiaru i iteracji po elementach mają niską złożoność (O(n), a więc są szybkie).

import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListExample {
    public static void main(String[] args) throws InterruptedException {
        var numbers = new CopyOnWriteArrayList<>();

        // This thread adds even numbers to a list, starting with 0
        // eg. 0, 2, 4, 6...
        Thread evenThread = new Thread(() -> {
            for (int evenNumber = 0; evenNumber < 2000; evenNumber += 2) {
                numbers.add(evenNumber);
            }
        });
        evenThread.start();

        // This thread adds odd numbers to a list, starting with 1
        // eg. 1, 3, 5, 7...
        Thread oddThread = new Thread(() -> {
            for (int oddNumber = 1; oddNumber < 2000; oddNumber += 2) {
                numbers.add(oddNumber);
            }
        });
        oddThread.start();

        // wait for threads to finish
        evenThread.join();
        oddThread.join();

        // now it's always correct
        System.out.println(numbers.size());
    }
}

Alternatywa listy

Przeróbmy trochę program – zobacz co się stanie jeśli zamiast wyświetlać rozmiar listy, wyświetlisz zawartość tej listy:

import java.util.concurrent.CopyOnWriteArrayList;

public class PrintNumbersListExample {
    public static void main(String[] args) throws InterruptedException {
        var numbers = new CopyOnWriteArrayList<>();

        // This thread adds even numbers to a list, starting with 0
        // eg. 0, 2, 4, 6...
        Thread evenThread = new Thread(() -> {
            for (int evenNumber = 0; evenNumber < 2000; evenNumber += 2) {
                numbers.add(evenNumber);
            }
        });
        evenThread.start();

        // This thread adds odd numbers to a list, starting with 1
        // eg. 1, 3, 5, 7...
        Thread oddThread = new Thread(() -> {
            for (int oddNumber = 1; oddNumber < 2000; oddNumber += 2) {
                numbers.add(oddNumber);
            }
        });
        oddThread.start();

        // wait for threads to finish
        evenThread.join();
        oddThread.join();

        // let's see whats inside
        for (var number : numbers) {
            System.out.println(number);
        }
    }
}

Po uruchomieniu tego programu zobaczyłem następujące liczby:

0
2
4
6
8
10
12
14
16
…
1989
1991
1993
1995
1997
1999

Co się stało? Lista zawiera wszystkie liczby (co sprawdziliśmy w poprzednim programie). Jednak dodawanie liczb w dwóch wątkach nie gwarantuje ustawienia liczb w kolejności.

Ustawienie danych w kolejności gwarantuje zbiór (ang. set). Zbiór także gwarantuje, że w kolekcji nie będzie duplikatów. W naszym przypadku to bez znaczenia, bo generujemy ciąg liczb.

W Javie istnieją dwie współbieżne implementacje zbioru:

  • CopyOnWriteArraySet – zasada działania taka sama jak CopyOnWriteArrayList, a więc przydaje się, gdy zbiór jest mały i rzadko aktualizowany. Dodatkowo ten zbiór nie ustawia elementów w kolejności.
  • ConcurrentSkipListSet – odczyt ma gorszą złożoność (O(log(n)) kosztem wyższej wydajności modyfikacji listy (uwaga: operacja size() może być kosztowna). Implementacja opiera się na SkipList i dzięki temu porządkuje zawartość zbioru.

Biorąc pod uwagę, że nasz program wstawia do kolekcji 2000 elementów i chcę ustawić liczby w kolejności, wybrałem ConcurrentSkipListSet.

Program wykorzystujący tę klasę wygląda następująco:

import java.util.concurrent.ConcurrentSkipListSet;

public class PrintNumbersSetExample {
    public static void main(String[] args) throws InterruptedException {
        var numbers = new ConcurrentSkipListSet<>();

        // This thread adds even numbers to a list, starting with 0
        // eg. 0, 2, 4, 6...
        Thread evenThread = new Thread(() -> {
            for (int evenNumber = 0; evenNumber < 2000; evenNumber += 2) {
                numbers.add(evenNumber);
            }
        });
        evenThread.start();

        // This thread adds odd numbers to a list, starting with 1
        // eg. 1, 3, 5, 7...
        Thread oddThread = new Thread(() -> {
            for (int oddNumber = 1; oddNumber < 2000; oddNumber += 2) {
                numbers.add(oddNumber);
            }
        });
        oddThread.start();

        // wait for threads to finish
        evenThread.join();
        oddThread.join();

        // let's see whats inside
        for (var number : numbers) {
            System.out.println(number);
        }
    }
}

Wynikiem działania tego programu są liczby wyświetlone w kolejności naturalnej:

0
1
2
3
4
5
6
..
1996
1997
1998
1999

Unikanie kolekcji współbieżnych

Powyższy program możesz napisać nie wykorzystując kolekcji współbieżnych w Java. Wystarczy, że zmienisz podejście, ponieważ wątki nie muszą dodawać wyników na bieżąco do listy. Mogą po prostu „zwrócić” gotowy wynik (liczby parzyste/nieparzyste), a w głównym wątku możesz te dane dodatkowo przetworzyć, tak jakby to był program jednowątkowy.

W takim scenariuszu weź pod uwagę następujące sprawy:

  1. Wątki nie zwracają danych, więc musisz przygotować jakiś „pojemnik” na wynik działania wątku. W każdym wątku, po zakończeniu obliczeń, uzupełnisz ten „pojemnik”.
  2. Wyniki z wielu wątków będziesz musiał połączyć w jeden wynik końcowy.
  3. Jeśli jeden z wątków nieoczekiwanie przestanie działać przed zakończeniem obliczeń, to nie dostaniesz żadnego wyniku, nawet częściowego (co czasami jest dobre – zamiast niepełnych danych masz pusty wynik i możesz powtórzyć obliczenia).

W poprzednich artykułach implementowałem specjalne pojemniki na dane (takie jak StringWrapper), aby przekazywać dane pomiędzy wątkami. Tym razem zamiast samodzielnie pisać klasę-pojemnik, użyję wbudowanej klasy AtomicReference. To specjalny pojemnik, który gwarantuje, że operacje zapisywania i odczytywania zmiennej są atomiczne. Pamiętaj – operacje na przechowywanej zmiennej nie będą atomiczne, wyłącznie operacje odczytu i zapisu zmiennej przechowywanej w AtomicReference są atomiczne.

Zmodyfikuję poprzedni program tak, aby wykorzystywał 2 instancje AtomicReference jako „pojemników” na wyniki (jedna instancja na wątek). Każdy wątek wstawi swoją listę do swojego AtomicReference, a po zakończeniu działania obydwu wątków połączę te listy w jeden zbiór i wyświetlę liczby.

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

public class NoConcurrentCollectionsExample {
    public static void main(String[] args) throws InterruptedException {
        var evenNumbers = new AtomicReference<List<Integer>>();
        var oddNumbers = new AtomicReference<List<Integer>>();

        // This thread calculates even numbers
        Thread evenThread = new Thread(() -> {
            var numbers = new ArrayList<Integer>();
            for (int evenNumber = 0; evenNumber < 2000; evenNumber += 2) {
                numbers.add(evenNumber);
            }
            evenNumbers.set(numbers);
        });
        evenThread.start();

        // This thread calculates odd numbers
        Thread oddThread = new Thread(() -> {
            var numbers = new ArrayList<Integer>();
            for (int oddNumber = 1; oddNumber < 2000; oddNumber += 2) {
                numbers.add(oddNumber);
            }
            oddNumbers.set(numbers);
        });
        oddThread.start();

        // wait for threads to finish
        evenThread.join();
        oddThread.join();

        // Let's combine the result...
        var allNumbers = new HashSet<>(oddNumbers.get());
        allNumbers.addAll(evenNumbers.get());

        // ...and show me what you've got
        for (var number : allNumbers) {
            System.out.println(number);
        }
    }
}

To podejście jest bardziej skomplikowane i nie zawsze możliwe do implementacji, ale ma swoje plusy. Po pierwsze, nie musisz używać synchronizacji, więc wątki nie są spowalniane podczas obliczeń. Po drugie, nie musisz używać specjalnych kolekcji, więc nie musisz iść na kompromisy co do wydajności wątków.

Jedyny kompromis do wzięcia pod uwagę to logika łączenia wyników w jeden zbiór allNumbers.

Jeżeli zależy mi na wydajności, to możesz zoptymalizować rozwiązanie jeszcze bardziej – wystarczy ostatnie linijki programu zastąpić pętlą:

int size = oddNumbers.get().size() + evenNumbers.get().size();
for (int i = 0; i < size; i++) {
    if (i % 2 == 0) {
        System.out.println(evenNumbers.get().get(i / 2));
    } else {
        System.out.println(oddNumbers.get().get(i / 2));
    }
}

Biorąc pod uwagę to, jakie obliczenia wykonuje program, taka logika wydaje mi się niepotrzebnie skomplikowana. Tak czy inaczej – są różne podejścia co do kolekcji współbieżnych, a każde jest nastawione na optymalizację czegoś innego.

Co dalej?

W kolejnym wpisie blogowym będę kontynuował temat kolekcji i zbierania wyników z wielu wątków. Pokażę inne klasy i inne podejścia do agregacji wyników w programach wielowątkowych. Tak jak w tym materiale, zaprezentuję przykłady użycia, a w następnych artykułach zajmę się zaawansowanymi metodami blokowania i nadzorowania wątków.

Autor
Cezary Regec
Cezary Regec
Software Engineer w FINANTEQ