התמודדות עם לחץ אחורי עם RxJava

1. סקירה כללית

במאמר זה נבחן את האופן בו ספריית RxJava עוזרת לנו להתמודד עם לחץ אחורי.

במילים פשוטות - RxJava משתמש במושג של זרמים תגובתי על ידי הצגת נצפים, לאיזה אחד או רבים משקיפים יכול להירשם. ההתמודדות עם זרמים אולי אינסופיים היא מאתגרת מאוד, מכיוון שאנו צריכים להתמודד עם בעיה של לחץ אחורי.

לא קשה להגיע למצב בו נצפה פולט פריטים במהירות רבה יותר מכפי שמנוי יכול לצרוך אותם. נבחן את הפתרונות השונים לבעיית גידול המאגר של פריטים שלא נצרכים.

2. חם נצפים נגד קר נצפים

ראשית, בואו ניצור פונקציה צרכנית פשוטה שתשמש כצרכן של אלמנטים מ נצפים שנגדיר בהמשך:

מחלקה ציבורית ComputeFunction {public static void compute (Integer v) {try {System.out.println ("compute integer v:" + v); Thread.sleep (1000); } לתפוס (InterruptedException e) {e.printStackTrace (); }}}

שֶׁלָנוּ לְחַשֵׁב() הפונקציה היא פשוט הדפסת הטיעון. הדבר החשוב שיש לשים לב כאן הוא קריאה של א Thread.sleep (1000) שיטה - אנו עושים זאת כדי לחקות איזו משימה ריצה ארוכה שתגרום נצפה למלא פריטים מהר יותר מַשׁקִיף יכול לצרוך אותם.

יש לנו שני סוגים של נצפים - חם ו קַר - שהם שונים לחלוטין כשמדובר בטיפול בלחץ אחורי.

2.1. קַר נצפים

קר נצפה פולט רצף פריטים מסוים אך יכול להתחיל לפלוט רצף זה כאשר שלו מַשׁקִיף מוצא שזה נוח, ובכל קצב שהוא מַשׁקִיף רצונות, מבלי לשבש את שלמות הרצף. קַר נצפה מספק פריטים בצורה עצלה.

ה מַשׁקִיף לוקח אלמנטים רק כאשר הוא מוכן לעבד פריט זה, ופריטים אינם צריכים להיות מאוחסנים ב- נצפה כי הם מתבקשים בצורה מושכת.

לדוגמא, אם אתה יוצר נצפה בהתבסס על טווח סטטי של אלמנטים ממיליון למיליון, זה נצפה היה פולט את אותו רצף של פריטים ולא משנה באיזו תדירות נצפים פריטים אלה:

Observable.range (1, 1_000_000) .observeOn (Schedulers.computation ()). Subscribe (ComputeFunction :: compute);

כשאנחנו מתחילים את התוכנית שלנו, הפריטים יחושבו על ידי מַשׁקִיף בעצלתיים ויתבקש בצורה מושכת. ה Schedulers.computation () השיטה פירושה שאנחנו רוצים להפעיל את מַשׁקִיף בתוך מאגר חוטי חישוב ב RxJava.

הפלט של התוכנית יורכב מתוצאה של לְחַשֵׁב() שיטה המופעלת עבור פריט אחד אחד מתוך נצפה:

לחשב מספר שלם v: 1 לחשב מספר שלם v: 2 לחשב מספר שלם v: 3 לחשב מספר שלם v: 4 ...

קַר נצפים לא צריך להיות כל צורה של לחץ אחורי כי הם עובדים בצורה מושכת. דוגמאות לפריטים הנפלטים מהצטננות נצפה עשוי לכלול תוצאות של שאילתת מסד נתונים, אחזור קבצים או בקשת אינטרנט.

2.2. חַם נצפים

חם נצפה מתחיל ליצור פריטים ופולט אותם מיד עם יצירתם. זה מנוגד להצטננות נצפים מודל משיכה של עיבוד. חַם נצפה פולט פריטים בקצב שלו, וזה תלוי במשקיפים לשמור על קשר.

כאשר מַשׁקִיף אינו מסוגל לצרוך פריטים באותה מהירות שהם מיוצרים על ידי נצפה צריך לאגר אותם או לטפל בהם בדרך אחרת, מכיוון שהם ימלאו את הזיכרון ויגרמו לבסוף OutOfMemoryException.

