dzikowski.github.io

Piszę o IT po polsku, więc z góry przepraszam za zagęszczenie kolokwializmów, ale co w angielskim brzmi naturalnie, w polskim często nie ma nawet odpowiedników.

Apache Kafka

W tym artykule opisałem, jak działa i jaka jest architektura Kafki, pokazałem, w jaki sposób można uruchomić Kafkę do celów testowych i wreszcie zaprezentowałem prosty przykład wykorzystania Kafki w Javie, którego źródło możesz znaleźć w moim repozytorium na Githubie.

Apache Kafka to tak zwany message broker. Tego rodzaju aplikacje odpowiadają za odbieranie, walidację, ewentualne przekształcenia i rozsyłanie komunikatów pomiędzy aplikacjami. Umożliwiają bardzo elastyczny sposób komunikowania się luźno ze sobą powiązanych elementów systemów informatycznych.

Cechą charakterystyczną Kafki jest jej niezawodność, wydajność i zdolność do pracy w środowisku rozproszonym. Kiedy zespół LinkedIn tworzył Kafkę, ich główną motywacją było poradzenie sobie z przetwarzaniem w czasie rzeczywistym ogromnej ilości zdarzeń. W konsekwencji powstało narzędzie, które jest idealne do wspomagania przetwarzania dynamicznie zmieniających się źródeł danych o dużej objętości i zmienności, na przykład strumieni big data.

W 2014 roku w LinkedIn komunikacja za pośrednictwem Kafki przebiegała zarówno pomiędzy klastrami, jak i centrami danych. W sumie przesyłano około 200 miliardów komunikatów dzienne, osiągając 7 milionów komunikatów na sekundę, jeśli chodzi o zapis i 35 milionów na sekundę w przypadku odczytu. Z Kafki, poza LinkedIn, korzystają między innymi Netflix, Twitter, Spotify, Cisco, czy Coursera.

Architektura

Kafka umożliwia przesyłanie komunikatów (ang. message) pomiędzy aplikacjami w systemach rozproszonych. Nadawca może przesyłać komunikaty do Kafki, natomiast odbiorca pobiera wiadomości ze strumienia publikowanego przez Kafkę.

Komunikaty pogrupowane są w tzw. tematy (ang. topic). Zarówno nadawca, jak i odbiorca powiązany jest z jednym tematem. Nadawca przesyła komunikaty z określonego tematu, a odbiorca otrzymuje za pośrednictwem Kafki wszystkie komunikaty z określonego tematu, które mogą pochodzić nawet od wielu nadawców. Każdy wysłany przez dowolnego nadawcę komunikat z danego tematu trafi do każdego odbiorcy, który nasłuchuje tego tematu.

Kafka producers and consumers

Wszystko to dzieje się w środowisku rozproszonym. Kafka jest uruchamiana na wielu serwerach, podobnie jak rejestry komunikatów z danego tematu są przechowywane i replikowane na wielu maszynach jednocześnie. Pojedynczy serwer – instancja Kafki to tzw. broker (ang. broker).

Komunikaty z danego tematu dopisywane są do tzw. partycji (ang. partition). Partycja to pewien rejestr, uporządkowana sekwencja komunikatów, która nie zmienia się, oprócz tego, że nowe komunikaty mogą zostać dopisane na koniec tej sekwencji, a stare – na przykład starsze niż dwa dni – są zapominane. Aby pobrać odpowiednią sekwencję komunikatów, odbiorcy muszą jedynie znać swoją pozycję w rejestrze – indeks ostatnio odczytanego komunikatu.

Pojedyncza partycja musi w całości zmieścić się na jednym serwerze, musi być możliwe obsłużenie jej przez jednego brokera. Z innej strony, jeśli masz jedną partycję z jednego tematu, do zapisu i odczytu komunikatów będzie wykorzystywany tylko jeden broker. Większa liczba partycji pozwala na wykorzystanie większej liczby brokerów w celu zrównoleglenia zapisu i odczytu komunikatów, a tym samym na zwiększenie wydajności klastra. Twórcy gwarantują wydajne działanie Kafki nawet dla 10 000 partycji – maksymalnie dla takiej ilości przeprowadzają testy wydajnościowe.

