Създаване на мащабируемо приложение за размяна на съобщения с Apache Kafka

15 юли
Автор: Георги Георгиев, Senior Java разработчик в софтуерната компания DataArt
Създаване на мащабируемо приложение за размяна на съобщения с Apache Kafka
Apache Kafka е една от най-използваните системи за изпращане и получаване на съобщения (събития), имаща за цел да осигури висока производителност и ниско забавяне при обработката на тези съобщения.

georgievКато повечето Apache библиотеки, Kafka е open-source инструмент, чието основно предимство е високото ниво на надеждност. С негова помощ могат да се правят както малки приложения за писмена комуникация, така и големи приложения, боравещи с огромни обеми от данни като електронна търговия, банков софтуер, финансов софтуер, телекомуникационен софтуер, софтуер в сферата на здравеопазването и т.н..

Събитията в Apache Kafka са разделени на т. нар. теми (topics).  Например - тема би могла да е папка с документи на име „плащания“, съдържаща събитие със следните три основни компонента:

  • Ключ на събитието: Георги
  • Съдържание на събитието: Вноска по кредит в размер на 500 лв.
  • Дата на събитието: 20.02.2020 г.

Apache Kafka е изградена на базата на разпределени системи за управление (Distributed control systems - DCS), състоящи се от сървъри и клиенти.

Клиентите могат да се абонират за определени теми, от които да получават или да изпращат потоци събития както последователно, така и паралелно, което осигурява голям дебит (bandwidth) на комуникация между клиентите през сървърите.

Сървърите, от своя страна, се грижат за запазването на тези събития и това да бъдат доставени до всеки клиент максимално бързо и с висока надеждност. Дори и някой клиент да не е онлайн, за да получи съобщението за дадено събитие, сървърът съхранява това събитие колкото време е необходимо.

Как работи Apache Kafka?

Както вече обясних, Apache Kafka работи със сървъри и клиенти. Сървърите служат за съхранение и обработване на съобщенията и могат да бъдат: високо-достъпен клъстер, data център или cloud услуга.

За комуникация на сървърите с външни системи (за импортиране / експортиране на данни) се използва Kafka Connect, което осигурява Kafka Streams. Клиентите от своя страна биха могли да са микросървисна архитектура за четене, писане и обработка на потоци от събития паралелно, мащабируемо и fault-tolerant. Кафка е изградена от пет модула и всеки отговаря за определена функционалност:

  • Producer API – публикува записи/поток от събития в една или повече Kafka теми.
  • Consumer API - всеки потребител в група може динамично да задава списък с теми, за които иска да се абонира.
  • Connector API - изпълнява многократно използваните API за producer-и и потребители, които могат да свържат темите със съществуващите приложения.
  • Streams API - това API преобразува входните потоци в изходни и произвежда резултата.
  • Admin API - използва се за управление на теми на Kafka, брокери и други обекти на Kafka.

Инсталиране и стартиране на Apache Kafka

Ще стартираме една инстанция на Apache Kafka брокер. Това е най-лесният и най-бързият начин да разберете как работи. Преди това обаче трябва да свалим и инсталираме Apache Kafka.

1. Сваляне на Apache Kafka

Можете да свалите последната версия на Kafka от тук.  Разархивирайте я на удобно за работа място на вашия компютър. ВНИМАНИЕ: Препоръчвам ви да я разархивирате в C:\kafka, за да не ползвате много дълги пътища на файлове и директории. Ако я разархивирате например в C:\Program Files\Apache\Kafka може да имате проблем със стартирането.

2. Конфигуриране и стартиране на Zookeeper

Apache Kafka се стартира винаги като Distributer application и трябва да се вземат предвид синхронизация на конфигурации, избор на лидер в клъстера и т.н. Цялата тази работа се върши от Zookeeper, той е част от пакета с Apache Kafka, който свалихте преди малко.

За да го конфигурирате, отворете файл:

C:\kafka\config\zookeeper.properties

Ако ползвате Windows, променете dataDir на:

dataDir=C:/kafka/data

Конфигурацията dataDir оказва къде да се съхраняват данните на Kafka, като може да си добавите и папката с log файловете:

dataDir=C:/kafka/dir

Така при евентуални проблеми ще знаете къде се намират логфайловете.

За да стартирате Zookeeper под Windows, трябва да влезете в папката “bin/windows” и да изпълните командата:

./zookeeper-server-start.bat ../../config/zookeeper.properties

Zookeeper се стартира с конфигурация по подразбиране на порт 2181. Той също така координира работата на отделните Kafka Брокери (Kafka Brokers) във вашия клъстер.

3. Стартиране на Kafka Broker

За да стартирате Брокер, трябва да влезете в папката “bin/windows” и да изпълните командата:

./kafka-server-start.bat ../../config/server.properties

4. Създаване на Тема (Topic) в Apache Kafka

Това е последната стъпка преди да преминем към разработване на приложението.

Ще създадем Тема „messageTopic“ със следната команда от „bin“ папката:

./kafka-topics.sh --create --topic messageTopic -zookeeper \ 
localhost:2181 --replication-factor 1 --partitions 1

5. Създаване на Java приложение, използващо Apache Kafka

