ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring kafka 적용하기
    Spring 2020. 3. 11. 17:13

    해당 문서는 의존성을 추가해서 작업을 하는데 익숙하지 않은 분들을 위하여 작성하였습니다.

     

    본 내용은 How to Work with Apache Kafka in Your Spring Boot Application 와 
    How to Use Schema Registry and Avro in Spring Boot Applications 를 참고하여 작성하였습니다.


    또한 원문 제목처럼 Spring boot 기준입니다.
    되게 쉬운 영어로 되어있어서 대부분 원문을 보시고 바로 하실 수 있겠지만, 조금이라도 빠르게 하실 수 있지 않을까 싶어서 작성하였습니다.

     

    의존성 추가하기

    가장 먼저 해주어야 하는 일은 Spring boot 에 카프카 dependency 를 추가해주어야 합니다.
    아마도 maven 또는 gradle 을 이용하실 것이니 아래와 같이 dependency 를 추가해주세요.

    <dependency>
    	<groupId>org.springframework.kafka</groupId>
    	<artifactId>spring-kafka</artifactId>
    </dependency>

    쓰고/읽을 때 이용할 Java Class 를 만들어주세요

    참고한 문서에서 이용한 기본 클래스와 같은 형태로 예를 들겠습니다.
    이름과 나이 아주 간단하고 좋네요.

    public class User {
    
        private String name;
        private int age;
    
        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }
    }

    Configuration 설정하기

    카프카 토픽으로부터 어떻게 가져올 것인지, 어떻게 카프카 토픽에 넣어줄 것인지에 대한 설정이 필요합니다.
    설정 방법으로는 "@Configuration" 어노테이션을 이용한 클래스를 만들어주셔도 되고, application.yml 이나 application.properties 에 명시해주셔도 됩니다.


    원문에서 어노테이션을 이용하는 것보다는 파일에 명시하라고 나와있네요. 이유는 명확하지만 상황에 맞게 이용하시면 될 것 같아요.

    spring:
       kafka:
         consumer:
            bootstrap-servers: localhost:9092
            group-id: group_id
            auto-offset-reset: earliest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
         producer:
            bootstrap-servers: localhost:9092
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer

    일단 값을 하나씩 보면서 간단히 알고만 넘어가면 될 것 같습니다.
    크게 consumer 와 producer 로 나뉘어 있고, 카프카 브로커인 bootstrap-server 를 명시하도록 되어있습니다.
    대충 쓱 보시면 아아 그렇구나 하실 것 같아요. '제대로 된 설명이 보고 싶다'면

    https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#integration-properties 
    를 참고하시면 됩니다!

    그리고 serializer 는 https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes#available-serdes 를 참고해주세요.

    이를 함께 보시면 원하시는 설정값으로 세팅이 가능합니다.


    프로듀서를 만들어 봅시다

    '아래와 같이 만들어 줍니다' 라고 써있는데 굳이 똑같이 만들 필요 없을 꺼 같아요.
    KafkaTemplate 을 잘 주입받아서 send 하면된다 정도면 될 것 같습니다.

    @Service
    public class Producer {
    
        private static final Logger logger = LoggerFactory.getLogger(Producer.class);
        private static final String TOPIC = "users";
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessage(String message) {
            logger.info(String.format("#### -> Producing message -> %s", message));
            this.kafkaTemplate.send(TOPIC, message);
        }
    }

     

    컨슈머를 만들어 봅시다

    마찬가지로 아래와 같이 만들어 줍니다. '@KafkaListener' 어노테이션을 이용하여 간단하게 리스너를 구성할 수 있습니다.

    @Service
    public class Consumer {
    
        private final Logger logger = LoggerFactory.getLogger(Producer.class);
    
        @KafkaListener(topics = "users", groupId = "group_id")
        public void consume(String message) throws IOException {
            logger.info(String.format("#### -> Consumed message -> %s", message));
        }
    }

     

     

    테스트를 해보자

    참고한 문서에 보면 간단한 컨트롤러를 만들어 프로듀싱을 하고 컨슈밍을 통해 로그에 잘 찍히는가를 테스트합니다.
    물론 카프카 구성이 먼저 되어있어야 합니다.
    컨트롤러 만드는 부분은 따로 기술하지 않겠습니다. 그럼 이제 정말 간단하게 스프링에서 카프카를 이용하는 것까진 알겠는데,
    실제 일정 규모 이상의 회사에서는 스키마 레지스트리를 이용하니까 스키마 레지스트리를 이용하는 부분을 이어가겠습니다.

    스키마 레지스트리를 위한 의존성 추가

    스키마 레지스트리는 Confluent Schema Registry 을 기준으로 하겠습니다.
    스키마 레지스트리를 왜 쓰는가는 MSA 환경에서 데이터에 대한 계약을 보장할 수 있다라고 쓰여 있네요.
    아래와 같이 의존성을 추가해줍니다.

    <dependencies>
            <!-- other dependencies -->
            <dependency>
                <groupId>io.confluent</groupId>
                
    <artifactId>kafka-schema-registry-client</artifactId>   (1)
                <version>5.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>   (2)
                <version>1.8.2</version>
            </dependency>
            <dependency>
                <groupId>io.confluent</groupId>
                <artifactId>kafka-avro-serializer</artifactId>   (3)
                <version>5.2.1</version>
            </dependency>
            <dependency>
                <groupId>io.confluent</groupId>
                <artifactId>kafka-streams-avro-serde</artifactId>
                <version>5.3.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>

    레파지토리도 추가를 해주시고요.

    <repositories>
            <!-- other maven repositories the project -->
            <repository>
                <id>confluent</id>   (4)   
                <url>https://packages.confluent.io/maven/</url>
            </repository>
    </repositories>

    마지막으로 플러그인까지 추가해주시면 됩니다.

    <plugins>
            <!-- other maven plugins in the project -->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
    <sourceDirectory>src/main/resources/avro</sourceDirectory>   (5)
    
    <outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
                            <stringType>String</stringType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
    </plugins>

    버전은 각자 상황에 맞게 선택하시고, 플러그인에 excutions 세팅이 이렇게 되어있구나만 슬쩍 보시면 됩니다.

    sourceDirectory 에 스키마 추가하기

    7번 과정에서 플러그인 추가부분을 보면 sourceDirectory path 를 명시한 곳이 있습니다. 여기에 스키마 파일을 넣어주어야 합니다.
    방법은 여러가지가 있습니다. 목적은 여기에 스키마 파일이 생기기만 하면 됩니다.

    첫번째로 https://www.baeldung.com/java-apache-avro 를 참고하여 SchemaBuilder를 이용하는 방법이 있습니다.

    두번째는 지금 진행하는 성격이 살짝 다르지만, POJO 에서 바로 GenericRecord 만들어주는 방식이 있습니다.
    (읽어보시면 아시겠지만 지금 avro 스키마를 만드는 방향과 조금 다릅니다. 이 부분은 읽어보시고 아 이런 방식으로 진행해도 되겠구나 생각하시고 장단점을 고려하여 선택하시면 됩니다.)

    세번째는 등록된 스키마를 다운받는 방법이 있습니다.

    네번째는 직접 만드는 겁니다. 레지스트리를 보고 직접 써보셔도 되고, https://docs.confluent.io/current/control-center/topics/schema.html#topicschema 에 보면 스키마 항목별 설명이 있습니다!

    두번째 방식은 약간 지금 진행하는 방식과 다르기 때문에 잘 읽어보시고 일단은 스키마 파일을 직접 추가하는 방식으로 진행해주세요.
    예를 들어 스키마를 다음과 같이 추가하겠습니다. 그리고 2.번 스텝에서 만들었던 User class 는 삭제해주세요. 왜냐면 스키마를 이용해서 제네레이트 할꺼거든요.

    {
      "namespace": "io.confluent.developer",   (1)
      "type": "record",
      "name": "User",
      "fields": [
        {
          "name": "name",
          "type": "string",
          "avro.java.string": "String"
        },
        {
          "name": "age",
          "type": "int"
        }
      ]
    }

     

    쓱 보면 대충 형태가 보이죠. 다시 한번 말씀드리면 https://docs.confluent.io/current/control-center/topics/schema.html#topicschema 에 가면 항목별 설명이 있습니다.
    여러가지 우여곡절 끝에 소스디렉토리에 스키마가 추가되었습니다.
    avro-maven-plugin 을 이용하여 POJO 를 제네레이트 하자 avro:schema

    topic 에 대한 설정값 추가

     

    참고한 문서에는 spring boot application class 에 추가를 해놓았는데, 아마도 다들 configuration 클래스를 따로 구성하실 것 같습니다.
    취향에 맞게, 서비스에 맞게 구성해주세요.

    @SpringBootApplication
    public class SpringAvroApplication {
    
      
      @Value("${topic.name}")   (1)
      private String topicName;
    
      @Value("${topic.partitions-num}")
      private Integer partitions;
    
      @Value("${topic.replication-factor}")
      private short replicationFactor;
    
      
      @Bean
      NewTopic moviesTopic() {   (2)
        return new NewTopic(topicName, partitions, replicationFactor);
      }
    
      
      public static void main(String[] args) {
        SpringApplication.run(SpringAvroApplication.class, args);
      }
    
    }

     

    포인트는 Bean 만 추가해주면 된다는 것. 토픽의 이름 정해주시고, 파티션 갯수 정해주시고, 리플리케이션도 정해줍니다. yml 에서 가져오는 값들이 있는데 상황에 따라 쓰시면 될 것 같습니다.
    참고 문서에 보면 모든 topic 에 반응할지 말지도 yml 의 true/false 값으로 결정할 수 있다네요. 
    내용 중 리플리케이션팩터????  이런 의문이 든다면,
    https://www.popit.kr/kafka-%EC%9A%B4%EC%98%81%EC%9E%90%EA%B0%80-%EB%A7%90%ED%95%98%EB%8A%94-topic-replication/
    이 문서를 읽어보시면 많은 도움이 되더라고요.
    다음은 4,5 번 내용과 같습니다. 
    카프카템플릿을 이용하여 쓱 send 해주면 쓱 날라가고 쓱 컨슈밍까지 됩니다.


    이슈 해결

    개발하는 중에 컨슈머가 따로 필요없어 프로듀서만 작업하는 경우는 커맨드 서버에서 확인하는 방법이 있습니다. 그때 만약 인코딩이 깨진다면 위의 과정을 통해서 우린 avro 로 데이터를 생산했으므로

    kafka-console-consumer 가 아닌 kafka-avro-console-consumer로 조회하면 정상적인 값을 확인할 수 있습니다. 

     

     

    반응형

    'Spring' 카테고리의 다른 글

    Reactive Spring Boot: Part 1: Kotlin REST Service  (0) 2021.01.17
    Spring event  (1) 2020.03.16
Designed by Tistory.