מבוא לנטי

1. הקדמה

במאמר זה אנו נסתכל על Netty - מסגרת יישומי רשת אסינכרונית מונעת אירועים.

המטרה העיקרית של Netty היא בניית שרתי פרוטוקולים בעלי ביצועים גבוהים המבוססים על NIO (או אולי NIO.2) עם הפרדה והצמדה רופפת של רכיבי הרשת והלוגיקה העסקית. זה עשוי ליישם פרוטוקול ידוע, כגון HTTP, או פרוטוקול ספציפי משלך.

2. מושגי ליבה

Netty היא מסגרת שאינה חוסמת. זה מוביל לתפוקה גבוהה בהשוואה לחסימת IO. הבנת IO שאינה חוסמת הינה מכרעת להבנת מרכיבי הליבה של Nety והקשרים ביניהם.

2.1. עָרוּץ

עָרוּץ הוא הבסיס של Java NIO. הוא מייצג חיבור פתוח המסוגל לפעולות IO כגון קריאה וכתיבה.

2.2. עתיד

כל פעולת IO בת עָרוּץ ב- Netty לא חוסם.

המשמעות היא שכל פעולה מוחזרת מיד לאחר השיחה. יש עתיד ממשק בספריית Java הרגילה, אך זה לא נוח למטרות Netty - אנחנו יכולים רק לשאול את עתיד לגבי השלמת הפעולה או לחסימת השרשור הנוכחי עד לסיום הפעולה.

בגלל זה לנטי יש משלו ChannelFuture מִמְשָׁק. אנחנו יכולים להעביר התקשרות חוזרת אל ChannelFuture שייקרא עם סיום הפעולה.

2.3. אירועים ומטפלים

Netty משתמש בפרדיגמת יישומים מונחת אירועים, כך שצינור עיבוד הנתונים הוא שרשרת אירועים העוברים על ידי מטפלים. אירועים ומטפלים יכולים להיות קשורים לזרימת הנתונים הנכנסת והיוצאת. אירועים נכנסים יכולים להיות הבאים:

  • הפעלת ערוץ והשבתה
  • קרא אירועי מבצע
  • אירועי חריגים
  • אירועי משתמש

אירועים יוצאים הם פשוטים יותר ובדרך כלל קשורים לפתיחה / סגירה של חיבור וכתיבת / שטיפה של נתונים.

יישומי Netty מורכבים מכמה אירועי רשת ולוגיקה של יישומים ומטפליהם. ממשקי הבסיס של מטפלי אירועי הערוץ הם ChannelHandler ואבותיו ChannelOutboundHandler ו ChannelInboundHandler.

Netty מספק היררכיה ענקית של יישומים של ChannelHandler. ראוי לציין את המתאמים שהם רק יישומים ריקים, למשל. ChannelInboundHandlerAdapter ו ChannelOutboundHandlerAdapter. נוכל להאריך את המתאמים הללו כאשר אנו צריכים לעבד רק קבוצת משנה של כל האירועים.

כמו כן, קיימות יישומים רבים של פרוטוקולים ספציפיים כגון HTTP, למשל. HttpRequestDecoder, HttpResponseEncoder, HttpObjectAggregator. זה יהיה טוב להכיר אותם בג'אבאדוק של נטי.

2.4. קודנים ומפענחים

כשאנחנו עובדים עם פרוטוקול הרשת, עלינו לבצע סדרת נתונים ועריקת נתונים. לצורך כך מציגה נטי הרחבות מיוחדות של ChannelInboundHandler ל מפענחים המסוגלים לפענח נתונים נכנסים. מעמד הבסיס של מרבית המפענחים הוא ByteToMessageDecoder.

לקידוד נתונים יוצאים, ל- Netty יש הרחבות של ChannelOutboundHandler שקוראים לו מקודדים. MessageToByteEncoder הוא הבסיס לרוב יישומי המקודדים. אנו יכולים להמיר את ההודעה מרצף בתים לאובייקט ג'אווה ולהיפך באמצעות מקודדים ומפענחים.

3. יישום שרת לדוגמא

בואו ניצור פרויקט המייצג שרת פרוטוקולים פשוט שמקבל בקשה, מבצע חישוב ושולח תגובה.

3.1. תלות

קודם כל, עלינו לספק את התלות ב- Netty שלנו pom.xml:

 io.netty netty-all 4.1.10 גמר 

אנו יכולים למצוא את הגרסה האחרונה ב- Maven Central.

3.2. מודל נתונים

מחלקת נתוני הבקשה תהיה בעלת המבנה הבא:

