본문 바로가기

Spring Boot

[webflux] 이미지 조회 Cache 처리

스프링 부트에서 webclient 를 통해 외부의 이미지를 조회하는 서비스를 제공한다.

로컬에서 테스트시 1 request 에 대해서 평균 100ms 이 걸리며 webflux 로 개발되어 있다.

 

mvc에서 사용되는 @Cacheable 은 mono, flux에 지원되지 않기에 사용에 맞게 개발이 필요한 상황이다.

 

build.gradle

...
ext {
    swaggerVersion = "2.10.5"
    commonsIoVersion = "2.7"
    gsonVersion = "2.8.6"
    jwtVersion = "0.9.1"
    reactExtraVersion = "3.3.3.RELEASE"
    caffeineVersion = "2.8.5"
}

dependencies {
    // spring boot
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-aop'
    implementation 'org.springframework.boot:spring-boot-starter-cache'

    // caffeine
    implementation "com.github.ben-manes.caffeine:caffeine:${caffeineVersion}"

    // monoCache fluxCache
    implementation "io.projectreactor.addons:reactor-extra:${reactExtraVersion}"
...
}

캐시 커스터마이징을 위해 필요한 것은 aop, reactor-extra 이며, 

starter-cache, caffeine은 캐시 timeout을 위한 config를 위해 필요하다. (스프링부트는 더이상 GuavaCacheManager를 지원하지 않는다)

 

CacheConfig

import com.github.benmanes.caffeine.cache.Caffeine;
import com.kep.sol.docsconverter.utils.Constants;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Configuration
@EnableCaching
public class CacheConfig extends CachingConfigurerSupport {

    @Bean
    public Caffeine caffeineConfig() {
        return Caffeine.newBuilder()
                .expireAfterWrite(Constants.JWT_TOKEN_VALIDITY, TimeUnit.MILLISECONDS)
                .maximumSize(Constants.CACHE_MAXIMUM_SIZE);
    }

    @Bean
    public CacheManager cacheManager(Caffeine caffeine) {
        CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();
        caffeineCacheManager.setCaffeine(caffeine);
        return caffeineCacheManager;
    }
}

caffeine 의 만료시간을 설정한다. (해당 서비스는 jwt token을 이용한 rest 보안을 사용하기에 유효기간을 동일하게 맞추었다)

최대 저장 개수를 2,000개로 지정하였다. 이미지 하나당 200KB 정도여서 500MB 정도를 기본 사이즈로 잡아보았다.

 

ReactorCacheables

import java.lang.annotation.*;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReactorCacheables {

    String name();
}

webflux 를 위한 새로운 어노테이션을 생성한다. name을 필수로 받도록 한다.

 

ReactorCacheManager

import lombok.RequiredArgsConstructor;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Component;
import reactor.cache.CacheFlux;
import reactor.cache.CacheMono;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;

import java.util.List;
import java.util.function.Supplier;

@RequiredArgsConstructor
@Component
public class ReactorCacheManager {

    private final CacheManager cacheManager;

    public <T> Mono<T> findCachedMono(String cacheName, Object key, Supplier<Mono<T>> retriever, Class<T> classType) {
        Cache cache = cacheManager.getCache(cacheName);
        assert cache != null;
        return CacheMono
                .lookup(k -> {
                    T result = cache.get(k, classType);
                    return Mono.justOrEmpty(result).map(Signal::next);
                }, key)
                .onCacheMissResume(Mono.defer(retriever))
                .andWriteWith((k, signal) -> Mono.fromRunnable(() -> {
                    if (!signal.isOnError()) {
                        cache.put(k, signal.get());
                    }
                }));
    }

    public <T> Flux<T> findCachedFlux(String cacheName, Object key, Supplier<Flux<T>> retriever) {
        Cache cache = cacheManager.getCache(cacheName);
        assert cache != null;
        return CacheFlux
                .lookup(k -> {
                    List<T> result = cache.get(k, List.class);
                    return Mono.justOrEmpty(result)
                            .flatMap(list -> Flux.fromIterable(list).materialize().collectList());
                }, key)
                .onCacheMissResume(Flux.defer(retriever))
                .andWriteWith((k, signalList) -> Flux.fromIterable(signalList)
                        .dematerialize()
                        .collectList()
                        .doOnNext(list -> {
                            cache.put(k, list);
                        })
                        .then());
    }
}

새롭게 정의한 CacheManager를 사용하며, 실제 캐싱이 필요한 mono/flux function은 retriever 로 들어온다.

cache를 먼저 찾아보고 해당 정보를 리턴하거나, 정보가 없다면 retriever 를 defer로 실행한다.

defer 에 대해서는 http://wonwoo.ml/index.php/post/category/web/spring 를 참고하자.

 

ReactorCacheAspect