בואו ניקח בחשבון דוגמה של חם נצפה, שמייצר מיליון פריטים לצרכן הסופי שעובד את הפריטים האלה. כש לְחַשֵׁב() שיטה ב מַשׁקִיף לוקח קצת זמן לעבד כל פריט, נצפה מתחיל למלא זיכרון בפריטים, מה שגורם לתוכנית להיכשל:

PublishSubject source = PublishSubject.create (); source.observeOn (Schedulers.computation ()). subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); IntStream.range (1, 1_000_000) .forEach (מקור :: onNext); 

הפעלת תוכנית זו תיכשל עם חסר BackpressureException כי לא הגדרנו דרך לטפל בייצור יתר נצפה.

דוגמאות לפריטים הנפלטים ממס חם נצפה עשוי לכלול אירועי עכבר ומקלדת, אירועי מערכת או מחירי מניות.

3. חציצה ייצור יתר נצפה

הדרך הראשונה להתמודד עם ייצור יתר נצפה זה להגדיר סוג כלשהו של חיץ עבור אלמנטים שלא ניתן לעבד על ידי מַשׁקִיף.

אנו יכולים לעשות זאת על ידי קריאת א בַּלָם() שיטה:

PublishSubject source = PublishSubject.create (); source.buffer (1024) .observeOn (Schedulers.computation ()). subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

הגדרת חיץ בגודל 1024 תתן מַשׁקִיף זמן מה להתעדכן במקור ייצור יתר. המאגר ישמור פריטים שטרם עובדו.

אנו יכולים להגדיל את גודל החיץ כדי שיהיה לנו מספיק מקום לערכים המיוצרים.

שים לב עם זאת שבדרך כלל, זה יכול להיות רק תיקון זמני מכיוון שהצפה עדיין יכולה לקרות אם המקור מייצר יתר על המידה את גודל החיץ החזוי.

4. אצווה פריטים שנפלטו

אנו יכולים לייצר פריטי ייצור יתר בחלונות של אלמנטים N.

מתי נצפה מייצר אלמנטים מהר יותר מאשר מַשׁקִיף יכולים לעבד אותם, אנו יכולים להקל על כך על ידי קיבוץ אלמנטים מיוצרים יחד ושליחת קבוצה של אלמנטים אל מַשׁקִיף המסוגל לעבד אוסף של אלמנטים במקום אלמנט אחד אחד:

PublishSubject source = PublishSubject.create (); source.window (500) .observeOn (Schedulers.computation ()). subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

באמצעות חַלוֹן() שיטה עם ויכוח 500, יספר נצפה לקבץ אלמנטים במנות 500 בגודל. טכניקה זו יכולה להפחית את הבעיה של ייצור יתר נצפה מתי מַשׁקִיף מסוגל לעבד אצווה של אלמנטים מהר יותר בהשוואה לעיבוד אלמנטים אחד אחד.

5. דילוג על אלמנטים

אם חלק מהערכים המיוצרים על ידי נצפה ניתן להתעלם בבטחה, אנו יכולים להשתמש בדגימה בתוך זמן מסוים ומפעילי חנק.

השיטות לִטעוֹם() ו מצערת ראשונה () לוקחים משך כפרמטר:

  • הסמַסְפִּיק() שיטה בודקת מעת לעת את רצף האלמנטים ופולטת את הפריט האחרון שיוצר תוך פרק הזמן שצוין
  • ה מצערת ראשונה () השיטה פולטת את הפריט הראשון שיוצר לאחר פרק הזמן שצוין

משך הזמן הוא שאחריו נבחר אלמנט ספציפי אחד מתוך רצף האלמנטים המיוצרים. אנו יכולים לציין אסטרטגיה לטיפול בלחץ אחורי על ידי דילוג על אלמנטים:

PublishSubject source = PublishSubject.create (); source.sample (100, TimeUnit.MILLISECONDS) .observeOn (Schedulers.computation ()). subscribe (ComputeFunction :: compute, Throwable :: printStackTrace);