Bardzo prawdopodobne jest to, że w systemie obsługującym znaczne ilości danych, rejestr wszystkich komunikatów z danego tematu nie zmieści się na pojedynczej maszynie. Dlatego Kafka pozwala na sytuację, w której jeden temat podzielony jest na wiele partycji, które mogą być obsługiwane przez różnych brokerów.

Wreszcie, partycje Kafki są replikowane, to znaczy, że ich kopie mogą znajdować się na wielu serwerach. Dla każdej partycji istnieje jeden serwer, który jest tzw. liderem (ang. leader) i który obsługuje wszystkie operacje odczytu i zapisu danej partycji. Dla każdej partycji mogą istnieć także serwery (ang. followers), który jedynie kopiują dane od lidera. Jeśli na przykład ustawisz współczynnik replikacji na 3, to znaczy, że każda partycja będzie miała jednego lidera i 2 serwery (dwóch brokerów), którzy jedynie kopiują dane, zwiększając tym samym bezpieczeństwo klastra na awarie.

Tym samym w kontekście działania Kafki w środowisku rozproszonym, warto zapamiętać następujące punkty:

  1. Każda partycja związana jest tylko z jednym tematem.
  2. Rejestr komunikatów z danego tematu może być rozbity na wiele partycji.
  3. Każda partycja musi zmieścić się na pojedynczej instancji Kafki (pojedynczym brokerze).
  4. Partycja może być replikowana.
  5. Większa liczba partycji pozwala na zwiększenie wydajności operacji odczytu i zapisu komunikatów.
  6. Większy współczynnik replikacji pozwala na zwiększenie bezpieczeństwa danych (na zwiększenie odporności klastra na awarie).

Poniżej pokazuję jeszcze na rysunku przykład działania klastra Kafki składającego się z trzech brokerów: B1, B2, B3. W tym przykładzie dla każdego tematu tworzona jest jedna partycja – stąd też w sumie są dwie partycje A i B, odpowiadające tematom A i B. Współczynnik replikacji został ustawiony na poziomie 2, to znaczy, że każda partycja pojawia się na dwóch serwerach.

Kafka cluster partitions

Broker B2 jest liderem dla tematu A, to znaczy, że w rzeczywistości to on jest odpowiedzialny za wszystkie operacje zapisu i odczytu do partycji A. Partycje na instancjach kafki B1 i B3 są jedynie kopiami. Analogicznie broker B1 jest liderem dla tematu B, a partycje A na B2 i B3, zawierają kopie danych z partycji na B1.

W przypadku awarii B2, rolę lidera dla tematu A przejmie broker B3. Dodatkowo zostaną utworzone nowe kopie partycji A i B na działających maszynach.

Kafka cluster partitions 2

Jak już wspomniałem, to, na ilu serwerach znajdzie się kopia każdej partycji jest regulowane przez tzw. współczynnik replikacji. Jeśli współczynnik replikacji ma wartość N, to znaczy, że kopia każdej partycji będzie znajdowała się N różnych serwerach, a Kafka może działać normalnie dla maksymalnie N-1 awarii.

Gwarancje

W środowisku rozproszonym bardzo ważna jest odporność na awarie. System rozproszony, działający na wielu urządzeniach jednocześnie, powinien pracować bez zakłóceń nawet wtedy, gdy kilka serwerów przestanie działać.

Dodatkowo można mówić o gwarancjach dotyczących sposobu dostarczania komunikatów. Rozróżnia się trzy rodzaje takich gwarancji:

  • Co najwyżej raz – komunikaty mogą zostać utracone, ale na pewno nie dojdzie do sytuacji, kiedy komuniat zostanie dostarczony wielokrotnie.
  • Przynajmniej raz – komunikat może zostać odebrany ponownie, ale za to żaden nie zostanie utracony.
  • Dokładnie jeden raz – każdy komunikat jest dostarczony jeden raz do odbiorcy.

Kafka domyślnie gwarantuje dostarczenie komunikatu przynajmniej raz, ale możliwe jest takie jej skonfigurowanie, by, przy odpowiedniej implementacji nadawców i odbiorców, umożliwić gwarancję dostarczenia co najwyżej raz, albo dokładnie jeden raz. Niesie to ze sobą jednak określone koszty, związane z wydajnością.

