成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久

您的位置:首頁技術文章
文章詳情頁

Springboot整合Netty實現RPC服務器的示例代碼

瀏覽:4日期:2023-03-30 17:51:24
一、什么是RPC?

RPC(Remote Procedure Call)遠程過程調用,是一種進程間的通信方式,其可以做到像調用本地方法那樣調用位于遠程的計算機的服務。其實現的原理過程如下:

本地的進程通過接口進行本地方法調用。 RPC客戶端將調用的接口名、接口方法、方法參數等信息利用網絡通信發送給RPC服務器。 RPC服務器對請求進行解析,根據接口名、接口方法、方法參數等信息找到對應的方法實現,并進行本地方法調用,然后將方法調用結果響應給RPC客戶端。二、實現RPC需要解決那些問題?1. 約定通信協議格式

RPC分為客戶端與服務端,就像HTTP一樣,我們需要定義交互的協議格式。主要包括三個方面:

請求格式 響應格式 網絡通信時數據的序列化方式

RPC請求

@Datapublic class RpcRequest { /** * 請求ID 用來標識本次請求以匹配RPC服務器的響應 */ private String requestId; /** * 調用的類(接口)權限定名稱 */ private String className; /** * 調用的方法名 */ private String methodName; /** * 方法參類型列表 */ private Class<?>[] parameterTypes; /** * 方法參數 */ private Object[] parameters;}

RPC響應

@Datapublic class RpcResponse { /** * 響應對應的請求ID */ private String requestId; /** * 調用是否成功的標識 */ private boolean success = true; /** * 調用錯誤信息 */ private String errorMessage; /** * 調用結果 */ private Object result;}2. 序列化方式

序列化方式可以使用JDK自帶的序列化方式或者一些第三方的序列化方式,JDK自帶的由于性能較差所以不推薦。我們這里選擇JSON作為序列化協議,即將請求和響應對象序列化為JSON字符串后發送到對端,對端接收到后反序列為相應的對象,這里采用阿里的 fastjson 作為JSON序列化框架。

3. TCP粘包、拆包

TCP是個“流”協議,所謂流,就是沒有界限的一串數據。大家可以想想河里的流水,是連成一片的,其間并沒有分界線。TCP底層并不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包的劃分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。粘包和拆包需要應用層程序來解決。

我們采用在請求和響應的頭部保存消息體的長度的方式解決粘包和拆包問題。請求和響應的格式如下:

+--------+----------------+ | Length | Content | | 4字節 | Length個字節 | +--------+----------------+4. 網絡通信框架的選擇

出于性能的考慮,RPC一般選擇異步非阻塞的網絡通信方式,JDK自帶的NIO網絡編程操作繁雜,Netty是一款基于NIO開發的網絡通信框架,其對java NIO進行封裝對外提供友好的API,并且內置了很多開箱即用的組件,如各種編碼解碼器。所以我們采用Netty作為RPC服務的網絡通信框架。

三、RPC服務端

RPC分為客戶端和服務端,它們有一個共同的服務接口API,我們首先定義一個接口 HelloService

public interface HelloService { String sayHello(String name);}

然后服務端需要提供該接口的實現類,然后使用自定義的@RpcService注解標注,該注解擴展自@Component,被其標注的類可以被Spring的容器管理。

@RpcServicepublic class HelloServiceImp implements HelloService { @Override public String sayHello(String name) { return 'Hello ' + name; }}

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Componentpublic @interface RpcService { }

RPC服務器類

我們實現了ApplicationContextAware接口,以便從bean容器中取出@RpcService實現類,存入我們的map容器中。

@Component@Slf4jpublic class RpcServer implements ApplicationContextAware, InitializingBean { // RPC服務實現容器 private Map<String, Object> rpcServices = new HashMap<>(); @Value('${rpc.server.port}') private int port; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> services = applicationContext.getBeansWithAnnotation(RpcService.class); for (Map.Entry<String, Object> entry : services.entrySet()) { Object bean = entry.getValue(); Class<?>[] interfaces = bean.getClass().getInterfaces(); for (Class<?> inter : interfaces) { rpcServices.put(inter.getName(), bean); } } log.info('加載RPC服務數量:{}', rpcServices.size()); } @Override public void afterPropertiesSet() { start(); } private void start(){ new Thread(() -> { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new IdleStateHandler(0, 0, 60));pipeline.addLast(new JsonDecoder());pipeline.addLast(new JsonEncoder());pipeline.addLast(new RpcInboundHandler(rpcServices)); } }) .channel(NioServerSocketChannel.class); ChannelFuture future = bootstrap.bind(port).sync(); log.info('RPC 服務器啟動, 監聽端口:' + port); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); boss.shutdownGracefully(); worker.shutdownGracefully(); } }).start(); }}

RpcServerInboundHandler 負責處理RPC請求

@Slf4jpublic class RpcServerInboundHandler extends ChannelInboundHandlerAdapter { private Map<String, Object> rpcServices; public RpcServerInboundHandler(Map<String, Object> rpcServices){ this.rpcServices = rpcServices; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info('客戶端連接成功,{}', ctx.channel().remoteAddress()); } public void channelInactive(ChannelHandlerContext ctx) { log.info('客戶端斷開連接,{}', ctx.channel().remoteAddress()); ctx.channel().close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ RpcRequest rpcRequest = (RpcRequest) msg; log.info('接收到客戶端請求, 請求接口:{}, 請求方法:{}', rpcRequest.getClassName(), rpcRequest.getMethodName()); RpcResponse response = new RpcResponse(); response.setRequestId(rpcRequest.getRequestId()); Object result = null; try { result = this.handleRequest(rpcRequest); response.setResult(result); } catch (Exception e) { e.printStackTrace(); response.setSuccess(false); response.setErrorMessage(e.getMessage()); } log.info('服務器響應:{}', response); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info('連接異常'); ctx.channel().close(); super.exceptionCaught(ctx, cause); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.ALL_IDLE){ log.info('客戶端已超過60秒未讀寫數據, 關閉連接.{}',ctx.channel().remoteAddress()); ctx.channel().close(); } }else{ super.userEventTriggered(ctx,evt); } } private Object handleRequest(RpcRequest rpcRequest) throws Exception{ Object bean = rpcServices.get(rpcRequest.getClassName()); if(bean == null){ throw new RuntimeException('未找到對應的服務: ' + rpcRequest.getClassName()); } Method method = bean.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes()); method.setAccessible(true); return method.invoke(bean, rpcRequest.getParameters()); }}四、RPC客戶端

/** * RPC遠程調用的客戶端 */@Slf4j@Componentpublic class RpcClient { @Value('${rpc.remote.ip}') private String remoteIp; @Value('${rpc.remote.port}') private int port; private Bootstrap bootstrap; // 儲存調用結果 private final Map<String, SynchronousQueue<RpcResponse>> results = new ConcurrentHashMap<>(); public RpcClient(){ } @PostConstruct public void init(){ bootstrap = new Bootstrap().remoteAddress(remoteIp, port); NioEventLoopGroup worker = new NioEventLoopGroup(1); bootstrap.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 10)); pipeline.addLast(new JsonEncoder()); pipeline.addLast(new JsonDecoder()); pipeline.addLast(new RpcClientInboundHandler(results)); } }); } public RpcResponse send(RpcRequest rpcRequest) { RpcResponse rpcResponse = null; rpcRequest.setRequestId(UUID.randomUUID().toString()); Channel channel = null; try { channel = bootstrap.connect().sync().channel(); log.info('連接建立, 發送請求:{}', rpcRequest); channel.writeAndFlush(rpcRequest); SynchronousQueue<RpcResponse> queue = new SynchronousQueue<>(); results.put(rpcRequest.getRequestId(), queue); // 阻塞等待獲取響應 rpcResponse = queue.take(); results.remove(rpcRequest.getRequestId()); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(channel != null && channel.isActive()){ channel.close(); } } return rpcResponse; }}

RpcClientInboundHandler負責處理服務端的響應

@Slf4jpublic class RpcClientInboundHandler extends ChannelInboundHandlerAdapter { private Map<String, SynchronousQueue<RpcResponse>> results; public RpcClientInboundHandler(Map<String, SynchronousQueue<RpcResponse>> results){ this.results = results; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RpcResponse rpcResponse = (RpcResponse) msg; log.info('收到服務器響應:{}', rpcResponse); if(!rpcResponse.isSuccess()){ throw new RuntimeException('調用結果異常,異常信息:' + rpcResponse.getErrorMessage()); } // 取出結果容器,將response放進queue中 SynchronousQueue<RpcResponse> queue = results.get(rpcResponse.getRequestId()); queue.put(rpcResponse); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state() == IdleState.ALL_IDLE){ log.info('發送心跳包'); RpcRequest request = new RpcRequest(); request.setMethodName('heartBeat'); ctx.channel().writeAndFlush(request); } }else{ super.userEventTriggered(ctx, evt); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ log.info('異常:{}', cause.getMessage()); ctx.channel().close(); }}接口代理

為了使客戶端像調用本地方法一樣調用遠程服務,我們需要對接口進行動態代理。

代理類實現

@Componentpublic class RpcProxy implements InvocationHandler { @Autowired private RpcClient rpcClient; @Override public Object invoke(Object proxy, Method method, Object[] args){ RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); rpcRequest.setParameterTypes(method.getParameterTypes()); RpcResponse rpcResponse = rpcClient.send(rpcRequest); return rpcResponse.getResult(); }}

實現FactoryBean接口,將生產動態代理類納入 Spring 容器管理。

public class RpcFactoryBean<T> implements FactoryBean<T> { private Class<T> interfaceClass; @Autowired private RpcProxy rpcProxy; public RpcFactoryBean(Class<T> interfaceClass){ this.interfaceClass = interfaceClass; } @Override public T getObject(){ return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, rpcProxy); } @Override public Class<?> getObjectType() { return interfaceClass; }}

自定義類路徑掃描器,掃描包下的RPC接口,動態生產代理類,納入 Spring 容器管理

public class RpcScanner extends ClassPathBeanDefinitionScanner { public RpcScanner(BeanDefinitionRegistry registry) { super(registry); } @Override protected Set<BeanDefinitionHolder> doScan(String... basePackages) { Set<BeanDefinitionHolder> beanDefinitionHolders = super.doScan(basePackages); for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) { GenericBeanDefinition beanDefinition = (GenericBeanDefinition)beanDefinitionHolder.getBeanDefinition(); beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(beanDefinition.getBeanClassName()); beanDefinition.setBeanClassName(RpcFactoryBean.class.getName()); } return beanDefinitionHolders; } @Override protected boolean isCandidateComponent(MetadataReader metadataReader) throws IOException { return true; } @Override protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) { return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent(); }}