Преди да преминете нататък, проверете дали имате инсталирани следните приложения:

  • Java (препоръчителна версия 8)
  • Maven (последна версия)

6. Създаване на pom.xml файл

Всяко Maven базирано приложение използва конфигурационен файл pom.xml. За да го създадете, може да използвате вградения в Maven генератор на базови проекти. Ето как:

  • Отворете едно CMD и изпълнете следната команда:
mvn archetype:generate \ 
-DgroupId=com.dataart.messaging.app \
-DartifactId=messaging-app \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false

Така ще създадете Maven проект в папка messaging-app.

  • За по-лесно конфигуриране на pom.xml файла, разгледайте и копирайте съдържанието:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>net.javaguides.springboot</groupId>
 <artifactId>Springboot-helloworld-application</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>Springboot-helloworld-application</name>
 <description>Demo project for Spring Boot</description>

 <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
     <version>2.0.5.RELEASE</version>
     <relativePath/> <!-- lookup parent from repository -->
 </parent>

 <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <java.version>1.8</java.version>
 </properties>

 <dependencies>
     <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
     </dependency>

     <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
     </dependency>
 </dependencies>

 <build>
     <plugins>
         <plugin>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-maven-plugin</artifactId>
         </plugin>
     </plugins>
 </build>
</project>

Допълнителните елементи във файла са за използване на дипендънситата на Spring Boot и Apache Kafka, както и parent project-а на Spring Boot.

Може да преименувате main файла App.java на MessagingApp.java със следното съдържание:

package com.dataart.messaging.app;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;

@SpringBootApplication(exclude = {SecurityAutoConfiguration.class})
public class MessagingApp
{
     public static void main( String[] args )
     {
        SpringApplication.run(MessagingApp.class, args);
     }

Както знаете, в него е входната точка за изпълнение на програмата.

Следващата стъпка е да създадем събитието (съобщението), което ще се изпраща и получава. За тази цел създайте package:

com.dataart.messaging.app.component

и в него създайте MessageComponent.java файл и поставете следното съдържание:

package com.dataart.messaging.app.component;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Component
public class MessageComponent {

    private final List<String> messages = new ArrayList><();

    @KafkaListener(topics = "messagingTopic", groupId = "kafka-sandbox")
    public void listen(String message) {
        synchronized (messages) {
            messages.add(message);
        }
    }

    public List<String> getMessages() {
        return messages;
    }

}

Както се вижда, този компонент има задачата да вземе съобщенията от Topic-а на Kafka и да ги запази в един динамичен масив, който ще бъде използван по-късно.

Следващата стъпка е създаването на конфигурации за получател и изпращач. За тази цел създайте package:

com.dataart.messaging.app.configuration

И в него сложете следните конфигурации за изпращане и получаване на съобщения. За целта създайте файл ReceiveConfiguration.java и в него сложете кода на получаване на съобщения:

package com.dataart.messaging.app.configuration; 

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class ReceiverConfiguration {

    private static final String KAFKA_BROKER = "localhost:9092";
    private static final String GROUP_ID = "kafka-sandbox";

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigurations());
    }

    @Bean
    public Map<String, Object> consumerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();

        configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
        configurations.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return configurations;
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

Създайте файл ReceiveConfiguration.java и в него сложете кода на изпращане на съобщения:

package com.dataart.messaging.app.configuration;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class SenderConfiguration {

    private static final String KAFKA_BROKER = "localhost:9092";

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigurations());
    }

    @Bean
    public Map<String, Object> producerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();

        configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
        configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return configurations;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

Както виждате, в тези конфигурации има информация за Apache Kafka брокера, който да се ползва, както и порта, на който да се свържат изпращача и получателя.

Ще използваме REST service за изпращане и получаване на съобщенията, като за целта можете да създадете package:

com.dataart.messaging.app.controller

…и създайте файл KafkaController.java в него сложете контролера на приложението:

package com.dataart.messaging.app.controller; 

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.dataart.messaging.app.component.*;

import java.util.List;

@RestController
public class KafkaController {

    private KafkaTemplate<String, String> template;
    private MessageComponent messageComponent;

    public KafkaController(KafkaTemplate<String, String> template, MessageComponent messageComponent) {
        this.template = template;
        this.messageComponent = messageComponent;
    }

    @GetMapping("/kafka/send")
    public void produce(@RequestParam String message) {
        template.send("messagingTopic", message);
    }

    @GetMapping("/kafka/messages")
    public List<String> getMessages() {
        return messageComponent.getMessages();
    }

 Ето с тази команда можете да стартирате приложението:

mvn spring-boot:run

След което можете да изпратите съобщение през браузъра:

Както и да видите какви съобщения вече са изпратени:

Сега имате завършено приложение, в което можете да изпращате съобщения и да получавате такива.

На базата на тази примерна интеграция на Apache Kafka, може да помислите как да направите приложение, така че да се изпращат и получава съобщения от два отделни компютъра (например - да ползвате две различни теми). Бихте могли и да подобрите съобщенията, така че изпращачът да има възможност да вижда дали дадено съобщение е получено и прочетено (например - всяко съобщение да е с различни статуси или да влиза в различни теми).