W Kafce komunikaty wysłane przez nadawcę w ramach określonego tematu i dla określonej zostaną zachowane w partycji w takiej kolejności, w jakiej są wysyłane. Poszczególni odbiorcy mają z kolei dostęp do komunikatów w takiej kolejności, w jakiej jak są one przechowywane. Wreszcie, jeśli współczynnik replikacji zostanie ustawiony na poziomie N, to znaczy, że nawet jeśli w klastrze przestanie działać N-1 serwerów, system będzie zachowywał się normalne.

Instalacja i uruchomienie

Korzystałem z Kafki w wersji 2.11-0.8.2-beta. Aby uruchomić Kafkę do celów testowych, wystarczy pobrać odpowiednie archiwum i rozpakować je w dowolnym miejscu, a następnie wywołać odpowiednie skrypty z folderu bin (dla Linuksa), albo z folderu bin/windows (dla Windowsa).

Ważny jest też folder conf, w którym znajdują się odpowiednie pliki konfiguracyjne dla instancji Zookeepera oraz Kafki.

Zookeper to usługa, która służy do koordynacji rozproszonych aplikacji. Do tego służy też Zookeper w Kafce: utrzymywania wspólnego stanu dla wszystkich instancji Kafki, zarządzania konfiguracją, monitorowania. Aby uruchomić Kafkę, konieczna jest działająca instancja Zookepera.

Ponieważ chciałbym pokazać, jak działa Kafka, i że świetnie sobie radzi w wypadku awarii, w sumie będę uruchamiać trzy instancje Kafki (inaczej: trzech brokerów, albo trzy serwery). Każda z nich wymaga osobnego pliku konfiguracyjnego. Przygotowałem takie pliki osobno dla Linuksa i Windows.

To, co jest w tej chwili najważniejsze, to następujące pozycje z konfiguracji brokerów Kafki:

broker.id=1

port=9092

log.dirs=C:/kafka-logs/kafka-logs-1

num.partitions=3

Pierwsze trzy powinny być różne dla poszczególnych instancji. Ostatnią, liczbę partycji ustawiłem na trzy, ponieważ zamierzam uruchomić trzech brokerów Kafki i chcę, żeby możliwy był odczyt komunikatów jednocześnie w trzech wątkach.

Aby uruchomić Kafkę dla celów testowych, najlepiej skopiować przygotowane przeze mnie pliki konfiguracyjne Kafki i Zookeepera do folderu conf, znajdującego się w dystrybucji Kafki, a następnie uruchomić przygotowany przeze mnie skrypt. Przygotowałem dwie wersje tego skryptu – dla Linuksa i dla Windows.

Wersja skryptu dla Windows (plik .bat):

cd C:\kafka_2.11-0.8.2-beta
start bin\windows\zookeeper-server-start.bat config\zookeeper.properties
start bin\windows\kafka-server-start.bat config\server1.properties

TIMEOUT /T 5

start bin\windows\kafka-server-start.bat config\server2.properties
start bin\windows\kafka-server-start.bat config\server3.properties

call start bin\windows\kafka-topics.bat --create --zookeeper localhost:2181    ^
        --replication-factor 2 --partitions 3 --topic coolkafka-in
call start bin\windows\kafka-topics.bat --create --zookeeper localhost:2181    ^
        --replication-factor 2 --partitions 3 --topic coolkafka-out

TIMEOUT /T 3

start bin\windows\kafka-console-producer.bat --topic coolkafka-in              ^
        --broker-list localhost:9092,localhost:9093,localhost:9094
start bin\windows\kafka-console-consumer.bat --topic coolkafka-in              ^
        --zookeeper localhost:2181
start bin\windows\kafka-console-consumer.bat --topic coolkafka-out             ^
        --zookeeper localhost:2181

Wersja skryptu dla Linuksa (plik .sh):

cd ~/kafka_2.11-0.8.2-beta
gnome-terminal -e "bin/zookeeper-server-start.sh config/zookeeper.properties"
gnome-terminal -e "bin/kafka-server-start.sh config/server1.properties"