מחלקה ציבורית RequestData {int intalue private; מחרוזת מחרוזת פרטית; // סטרים וקובעים סטנדרטיים}

נניח שהשרת מקבל את הבקשה ומחזיר את ה- intValue כפול 2. לתגובה יהיה ערך int יחיד:

ResponseData בכיתה ציבורית {int intalue private; // סטרים וקובעים סטנדרטיים}

3.3. בקש מפענח

כעת עלינו ליצור קודנים ומפענחים להודעות הפרוטוקול שלנו.

צריך לציין ש Netty עובד עם מאגר קבלת שקעים, אשר מיוצג לא כתור אלא רק כחבורת בתים. משמעות הדבר היא כי ניתן להתקשר למטפל הנכנס שלנו כאשר ההודעה המלאה אינה מתקבלת על ידי השרת.

עלינו לוודא שקיבלנו את ההודעה המלאה לפני העיבוד וישנן דרכים רבות לעשות זאת.

קודם כל, אנחנו יכולים ליצור זמני ByteBuf ולצרף את כל הבתים הנכנסים עד שנקבל את כמות הבתים הנדרשת:

מחלקה ציבורית SimpleProcessingHandler מרחיב את ChannelInboundHandlerAdapter {פרטי ByteBuf tmp; @Override handler void public הוסיף (ChannelHandlerContext ctx) {System.out.println ("המטפל נוסף"); tmp = ctx.alloc (). חיץ (4); } @Override public void handlerRemoved (ChannelHandlerContext ctx) {System.out.println ("המטפל הוסר"); tmp.release (); tmp = null; } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg; tmp.writeBytes (מ '); m. שחרור (); אם (tmp.readableBytes ()> = 4) {// בקשת עיבוד RequestData requestData = RequestData חדש (); requestData.setIntValue (tmp.readInt ()); ResponseData responseData = ResponseData חדש (); responseData.setIntValue (requestData.getIntValue () * 2); ChannelFuture עתיד = ctx.writeAndFlush (responseData); future.addListener (ChannelFutureListener.CLOSE); }}}

הדוגמה המוצגת לעיל נראית מעט מוזרה אך עוזרת לנו להבין כיצד Netty עובד. כל שיטה של ​​המטפל שלנו נקראת כאשר האירוע המתאים שלה מתרחש. אז אנו מאתחלים את המאגר כאשר מתווסף המטפל, ממלאים אותו בנתוני קבלת בתים חדשים ומתחילים לעבד אותו כשאנחנו מקבלים מספיק נתונים.

בכוונה לא השתמשנו ב- stringValue - פענוח באופן כזה יהיה מורכב שלא לצורך. זו הסיבה ש- Netty מספק שיעורי מפענח שימושיים שהם יישומים של ChannelInboundHandler: ByteToMessageDecoder ו ReplayingDecoder.

כפי שציינו לעיל אנו יכולים ליצור צינור לעיבוד ערוצים עם Netty. כדי שנוכל לשים את המפענח שלנו כמטפל הראשון והמטפל בלוגיקה בעיבוד יכול לבוא אחריו.

המפענח של RequestData מוצג בהמשך:

מחלקה ציבורית RequestDecoder מרחיב את ReplayingDecoder {Charset private charset final = Charset.forName ("UTF-8"); פענוח חלל מוגן של @Override (ChannelHandlerContext ctx, ByteBuf in, List out) זורק Exception {נתוני RequestData = RequestData חדשים (); data.setIntValue (in.readInt ()); int strLen = in.readInt (); data.setStringValue (in.readCharSequence (strLen, charset) .toString ()); out.add (נתונים); }}

הרעיון של המפענח הזה הוא די פשוט. הוא משתמש ביישום של ByteBuf אשר משליך חריג כשאין מספיק נתונים במאגר לפעולת הקריאה.

כאשר היוצא מן הכלל נתפס המאגר נפרש מחדש להתחלה והמפענח ממתין לחלק חדש של נתונים. הפענוח נעצר כאשר ה- הַחוּצָה הרשימה אינה ריקה לאחר מכן לְפַעֲנֵחַ ביצוע.

3.4. מקודד תגובה

חוץ מזה פענוח RequestData אנחנו צריכים לקודד את המסר. פעולה זו פשוטה יותר מכיוון שיש לנו את נתוני ההודעות המלאים כאשר פעולת הכתיבה מתרחשת.

אנחנו יכולים לכתוב נתונים ל עָרוּץ אצל המטפל הראשי שלנו או שנוכל להפריד בין ההיגיון וליצור מטפל המרחיב MessageToByteEncoder אשר יתפוס את הכתיבה ResponseData מבצע:

מחלקה ציבורית ResponseDataEncoder מרחיב את MessageToByteEncoder {@Override קידוד ריק מוגן (ChannelHandlerContext ctx, ResponseData msg, ByteBuf out) זורק Exception {out.writeInt (msg.getIntValue ()); }}

3.5. בקש עיבוד

מכיוון שביצענו את הפענוח והקידוד אצל מטפלים נפרדים עלינו לשנות את שלנו ProcessingHandler:

מחלקה ציבורית ProcessingHandler מרחיב את ChannelInboundHandlerAdapter {@Override public void channelRead (ChannelHandlerContext ctx, Object msg) זורק חריג {RequestData requestData = (RequestData) msg; ResponseData responseData = ResponseData חדש (); responseData.setIntValue (requestData.getIntValue () * 2); ChannelFuture עתיד = ctx.writeAndFlush (responseData); future.addListener (ChannelFutureListener.CLOSE); System.out.println (requestData); }}

3.6. רצועת אתחול השרת

עכשיו בואו נרכיב את הכל ונפעיל את השרת שלנו:

מחלקה ציבורית NettyServer {יציאת אינטראקציה פרטית; // קונסטרוקטור ציבורי ריק ריק (מחרוזת [] טענות) זורק חריג {int port = args.length> 0? Integer.parseInt (טענות [0]); : 8080; NettyServer חדש (יציאה) .run (); } הפעלה בטלנית ציבורית () זורקת Exception {EventLoopGroup bossGroup = NioEventLoopGroup חדש (); EventLoopGroup workerGroup = NioEventLoopGroup חדש (); נסה {ServerBootstrap b = ServerBootstrap חדש (); b.group (bossGroup, workerGroup) .channel (NioServerSocketChannel.class) .childHandler (ChannelAnitializer חדש () {@Override public void initChannel (SocketChannel ch) זורק Exception {ch.pipeline (). addLast (חדש RequestDecoder (), ResponseDataEc חדש (), ProcessingHandler חדש ());}}). אפשרות (ChannelOption.SO_BACKLOG, 128) .childOption (ChannelOption.SO_KEEPALIVE, נכון); ChannelFuture f = b.bind (port) .sync (); f.channel (). closeFuture (). sync (); } סוף סוף {workerGroup.shutdownGracefully (); bossGroup.shutdownGracefully (); }}}

את פרטי השיעורים המשמשים בדוגמת bootstrap של השרת לעיל ניתן למצוא ב- Javadoc שלהם. החלק המעניין ביותר הוא שורה זו:

ch.pipeline (). addLast (חדש RequestDecoder (), ResponseDataEncoder חדש (), ProcessingHandler חדש ());

כאן אנו מגדירים מטפלים נכנסים ויוצאים אשר יעבדו בקשות ופלטים בסדר הנכון.

4. יישום לקוח

הלקוח צריך לבצע קידוד ופענוח הפוך, לכן עלינו להיות עם RequestDataEncoder ו ResponseDataDecoder:

מחלקה ציבורית RequestDataEncoder מרחיב את MessageToByteEncoder {charset private Charset final = Charset.forName ("UTF-8"); קידוד הריק המוגן על ידי @Override (ChannelHandlerContext ctx, RequestData msg, ByteBuf out) זורק Exception {out.writeInt (msg.getIntValue ()); out.writeInt (msg.getStringValue (). אורך ()); out.writeCharSequence (msg.getStringValue (), ערכה); }}
Class class ResponseDataDecoder מרחיב את ReplayingDecoder {@Override decid decid decid (ChannelHandlerContext ctx, ByteBuf in, List out) זורק Exception {ResponseData data = new ResponseData (); data.setIntValue (in.readInt ()); out.add (נתונים); }}

כמו כן, עלינו להגדיר א ClientHandler אשר ישלח את הבקשה ויקבל את התגובה מהשרת:

מחלקה ציבורית ClientHandler מרחיב את ChannelInboundHandlerAdapter {@Override public void channelActive (ChannelHandlerContext ctx) זורק חריג {RequestData msg = new RequestData (); msg.setIntValue (123); msg.setStringValue ("כל העבודה ואין משחק הופך את ג'ק לילד משעמם"); ChannelFuture עתיד = ctx.writeAndFlush (msg); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) זורק Exception {System.out.println ((ResponseData) msg); ctx.close (); }}

עכשיו בואו לאתחל את הלקוח:

מחלקה ציבורית NettyClient {main public public static (String [] args) זורק Exception {String host = "localhost"; יציאת int = 8080; EventLoopGroup workerGroup = NioEventLoopGroup חדש (); נסה {Bootstrap b = Bootstrap new (); b.group (workerGroup); b.channel (NioSocketChannel.class); b.option (ChannelOption.SO_KEEPALIVE, נכון); b.handler (ChannelInitializer חדש () {@Override הציבור הריק initChannel (SocketChannel ch) זורק Exception {ch.pipeline (). addLast (חדש RequestDataEncoder (), ResponseDataDecoder חדש (), ClientHandler חדש ());}}); ChannelFuture f = b.connect (מארח, יציאה). Sync (); f.channel (). closeFuture (). sync (); } סוף סוף {workerGroup.shutdownGracefully (); }}}

כפי שאנו רואים, ישנם פרטים רבים במשותף עם bootstrapping של השרת.

כעת אנו יכולים להריץ את השיטה העיקרית של הלקוח ולהסתכל על פלט המסוף. כצפוי, קיבלנו ResponseData עם intValue שווה ל 246.

5. מסקנה

במאמר זה הייתה לנו היכרות מהירה עם Netty. הראינו את מרכיבי הליבה שלה כגון עָרוּץ ו ChannelHandler. כמו כן, יצרנו עבורו שרת פרוטוקולים פשוט שאינו חוסם ולקוח.

כמו תמיד, כל דוגמאות הקוד זמינות ב- GitHub.