先把栗子放上,让大家方便测试用: Service端 public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 设置一个协议,默认bolt .setPort(12200) // 设置一个端口,默认12200 .setDaemon(false); // 非守护线程 ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定接口 .setRef(new HelloServiceImpl()) // 指定实现 .setServer(serverConfig); // 指定服务端 providerConfig.export(); // 发布服务 } public class HelloServiceImpl implements HelloService { private final static Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class); @Override public String sayHello(String string) { LOGGER.info("Server receive: " + string); // 获取请求透传数据并打印 System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getRequestBaggage("req_bag")); // 设置响应透传数据到当前线程的上下文中 RpcInvokeContext.getContext().putResponseBaggage("req_bag", "s2c"); return "hello " + string + " !"; } } client端 public static void main(String[] args) { ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定接口 .setProtocol("bolt") // 指定协议 .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址 .setConnectTimeout(10 * 1000); RpcInvokeContext.getContext().putRequestBaggage("req_bag", "a2bbb"); HelloService helloService = consumerConfig.refer(); while (true) { System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getResponseBaggage("req_bag")); try { LOGGER.info(helloService.sayHello("world")); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } 通过上面的栗子我们可以看出整个流程应该是: 客户端把需要透传的数据放入到requestBaggage中,然后调用服务端 服务端在HelloServiceImpl中获取请求透传数据并打印,并把响应数据放入到responseBaggage中 客户端收到透传数据 所以下面我们从客户端开始源码讲解。 客户端数据透传给服务端 首先客户端在引用之前要设置putRequestBaggage,然后在客户端引用的时候会调用ClientProxyInvoker#invoke方法。 如下: ClientProxyInvoker#invoke public SofaResponse invoke(SofaRequest request) throws SofaRpcException { .... // 包装请求 decorateRequest(request); .... } 通过调用decorateRequest会调用到子类DefaultClientProxyInvoker的decorateRequest方法。 DefaultClientProxyInvoker#decorateRequest protected void decorateRequest(SofaRequest request) { .... RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext(); RpcInternalContext internalContext = RpcInternalContext.getContext(); if (invokeCtx != null) { .... // 如果用户指定了透传数据 if (RpcInvokeContext.isBaggageEnable()) { // 需要透传 BaggageResolver.carryWithRequest(invokeCtx, request); internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx); } } .... } 在decorateRequest方法里首先会校验有没有开启透传数据,如果开启了,那么就调用BaggageResolver#carryWithRequest,把要透传的数据放入到request里面 BaggageResolver#carryWithRequest public static void carryWithRequest(RpcInvokeContext context, SofaRequest request) { if (context != null) { //获取所有的透传数据 Map<String, String> requestBaggage = context.getAllRequestBaggage(); if (CommonUtils.isNotEmpty(requestBaggage)) { // 需要透传 request.addRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE, requestBaggage); } } } 这个方法里面要做的就是获取所有的透传数据,然后放置到RequestProp里面,这样在发送请求的时候就会传送到服务端。 服务端接受透传数据 服务端的调用流程如下: BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker 所以从上面的调用链可以知道,在服务端引用的时候会经过ProviderBaggageFilter过滤器,我们下面看看这个过滤器做了什么事情: ProviderBaggageFilter#invoke public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException { SofaResponse response = null; try { //从request中获取透传数据存入到requestBaggage中 BaggageResolver.pickupFromRequest(RpcInvokeContext.peekContext(), request, true); response = invoker.invoke(request); } finally { if (response != null) { BaggageResolver.carryWithResponse(RpcInvokeContext.peekContext(), response); } } return response; } ProviderBaggageFilter会调用BaggageResolver#pickupFromRequest从request中获取数据 BaggageResolver#pickupFromRequest public static void pickupFromRequest(RpcInvokeContext context, SofaRequest request, boolean init) { if (context == null && !init) { return; } // 解析请求 Map<String, String> requestBaggage = (Map<String, String>) request .getRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE); if (CommonUtils.isNotEmpty(requestBaggage)) { if (context == null) { context = RpcInvokeContext.getContext(); } context.putAllRequestBaggage(requestBaggage); } } 最后会在ProviderBaggageFilter invoke方法的finally里面调用BaggageResolver#carryWithResponse把响应透传数据回写到response里面。 public static void carryWithResponse(RpcInvokeContext context, SofaResponse response) { if (context != null) { Map<String, String> responseBaggage = context.getAllResponseBaggage(); if (CommonUtils.isNotEmpty(responseBaggage)) { String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + "."; for (Map.Entry<String, String> entry : responseBaggage.entrySet()) { response.addResponseProp(prefix + entry.getKey(), entry.getValue()); } } } } 客户端收到响应透传数据 最后客户端会在ClientProxyInvoker#invoke方法里调用decorateResponse获取response回写的数据。 public SofaResponse invoke(SofaRequest request) throws SofaRpcException { .... // 包装响应 decorateResponse(response); .... } decorateResponse是在子类DefaultClientProxyInvoker实现的: DefaultClientProxyInvoker#decorateResponse protected void decorateResponse(SofaResponse response) { .... //如果开启了透传 if (RpcInvokeContext.isBaggageEnable()) { BaggageResolver.pickupFromResponse(invokeCtx, response, true); } .... } 这个方法里面会调用BaggageResolver#pickupFromResponse public static void pickupFromResponse(RpcInvokeContext context, SofaResponse response, boolean init) { if (context == null && !init) { return; } Map<String, String> responseBaggage = response.getResponseProps(); if (CommonUtils.isNotEmpty(responseBaggage)) { String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + "."; for (Map.Entry<String, String> entry : responseBaggage.entrySet()) { if (entry.getKey().startsWith(prefix)) { if (context == null) { context = RpcInvokeContext.getContext(); } //因为entry的key里面会包含rpc_resp_baggage,所以需要截取掉 context.putResponseBaggage(entry.getKey().substring(prefix.length()), entry.getValue()); } } } } 这个方法里面response获取所有的透传数据,然后放入到ResponseBaggage中。 到这里SOFARPC数据透传就分析完毕了
转载自://www.cnblogs.com/luozhiyun/p/11388450.html