זרמי תגובה 9 של ג'אווה

1. סקירה כללית

במאמר זה נבחן את זרמי התגובה של Java 9. במילים פשוטות, נוכל להשתמש ב- זְרִימָה class, אשר מצרף את אבני הבניין העיקריות לבניית לוגיקה לעיבוד זרם תגובתי.

זרמים ריאקטיביים הוא תקן לעיבוד זרם אסינכרוני עם לחץ גב לא חוסם. מפרט זה מוגדר ב מניפסט תגובתי, ויש מימושים שונים של זה, למשל, RxJava אוֹ עכו-נחלים.

2. סקירה API ריאקטיבית

לבנות א זְרִימָה, נוכל להשתמש בשלוש מופשטים עיקריים ולחבר אותם לוגיקת עיבוד אסינכרונית.

כֹּל זְרִימָה צריך לעבד אירועים שמתפרסמים אליו על ידי מופע של מפרסם; ה מוֹצִיא לָאוֹר יש שיטה אחת - הירשם כמנוי ().

אם מישהו מהמנויים רוצה לקבל אירועים שפורסמו על ידו, עליהם להירשם למנוי מוֹצִיא לָאוֹר.

מקלט ההודעות צריך ליישם את מָנוּי מִמְשָׁק. בדרך כלל זה הסוף לכל אחד זְרִימָה עיבוד כי המופע שלו לא שולח הודעות הלאה.

אנחנו יכולים לחשוב על מָנוּי כ כִּיוֹר. יש לכך ארבע שיטות שצריך לבטל - onSubscribe (), onNext (), onError (), ו onComplete (). אנו נסתכל על אלה בסעיף הבא.

אם אנו רוצים להפוך את ההודעה הנכנסת ולהעביר אותה הלאה להבא מָנוּי, אנחנו צריכים ליישם את מעבד מִמְשָׁק. זה פועל גם כ- מָנוּי כי זה מקבל הודעות, וכמו מוֹצִיא לָאוֹר מכיוון שהוא מעבד את המסרים האלה ושולח אותם לעיבוד נוסף.

3. פרסום וצריכת הודעות

נניח שאנחנו רוצים ליצור פשוט זְרִימָה, בו יש לנו א מוֹצִיא לָאוֹר פרסום הודעות, ופשוט מָנוּי צורכת הודעות כשהן מגיעות - אחת בכל פעם.

בואו ניצור EndSubscriber מעמד. עלינו ליישם את מָנוּי מִמְשָׁק. לאחר מכן נעקוף את השיטות הנדרשות.

ה onSubscribe () שיטה נקראת לפני תחילת העיבוד. המקרה של מִנוּי מועבר כוויכוח. זהו מחלקה המשמשת לשליטה בזרימת המסרים בין מָנוּי וה מוֹצִיא לָאוֹר:

מחלקה ציבורית ציבורית EndSubscriber מיישם מנוי {מנוי פרטי למנוי; רשימה ציבורית consumedElements = חדש LinkedList (); @ ביטול ציבורי בטל @ מנויים (מנוי מנוי) {this.subscription = מנוי; מנוי. בקשה (1); }}

אותנו גם אתחול ריק רשימה שֶׁל נצרך אלמנטים זה ישמש במבחנים.

כעת, עלינו ליישם את השיטות הנותרות מה- מָנוּי מִמְשָׁק. השיטה העיקרית כאן היא onNext () - זה נקרא בכל פעם שה- מוֹצִיא לָאוֹר מפרסם הודעה חדשה:

@ ביטול חלל ציבורי ב- Next (פריט T) {System.out.println ("Got:" + פריט); מנוי. בקשה (1); }

שים לב שכאשר התחלנו את המנוי ב- onSubscribe () וכאשר עיבדנו הודעה עלינו להתקשר אל בַּקָשָׁה() שיטה על מִנוּי כדי לאותת על כך שהזרם מָנוּי מוכן לצרוך עוד הודעות.

לבסוף, עלינו ליישם onError () - מה שמכונה בכל פעם שיוצא חריג כלשהו בעיבוד, כמו גם onComplete () - נקרא כאשר מוֹצִיא לָאוֹר סגור:

@ ביטול ציבורי בטל onError (ניתן לזריקה) {t.printStackTrace (); } @ ביטול ציבורי בטל onComplete () {System.out.println ("בוצע"); }

בואו נכתוב מבחן לעיבוד זְרִימָה. נשתמש ב- הגשה מפרסם מחלקה - מבנה מתוך java.util.concurrent - המיישם את מוֹצִיא לָאוֹר מִמְשָׁק.

אנחנו הולכים להגיש נ אלמנטים ל מוֹצִיא לָאוֹר - אשר שלנו EndSubscriber יקבל:

