Spring Cloud Gateway 记录请求应答数据日志操作
我就废话不多说了,大家还是直接看代码吧~
public class GatewayContext { public static final String CACHE_GATEWAY_CONTEXT = 'cacheGatewayContext'; /** * cache json body */ private String cacheBody; /** * cache formdata */ private MultiValueMap<String, String> formData; /** * cache reqeust path */ private String path; public String getCacheBody() { return cacheBody; } public void setCacheBody(String cacheBody) { this.cacheBody = cacheBody; } public MultiValueMap<String, String> getFormData() { return formData; } public void setFormData(MultiValueMap<String, String> formData) { this.formData = formData; } public String getPath() { return path; } public void setPath(String path) { this.path = path; }}
import java.io.UnsupportedEncodingException;import java.net.URLEncoder;import java.nio.charset.Charset;import java.nio.charset.StandardCharsets;import java.util.List;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.core.io.ByteArrayResource;import org.springframework.core.io.buffer.DataBuffer;import org.springframework.core.io.buffer.DataBufferUtils;import org.springframework.core.io.buffer.NettyDataBufferFactory;import org.springframework.http.HttpHeaders;import org.springframework.http.HttpMethod;import org.springframework.http.MediaType;import org.springframework.http.codec.HttpMessageReader;import org.springframework.http.server.reactive.ServerHttpRequest;import org.springframework.http.server.reactive.ServerHttpRequestDecorator;import org.springframework.stereotype.Component;import org.springframework.util.MultiValueMap;import org.springframework.web.reactive.function.server.HandlerStrategies;import org.springframework.web.reactive.function.server.ServerRequest;import org.springframework.web.server.ServerWebExchange;import io.netty.buffer.ByteBufAllocator;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;// https://segmentfault.com/a/1190000017898354@Componentpublic class LogRequestGlobalFilter implements GlobalFilter { /** * default HttpMessageReader */ private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders(); private Logger log = LoggerFactory.getLogger(LogRequestGlobalFilter.class); @Override public Mono<Void> filter( ServerWebExchange exchange, GatewayFilterChain chain) { /** * save request path and serviceId into gateway context */ ServerHttpRequest request = exchange.getRequest(); String path = request.getPath().pathWithinApplication().value(); GatewayContext gatewayContext = new GatewayContext(); gatewayContext.setPath(path); /** * save gateway context into exchange */ exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT, gatewayContext); HttpHeaders headers = request.getHeaders(); MediaType contentType = headers.getContentType(); log.info('start-------------------------------------------------'); log.info('HttpMethod:{},Url:{}', request.getMethod(), request.getURI().getRawPath()); log.info('Headers token: {}', headers.getFirst('token')); if (request.getMethod() == HttpMethod.GET) { log.info('end-------------------------------------------------'); } if (request.getMethod() == HttpMethod.POST) { Mono<Void> voidMono = null; if (MediaType.APPLICATION_JSON.equals(contentType) || MediaType.APPLICATION_JSON_UTF8.equals(contentType)) { voidMono = readBody(exchange, chain, gatewayContext); } if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) { voidMono = readFormData(exchange, chain, gatewayContext); } return voidMono; } /* log.debug( '[GatewayContext]ContentType:{},Gateway context is set with {}', contentType, gatewayContext);*/ return chain.filter(exchange); } /** * ReadFormData * * @param exchange * @param chain * @return */ private Mono<Void> readFormData( ServerWebExchange exchange, GatewayFilterChain chain, GatewayContext gatewayContext) { final ServerHttpRequest request = exchange.getRequest(); HttpHeaders headers = request.getHeaders(); return exchange.getFormData() .doOnNext(multiValueMap -> { gatewayContext.setFormData(multiValueMap); log.info('Post x-www-form-urlencoded:{}', multiValueMap); log.info( 'end-------------------------------------------------'); }) .then(Mono.defer(() -> { Charset charset = headers.getContentType().getCharset(); charset = charset == null ? StandardCharsets.UTF_8 : charset; String charsetName = charset.name(); MultiValueMap<String, String> formData = gatewayContext.getFormData(); /** * formData is empty just return */ if (null == formData || formData.isEmpty()) { return chain.filter(exchange); } StringBuilder formDataBodyBuilder = new StringBuilder(); String entryKey; List<String> entryValue; try { /** * repackage form data */ for (Map.Entry<String, List<String>> entry : formData.entrySet()) { entryKey = entry.getKey(); entryValue = entry.getValue(); if (entryValue.size() > 1) {for (String value : entryValue) { formDataBodyBuilder.append(entryKey).append('=') .append( URLEncoder.encode(value, charsetName)) .append('&');} } else {formDataBodyBuilder .append(entryKey).append('=').append(URLEncoder .encode(entryValue.get(0), charsetName)) .append('&'); } } } catch (UnsupportedEncodingException e) { // ignore URLEncode Exception } /** * substring with the last char ’&’ */ String formDataBodyString = ''; if (formDataBodyBuilder.length() > 0) { formDataBodyString = formDataBodyBuilder.substring(0,formDataBodyBuilder.length() - 1); } /** * get data bytes */ byte[] bodyBytes = formDataBodyString.getBytes(charset); int contentLength = bodyBytes.length; ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator( request) {/** * change content-length * * @return */@Overridepublic HttpHeaders getHeaders() { HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(super.getHeaders()); if (contentLength > 0) { httpHeaders.setContentLength(contentLength); } else { httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, 'chunked'); } return httpHeaders;}/** * read bytes to Flux<Databuffer> * * @return */@Overridepublic Flux<DataBuffer> getBody() { return DataBufferUtils .read(new ByteArrayResource(bodyBytes), new NettyDataBufferFactory( ByteBufAllocator.DEFAULT), contentLength);} }; ServerWebExchange mutateExchange = exchange.mutate().request(decorator).build(); /* log.info('[GatewayContext]Rewrite Form Data :{}', formDataBodyString);*/ return chain.filter(mutateExchange); })); } /** * ReadJsonBody * * @param exchange * @param chain * @return */ private Mono<Void> readBody( ServerWebExchange exchange, GatewayFilterChain chain, GatewayContext gatewayContext) { /** * join the body */ return DataBufferUtils.join(exchange.getRequest().getBody()) .flatMap(dataBuffer -> { /* * read the body Flux<DataBuffer>, and release the buffer * //TODO when SpringCloudGateway Version Release To G.SR2,this can be update with the new version’s feature * see PR https://github.com/spring-cloud/spring-cloud-gateway/pull/1095 */ byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); Flux<DataBuffer> cachedFlux = Flux.defer(() -> { DataBuffer buffer =exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); }); /** * repackage ServerHttpRequest */ ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {@Overridepublic Flux<DataBuffer> getBody() { return cachedFlux;} }; /** * mutate exchage with new ServerHttpRequest */ ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build(); /** * read body string with default messageReaders */ return ServerRequest.create(mutatedExchange, messageReaders) .bodyToMono(String.class) .doOnNext(objectValue -> {log.info('PostBody:{}', objectValue);log.info( 'end-------------------------------------------------');gatewayContext.setCacheBody(objectValue); /* log.debug('[GatewayContext]Read JsonBody:{}',objectValue);*/ }).then(chain.filter(mutatedExchange)); }); }}
import lombok.extern.slf4j.Slf4j;import org.reactivestreams.Publisher;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.core.Ordered;import org.springframework.core.io.buffer.DataBuffer;import org.springframework.core.io.buffer.DataBufferFactory;import org.springframework.http.HttpHeaders;import org.springframework.http.HttpMethod;import org.springframework.http.server.reactive.ServerHttpRequest;import org.springframework.http.server.reactive.ServerHttpResponse;import org.springframework.http.server.reactive.ServerHttpResponseDecorator;import org.springframework.stereotype.Component;import org.springframework.web.server.ServerWebExchange;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;import java.net.InetSocketAddress;import java.net.URI;import java.nio.CharBuffer;import java.nio.charset.Charset;import java.nio.charset.StandardCharsets;import java.util.concurrent.atomic.AtomicReference;@Component@Slf4jpublic class LogResponseGlobalFilter implements GlobalFilter, Ordered { private static final String REQUEST_PREFIX = 'Request Info [ '; private static final String REQUEST_TAIL = ' ]'; private static final String RESPONSE_PREFIX = 'Response Info [ '; private static final String RESPONSE_TAIL = ' ]'; private StringBuilder normalMsg = new StringBuilder(); @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); DataBufferFactory bufferFactory = response.bufferFactory(); normalMsg.append(RESPONSE_PREFIX); ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) { @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { if (body instanceof Flux) { Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body; return super.writeWith(fluxBody.map(dataBuffer -> { // probably should reuse buffers byte[] content = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(content); String responseResult = new String(content, Charset.forName('UTF-8')); normalMsg.append('status=').append(this.getStatusCode()); normalMsg.append(';header=').append(this.getHeaders()); normalMsg.append(';responseResult=').append(responseResult); normalMsg.append(RESPONSE_TAIL); log.info(normalMsg.toString()); return bufferFactory.wrap(content); })); } return super.writeWith(body); // if body is not a flux. never got there. } }; return chain.filter(exchange.mutate().response(decoratedResponse).build()); } @Override public int getOrder() { return -2; }}
补充知识:Spring Cloud Gateway 2.x 打印 Log
场景
在服务网关层面,需要打印出用户每次的请求body和其他的参数,gateway使用的是Reactor响应式编程,和Zuul网关获取流的写法还有些不同,
不过基本的思路是一样的,都是在filter中读取body流,然后缓存回去,因为body流,框架默认只允许读取一次。
思路
1. 添加一个filter做一次请求的拦截
GatewayConfig.java
添加一个配置类,配置一个高优先级的filter,并且注入一个PayloadServerWebExchangeDecorator 对request和response做包装的类。
package com.demo.gateway2x.config;import com.demo.gateway2x.decorator.PayloadServerWebExchangeDecorator;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.Ordered;import org.springframework.core.annotation.Order;import org.springframework.web.server.WebFilter;@Configurationpublic class GatewayConfig { @Bean @Order(Ordered.HIGHEST_PRECEDENCE) //过滤器顺序 public WebFilter webFilter() { return (exchange, chain) -> chain.filter(new PayloadServerWebExchangeDecorator(exchange)); }}
PayloadServerWebExchangeDecorator.java
这个类中,我们实现了框架的ServerWebExchangeDecorator类,同时注入了自定义的两个类,PartnerServerHttpRequestDecorator 和 PartnerServerHttpResponseDecorator ,
这两个类用于后面对请求与响应的拦截。
package com.demo.gateway2x.decorator;import org.springframework.http.server.reactive.ServerHttpRequest;import org.springframework.http.server.reactive.ServerHttpResponse;import org.springframework.web.server.ServerWebExchange;import org.springframework.web.server.ServerWebExchangeDecorator;public class PayloadServerWebExchangeDecorator extends ServerWebExchangeDecorator { private PartnerServerHttpRequestDecorator requestDecorator; private PartnerServerHttpResponseDecorator responseDecorator; public PayloadServerWebExchangeDecorator(ServerWebExchange delegate) { super(delegate); requestDecorator = new PartnerServerHttpRequestDecorator(delegate.getRequest()); responseDecorator = new PartnerServerHttpResponseDecorator(delegate.getResponse()); } @Override public ServerHttpRequest getRequest() { return requestDecorator; } @Override public ServerHttpResponse getResponse() { return responseDecorator; }}
2. 在请求进入时,对request做一次拦截
PartnerServerHttpRequestDecorator.java
这个类实现了 ServerHttpRequestDecorator , 并在构造函数中,使用响应式编程,调用了打印log的方法,注意关注 Mono<DataBuffer> mono = DataBufferUtils.join(flux); ,
这里将Flux合并成了一个Mono,因为如果不这么做,body内容过多,将会被分段打印,这里是一个恒重要的点,
在打印RequestParamsHandle.chain打印过日志后,我们又返回了一个dataBuffer,用作向下传递,否则dataBuffer被读取过一次后就不能继续使用了。
package com.demo.gateway2x.decorator;import lombok.extern.slf4j.Slf4j;import org.springframework.core.io.buffer.DataBuffer;import org.springframework.core.io.buffer.DataBufferUtils;import org.springframework.http.server.reactive.ServerHttpRequest;import org.springframework.http.server.reactive.ServerHttpRequestDecorator;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;import static reactor.core.scheduler.Schedulers.single;@Slf4jpublic class PartnerServerHttpRequestDecorator extends ServerHttpRequestDecorator { private Flux<DataBuffer> body; public PartnerServerHttpRequestDecorator(ServerHttpRequest delegate) { super(delegate); Flux<DataBuffer> flux = super.getBody(); if (ParamsUtils.CHAIN_MEDIA_TYPE.contains(delegate.getHeaders().getContentType())) { Mono<DataBuffer> mono = DataBufferUtils.join(flux); body = mono.publishOn(single()).map(dataBuffer -> RequestParamsHandle.chain(delegate, log, dataBuffer)).flux(); } else { body = flux; } } @Override public Flux<DataBuffer> getBody() { return body; }}
RequestParamsHandle.java
这个类主要用来读取dataBuffer并做了日志打印处理,也可以做一些其他的例如参数校验等使用。
package com.demo.gateway2x.decorator;import com.alibaba.fastjson.JSON;import org.slf4j.Logger;import org.springframework.core.io.buffer.DataBuffer;import org.springframework.http.server.reactive.ServerHttpRequest;import org.springframework.util.StringUtils;import java.util.HashMap;import java.util.Map;public class RequestParamsHandle { public static <T extends DataBuffer> T chain(ServerHttpRequest delegate, Logger log, T buffer) { ParamsUtils.BodyDecorator bodyDecorator = ParamsUtils.buildBodyDecorator(buffer); // 参数校验 和 参数打印 log.info('Payload: {}', JSON.toJSONString(validParams(getParams(delegate, bodyDecorator.getBody())))); return (T) bodyDecorator.getDataBuffer(); } public static Map<String,Object> getParams(ServerHttpRequest delegate, String body) { // 整理参数 Map<String,Object> params = new HashMap<>(); if (delegate.getQueryParams() != null) { params.putAll(delegate.getQueryParams()); } if (!StringUtils.isEmpty(body)) { params.putAll(JSON.parseObject(body)); } return params; } public static Map<String,Object> validParams(Map<String,Object> params) { // todo 参数校验 return params; }}
3. 在结果返回时,对response做一次拦截
PartnerServerHttpResponseDecorator.java
这个类和上面的request的异曲同工,拦截响应流,并做记录入处理。
package com.demo.gateway2x.decorator;import lombok.extern.slf4j.Slf4j;import org.reactivestreams.Publisher;import org.springframework.core.io.buffer.DataBuffer;import org.springframework.core.io.buffer.DataBufferUtils;import org.springframework.http.MediaType;import org.springframework.http.server.reactive.ServerHttpResponse;import org.springframework.http.server.reactive.ServerHttpResponseDecorator;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;import static reactor.core.scheduler.Schedulers.single;@Slf4jpublic class PartnerServerHttpResponseDecorator extends ServerHttpResponseDecorator { PartnerServerHttpResponseDecorator(ServerHttpResponse delegate) { super(delegate); } @Override public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) { return super.writeAndFlushWith(body); } @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { final MediaType contentType = super.getHeaders().getContentType(); if (ParamsUtils.CHAIN_MEDIA_TYPE.contains(contentType)) { if (body instanceof Mono) { final Mono<DataBuffer> monoBody = (Mono<DataBuffer>) body; return super.writeWith(monoBody.publishOn(single()).map(dataBuffer -> ResponseParamsHandle.chain(log, dataBuffer))); } else if (body instanceof Flux) { Mono<DataBuffer> mono = DataBufferUtils.join(body); final Flux<DataBuffer> monoBody = mono.publishOn(single()).map(dataBuffer -> ResponseParamsHandle.chain(log, dataBuffer)).flux(); return super.writeWith(monoBody); } } return super.writeWith(body); }}
ResponseParamsHandle.java
响应流的日志打印
package com.demo.gateway2x.decorator;import org.slf4j.Logger;import org.springframework.core.io.buffer.DataBuffer;public class ResponseParamsHandle { public static <T extends DataBuffer> T chain(Logger log, T buffer) { ParamsUtils.BodyDecorator bodyDecorator = ParamsUtils.buildBodyDecorator(buffer); // 参数校验 和 参数打印 log.info('Payload: {}', bodyDecorator.getBody()); return (T) bodyDecorator.getDataBuffer(); }}
下面是实际操作,发送一次http请求:
控制台log结果:
github源码地址:https://github.com/qiaomengnan16/gateway-2x-log-demo
总结
gateway和zuul打印参数的方式思路是一致的,只是gateway采用的是reactor,写法上与zuul的直接读取流有些不同,这里需要知道的是Flux需要转换为Mono这个地方,如果不转换容易分多批打印。
以上这篇Spring Cloud Gateway 记录请求应答数据日志操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持好吧啦网。
相关文章: