מבוא ל- RSocket

1. הקדמה

במדריך זה נבחן ראשונה את RSocket וכיצד הוא מאפשר תקשורת בין שרת לקוח.

2. מה זה שקע?

RSocket הוא פרוטוקול תקשורת בינארי, נקודה לנקודה מיועד לשימוש ביישומים מבוזרים. במובן זה הוא מספק חלופה לפרוטוקולים אחרים כמו HTTP.

השוואה מלאה בין RSocket לפרוטוקולים אחרים חורגת מתחום המאמר. במקום זאת, נתמקד בתכונה מרכזית של RSocket: מודלי האינטראקציה שלה.

RSocket מספק ארבעה מודלים לאינטראקציה. עם זאת, נחקור כל אחד מהם עם דוגמה.

3. תלות Maven

לדוגמא שלנו, RSocket זקוקה לשתי תלות ישירות בלבד:

 io.rsocket rsocket-core 0.11.13 io.rsocket rsocket-transport-netty 0.11.13 

התלות rsocket-core ו- rsocket-transport-netty זמינות ב- Maven Central.

הערה חשובה היא שספריית ה- RSocket עושה שימוש תכוף בזרמים תגובתיים. ה שֶׁטֶף ו מונו שיעורים משמשים לאורך מאמר זה ולכן הבנה בסיסית בהם תועיל.

4. הגדרת שרת

ראשית, בואו ניצור את שרת מעמד:

שרת ברמה ציבורית {שרת פרטי חד פעמי סופי; שרת ציבורי () {this.server = RSocketFactory.receive () .acceptor ((setupPayload, reactiveSocket) -> Mono.just (RSocketImpl new ())) .transport (TcpServerTransport.create ("localhost", TCP_PORT)). start. (). מנוי (); } בטל ציבורי להיפטר () {this.server.dispose (); } כיתה פרטית RSocketImpl מרחיב את AbstractRSocket {}}

כאן אנו משתמשים ב- RSocketFactory להגדיר ולהאזין לשקע TCP. אנחנו עוברים במנהג שלנו RSocketImpl לטפל בבקשות של לקוחות. נוסיף שיטות ל- RSocketImpl תוך כדי.

לאחר מכן, כדי להפעיל את השרת, עלינו רק לתקן אותו:

שרת שרת = שרת חדש ();

מופע שרת יחיד יכול לטפל במספר חיבורים. כתוצאה מכך, מופע שרת אחד בלבד יתמוך בכל הדוגמאות שלנו.

כשנסיים, ה- להשליך השיטה תעצור את השרת ותשחרר את יציאת TCP.

4. מודלים לאינטראקציה

4.1. בקשת תגובה

RSocket מספק מודל בקשה / תגובה - כל בקשה זוכה למענה יחיד.

עבור מודל זה ניצור שירות פשוט המחזיר הודעה ללקוח.

נתחיל בהוספת שיטה להרחבה שלנו של תקציר שקע, RSocketImpl:

@ ביטול מונו בקשת תגובה ציבורית (מטען מטען) {נסה {להחזיר מונו.just (מטען); // משקף את המטען בחזרה לשולח} לתפוס (חריג x) {להחזיר מונו.שגיאה (x); }}

ה בקשת תגובה שיטה מחזירה תוצאה אחת לכל בקשה, כפי שאנו יכולים לראות על ידי מונו סוג תגובה.

מטען הוא המחלקה המכילה תוכן הודעה ומטא נתונים. הוא משמש את כל מודלי האינטראקציה. תוכן המטען הוא בינארי, אך ישנן שיטות נוחות התומכות חוּטתוכן מבוסס.

בשלב הבא נוכל ליצור את מחלקת הלקוחות שלנו:

מחלקה ציבורית ReqResClient {שקע RSocket סופי פרטי; ReqResClient ציבורי () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } callBlocking מחרוזת ציבורי (מחרוזת מחרוזת) {שקע חזרה .requestResponse (DefaultPayload.create (מחרוזת)) .מפה (מטען :: getDataUtf8) .block (); } בטל פומבי להשליך () {this.socket.dispose (); }}

הלקוח משתמש ב- RSocketFactory.connect () שיטה ליזום חיבור שקע עם השרת. אנו משתמשים ב- בקשת תגובה שיטה בשקע כדי לשלוח מטען לשרת.

המטען שלנו מכיל את חוּט עבר ללקוח. כאשר מונו התגובה מגיעה אנו יכולים להשתמש ב- getDataUtf8 () שיטה לגשת ל חוּט תוכן התגובה.

לבסוף, אנו יכולים להריץ את מבחן האינטגרציה כדי לראות בקשה / תגובה בפעולה. אנו נשלח א חוּט לשרת וודא כי אותו הדבר חוּט מוחזר:

@ מבחן ציבורי בטל כאשרSendingAString_thenRevceiveTheSameString () {לקוח ReqResClient = ReqResClient חדש (); מחרוזת מחרוזת = "שלום RSocket"; assertEquals (מחרוזת, client.callBlocking (מחרוזת)); client.dispose (); }

4.2. אש-ושכח

עם המודל אש-ושכח, הלקוח לא יקבל תגובה מהשרת.

בדוגמה זו הלקוח ישלח מדידות מדומות לשרת במרווחי זמן של 50ms. השרת יפרסם את המדידות.

בואו נוסיף מטפל באש ושכח לשרת שלנו ב RSocketImpl מעמד:

@ ביטול ציבורי מונו FireAndForget (מטען מטען) {נסה {dataPublisher.publish (מטען); // העבר את החזר המטען Mono.empty (); } לתפוס (חריג x) {להחזיר מונו.שגיאה (x); }}

מטפל זה נראה דומה מאוד למטפל בבקשה / בתגובה. למרות זאת, fireAndForget החזרות מונו במקום מונו.

ה dataPublisher הוא מופע של org.reactivestreams.Publisher. לפיכך, זה הופך את המטען לרשות המנויים. נשתמש בזה בדוגמת הבקשה / הזרם.

לאחר מכן ניצור את לקוח האש והשכחה:

מחלקה ציבורית FireNForgetClient {שקע RSocket סופי פרטי; פרטי רשימת נתונים סופיים; ציבורי FireNForgetClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } / ** שלח מהירות בינארית (צף) כל 50ms * / sendData חלל ציבורי () {data = Collections.unmodifiableList (generateData ()); Flux.interval (Duration.ofMillis (50)). Take (data.size ()) .map (this :: createFloatPayload) .flatMap (socket :: fireAndForget) .blockLast (); } // ...}

הגדרת השקע זהה לחלוטין לקודם.

ה לשלוח נתונים() השיטה משתמשת בא שֶׁטֶף זרם כדי לשלוח מספר הודעות. על כל הודעה אנו קוראים שקע :: fireAndForget.

אנחנו צריכים להירשם כמנוי ל- מונו תגובה לכל הודעה. אם נשכח להירשם כמנוי אז שקע :: fireAndForget לא יבצע.

ה flatMap המפעיל מוודא שה- בָּטֵל התגובות מועברות למנוי ואילו blockLast המפעיל פועל כמנוי.

אנו נמתין לחלק הבא כדי להריץ את מבחן האש והשכחה. בשלב זה, ניצור לקוח בקשה / זרם לקבל את הנתונים שנדחפו על ידי לקוח האש והשכחה.

4.3. בקשה / זרם

במודל הבקשה / הזרם, בקשה אחת עשויה לקבל תגובות מרובות. כדי לראות זאת בפעולה נוכל לבנות על הדוגמה של אש ושכח. לשם כך, נבקש זרם לאחזור המדידות ששלחנו בחלק הקודם.

כמו קודם, נתחיל בהוספת מאזין חדש ל- RSocketImpl בשרת:

@ ביטול השטף הציבורי הציבורית Stream (מטען מטען) {החזר Flux.from (dataPublisher); }

ה requestStream המטפל מחזיר א שֶׁטֶף זרם. כזכור מהסעיף הקודם, fireAndForget המטפל פרסם נתונים נכנסים למערכת dataPublisher. כעת ניצור שֶׁטֶף זרם באמצעות אותו dataPublisher כמקור האירוע. על ידי כך נתוני המדידה יזרמו בצורה אסינכרונית מלקוח האש והשכחה שלנו ללקוח הבקשה / זרם שלנו.

בוא ניצור את לקוח הבקשה / הזרם הבא:

מחלקה ציבורית ReqStreamClient {שקע RSocket סופי פרטי; ReqStreamClient ציבורי () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } שטף ציבורי getDataStream () {שקע החזרה .requestStream (DefaultPayload.create (DATA_STREAM_NAME)) .map (מטען :: getData) .map (buf -> buf.getFloat ()) .onErrorReturn (null); } בטל פומבי להשליך () {this.socket.dispose (); }}

אנו מתחברים לשרת באותה צורה כמו הלקוחות הקודמים שלנו.

ב getDataStream ()אנו משתמשים socket.requestStream () לקבל זרם Flux מהשרת. מאותו זרם אנו מוציאים את לָצוּף ערכים מהנתונים הבינאריים. לבסוף, הזרם מוחזר למתקשר, ומאפשר למתקשר להירשם אליו ולעבד את התוצאות.

עכשיו בואו לבדוק. אנו נוודא את הטיול הלוך ושוב מאש ושכח לבקש / להזרים.

אנו יכולים לקבוע כי כל ערך מתקבל באותו סדר שבו נשלח. לאחר מכן נוכל לקבוע כי אנו מקבלים את אותו מספר ערכים שנשלחו:

@Test ציבורי בטל כאשרSendingStream_thenReceiveTheSameStream () {FireNForgetClient fnfClient = FireNForgetClient חדש (); ReqStreamClient streamClient = ReqStreamClient חדש (); נתוני רשימה = fnfClient.getData (); רשימת dataReceived = ArrayList חדש (); מנוי חד פעמי = streamClient.getDataStream () .index (). Subscribe (tuple -> {assertEquals ("ערך שגוי", data.get (tuple.getT1 (). IntValue ()), tuple.getT2 ()); dataReceived. הוסף (tuple.getT2 ());}, שגיאה -> LOG.error (err.getMessage ())); fnfClient.sendData (); // ... סילוק assertEquals לקוח ומנויים ("ספירת נתונים שגויה התקבלה", data.size (), dataReceived.size ()); }

4.4. עָרוּץ

מודל הערוץ מספק תקשורת דו כיוונית. במודל זה, זרמי הודעות זורמים בצורה אסינכרונית לשני הכיוונים.

בואו ליצור הדמיית משחק פשוטה כדי לבדוק זאת. במשחק זה כל צד של הערוץ יהפוך לשחקן. במהלך המשחק, שחקנים אלה ישלחו הודעות לצד השני במרווחי זמן אקראיים. הצד ההפוך יגיב להודעות.

ראשית, ניצור את המטפל בשרת. כמו קודם, אנו מוסיפים ל RSocketImpl:

@ עקוף Flux בקשת שטף ציבורית (מטעני מטען של מפרסמים) {Flux.from (מטענים)). מנוי (gameController :: processPayload); החזר Flux.from (gameController); }

ה בקשת ערוץ למטפל יש מטען זרמים הן לקלט והן לפלט. ה מוֹצִיא לָאוֹר פרמטר קלט הוא זרם מטענים שהתקבלו מהלקוח. עם הגעתם, מטענים אלה מועברים אל gameController :: processPayload פוּנקצִיָה.

בתגובה, אנו מחזירים אחר שֶׁטֶף זרם בחזרה ללקוח. זרם זה נוצר משלנו שלט משחק, שהוא גם א מוֹצִיא לָאוֹר.

הנה סיכום של שלט משחק מעמד:

GameController בכיתה ציבורית מיישמת את המו"ל {@Override מנוי בטל ציבורי (מנוי למנוי) {// שולח הודעות מטען למנוי במרווחי זמן אקראיים} תהליך בטל ציבורי תשלום (מטען מטען) {// מגיבים להודעות מהשחקן השני}}

כאשר שלט משחק מקבל מנוי הוא מתחיל לשלוח הודעות לאותו מנוי.

לאחר מכן, בואו ניצור את הלקוח:

מחלקה ציבורית ChannelClient {שקע RSocket סופי פרטי; גמר פרטי GameController gameController; ChannelClient ציבורי () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); this.gameController = GameController חדש ("נגן לקוח"); } playGame () ריק ריק () {socket.requestChannel (Flux.from (gameController)) .doOnNext (gameController :: processPayload) .blockLast (); } בטל פומבי להשליך () {this.socket.dispose (); }}