sleep 5

gnome-terminal -e "bin/kafka-server-start.sh config/server2.properties"
gnome-terminal -e "bin/kafka-server-start.sh config/server3.properties"

zookeeper="--zookeeper localhost:2181"
create="--create $zookeeper --replication-factor 2 --partitions 3"

bin/kafka-topics.sh $create --topic coolkafka-in
bin/kafka-topics.sh $create --topic coolkafka-out

sleep 3

brokers="--broker-list localhost:9092,localhost:9093,localhost:9094"
gnome-terminal -e "bin/kafka-console-producer.sh  $brokers --topic coolkafka-in"

gnome-terminal -e "bin/kafka-console-consumer.sh $zookeeper --topic coolkafka-in"
gnome-terminal -e "bin/kafka-console-consumer.sh $zookeeper --topic coolkafka-out"

W pierwszej kolejności uruchamiam instancję Zookeepera, którego na szczęście nie muszę instalować – dystrybucja Kafki ma wbudowaną wersję Zookeepera, która dla celów testowych w zupełności nam wystarczy. Następnie uruchamiam pierwszego brokera Kafki i czekam przez chwilę, żeby zdążył się uruchomić, co pozwoli uniknąć niektórych błędów uruchomienia. Potem uruchamiam dwóch kolejnych brokerów dla dwóch kolejnych plików konfiguracyjnych.

Teraz mogę przystąpić do stworzenia dwóch tematów, które będę wykorzystywał w tym i w kolejnych wpisach, tematów: coolkafka-in oraz coolkafka-out. Tematy wystarczy stworzyć raz, nie trzeba tego robić przy każdym uruchomieniu Kafki, zdecydowałem się jednak zamieścić tworzenie tematów w tym skrypcie, żeby był bardziej uniwersalny. Jeśli temat został już utworzony, pojawia się tylko ostrzeżenie, co w żaden sposób nie będzie nam przeszkadzać. Dzięki temu mogę wywoływać za każdym razem ten sam skrypt, nie przejmując się, czy tematy zostały już wcześniej utworzone, czy jeszcze nie.

Wreszcie przystępuję do uruchomienia oddzielnych okien terminala: jednego na nadawcę i dwóch na odbiorców. Nadawca będzie przesyłał komunikaty w temacie coolkafka-in; w tym temacie będzie też odbierał komunikaty pierwszy z odbiorców. Drugi odbiorca będzie nasłuchiwał komunikatów w temacie coolkafka-out.

W konsekwencji powinno uruchomić się siedem nowych okien terminala (wiersza poleceń):

  • Jedno z uruchomioną instancją Zookepera.
  • Trzy z działającymi brokerami Kafki.
  • Jedno, będące nadawcą komunikatów w temacie coolkafka-in.
  • Jedno, będące odbiorcą komunikatów w temacie coolkafka-in.
  • Jedno, będące odbiorcą komunikatów w temacie coolkafka-out.

Możesz sprawdzić, czy wszystko działa, wpisując w oknie nadawcy jakiś tekst i wciskając enter. Taki sam tekst powinien pojawić się w oknie pierwszego z odbiorców, ponieważ to on nasłuchuje komunikatów z tego samego tematu, co nadawca.

Kafka w Javie

Poniżej opisuję przykład wykorzystania Kafki w Javie, który można znaleźć w projekcie simple-kafka-storm-java.

Kafka została napisana w języku Scala, jednak warstwa przesyłania komunikatów jest niezależna od języka – przesyłanie odbywa się za pośrednictwem protokołu TCP. W dystrybucji Kafki dostępny jest klient dla Scali i Javy, i właśnie z klienta dla Javy korzystałem w omawianym projekcie (jeśli potrzebujesz klienta dla innego języka, zajrzyj tutaj).

Model: transakcja bankowa

Chciałbym zaprezentować działanie Kafki w Javie na podstawie przesyłania informacji dotyczących prostych transakcji bankowych. W tym celu stworzyłem trzy klasy w Javie (pakiet io.github.dzikowski.bank): Transaction, Bank oraz RandomTransactionProducer.