ציינו כי אסטרטגיית דילוג על אלמנטים תהיה לִטעוֹם() שיטה. אנו רוצים מדגם של רצף באורך של 100 אלפיות השנייה. אלמנט זה ייפלט ל מַשׁקִיף.

זכור, עם זאת, שמפעילים אלה רק מפחיתים את קצב קבלת הערך במורד הזרם מַשׁקִיף וכך הם עדיין עשויים להוביל ל חסר BackpressureException.

6. טיפול במילוי נצפה בַּלָם

למקרה שהאסטרטגיות שלנו לדגימת או אלמנטים באצווה אינן עוזרות במילוי חיץ, עלינו ליישם אסטרטגיה של טיפול בתיקים כאשר חיץ מתמלא.

עלינו להשתמש ב- onBackpressureBuffer () שיטה למניעה BufferOverflowException.

ה onBackpressureBuffer () השיטה לוקחת שלושה טיעונים: יכולת של נצפה חיץ, שיטה שמופעלת כאשר חיץ מתמלא, ואסטרטגיה לטיפול באלמנטים שיש לזרוק ממאגר. אסטרטגיות להצפה הן ב לחץ אחורי זרימה מעמד.

ישנם 4 סוגים של פעולות שניתן לבצע כאשר המאגר מתמלא:

  • ON_OVERFLOW_ERROR - זוהי התנהגות ברירת המחדל המסמנת א BufferOverflowException כשהמאגר מלא
  • ON_OVERFLOW_DEFAULT - נכון לעכשיו זה אותו דבר כמו ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST - אם יתרחש הצפה, הערך הנוכחי פשוט יתעלם ורק הערכים הישנים יועברו לאחר ההורדה מַשׁקִיף בקשות
  • ON_OVERFLOW_DROP_OLDEST - מפיל את האלמנט העתיק ביותר במאגר ומוסיף לו את הערך הנוכחי

בואו נראה כיצד לציין את האסטרטגיה הזו:

Observable.range (1, 1_000_000) .onBackpressureBuffer (16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn (Schedulers.computation ()). מנוי (e -> {}, ניתן לזרוק :: printStackTrace); 

כאן האסטרטגיה שלנו לטיפול במאגר הצף היא הטלת האלמנט העתיק ביותר במאגר והוספת פריט חדש ביותר המיוצר על ידי נצפה.

שים לב ששתי האסטרטגיות האחרונות גורמות להפסקת המשך בזרם כשהם מפילים אלמנטים. בנוסף, הם לא יסמנו BufferOverflowException.

7. השמטת כל האלמנטים המיוצרים יתר על המידה

בכל פעם במורד הזרם מַשׁקִיף אינו מוכן לקבל אלמנט, אנו יכולים להשתמש ב- onBackpressureDrop () שיטה להוריד את האלמנט הזה מהרצף.

אנו יכולים לחשוב על שיטה זו כעל onBackpressureBuffer () שיטה עם קיבולת של חיץ מוגדר לאפס עם אסטרטגיה ON_OVERFLOW_DROP_LATEST.

מפעיל זה שימושי כאשר אנו יכולים להתעלם בבטחה מערכים ממקור נצפה (כגון תנועות עכבר או אותות מיקום GPS נוכחיים) מאחר ובהמשך יהיו ערכים עדכניים יותר:

Observable.range (1, 1_000_000) .onBackpressureDrop () .observeOn (Schedulers.computation ()) .doOnNext (ComputeFunction :: compute). Subscribe (v -> {}, Throwable :: printStackTrace);

השיטה onBackpressureDrop () מבטלת בעיה של ייצור יתר נצפה אבל צריך להשתמש בזהירות.

8. מסקנה

במאמר זה בחנו בעיה של ייצור יתר נצפה ודרכי התמודדות עם לחץ אחורי. בדקנו אסטרטגיות של חציצה, אצווה ודילוג על אלמנטים כאשר מַשׁקִיף אינו מסוגל לצרוך אלמנטים באותה מהירות שהם מיוצרים על ידי נצפה.

ניתן למצוא את היישום של כל הדוגמאות וקטעי הקוד בפרויקט GitHub - זהו פרויקט Maven, כך שיהיה קל לייבא ולהפעיל אותו כפי שהוא.


$config[zx-auto] not found$config[zx-overlay] not found