@ מבחן ציבורי בטל כאשר SubscribeToIt_thenShouldConsumeAll () זורק את InterruptedException {// נתון SubmissionPublish Publisher = חדש SubmissionPublisher (); מנוי EndSubscriber = חדש EndSubscriber (); publisher.subscribe (מנוי); פריטי רשימה = List.of ("1", "x", "2", "x", "3", "x"); // כאשר assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (מו"ל :: להגיש); publisher.close (); // ואז ממתינים (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (items)); }

שים לב, שאנחנו קוראים ל- סגור() השיטה למשל EndSubscriber. זה יפעיל onComplete () החזרה למטה בכל פעם מָנוּי של הנתון מוֹצִיא לָאוֹר.

הפעלת תוכנית זו תפיק את הפלט הבא:

יש: 1 Got: x Got: 2 Got: x Got: 3 Got: x בוצע

4. טרנספורמציה של מסרים

בואו נגיד שאנחנו רוצים לבנות היגיון דומה בין a מוֹצִיא לָאוֹר ו מָנוּי, אך גם ליישם טרנספורמציה מסוימת.

אנו ניצור את TransformProcessor כיתה שמיישמת מעבד ומתארך הגשה מפרסם - כיוון שזה יהיה שניהם פמעלה ו- Sמנוי.

נעבור ב פוּנקצִיָה שיהפכו תשומות ליציאות:

מחלקה ציבורית TransformProcessor מרחיב את SubmissionPublisher מיישם את Flow.Processor {פונקציית פונקציה פרטית; מנוי פרטי של Flow. מנוי; TransformProcessor ציבורי (פונקציית פונקציה) {super (); this.function = פונקציה; } @ ביטול חלל ציבורי ב- Subscribe (מנוי Flow.Subscription) {this.subscription = מנוי; מנוי. בקשה (1); } @ ביטול חלל ציבורי ב- Next (פריט T) {שלח (function.apply (פריט)); מנוי. בקשה (1); } @Override חלל ציבורי onError (Throwable t) {t.printStackTrace (); } @ ביטול ציבורי בטל onComplete () {סגור (); }}

בואו עכשיו לכתוב מבחן מהיר עם זרימת עיבוד שבה ה- מוֹצִיא לָאוֹר מפרסם חוּט אלמנטים.

שֶׁלָנוּ TransformProcessor ינתח את חוּט כפי ש מספר שלם - כלומר המרה צריכה לקרות כאן:

@ מבחן ציבורי בטל כאשר SubscribeAndTransformElements_thenShouldConsumeAll () זורק את InterruptedException {// given SubmissionPublish publisher = New SubmissionPublisher (); TransformProcessor transformProcessor = TransformProcessor חדש (מספר שלם :: parseInt); מנוי EndSubscriber = חדש EndSubscriber (); פריטי רשימה = List.of ("1", "2", "3"); רשימה expectResult = List.of (1, 2, 3); // כאשר publisher.subscribe (transformProcessor); transformProcessor.subscribe (מנוי); items.forEach (מפרסם :: להגיש); publisher.close (); // ואז לחכות (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (expectResult)); }

שים לב, כי קורא את סגור() שיטה על הבסיס מוֹצִיא לָאוֹר יגרום ל onComplete () שיטה על TransformProcessor שיופעל.

זכור כי כל המו"לים ברשת העיבוד צריכים להיות סגורים בדרך זו.

5. בקרת דרישה להודעות באמצעות מִנוּי

בואו נגיד שאנחנו רוצים לצרוך רק את האלמנט הראשון מהמנוי, להחיל קצת הגיון ולסיים את העיבוד. אנחנו יכולים להשתמש ב- בַּקָשָׁה() שיטה להשיג זאת.

בואו נשנה את שלנו EndSubscriber לצרוך מספר N בלבד של הודעות. אנו נעביר את המספר הזה כ- howMuchMessagesConsume טיעון קונסטרוקטור:

מחלקה ציבורית EndSubscriber מיישמת מנוי {private AtomicInteger howMuchMessagesConsume; מנוי מנויים פרטי; רשימה ציבורית consumedElements = LinkedList חדש (); Public EndSubscriber (Integer howMuchMessagesConsume) {this.howMuchMessagesConsume = AtomicInteger חדש (howMuchMessagesConsume); } @ ביטול חלל ציבורי ב- מנוי (מנוי למנוי) {this.subscription = מנוי; מנוי. בקשה (1); } @ ביטול ציבורי בטל ב- Next (פריט T) {howMuchMessagesConsume.decrementAndGet (); System.out.println ("Got:" + פריט); consumedElements.add (פריט); אם (howMuchMessagesConsume.get ()> 0) {מנוי. בקשה (1); }} // ...}

אנחנו יכולים לבקש אלמנטים כל עוד אנחנו רוצים.

בואו נכתוב מבחן בו אנו רוצים לצרוך רק אלמנט אחד מהנתון מִנוּי:

@ מבחן פומבי בטל כאשר RequestForOnlyOneElement_thenShouldConsumeOne () זורק את InterruptedException {// נתון המו"ל SubmissionPublisher = SubmissionPublisher חדש (); מנוי EndSubscriber = חדש EndSubscriber (1); publisher.subscribe (מנוי); פריטי רשימה = List.of ("1", "x", "2", "x", "3", "x"); רשימה צפויה = List.of ("1"); // כאשר assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (מו"ל :: להגיש); publisher.close (); // ואז לחכות (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (צפוי)); }

למרות ש מוֹצִיא לָאוֹר מפרסם שישה אלמנטים, שלנו EndSubscriber יצרכו רק אלמנט אחד מכיוון שהוא מאותת על דרישה לעיבוד אחד בלבד.

באמצעות בַּקָשָׁה() שיטה על מִנוּי, אנו יכולים להטמיע מנגנון לחץ-גב מתוחכם יותר כדי לשלוט על מהירות צריכת ההודעות.

6. מסקנה

במאמר זה, הסתכלנו על הזרמים המגיבים של Java 9.

ראינו כיצד ליצור עיבוד זְרִימָה המורכב מ- מוֹצִיא לָאוֹר ו מָנוּי. יצרנו זרימת עיבוד מורכבת יותר עם השינוי של אלמנטים באמצעות מעבדים.

לבסוף, השתמשנו ב- מִנוּי לשלוט בדרישה לאלמנטים על ידי מָנוּי.

ניתן למצוא את היישום של כל הדוגמאות וקטעי הקוד בפרויקט GitHub - זהו פרויקט Maven, כך שיהיה קל לייבא ולהפעיל אותו כפי שהוא.


$config[zx-auto] not found$config[zx-overlay] not found