Pierwsza klasa reprezentuje transakcję bankową, która ma określoną datę wykonania, nadawcę, odbiorcę oraz kwotę. Zakładam, że możemy mieć do czynienia z transakcjami zwykłymi oraz specjalnymi. Zwykłe polegają na przelewaniu pewnej kwoty pieniędzy z jednego konta na inne, specjalna z kolei to wpływ pieniędzy na konto (brak nadawcy).

public final class Transaction {

    private final Date date;
    private final String from;
    private final String to;
    private final int amount;
    
    ...
    
    public boolean isSpecial() {
        return "SPECIAL".equals(from);
    }
    
    ...

Druga klasa, Bank, reprezentuje bank. Zawiera ona mapę, w której trzymam informacje o stanie kont, a także metody, które pozwalają na pobranie informacji o środkach na koncie oraz przeprowadzenie transakcji.

public class Bank implements Serializable {

    private final Map<String, Integer> accounts;

    ...

    public TransactionState make(Transaction transaction) {

        int amount = transaction.getAmount();
        String to = transaction.getTo();

        // someone earns

        if (transaction.isSpecial()) {
            accounts.put(to, currentAmount(transaction.getTo()) + amount);
            return TransactionState.SPECIAL_OK;
        }

        // money transfer

        else {
            String from = transaction.getFrom();
            int amountFrom = currentAmount(from);
            int amountTo = currentAmount(to);

            accounts.put(from, amountFrom - amount);
            accounts.put(to, amountTo + amount);

            return TransactionState.REGULAR_OK;
        }
    }

    public int currentAmount(String user) {
        Integer amount = accounts.get(user);
        return amount == null ? 0 : amount;
    }
    
    ...

Jak widzisz, nigdzie nie sprawdzam, czy można dokonać przelewu (tj. czy ktoś posiada wystarczające środki na koncie). Robię to celowo, ponieważ z kontrolowaniem stanu konta wiąże się pewien problem w środowiskach rozproszonych, o którym będe jeszcze pisał w jednym z kolejnych artykułów (o monoidach). Na razie jednak ten problem pomijam.

Ostatnia klasa, RandomTransactionProducer, służy do losowego generowania transakcji, zarówno jako obiektów klasy Transaction, jak i ich reprezentacji w formacie JSON. (Konwertuję obiekty transakcji na JSONy z wykorzystaniem biblioteki Gson).

Pojedynczą transakcję losuję w następujący sposób:

private static String[] people = new String[] { "Alice", "Bob", "Charles" };

private static int[] amounts = new int[] { 100, 200, 500, 1000 };

public static Transaction create() {
    int person1 = random.nextInt(people.length);
    int person2 = random.nextInt(people.length);
    int amount = amounts[random.nextInt(amounts.length)];

    if (person1 == person2) {
        return Transaction.special(people[person1], amount);
    } else {
        return Transaction.regular(people[person1], people[person2], amount);
    }
}

Czyli mam trojkę właścicieli kont, losuję nadawcę i odbiorcę przelewu, a jeśli wylosowana zostanie ta sama osoba, to przyjmuję, że wylosowano transakcję specjalną, czyli przelew zewnętrzny przychodzący, zwiększający środki na koncie wylosowanej osoby.

Za pośrednictwem Kafki przesyłam komunikaty, które są właśnie JSONami, reprezentującymi losowe transakcje.

Nadawca

Działanie nadawcy w Kafce mogłeś sprawdzić, wpisując w jedno z okien terminala dowolny ciąg znaków. Oczywiście jednocześnie może być więcej połączonych nadawców z danego tematu. Przykład nadawcy w Javie znajdziesz w klasie RandomTransactionProducer, a przykład jego uruchomienia w KafkaProduceExample. Spójrz może najpierw na tę drugą klasę.

class KafkaProduceExample {

    ...

    public static void start(int numberOfTransactions) {
        Runnable producer = new RandomTransactionProducer(numberOfTransactions, 
                                            Conf.inputTopic, producerConfig());
        new Thread(producer).start();
    }