כפי שראינו בדוגמאות הקודמות שלנו, הלקוח מתחבר לשרת באותה צורה כמו הלקוחות האחרים.

הלקוח יוצר מופע משלו של ה- שלט משחק.

אנו משתמשים socket.requestChannel () לשלוח את שלנו מטען זרם לשרת. השרת מגיב בזרם מטען משל עצמו.

בתור מטעני מטען שהתקבלו מהשרת אנו מעבירים אותם אל שלנו gameController :: processPayload מטפל.

בסימולציית המשחק שלנו הלקוח והשרת הם תמונות ראי זו לזו. זה, כל צד שולח זרם של מטען וקבלת זרם של מטען מהקצה השני.

הזרמים פועלים באופן עצמאי, ללא סנכרון.

לבסוף, בואו נריץ את הסימולציה במבחן:

@ מבחן ציבורי בטל כאשר RunChannelGame_thenLogTheResults () {לקוח ChannelClient = ChannelClient חדש (); client.playGame (); client.dispose (); }

5. מסקנה

במאמר ההיכרות הזה חקרנו את מודלי האינטראקציה שמספקים RSocket. קוד המקור המלא של הדוגמאות נמצא במאגר Github שלנו.

הקפד לבדוק באתר RSocket לדיון מעמיק יותר. בפרט, השאלות הנפוצות ומסמכי המוטיבציה מספקים רקע טוב.


$config[zx-auto] not found$config[zx-overlay] not found