RxJava 2 - זורם

1. הקדמה

RxJava הוא יישום הרחבות תגובתי של Java המאפשר לנו לכתוב יישומים מונעי אירועים ואסינכרוניים. מידע נוסף על אופן השימוש ב- RxJava ניתן למצוא במאמר המבוא שלנו כאן.

RxJava 2 שוכתב מאפס, מה שהביא מספר תכונות חדשות; חלקם נוצרו כתגובה לנושאים שהיו בגירסה הקודמת של המסגרת.

אחת התכונות הללו היא io.reactivex.Flowable.

2. נצפה לעומת. זורם

בגרסה הקודמת של RxJava, הייתה רק מחלקת בסיס אחת להתמודדות עם מקורות מודעים ללחץ אחורי ולא מודע ללחץ אחורי - נצפה.

RxJava 2 הציג הבחנה ברורה בין שני סוגים אלה של מקורות - מקורות מודעים ללחץ אחורי מיוצגים כעת באמצעות מחלקה ייעודית - זורם.

נצפה מקורות אינם תומכים בלחץ אחורי. בגלל זה, עלינו להשתמש בו למקורות שאנו צורכים ואינם יכולים להשפיע עליהם.

כמו כן, אם עסקינן במספר גדול של אלמנטים, שני תרחישים אפשריים הקשורים ללחץ אחורי יכולים להתרחש בהתאם לסוג ה נצפה.

במקרה של שימוש במה שנקרא קַר נצפה“, האירועים נפלטים בעצלתיים, כך שאנו בטוחים מלהציף מתבונן.

בעת שימוש ב- חַם נצפהעם זאת, זה ימשיך לפלוט אירועים, גם אם הצרכן לא יכול לעמוד בקצב.

3. יצירת א זורם

ישנן דרכים שונות ליצור זורם. באופן נוח מבחינתנו, שיטות אלה נראות דומות לשיטות שב- נצפה בגרסה הראשונה של RxJava.

3.1. פָּשׁוּט זורם

אנחנו יכולים ליצור זורם משתמש ב רַק() שיטה באופן דומה ככל שיכולנו נצפה:

Flowable integerFlowable = Flowable.just (1, 2, 3, 4);

למרות שמשתמשים ב- רַק() הוא די פשוט, זה לא מאוד נפוץ ליצור זורם מנתונים סטטיים, והוא משמש למטרות בדיקה.

3.2. זורם מ נצפה

כשיש לנו נצפה אנחנו יכולים להפוך את זה בקלות ל זורם משתמש ב toFlowable () שיטה:

מספר שלם נצפהObservable = Observable.just (1, 2, 3); Flowable integerFlowable = integerObservable .toFlowable (BackpressureStrategy.BUFFER);

שימו לב שכדי שנוכל לבצע את ההמרה, עלינו להעשיר את ה- נצפה עם לחץ אחורי סטרטגיה. נתאר אסטרטגיות זמינות בסעיף הבא.

3.3. זורם מ FlowableOnSubscribe

RxJava 2 הציג ממשק פונקציונלי FlowableOnSubscribe, המייצג א זורם שמתחיל להנפיק אירועים לאחר שהצרכן נרשם אליו.

בשל כך, כל הלקוחות יקבלו את אותה מערך אירועים, מה שעושה FlowableOnSubscribe בטוח בלחץ אחורי.

כשיש לנו את FlowableOnSubscribe אנחנו יכולים להשתמש בו כדי ליצור את זורם:

FlowableOnSubscribe flowableOnSubscribe = flowable -> flowable.onNext (1); Flowable integerFlowable = Flowable .create (flowableOnSubscribe, BackpressureStrategy.BUFFER);

התיעוד מתאר שיטות רבות נוספות ליצור זורם.

4. זורםלחץ אחורי

כמה שיטות כמו toFlowable () אוֹ לִיצוֹר() קח א לחץ אחורי כוויכוח.

ה לחץ אחורי הוא ספירה המגדירה את התנהגות הלחץ האחורית שנחיל על שלנו זורם.

זה יכול לשמור מטמון או לשחרר אירועים או לא ליישם התנהגות כלשהי, במקרה האחרון, אנחנו נהיה אחראים להגדרת זה, באמצעות מפעילי לחץ אחורי.

לחץ אחורי דומה ל BackpressureMode נוכח בגרסה הקודמת של RxJava.

קיימות חמש אסטרטגיות שונות ב- RxJava 2.

4.1. בַּלָם

אם אנו משתמשים ב- BackpressureStrategy.BUFFER, המקור יאגר את כל האירועים עד שהמנוי יוכל לצרוך אותם:

חלל ציבורי thenAllValuesAreBufferedAndReceived () {List testList = IntStream.range (0, 100000) .boxed () .collect (Collectors.toList ()); נצפה נצפה = Observable.fromIterable (testList); TestSubscriber testSubscriber = נצפה .toFlowable (BackpressureStrategy.BUFFER) .observeOn (Schedulers.computation ()). Test (); testSubscriber.awaitTerminalEvent (); רשימה receivedInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) object) .boxed () .collect (Collectors.toList ()); assertEquals (testList, receivedInts); }

זה דומה להפעלת onBackpressureBuffer () שיטה ב זורם, אך הוא אינו מאפשר להגדיר את גודל המאגר או את פעולת onOverflow במפורש.

4.2. יְרִידָה

אנחנו יכולים להשתמש ב- BackpressureStrategy.DROP להשליך את האירועים שלא ניתן לצרוך במקום לאגר אותם.

שוב זה דומה לשימוש onBackpressureDrop() עַל זורם:

חלל ציבורי כאשר DropStrategyUsed_thenOnBackpressureDropped () {נצפה נצפה = Observable.fromIterable (testList); TestSubscriber testSubscriber = נצפה .toFlowable (BackpressureStrategy.DROP) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); רשימה receivedInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) object) .boxed () .collect (Collectors.toList ()); assertThat (receivedInts.size () <testList.size ()); assertThat (! receivedInts.contains (100000)); }

4.3. הכי מאוחר

משתמש ב BackpressureStrategy.LATEST יאלץ את המקור לשמור רק את האירועים האחרונים, וכך להחליף את הערכים הקודמים אם הצרכן לא יכול לעמוד בקצב:

חלל ציבורי כאשרLatestStrategyUsed_thenTheLastElementReceived () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = נצפה .toFlowable (BackpressureStrategy.LATEST) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); רשימה receivedInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) object) .boxed () .collect (Collectors.toList ()); assertThat (receivedInts.size () <testList.size ()); assertThat (receivedInts.contains (100000)); }

BackpressureStrategy.LATEST ו- BackpressureStrategy.DROP נראים דומים מאוד כשאנחנו מסתכלים על הקוד.

למרות זאת, BackpressureStrategy.LATEST יחליף אלמנטים שהמנוי שלנו לא יכול לטפל בהם וישמור רק על אלה האחרונים ומכאן השם.

BackpressureStrategy.DROP, מצד שני, ישליך אלמנטים שלא ניתן יהיה לטפל בהם. פירוש הדבר שהאלמנטים החדשים ביותר לא בהכרח ייפלטו.

4.4. שְׁגִיאָה

כאשר אנו משתמשים ב- BackpressureStrategy.ERROR, אנחנו פשוט אומרים את זה אנו לא מצפים שיתרחש לחץ אחורי. כתוצאה מכך, א חסר BackpressureException צריך לזרוק אם הצרכן לא יכול לעמוד בקצב המקור:

חלל ציבורי כאשרErrorStrategyUsed_thenExceptionIsThrown () {נצפה נצפה = Observable.range (1, 100000); מנוי TestSubscriber = נצפה .toFlowable (BackpressureStrategy.ERROR) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }

4.5. חָסֵר

אם אנו משתמשים ב- BackpressureStrategy.MISSING, המקור ידחוף אלמנטים מבלי להשליך או לחציץ.

במורד הזרם יצטרכו להתמודד עם הצפות במקרה זה:

חלל ציבורי כאשר MissingStrategyUsed_thenException () {נצפה נצפה = Observable.range (1, 100000); מנוי TestSubscriber = נצפה .toFlowable (BackpressureStrategy.MISSING) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }

במבחנים שלנו, אנחנו יוצאים מהכלל חסר לחץ לחץ יוצא מן הכלל לשניהם שְׁגִיאָה ו חָסֵר אסטרטגיות. כיוון ששניהם יזרקו חריג כזה כאשר המאגר הפנימי של המקור יוצף על גדותיו.

עם זאת, כדאי לציין כי לשניהם יש מטרה שונה.

עלינו להשתמש בראשון כאשר איננו מצפים כלל ללחץ אחורי, ואנחנו רוצים שהמקור יזרוק חריג במקרה שהוא קורה.

זה האחרון יכול לשמש אם אנחנו לא רוצים לציין התנהגות ברירת מחדל ביצירת ה- זורם. ואנחנו נשתמש במפעילי לחץ אחורי כדי להגדיר את זה בהמשך.

5. סיכום

במדריך זה הצגנו את הכיתה החדשה שהוצגה ב- RxJava 2 שקוראים לו זורם.

למידע נוסף אודות זורם עצמו ו- API זה אנו יכולים להפנות לתיעוד.

כמו תמיד ניתן למצוא את כל דגימות הקוד ב- GitHub.


$config[zx-auto] not found$config[zx-overlay] not found