    public static void main(String... args) {
        start(3000);
    }
}

Stworzyłem metodę start, w której tworzę obiekt nadawcy, podając jako argumenty liczbę transakcji, które mają być wygenerowane, temat, do którego mają być przesyłane komunikaty (większość stałych trzymam w oddzielnym interfejsie, Conf), a także obiekt konfiguracji nadawcy. Ponieważ nadawca implementuje interfejs Runnable, mogę go uruchomić w nowym wątku.

Konfiguracja nadawcy zawarta jest w obiekcie klasy Properties, dla zachowania czystości kodu tworzę ten obiekt w osobnej metodzie.

private static ProducerConfig producerConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", Conf.brokerList);
    props.put("serializer.class", StringEncoder.class.getName());
    props.put("partitioner.class", VerbosePartitioner.class.getName());
    props.put("request.required.acks", "1");
    return new ProducerConfig(props);
}

W obiekcie konfiguracji kolejno definiuję parametry:

  • Listę broketów Kafki ("localhost:9092,localhost:9093,localhost:9094").
  • Klasę, której obiekty będą służyć do serializowania danych (w tym wypadku jest to jedna z klas Kafki i komunikaty będą traktowane jako łańcuchy znaków).
  • Klasę, której obiekty mają służyć do ustalania, do której z partycji ma trafić dany komunikat. VerbosePartitioner to napisana przeze mnie prosta klasa, która losowo rozdziela komunikaty do dostępnych partycji, a po drodze wypisuje informację o tym na standardowym wyjściu. Dzieki temu możesz się przekonać, że sam mechanizm rozdzielania działa, a także ile partycji faktycznie zostało stworzonych.
  • Ostatnia wartość informuje nadawcę, że ma czekać, aż Kafka potwierdzi, że komunikat został dostarczony.

Obiekt konfiguracji, podobnie jak temat i liczba transakcji, przekazany jest jako argument konstruktora nadawcy. To, co najciekawsze, dzieje się jednak w metodzie run.

public class RandomTransactionProducer implements Runnable {

    ...

    @Override
    public void run() {

        Producer<String, String> producer = new Producer<String, String>(producerConfig);

        for (long nEvents = 0; nEvents < eventsToSend; nEvents++) {

            // sleep

            if (nEvents % 100 == 0)
                Sleep.millis(300);

            // random transaction JSON to send as message

            String msg = RandomTransactionGenerator.createJson();

            // BTW: when key is not given, Kafka does not use our partitioner

            KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, "transaction", msg);
            producer.send(data);
        }

        producer.close();
    }
}

Najpierw tworzony jest obiekt nadawcy Kafki, następnie za pośrednictwem tego obiektu wysyłana jest określona liczba losowych transakcji. Żeby sam proces trwał dłużej – a tym samym, żeby można lepiej śledzić jego działanie – co 100 wygenerowanych transakcji, nadawca robi pauzę na 300 milisekund.

Komunikaty wysyłane są jako obiekty klasy KeyedMessage. Mają one przypisany temat, klucz komunikatu i samą treść komunikatu. Klucz może służyć między innymi do rozdzielania komunikatów na poszczególne partycje. Gdyby nie był podany, VerbosePartitioner w ogóle nie byłby wywoływany.

Aby przykład nadawcy zadziałał, najpierw musisz mieć uruchomioną Kafkę za pośrednictwem przygotowanego skryptu. Potem uruchom klasę KafkaProduceExample, a na standardowym wyjściu programu zobaczysz transakcje (w formacie JSON) oraz informacje o ich przypisaniu do określonych partycji.

Grupa odbiorców

Często w aplikacjach wykorzystujących Kafkę mamy w rzeczywistości do czynienia nie z pojedynczymi odbiorcami, ale z grupami odbiorców.

Można wyróżnić dwa podstawowe sposoby przesyłania komunikatów: publish-subscribe oraz kolejkowanie (ang. queuing). W pierwszym podejściu Kafka emituje sekwencję komunikatów, które mogą odbierać dowolni odbiorcy. Ten rodzaj przesyłania ma miejsce w przypadku emitowania komunikatów przez Kafkę i ich odbierania przez odbiorców, kiedy każdy odbiorca ma dostęp do wszystkich komunikatów z danego tematu.

