מתזמנים ב- RxJava

1. סקירה כללית

במאמר זה אנו נתמקד בסוגים שונים של מתזמנים שאנו נשתמש בכתיבת תוכניות ריבוי הליכי משנה המבוססות על RxJava מנוי לצפייה ו להתבונן שיטות.

מתזמנים לתת את האפשרות לציין היכן וסביר להניח מתי לבצע משימות הקשורות להפעלת נצפה שַׁרשֶׁרֶת.

אנחנו יכולים להשיג א מתזמן משיטות המפעל המתוארות בכיתה מתזמנים.

2. התנהגות הברגה ברירת מחדל

כברירת מחדל,Rx הוא בעל הברגה אחת מה שמרמז ש- נצפה ושרשרת המפעילים שנוכל לפנות אליה תודיע למשקיפים על אותו שרשור עליו הירשם () שיטה נקראת.

ה להתבונן ו subscribeOn השיטות לוקחות כטענה א מתזמן, שזה, כפי שהשם מרמז, כלי שנוכל להשתמש בו לתזמון פעולות בודדות.

אנו ניצור את היישום שלנו של מתזמן באמצעות לִיצוֹרעוֹבֵד שיטה, המחזירה א מתזמן. עובד. א עוֹבֵד מקבל פעולות ומבצע אותן ברצף על חוט יחיד.

במובן מסוים, א עוֹבֵד הוא סהצ'דלר עצמו, אך לא נתייחס אליו כאל מתזמן כדי למנוע בלבול.

2.1. קביעת פעולה

אנחנו יכולים לתזמן עבודה בכל אחת מהן מתזמן על ידי יצירת חדש עוֹבֵד ותזמון פעולות:

מתזמן מתזמן = Schedulers.immediate (); עובד Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> תוצאה + = "פעולה"); Assert.assertTrue (result.equals ("פעולה"));

הפעולה מתייצבת בתור בשרשור שהעובד מוקצה אליו.

2.2. ביטול פעולה

מתזמן. עובד מרחיב מִנוּי. קורא ל בטל את הרישום שיטה על א עוֹבֵד יביא לריקון התור וביטול כל המשימות הממתינות. אנו יכולים לראות זאת על ידי דוגמה:

מתזמן מתזמן = מתזמן.newThread (); עובד Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> {result + = "First_Action"; worker.unsubscribe ();}); worker.schedule (() -> תוצאה + = "פעולה שניה"); Assert.assertTrue (result.equals ("First_Action"));

המשימה השנייה לא מתבצעת לעולם משום שהמשימה שלפניה ביטלה את כל הפעולה. פעולות שהיו בתהליך ביצוע יופרעו.

3. מתזמנים. חדש

מתזמן זה פשוט מתחיל שרשור חדש בכל פעם שהוא מתבקש באמצעות subscribeOn () אוֹ observerOn ().

זו כמעט לא בחירה טובה, לא רק בגלל האיחור הכרוך בהתחלת שרשור, אלא גם משום שלא נעשה שימוש חוזר בשרשור זה:

Observable.just ("שלום") .observeOn (Schedulers.newThread ()) .doOnNext (s -> result2 + = Thread.currentThread (). GetName ()) .observeOn (Schedulers.newThread ()). מנוי (s - > result1 + = Thread.currentThread (). getName ()); Thread.sleep (500); Assert.assertTrue (result1.equals ("RxNewThreadScheduler-1")); Assert.assertTrue (result2.equals ("RxNewThreadScheduler-2");

כאשר עוֹבֵד נעשה, החוט פשוט מסתיים. זֶה מתזמן ניתן להשתמש רק כאשר המשימות הן גסות: זה לוקח הרבה זמן להשלים, אבל יש מעט מאוד מהן כך שסביר להניח שהחוטים לא יעשו שימוש חוזר בכלל.

מתזמן מתזמן = מתזמן.newThread (); עובד Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Thread.sleep (3000); Assert.assertTrue (result.equals ("RxNewThreadScheduler-1_Start_End_worker_"));

כאשר קבענו את עוֹבֵד על NewThreadScheduler, ראינו שהעובד קשור לחוט מסוים.

4. מתזמנים.מיד

מתזמנים.מיד הוא מתזמן מיוחד שמפעיל משימה בתוך שרשור הלקוח בצורה חוסמת, ולא באופן אסינכרוני וחוזר לאחר השלמת הפעולה:

מתזמן מתזמן = Schedulers.immediate (); עובד Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Thread.sleep (500); Assert.assertTrue (result.equals ("main_Start_worker__End"));

למעשה, מנוי ל- נצפה באמצעות מתזמן מיידי בדרך כלל יש את אותו ההשפעה כמו לא להירשם כמנוי עם מישהו מסוים סצ'דלר בכלל:

Observable.just ("שלום"). SubscribeOn (Schedulers.immediate ()). Subscribe (s -> result + = Thread.currentThread (). GetName ()); Thread.sleep (500); Assert.assertTrue (result.equals ("ראשי"));

5. מתזמנים.טרמפולינה

ה טרַמפּוֹלִינָהמתזמן דומה מאוד ל מִיָדִי מכיוון שהוא גם מתזמן משימות באותו שרשור, וחוסם ביעילות.

עם זאת, המשימה הקרובה מבוצעת כאשר כל המשימות שתוכננו בעבר הושלמו:

Observable.just (2, 4, 6, 8). SubscribeOn (Schedulers.trampoline ()). Subscribe (i -> result + = "" + i); Observable.just (1, 3, 5, 7, 9). SubscribeOn (Schedulers.trampoline ()). Subscribe (i -> result + = "" + i); Thread.sleep (500); Assert.assertTrue (result.equals ("246813579"));

מִיָדִי מיד קורא למשימה נתונה ואילו טרַמפּוֹלִינָה ממתין לסיום המשימה הנוכחית.

ה טרַמפּוֹלִינָהשל עוֹבֵד מבצע כל משימה בשרשור שתזמן את המשימה הראשונה. השיחה הראשונה ל לוח זמנים חוסם עד לריקון התור:

מתזמן מתזמן = מתזמן.טרמפולינה (); עובד Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "התחל"; worker.schedule (() -> {result + = "_middleStart"; worker.schedule (() -> תוצאה + = "_עובד_"); תוצאה + = "_middleEnd";}); תוצאה + = "_mainEnd";}); Thread.sleep (500); Assert.assertTrue (תוצאת .equals ("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. מתזמנים

מתזמנים הם יותר מורכבים מבפנים מוציאים לפועל מ java.util.concurrent - אז היה צורך בהפשטה נפרדת.

אבל מכיוון שהם מבחינה רעיונית די דומים, באופן לא מפתיע יש עטיפה שיכולה להסתובב מוציא להורג לְתוֹך מתזמן משתמש ב מ שיטת מפעל:

פרטי ThreadFactory threadFactory (תבנית מחרוזת) {להחזיר ThreadFactoryBuilder חדש () .setNameFormat (תבנית) .build (); } @Test הציבור בטל שניתןExecutors_whenSchedulerFrom_thenReturnElements () זורק InterruptedException {ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched-A-% d")); מתזמן מתזמן A = Schedulers.from (poolA); ExecutorService poolB = newFixedThreadPool (10, threadFactory ("Sched-B-% d")); מתזמן מתזמן B = Schedulers.from (poolB); נצפה נצפה = Observable.create (מנוי -> {subscriber.onNext ("אלפא"); subscriber.onNext ("ביתא"); subscriber.onCompleted ();}) ;; נצפה .subscribeOn (schedulerA) .subscribeOn (schedulerB). subscribe (x -> result + = Thread.currentThread (). getName () + x + "_", Throwable :: printStackTrace, () -> result + = "_Completed "); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

מתזמן B. משמש לפרק זמן קצר, אך בקושי הוא מתזמן פעולה חדשה מתזמן A., שעושה את כל העבודה. לפיכך, מרובה subscribeOn שיטות לא רק מתעלמים, אלא גם מכניסים תקורה קטנה.

7. Schedulers.io

זֶה מתזמן דומה ל- חדש Thread למעט העובדה שרשורים שהתחילו כבר ממוחזרים ויכולים לטפל בבקשות עתידיות.

יישום זה פועל באופן דומה ל ThreadPoolExecutor מ java.util.concurrent עם מאגר חוטים בלתי מוגבל. בכל פעם חדש עוֹבֵד מתבקש, או שמתחילים שרשור חדש (ומאוחר יותר שומם במצב לא פעיל במשך זמן מה) או שמשתמשים בו בסרק:

Observable.just ("io") .subscribeOn (Schedulers.io ()) .subscribe (i -> result + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxIoScheduler-2"));

עלינו להיזהר במשאבים בלתי מוגבלים מכל סוג שהוא - במקרה של תלות חיצונית איטית או לא מגיבה כמו שירותי אינטרנט, ioמתזמן עשוי להתחיל מספר עצום של שרשורים, מה שיוביל ליישום שלנו להיות מגיב.

בפועל, בעקבות Schedulers.io היא כמעט תמיד בחירה טובה יותר.

8. מתזמנים.חישוב

חישוב סצ'דלר כברירת מחדל מגביל את מספר הנושאים הפועלים במקביל לערך מעבדים זמינים (), כפי שנמצא ב Runtime.getRuntime () מחלקת שירות.

אז עלינו להשתמש ב- מתזמן חישוב כאשר המשימות קשורות לחלוטין למעבד; כלומר, הם דורשים כוח חישובי ואין להם קוד חוסם.

הוא משתמש בתור בלתי מוגבל מול כל שרשור, כך שאם המשימה מתוזמנת, אך כל הליבות תפוסות, היא תעמוד בתור. עם זאת, התור רגע לפני כל שרשור ימשיך לגדול:

Observable.just ("חישוב"). SubscribeOn (Schedulers.computation ()). Subscribe (i -> result + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxComputationScheduler-1"));

אם מסיבה כלשהי, אנו זקוקים למספר שרשורים שונה מברירת המחדל, תמיד נוכל להשתמש ב- rx.scheduler.max-thread-threads נכס מערכת.

על ידי לקיחת פחות שרשורים אנו יכולים להבטיח שתמיד יש ליבות מעבד אחת או יותר במצב סרק, ואפילו בעומס כבד, חישוב מאגר החוטים אינו מרווה את השרת. פשוט לא ניתן לקבל יותר חוטי חישוב מאשר ליבות.

9. Schedulers.test

זֶה מתזמן משמש רק למטרות בדיקה, ולעולם לא נראה אותו בקוד ייצור. היתרון העיקרי שלה הוא היכולת לקדם את השעון, לדמות את הזמן שעובר באופן שרירותי:

אותיות רשימה = Arrays.asList ("A", "B", "C"); מתזמן TestScheduler = Schedulers.test (); מנוי TestSubscriber = חדש TestSubscriber (); סמן נצפה = אינטרוול נצפה (1, TimeUnit.SECONDS, מתזמן); Observable.from (אותיות) .zipWith (סמן, (מחרוזת, אינדקס) -> אינדקס + "-" + מחרוזת). Subscribe subscribe (מתזמן). Subscribe (מנוי); subscriber.assertNoValues ​​(); subscriber.assertNotCompleted (); scheduler.advanceTimeBy (1, TimeUnit.SECONDS); subscriber.assertNoErrors (); subscriber.assertValueCount (1); subscriber.assertValues ​​("0-A"); scheduler.advanceTimeTo (3, TimeUnit.SECONDS); subscriber.assertCompleted (); subscriber.assertNoErrors (); subscriber.assertValueCount (3); assertThat (subscriber.getOnNextEvents (), hasItems ("0-A", "1-B", "2-C");

10. מתזמנים ברירת מחדל

כמה נצפה למפעילים ב- RxJava יש טפסים חלופיים המאפשרים לנו להגדיר אילו מתזמן המפעיל ישתמש בהפעלתו. אחרים אינם מבצעים שום פעולה מסוימת מתזמן או לפעול לפי ברירת מחדל מסוימת מתזמן.

לדוגמא, ה לְעַכֵּב המפעיל לוקח אירועים במעלה הזרם ודוחף אותם במורד הזרם לאחר זמן נתון. ברור שהוא לא יכול להחזיק את החוט המקורי באותה תקופה, ולכן עליו להשתמש באחר מתזמן:

ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched1-")); מתזמן מתזמן A = Schedulers.from (poolA); Observable.just ('A', 'B'). Delay (1, TimeUnit.SECONDS, schedulerA). Subscribe (i -> result + = Thread.currentThread (). GetName () + i + ""); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched1-A Sched1-B"));

בלי לספק מנהג מתזמן A., כל המפעילים למטה לְעַכֵּב ישתמש ב- מתזמן חישוב.

מפעילים חשובים אחרים התומכים בהתאמה אישית מתזמנים הם בַּלָם, הַפסָקָה, טווח, שָׁעוֹן עֶצֶר, לדלג, לקחת, פסק זמן, וכמה אחרים. אם לא נספק א מתזמן למפעילים כאלה, חישוב מתזמן משמש, המהווה ברירת מחדל בטוח ברוב המקרים.

11. מסקנה

ביישומים תגובתי באמת, שכל הפעולות ארוכות הטווח הם אסינכרוניים, מעט מאוד שרשורים וכך מתזמנים דרושים.

מתזמני מאסטרים חיוניים לכתיבת קוד מדרגי ובטוח באמצעות RxJava. ההבדל בין subscribeOn ו להתבונן חשוב במיוחד בעומס גבוה כאשר כל משימה חייבת להתבצע בדיוק כשאנחנו מצפים.

אחרון חביב, עלינו להיות בטוחים בכך מתזמנים במורד הזרם המשמש יכול לעמוד בקצב המודעה שנוצרה על ידי מתזמנים upstrea m. לקבלת מידע נוסף, יש מאמר זה על לחץ אחורי.

ניתן למצוא את היישום של כל הדוגמאות וקטעי הקוד בפרויקט GitHub - זהו פרויקט Maven, כך שיהיה קל לייבא ולהפעיל אותו כפי שהוא.


$config[zx-auto] not found$config[zx-overlay] not found