스프링 부트에서 webclient 를 통해 외부의 이미지를 조회하는 서비스를 제공한다.
로컬에서 테스트시 1 request 에 대해서 평균 100ms 이 걸리며 webflux 로 개발되어 있다.
mvc에서 사용되는 @Cacheable 은 mono, flux에 지원되지 않기에 사용에 맞게 개발이 필요한 상황이다.
build.gradle
...
ext {
swaggerVersion = "2.10.5"
commonsIoVersion = "2.7"
gsonVersion = "2.8.6"
jwtVersion = "0.9.1"
reactExtraVersion = "3.3.3.RELEASE"
caffeineVersion = "2.8.5"
}
dependencies {
// spring boot
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-aop'
implementation 'org.springframework.boot:spring-boot-starter-cache'
// caffeine
implementation "com.github.ben-manes.caffeine:caffeine:${caffeineVersion}"
// monoCache fluxCache
implementation "io.projectreactor.addons:reactor-extra:${reactExtraVersion}"
...
}
캐시 커스터마이징을 위해 필요한 것은 aop, reactor-extra 이며,
starter-cache, caffeine은 캐시 timeout을 위한 config를 위해 필요하다. (스프링부트는 더이상 GuavaCacheManager를 지원하지 않는다)
CacheConfig
import com.github.benmanes.caffeine.cache.Caffeine;
import com.kep.sol.docsconverter.utils.Constants;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
@EnableCaching
public class CacheConfig extends CachingConfigurerSupport {
@Bean
public Caffeine caffeineConfig() {
return Caffeine.newBuilder()
.expireAfterWrite(Constants.JWT_TOKEN_VALIDITY, TimeUnit.MILLISECONDS)
.maximumSize(Constants.CACHE_MAXIMUM_SIZE);
}
@Bean
public CacheManager cacheManager(Caffeine caffeine) {
CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();
caffeineCacheManager.setCaffeine(caffeine);
return caffeineCacheManager;
}
}
caffeine 의 만료시간을 설정한다. (해당 서비스는 jwt token을 이용한 rest 보안을 사용하기에 유효기간을 동일하게 맞추었다)
최대 저장 개수를 2,000개로 지정하였다. 이미지 하나당 200KB 정도여서 500MB 정도를 기본 사이즈로 잡아보았다.
ReactorCacheables
import java.lang.annotation.*;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReactorCacheables {
String name();
}
webflux 를 위한 새로운 어노테이션을 생성한다. name을 필수로 받도록 한다.
ReactorCacheManager
import lombok.RequiredArgsConstructor;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Component;
import reactor.cache.CacheFlux;
import reactor.cache.CacheMono;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import java.util.List;
import java.util.function.Supplier;
@RequiredArgsConstructor
@Component
public class ReactorCacheManager {
private final CacheManager cacheManager;
public <T> Mono<T> findCachedMono(String cacheName, Object key, Supplier<Mono<T>> retriever, Class<T> classType) {
Cache cache = cacheManager.getCache(cacheName);
assert cache != null;
return CacheMono
.lookup(k -> {
T result = cache.get(k, classType);
return Mono.justOrEmpty(result).map(Signal::next);
}, key)
.onCacheMissResume(Mono.defer(retriever))
.andWriteWith((k, signal) -> Mono.fromRunnable(() -> {
if (!signal.isOnError()) {
cache.put(k, signal.get());
}
}));
}
public <T> Flux<T> findCachedFlux(String cacheName, Object key, Supplier<Flux<T>> retriever) {
Cache cache = cacheManager.getCache(cacheName);
assert cache != null;
return CacheFlux
.lookup(k -> {
List<T> result = cache.get(k, List.class);
return Mono.justOrEmpty(result)
.flatMap(list -> Flux.fromIterable(list).materialize().collectList());
}, key)
.onCacheMissResume(Flux.defer(retriever))
.andWriteWith((k, signalList) -> Flux.fromIterable(signalList)
.dematerialize()
.collectList()
.doOnNext(list -> {
cache.put(k, list);
})
.then());
}
}
새롭게 정의한 CacheManager를 사용하며, 실제 캐싱이 필요한 mono/flux function은 retriever 로 들어온다.
cache를 먼저 찾아보고 해당 정보를 리턴하거나, 정보가 없다면 retriever 를 defer로 실행한다.
defer 에 대해서는 http://wonwoo.ml/index.php/post/category/web/spring 를 참고하자.
ReactorCacheAspect
import com.kep.sol.docsconverter.annotation.ReactorCacheables;
import com.kep.sol.docsconverter.utils.ReactorCacheManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.ResolvableType;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
@Aspect
@Component
public class ReactorCacheAspect {
private final ReactorCacheManager reactorCacheManager;
@Pointcut("@annotation(com.test.annotation.ReactorCacheables)")
public void pointcut() {
}
@Around("pointcut()")
public Object around(ProceedingJoinPoint joinPoint) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
ParameterizedType parameterizedType = (ParameterizedType) method.getGenericReturnType();
Type rawType = parameterizedType.getRawType();
if (!rawType.equals(Mono.class) && !rawType.equals(Flux.class)) {
throw new IllegalArgumentException("The return type is not Mono/Flux. Use Mono/Flux for return type. method: " + method.getName());
}
ReactorCacheables reactorCacheable = method.getAnnotation(ReactorCacheables.class);
String cacheName = reactorCacheable.name();
Object[] args = joinPoint.getArgs();
ThrowingSupplier retriever = () -> joinPoint.proceed(args);
if (rawType.equals(Mono.class)) {
Type returnTypeInsideMono = parameterizedType.getActualTypeArguments()[0];
Class<?> returnClass = ResolvableType.forType(returnTypeInsideMono).resolve();
return reactorCacheManager
.findCachedMono(cacheName, generateKey(args), retriever, returnClass)
.doOnError(e -> log.error("Failed to processing mono cache. method: " + method.getName(), e));
} else {
return reactorCacheManager
.findCachedFlux(cacheName, generateKey(args), retriever)
.doOnError(e -> log.error("Failed to processing flux cache. method: " + method.getName(), e));
}
}
private String generateKey(Object... objects) {
return Arrays.stream(objects)
.map(obj -> obj == null ? "" : obj.toString())
.collect(Collectors.joining(":"));
}
@FunctionalInterface
public interface ThrowingSupplier<T> extends Supplier<T> {
@Override
default T get() {
try {
return getThrows();
} catch (Throwable th) {
throw new RuntimeException(th);
}
}
T getThrows() throws Throwable;
}
}
aop 를 이용하여 ReactorCacheables 어노테이션이 사용된 곳에 적용되게 처리한다.
mono, flux에 맞게 ReactorCacheManager 정보를 이용하여 처리해준다.
서비스에 적용
...
@ReactorCacheables(name = "docImage")
@Override
public Mono<byte[]> getImage(String token, String imageId) {
String jwtToken = token.split(" ")[1];
if (!jwtUtils.validateToken(jwtToken))
throw new UnauthorizedException(HttpStatus.INTERNAL_SERVER_ERROR, Constants.ERROR_NO_AUTH);
String subject = jwtUtils.getSubject(jwtToken);
Gson gson = new Gson();
ImageConvInfo imageConvInfo = gson.fromJson(subject, ImageConvInfo.class);
log.info("ImageConvInfo :: {} - {}", imageId, imageConvInfo.toString());
return WebClient.builder()
.exchangeStrategies(exchangeStrategies)
.build()
.get()
.uri(imageConvInfo.getBaseResourcePath()+imageId)
.retrieve()
.bodyToMono(byte[].class);
}
...
이미지를 가져오는 서비스에 어노테이션으로 등록한다.
캐시가 적용되면 해당 서비스는 실행되지 않기에 log가 찍히지 않는것을 확인할 수 있다.
참고
https://dreamchaser3.tistory.com/m/17