שילוב תצפיות ב- RxJava

1. הקדמה

במדריך מהיר זה נדון בדרכי שילוב שונות נצפים ב- RxJava.

אם אתה חדש ב- RxJava, בהחלט בדוק קודם את מדריך המבוא הזה.

עכשיו, בואו נקפוץ פנימה.

2. נצפים

נצפה רצפים, או פשוט נצפים, הם ייצוגים של זרמי נתונים אסינכרוניים.

אלה מבוססים על דפוס ה- Observer שבו אובייקט שנקרא מַשׁקִיף, מנוי על פריטים הנפלטים על ידי נצפה.

המנוי אינו חסום כ- מַשׁקִיף עומד להגיב לכל מה ש- נצפה עתיד לפלוט בעתיד. זה, בתורו, מקל על המקבילות.

הנה הדגמה פשוטה ב- RxJava:

נצפה מ- (מחרוזת חדשה [] {"ג'ון", "איילה"}). מנוי (שם -> System.out.println ("שלום" + שם))

3. שילוב נצפים

בעת תכנות באמצעות מסגרת תגובית, זה מקרה נפוץ לשלב בין שונים נצפים.

ביישום אינטרנט, למשל, נצטרך לקבל שתי קבוצות של זרמי נתונים אסינכרוניים שאינם תלויים זה בזה.

במקום להמתין להשלמת הזרם הקודם לפני שנבקש את הזרם הבא, אנו יכולים להתקשר לשניהם במקביל ולהירשם לזרמים המשולבים.

בחלק זה נדון בכמה מהדרכים השונות בהן אנו יכולים לשלב מספר רב נצפים ב- RxJava ובמקרי השימוש השונים עליהם כל שיטה חלה.

3.1. לְמַזֵג

אנחנו יכולים להשתמש ב- לְמַזֵג מפעיל לשלב את הפלט של מספר רב נצפים כדי שהם יתנהגו כמו אחד:

@Test הציבור בטל givenTwoObservables_whenMerged_shouldEmitCombinedResults () {TestSubscriber testSubscriber = חדש TestSubscriber (); Observable.merge (Observable.from (מחרוזת חדשה [] {"שלום", "עולם"}), Observable.from (מחרוזת חדשה [] {"אני אוהב", "RxJava"})). Subscribe (testSubscriber); testSubscriber.assertValues ​​("שלום", "עולם", "אני אוהב", "RxJava"); }

3.2. MergeDelayError

ה mergeDelayError שיטה זהה ל- לְמַזֵג בכך שהוא משלב מספר רב נצפים לאחד, אבל אם מתרחשות שגיאות במהלך המיזוג, היא מאפשרת להמשיך לפריטים נטולי שגיאות לפני הפצת השגיאות:

@Test הציבור בטל שניתןMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError () {TestSubscriber testSubscriber = new TestSubscriber (); Observable.mergeDelayError (Observable.from (מחרוזת חדשה [] {"שלום", "עולם"}), Observable.error (RuntimeException חדש ("יוצא מן הכלל")), Observable.from (מחרוזת חדשה [] {"rxjava"} )) .subscribe (testSubscriber); testSubscriber.assertValues ​​("שלום", "עולם", "rxjava"); testSubscriber.assertError (RuntimeException.class); }

הדוגמא לעיל פולט את כל הערכים ללא שגיאות:

שלום עולם rxjava

שים לב שאם אנו משתמשים לְמַזֵג במקום mergeDelayError, ה חוּטrxjava ” לא נפלט בגלל לְמַזֵג מפסיק מיד את זרימת הנתונים מ- נצפים כאשר מתרחשת שגיאה.

3.3. רוכסן

ה רוכסן שיטת הארכה מפגיש שני רצפי ערכים כזוגות:

@Test הציבור בטל givenTwoObservables_whenZipped_thenReturnCombinedResults () {רשימה zippedStrings = ArrayList חדש (); Observable.zip (Observable.from (מחרוזת חדשה [] {"פשוט", "מתון", "מורכב"}), Observable.from (מחרוזת חדשה [] {"פתרונות", "הצלחה", "היררכיה"}), (str1, str2) -> str1 + "" + str2). subscribe (zippedStrings :: add); assertThat (zippedStrings) .isNotEmpty (); assertThat (zippedStrings.size ()). isEqualTo (3); assertThat (zippedStrings) .contains ("פתרונות פשוטים", "הצלחה מתונה", "היררכיה מורכבת"); }

3.4. רוכסן עם מרווח

בדוגמה זו, אנו נקשר זרם עם הַפסָקָה שלמעשה יעכב את פליטת אלמנטים מהזרם הראשון:

@Test הציבור בטל שניתן AStream_whenZippedWithInterval_shouldDelayStreamEmmission () {TestSubscriber testSubscriber = new TestSubscriber (); נתונים נצפים = Observable.just ("אחד", "שניים", "שלושה", "ארבעה", "חמש"); מרווח נצפה = Observable.interval (1L, TimeUnit.SECONDS); .Zip נצפה (נתונים, מרווח, (strData, tick) -> String.format ("[% d] =% s", tick, strData)) .toBlocking (). Subscribe (testSubscriber); testSubscriber.assertCompleted (); testSubscriber.assertValueCount (5); testSubscriber.assertValues ​​("[0] = אחד", "[1] = שניים", "[2] = שלוש", "[3] = ארבע", "[4] = חמש"); }

4. סיכום

במאמר זה ראינו כמה משיטות השילוב נצפים עם RxJava. אתה יכול ללמוד על שיטות אחרות כמו combineLatest, לְהִצְטַרֵף, קבוצה הצטרף, switchOnNext, בתיעוד הרשמי של RxJava.

כמו תמיד, קוד המקור של מאמר זה זמין ברפיונו של GitHub.


$config[zx-auto] not found$config[zx-overlay] not found