Aternatywą dla publish-subscribe jest kolejkowanie (ang. queuing). W tym podejściu mamy do czynienia z kolejką komunikatów dostarczanych do odbiorców. Jeśli jeden z odbiorców odczyta komunikat, nie będzie on już dostarczany do pozostałych. W przeciwieństwie do publish-subscribe, gdzie wielu odbiorców może odczytać ten sam komunikat, w kolejkowaniu komunikat może być odczytany tylko przez pierwszego z odbiorców.

Kafka consumer group

Kolejkowanie w Kafce zastosowane jest w ramach grup odbiorców. Grupa czerpie informacje z Kafki na zasadach publish-subscribe, natomiast wewnątrz grupy są one dystrybuowane na zasadach kolejkowania. Poszczególni odbiorcy maja przypisaną nazwę grupy, a każdy komunikat z danego tematu jest dostarczony tylko do jednego z odbiorców w danej grupie. Odbiorcy mogą oczywiście działać w odrębnych procesach, a także na oddzielnych maszynach.

Przykład uruchomienia grupy odbiorców znajdziesz w klasie KafkaProduceExample.

ConsumerGroup group = new ConsumerGroup(Conf.inputTopic, consumerConfig());
group.run(3);

Sleep.seconds(20);

group.shutdown();

Na początku tworzony jest obiekt stworzonej przeze mnie klasy, reprezentującej grupę odbiorców. Konstruktor przyjmuje jako parametry temat oraz obiekt konfiguracji nadawcy. Następnie uruchamiana grupa trzech odbiorców, która nasłuchuje przez 20 sekund. Po tym czasie, grupa odbiorców jest wyłączana i program kończy działanie.

Zauważ, że celowo uruchamiam grupę składającą się trzech odbiorców w osobnych wątkach – w końcu ustaliłem w konfiguracji, że mają być trzy partycje.

Spójrz jeszcze na konfigurację grupy odbiorców, a potem na samą klasę ConsumerGroup, reprezentującą grupę odbiorców.

private static ConsumerConfig consumerConfig() {
    Properties props = new Properties();
    props.put("zookeeper.connect", Conf.zookeeper);
    props.put("group.id", Conf.group);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);
}

Najważniejsze są dwa pierwsze elementy obiektu konfiguracji:

  • Lokalizacja uruchomionej instancji Zookeepera ("localhost:2181").
  • Identyfikator grupy (dowolny).

Ten obiekt, wraz z tematem, przekazywany jest do konstruktora klasy ConsumerGroup, gdzie od razu tworzony jest obiekt klasy ConsumerConnector, który pozwala odbiorcom łączyć się z Kafką.

public class ConsumerGroup {

    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;

    public ConsumerGroup(String a_topic, ConsumerConfig consumerConfig) {
        this.consumer = Consumer.createJavaConsumerConnector(consumerConfig);
        this.topic = a_topic;
    }

    public void run(int numberOfThreads) {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, numberOfThreads);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // launch all the threads

        executor = Executors.newFixedThreadPool(numberOfThreads);

        // create an object to consume the messages

        int threadNumber = 0;
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            executor.submit(new SimpleConsumer(stream, threadNumber));
            threadNumber++;
        }
    }

    ...
}

W metodzie run najpierw tworzę mapę z informacją o tym, ile wątków będzie przypadać na (każdy) temat. Mapę wykorzystuję jako argument metody createMessageStreams obiektu klasy ConsumerConnector. Dzięki temu mogę stworzyć mapę, gdzie każdemu tematowi odpowiadać będzie lista strumieni komunikatów Kafki.

Tworzę też pulę wątków, w których będą uruchamiani kolejni odbiorcy, a następnie dla każdego ze strumieni Kafki tworzę odbiorcę – obiekt klasy SimpleConsumer – i przesyłam go do puli wątków.

SimpleConsumer musi implementować interfejs Runnable, żeby obiekty tej klasy można było uruchomić w puli wątków. Implementacja metody run jest trywialna – tak długo, dopóki są nowe komunikaty, wyświetla ich treść i informację o numerze wątku. Iterator strumienia Kafki będzie czekał, aż pojawi się następny element – metoda hasNext nigdy nie zwróci false.