@Componentpublic class RpcBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor { @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { RpcScanner rpcScanner = new RpcScanner(registry); // 傳入RPC接口所在的包名 rpcScanner.scan('com.ygd.rpc.common.service'); } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { }}

JSON編解碼器

/** * 將 RpcRequest 編碼成字節序列發送 * 消息格式: Length + Content * Length使用int存儲,標識消息體的長度 * * +--------+----------------+ * | Length | Content | * | 4字節 | Length個字節 | * +--------+----------------+ */public class JsonEncoder extends MessageToByteEncoder<RpcRequest> { @Override protected void encode(ChannelHandlerContext ctx, RpcRequest rpcRequest, ByteBuf out){ byte[] bytes = JSON.toJSONBytes(rpcRequest); // 將消息體的長度寫入消息頭部 out.writeInt(bytes.length); // 寫入消息體 out.writeBytes(bytes); }}

/** * 將響應消息解碼成 RpcResponse */public class JsonDecoder extends LengthFieldBasedFrameDecoder { public JsonDecoder(){ super(Integer.MAX_VALUE, 0, 4, 0, 4); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf msg = (ByteBuf) super.decode(ctx, in); byte[] bytes = new byte[msg.readableBytes()]; msg.readBytes(bytes); RpcResponse rpcResponse = JSON.parseObject(bytes, RpcResponse.class); return rpcResponse; }}

測試我們編寫一個Controller進行測試

@RestController@RequestMapping('/hello')public class HelloController { @Autowired private HelloService helloService; @GetMapping('/sayHello') public String hello(String name){ return helloService.sayHello(name); }}

通過 PostMan調用 controller 接口 http://localhost:9998/hello/sayHello?name=小明

響應: Hello 小明

總結

本文實現了一個簡易的、具有基本概念的RPC,主要涉及的知識點如下:

網絡通信及通信協議的編碼、解碼 Java對象的序列化及反序列化 通信鏈路心跳檢測 Java反射 JDK動態代理

項目完整代碼詳見:https://github.com/yinguodong/netty-rpc

到此這篇關于Springboot整合Netty實現RPC服務器的示例代碼的文章就介紹到這了,更多相關Springboot RPC服務器內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Spring
相關文章:
成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久
亚洲经典在线看| 毛片av一区二区| 琪琪一区二区三区| 日韩视频在线一区二区三区| 久久久久99精品国产片| 国产成人aaa| 这里只有精品99re| 精品一区二区三区久久久| 一本色道a无线码一区v| 亚洲二区在线视频| 亚洲主播在线| 亚洲精品中文字幕乱码三区| 亚洲精品系列| 亚洲综合久久久久| 久久成人一区| 日韩精品一级中文字幕精品视频免费观看| 美女国产精品| 青草av.久久免费一区| 欧美午夜片在线观看| 美脚の诱脚舐め脚责91| 欧美日韩三级视频| 国产精品18久久久久| 日韩一二三区不卡| 不卡在线视频中文字幕| 久久久国产一区二区三区四区小说| av电影在线观看完整版一区二区| 日韩三级视频中文字幕| 成人免费视频网站在线观看| 精品久久久久久久久久久久久久久久久 | 久久综合成人精品亚洲另类欧美| 国产凹凸在线观看一区二区| 欧美精品一区二区不卡 | 国产美女精品一区二区三区| 91精品欧美综合在线观看最新| 国内偷窥港台综合视频在线播放| 欧美久久久久久蜜桃| 国产成人亚洲精品青草天美| 欧美大片在线观看| 欧美日韩在线播放一区二区| 亚洲视频每日更新| 午夜在线a亚洲v天堂网2018| 蜜臀va亚洲va欧美va天堂| 在线成人午夜影院| av欧美精品.com| 成人免费视频在线观看| 亚洲男人影院| 精品在线你懂的| 欧美成人乱码一区二区三区| 午夜性色一区二区三区免费视频| 国产精品美女久久久久久| 一区二区欧美日韩| 免费高清成人在线| 日韩一区二区免费在线观看| 91视频观看免费| 亚洲啪啪综合av一区二区三区| 国产一区二区三区成人欧美日韩在线观看 | 欧美另类女人| 亚洲另类在线视频| 在线视频一区二区三区| 国产 日韩 欧美大片| 国产精品欧美一级免费| 麻豆久久婷婷| 福利一区福利二区| 自拍偷拍国产亚洲| 欧美做爰猛烈大尺度电影无法无天| 国产91在线观看丝袜| 国产精品美女久久久久久2018 | 欧美精品播放| 亚洲午夜三级在线| 67194成人在线观看| 午夜欧美理论片| 亚洲成人一二三| 日韩女优av电影| 日韩视频三区| 国产一区不卡视频| 亚洲欧洲日本在线| 欧美性猛片xxxx免费看久爱| 91视频免费播放| 亚洲成人黄色小说| 精品理论电影在线观看| 国产精品日本欧美一区二区三区| 国产米奇在线777精品观看| 国产精品初高中害羞小美女文| 色婷婷av一区二区三区gif| 99久久国产综合精品麻豆| 亚洲精选视频免费看| 欧美一级黄色大片| 国产精品免费一区二区三区在线观看 | 91免费看`日韩一区二区| 一区二区三区美女视频| 欧美精品一二三| 黄色综合网站| 国产在线一区二区| 亚洲丝袜美腿综合| 91麻豆精品国产综合久久久久久| 国产一区二区三区四区老人| 日本欧美加勒比视频| 日本一区二区动态图| 欧美日韩精品一区视频| 亚洲欧洲一区| 成人a免费在线看| 婷婷开心激情综合| 亚洲国产精品高清| 在线不卡免费欧美| 亚洲中午字幕| 欧美日韩亚洲一区| 国产一区中文字幕| 一区二区三区视频在线看| 欧美tk—视频vk| 久久一区二区精品| 国内一区二区在线视频观看 | 影院欧美亚洲| 国产美女在线精品| 亚洲香蕉伊在人在线观| 久久亚洲影视婷婷| 欧美日韩久久不卡| 亚洲一区区二区| 欧美久久一区| 国产麻豆精品在线| 五月婷婷激情综合| ...av二区三区久久精品| 欧美成人一区二区三区在线观看| 一本大道久久精品懂色aⅴ| 在线播放日韩| a美女胸又www黄视频久久| 免费日韩伦理电影| 一区二区三区在线观看视频| 国产日产亚洲精品系列| 91精品国模一区二区三区| 91国偷自产一区二区开放时间 | 欧美视频中文一区二区三区在线观看| 91久久久久| 成人av集中营| 麻豆国产精品官网| 夜夜嗨av一区二区三区中文字幕 | 国产精品久久久久精k8| 日韩午夜电影在线观看| 欧亚一区二区三区| 久久久久久穴| 亚洲一区二区三区精品在线观看| 激情久久久久久久| 色综合久久中文综合久久牛| 国产a区久久久| 国内精品自线一区二区三区视频| 午夜视黄欧洲亚洲| 亚洲三级视频在线观看| 久久久99精品免费观看不卡| 制服丝袜在线91| 欧美日韩视频第一区| 久久亚洲午夜电影| 先锋影音久久久| 亚洲国产精品第一区二区三区| 色综合色综合色综合色综合色综合| 国产不卡视频一区| 韩国欧美国产1区| 久久福利资源站| 美女网站一区二区| 美女在线视频一区| 日本伊人午夜精品| 午夜精品久久一牛影视| 一区二区三区欧美| 亚洲欧美偷拍另类a∨色屁股| 国产精品欧美一级免费| 国产欧美日韩精品一区| 久久久www成人免费无遮挡大片| 精品日韩99亚洲| 日韩欧美精品三级| 日韩欧美在线123| 欧美一区三区二区| 欧美日韩国产精选| 欧美高清性hdvideosex| 91精品中文字幕一区二区三区| 欧美无砖砖区免费| 欧美在线一区二区三区| 欧美性做爰猛烈叫床潮| 在线免费观看日韩欧美| 在线亚洲人成电影网站色www| 色婷婷综合在线| 欧美在线小视频| 欧美日韩日日夜夜| 欧美一区二区三区日韩视频| 欧美一级欧美一级在线播放| 91精品国产综合久久国产大片| 337p亚洲精品色噜噜| 日韩精品一区二区三区在线 | 秋霞电影一区二区| 蜜臀av一区二区| 激情欧美一区二区| 国产成人精品午夜视频免费| 国产精华液一区二区三区| 成人午夜又粗又硬又大| 不卡一区二区中文字幕| 91免费看`日韩一区二区| 亚洲图片欧洲图片日韩av| 99av国产精品欲麻豆| 久久国产一区二区| 欧美日韩一区在线| 欧美一区二区三区免费| 久久亚洲捆绑美女| 国产精品福利一区二区|