import com.kep.sol.docsconverter.annotation.ReactorCacheables;
import com.kep.sol.docsconverter.utils.ReactorCacheManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.ResolvableType;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.function.Supplier;
import java.util.stream.Collectors;

@Slf4j
@RequiredArgsConstructor
@Aspect
@Component
public class ReactorCacheAspect {

    private final ReactorCacheManager reactorCacheManager;

    @Pointcut("@annotation(com.test.annotation.ReactorCacheables)")
    public void pointcut() {
    }

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint joinPoint) {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        ParameterizedType parameterizedType = (ParameterizedType) method.getGenericReturnType();
        Type rawType = parameterizedType.getRawType();

        if (!rawType.equals(Mono.class) && !rawType.equals(Flux.class)) {
            throw new IllegalArgumentException("The return type is not Mono/Flux. Use Mono/Flux for return type. method: " + method.getName());
        }
        ReactorCacheables reactorCacheable = method.getAnnotation(ReactorCacheables.class);
        String cacheName = reactorCacheable.name();
        Object[] args = joinPoint.getArgs();

        ThrowingSupplier retriever = () -> joinPoint.proceed(args);

        if (rawType.equals(Mono.class)) {
            Type returnTypeInsideMono = parameterizedType.getActualTypeArguments()[0];
            Class<?> returnClass = ResolvableType.forType(returnTypeInsideMono).resolve();
            return reactorCacheManager
                    .findCachedMono(cacheName, generateKey(args), retriever, returnClass)
                    .doOnError(e -> log.error("Failed to processing mono cache. method: " + method.getName(), e));
        } else {
            return reactorCacheManager
                    .findCachedFlux(cacheName, generateKey(args), retriever)
                    .doOnError(e -> log.error("Failed to processing flux cache. method: " + method.getName(), e));
        }
    }

    private String generateKey(Object... objects) {
        return Arrays.stream(objects)
                .map(obj -> obj == null ? "" : obj.toString())
                .collect(Collectors.joining(":"));
    }

    @FunctionalInterface
    public interface ThrowingSupplier<T> extends Supplier<T> {
        @Override
        default T get() {
            try {
                return getThrows();
            } catch (Throwable th) {
                throw new RuntimeException(th);
            }
        }

        T getThrows() throws Throwable;
    }
}

aop 를 이용하여 ReactorCacheables 어노테이션이 사용된 곳에 적용되게 처리한다.

mono, flux에 맞게 ReactorCacheManager 정보를 이용하여 처리해준다.

 

서비스에 적용

...
    @ReactorCacheables(name = "docImage")
    @Override
    public Mono<byte[]> getImage(String token, String imageId) {
        String jwtToken = token.split(" ")[1];
        if (!jwtUtils.validateToken(jwtToken))
            throw new UnauthorizedException(HttpStatus.INTERNAL_SERVER_ERROR, Constants.ERROR_NO_AUTH);

        String subject = jwtUtils.getSubject(jwtToken);
        Gson gson = new Gson();
        ImageConvInfo imageConvInfo = gson.fromJson(subject, ImageConvInfo.class);

        log.info("ImageConvInfo :: {} - {}", imageId, imageConvInfo.toString());
        return WebClient.builder()
                .exchangeStrategies(exchangeStrategies)
                .build()
                .get()
                .uri(imageConvInfo.getBaseResourcePath()+imageId)
                .retrieve()
                .bodyToMono(byte[].class);
    }
...

이미지를 가져오는 서비스에 어노테이션으로 등록한다.

캐시가 적용되면 해당 서비스는 실행되지 않기에 log가 찍히지 않는것을 확인할 수 있다.

 

참고

https://dreamchaser3.tistory.com/m/17

 

Spring Webflux Cache

개요 스프링 웹플럭스와 Reactor를 사용하여 웹서버를 개발할 때 고민이 되는 부분이 있다. 바로 캐시이다. 기존 mvc 모델에서는 스프링 캐시를 사용하여 캐싱을 쉽게 할 수 있었지만, 웹플럭스 모�

dreamchaser3.tistory.com

https://cnpnote.tistory.com/entry/SPRING-Cacheable%EC%97%90-TTL%EC%9D%84-%EC%84%A4%EC%A0%95%ED%95%A0-%EC%88%98-%EC%9E%88%EC%8A%B5%EB%8B%88%EA%B9%8C

 

[SPRING] @Cacheable에 TTL을 설정할 수 있습니까?

@Cacheable에 TTL을 설정할 수 있습니까? 나는 Spring 3.1에 대한 @Cacheable 주석 지원을 시도하고 있으며 TTL을 설정하여 캐시 된 데이터를 지우는 방법이 있는지 궁금하십니까? 지금 당장 볼 수 있듯이 @C

cnpnote.tistory.com