public class SimpleConsumer implements Runnable {
    ...
    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            String msg = new String(it.next().message());
            ...
            System.out.println("Thread " + threadNumber + ": " + msg);
            ...

Zachęcam Cię, żebyś uruchomił Kafkę, a następnie aplikację z grupą odbiorców. Wpisz coś w oknie terminala nadawcy i zobacz, co się wydarzy.

Zobacz, co się też wydarzy, kiedy wpiszesz SHUTDOWN.

Nadawcy i odbiorcy

Aby przetestować cały proces, możesz uruchomić aplikację KafkaProduceAndConsumeExample z pakietu io.github.dzikowski.apps. Aplikacja uruchamia najpierw KafkaProduceExample dla 3000 losowych transakcji, a następnie KafkaConsumerGroupExample.

W konsekwencji w oknie terminala – odbiorcy tematu coolkafka-in pojawią się komunikaty w postaci transakcji w formacie JSON. Takie same komunikaty pojawią się też na wyjściu aplikacji, wraz z informacją o przydzieleniu do konkretnego wątku (pochodzącą z klasy VerbosePartitioner).

Na co należy uważać

W powyższym przykładzie pokazałem przykład wykorzystania Kafki do przesyłania komunikatów dotyczących losowych transakcji bankowych. Kafka dobrze nadaje się do tego celu, ponieważ gwarantuje, że dane nie zostaną utracone, a komunikaty zachowane są w kolejności publikowania. Dzięki temu łatwo odtworzyć historyczny stan konta, a historia transakcji dostępna jest od ręki.

Należy zwrócić jednak uwagę na gwarancję, jaką daje Kafka, dotyczącą dostarczania komunikatów: przynajmniej raz. Oznacza to, że komunikat może dotrzeć do odbiorcy wielokrotnie – np. w nierozważnej implementacji może dojść do sytuacji, gdzie przelew bankowy zostanie wykonany dwukrotnie.

Innym zagadnieniem jest bezpieczeństwo danych. Na obecnym etapie rozwoju, Kafka nie jest zabezpieczana w kontekście uwierzytelnienia, autoryzacji, czy szyfrowania. Zagadnienia te są jedynie na etapie propozycji. W praktyce często wykorzystuje się rozwiązanie, w którym Kafka znajduje się za zaporą, odizolowane od internetu. (Swoją drogą, jest to rozwiązanie stosowane często także dla innych narzędzi big data).

Słowniczek

  • Broker Kafki (ang. Kafka broker) Inaczej instancja Kafki, serwer Kafki.
  • Partycja (ang. partition) Wydaje mi się, że słowo to w języku polskim wygląda równie zgrabnie, zresztą “partycja” funkcjonuje w nieco innym kontekście w IT, ale ma – moim zdaniem – zbliżone znaczenie.
  • Rejestr komunikatów (ang. commit log)
  • Komunikat (ang. message) Równie dobrze mogłem użyć słowa wiadomość, jednak uważam, że jest zbyt potoczne, np. zwrot “wymiana komunikatów” jest moim zdaniem dużo lepszy, niż “wymiana wiadomości”.
  • Odbiorca (ang. consumer) Wydaje mi się, że dosłowne tłumaczenie, “konsument” w języku polskim ma nieco inne znaczenie. Skoro mówimy o przesyłaniu komunikatów, słowo “odbiorca” pasuje tu bardziej.
  • Nadawca (ang. producer) W kontekście Kafki chodziło o coś, co służy do produkowania komunikatów. “Nadawca” wydaje mi się jednak dużo lepszym określeniem, niż “producent”.

Różne źródła w języku angielskim

  • Oficjalna dokumentacja na stronie głównej projektu.
  • Ogólna prezentacja o Kafce autorstwa Michaela G. Nolla z Verisign.
  • Prezentacja o wykorzystaniu Kafki w Spotify, autorstwa Pablo Barrery.
  • Analiza wydajności Kafki, przeprowadzona na umiarkowanie wydajnych serwerach przez Jaya Krepsa, jednego z inżynierów LinkedIn.
  • Zbiór artykułów o Kafce, znów autorstwa Michaela G. Nolla. Część z nich obejmuje integrację z Apache Storm oraz Apache Spark, które pojawią się w kolejnych moich tutorialach.