מבוא לליבת הכור

1. הקדמה

Reactor Core היא ספריית Java 8 המיישמת את מודל התכנות תגובתי. הוא בנוי על גבי מפרט זרמי התגובה, תקן לבניית יישומים ריאקטיביים.

מרקע של פיתוח ג'אווה שאינו תגובתי, מעבר תגובתי יכול להיות עקומת למידה תלולה למדי. זה הופך להיות מאתגר יותר כאשר משווים אותו ל- Java 8 זרם ממשק API, שכן ניתן לטעות בכך שהם אותם הפשטות ברמה גבוהה.

במאמר זה ננסה להסיר את הפרדיגמה הזו. אנו ננקוט בצעדים קטנים דרך הכור עד שנבנה תמונה כיצד להרכיב קוד תגובתי, ולהניח את הבסיס למאמרים מתקדמים יותר שיגיעו בסדרה מאוחרת יותר.

2. מפרט זרמי תגובתי

לפני שנסתכל על הכור, עלינו להסתכל במפרט הזרמים התגובים. זה מה שמיישם הכור, והוא מניח את היסודות לספרייה.

בעיקרו של דבר, זרמים ריאקטיביים הם מפרט לעיבוד זרם אסינכרוני.

במילים אחרות, מערכת בה הרבה אירועים מופקים ונצרכים בצורה לא סינכרונית. חשוב על זרם של אלפי עדכוני מניות בשנייה שנכנס ליישום פיננסי, ועל כך שהוא יצטרך לענות על עדכונים אלה בזמן.

אחת המטרות העיקריות לכך היא לטפל בבעיית הלחץ האחורי. אם יש לנו מפיק שמעביר אירועים לצרכן מהר יותר מכפי שהוא יכול לעבד אותם, בסופו של דבר הצרכן יהיה מוצף באירועים, ונגמר להם משאבי המערכת.

לחץ-לחץ אומר שהצרכן שלנו אמור להיות מסוגל להגיד ליצרן כמה נתונים לשלוח על מנת למנוע זאת, וזה מה שמפורט במפרט.

3. תלות Maven

לפני שנתחיל, בואו נוסיף את התלות שלנו ב- Maven:

 io.projectreactor reactor-core 3.3.9.RELEASE ch.qos.logback logback-classic 1.1.3 

אנו מוסיפים גם Logback כתלות. הסיבה לכך היא שנרשום את הפלט של הכור על מנת להבין טוב יותר את זרימת הנתונים.

4. הפקת זרם נתונים

על מנת שאפליקציה תהיה תגובתית, הדבר הראשון שהיא חייבת להיות מסוגלת לעשות הוא לייצר זרם נתונים.

זה יכול להיות משהו כמו הדוגמה לעדכון המניות שנתנו קודם. ללא נתונים אלה, לא יהיה לנו על מה להגיב, ולכן זהו צעד ראשון הגיוני.

Core Reactive נותן לנו שני סוגי נתונים המאפשרים לנו לעשות זאת.

4.1. שֶׁטֶף

הדרך הראשונה לעשות זאת היא באמצעות שֶׁטֶף. זה זרם שיכול לפלוט 0..n אלמנטים. בואו ננסה ליצור אחת פשוטה:

שטף פשוט = Flux.just (1, 2, 3, 4);

במקרה זה, יש לנו זרם סטטי של ארבעה אלמנטים.

4.2. מונו

הדרך השנייה לעשות זאת היא באמצעות מונו, שהוא זרם של 0..1 אלמנטים. בואו ננסה ליצור אחד:

מונו סתם = מונו.סתם (1);

זה נראה ומתנהג כמעט בדיוק כמו שֶׁטֶף, רק שהפעם אנחנו מוגבלים לא יותר מאלמנט אחד.

4.3. מדוע לא רק שטף?

לפני שניסה להמשיך, כדאי להדגיש מדוע יש לנו שני סוגי נתונים אלה.

ראשית, יש לציין כי שניהם א שֶׁטֶף ו מונו הם יישומים של זרמי התגובה מוֹצִיא לָאוֹר מִמְשָׁק. שני השיעורים תואמים למפרט, ואנחנו יכולים להשתמש בממשק זה במקומם:

מו"ל רק = Mono.just ("foo");

אבל באמת, ידיעה של קרדינליות זו שימושית. הסיבה לכך היא שכמה פעולות הגיוניות רק עבור אחד משני הסוגים, ומכיוון שהיא יכולה להיות יותר אקספרסיבית (דמיין findOne () במאגר).

5. מנוי לזרם

כעת יש לנו סקירה ברמה גבוהה כיצד לייצר זרם נתונים, עלינו להירשם אליו על מנת שהוא יפלוט את האלמנטים.

5.1. איסוף אלמנטים

בואו נשתמש ב- הירשם () שיטה לאיסוף כל האלמנטים בזרם:

רכיבי רשימה = ArrayList חדש (); Flux.just (1, 2, 3, 4) .log (). Subscribe (אלמנטים :: להוסיף); assertThat (אלמנטים) .contains בדיוק (1, 2, 3, 4);

הנתונים לא יתחילו לזרום עד שנרשם כמנוי. שימו לב שהוספנו גם כמה רישומים, זה יעיל כשאנחנו מסתכלים על מה שקורה מאחורי הקלעים.

5.2. זרימת האלמנטים

עם כניסה במקום, אנו יכולים להשתמש בו כדי לדמיין כיצד הנתונים זורמים דרך הזרם שלנו:

20: 25: 19.550 [ראשי] כור INFO.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | בקשה (ללא גבולות) 20: 25: 19.553 [ראשי] INFO reactor.Flux.Array.1 - | onNext (1) 20: 25: 19.553 [ראשי] INFO reactor.Flux.Array.1 - | onNext (2) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onNext (3) 20: 25: 19.553 [ראשי] INFO reactor.Flux.Array.1 - | onNext (4) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onComplete ()

קודם כל, הכל פועל על החוט הראשי. בואו לא נפרט על כך, מכיוון שנבדוק את מקביליות בהמשך מאמר זה. זה אכן הופך את הדברים לפשוטים, שכן אנו יכולים להתמודד עם הכל לפי הסדר.

עכשיו בואו נעבור את הרצף שרשמנו אחד אחד:

  1. onSubscribe () - זה נקרא כאשר אנו מנויים לזרם שלנו
  2. בקשה (לא מוגבלת) - כשאנחנו מתקשרים הירשם כמנוימאחורי הקלעים אנו יוצרים א מִנוּי. מנוי זה מבקש אלמנטים מהזרם. במקרה זה, ברירת המחדל היא בִּלתִי מוּגבָּל, כלומר הוא מבקש כל אלמנט זמין
  3. בהבא() - זה נקרא בכל אלמנט אחד
  4. onComplete () - זה נקרא אחרון, לאחר קבלת האלמנט האחרון. יש למעשה א onError () כמו כן, אשר ייקרא אם יש חריג, אך במקרה זה, אין

זהו הזרימה המוצבת ב מָנוּי ממשק כחלק ממפרט הזרמים התגובים, ובמציאות, זה מה שהונח מאחורי הקלעים בקריאה שלנו onSubscribe (). זו שיטה שימושית, אבל כדי להבין טוב יותר מה קורה בוא נספק מָנוּי ממשק ישירות:

Flux.just (1, 2, 3, 4) .log (). מנוי (מנוי חדש () {@ ביטול ציבורי בטל ב- רישום (מנוי) {s.request (Long.MAX_VALUE);} @ ביטול בטל ציבורי על הבא ( מספר שלם שלם) {elements.add (מספר שלם);} @ עקירה ציבורית בטל onError (ניתן לזריקה) {} @ ביטול ציבורי בטל onComplete () {}});

אנו יכולים לראות כי כל שלב אפשרי במפות זורמים לעיל לשיטה ב- מָנוּי יישום. זה פשוט קורה שה- שֶׁטֶף סיפקה לנו שיטת עוזר להפחתת מילוליות זו.

5.3. השוואה ל- Java 8 זרמים

עדיין נראה שיש לנו משהו נרדף ל- Java 8 זרם עושה איסוף:

רשימה שנאספה = Stream.of (1, 2, 3, 4) .collect (toList ());

רק אנחנו לא.

ההבדל העיקרי הוא ש- Reactive הוא מודל דחיפה, ואילו ה- Java 8 זרמים הם מודל משיכה. בגישה תגובתית, אירועים הם נדחף למנויים כשהם נכנסים.

הדבר הבא שיש לשים לב אליו הוא זרמים מפעיל המסוף הוא בדיוק זה, מסוף, מושך את כל הנתונים ומחזיר תוצאה. עם תגובתי יכול להיות שזרם אינסופי יגיע ממשאב חיצוני, עם מספר מנויים המצורפים והוסרו על בסיס אד-הוק. אנחנו יכולים לעשות דברים כמו לשלב זרמים, זרמי מצערת ולהפעיל לחץ אחורי, אותו נעסוק בהמשך.

6. לחץ אחורי

הדבר הבא שעלינו לקחת בחשבון הוא לחץ אחורי. בדוגמה שלנו, המנוי אומר למפיק לדחוף כל אלמנט בבת אחת. זה יכול בסופו של דבר להיות מוחץ עבור המנוי, ויצרוך את כל משאביו.

לחץ אחורי הוא כאשר זרם במורד הזרם יכול להורות למעלה הזרם לשלוח לו פחות נתונים על מנת למנוע את הצפתו.

אנחנו יכולים לשנות את שלנו מָנוּי יישום להפעלת לחץ אחורי. בואו נגיד למעלה הזרם לשלוח רק שני אלמנטים בכל פעם באמצעות בַּקָשָׁה():

Flux.just (1, 2, 3, 4) .log (). מנוי (מנוי חדש () {מנוי פרטי s; int onNextAmount; @ ביטול ציבורי ריק ב- Subscribe (מנויים) {this.s = s; (2);} @ ביטול חלל ציבורי ב- Next (מספר שלם שלם) {elements.add (מספר שלם); onNextAmount ++; אם (onNextAmount% 2 == 0) {s.request (2);}} @ ביטול ריק בטלוויזיה onError (Throwable t) {} @ ביטול ריק של הציבור ב- Complete () {}});

כעת אם נפעיל את הקוד שלנו שוב, נראה את בקשה (2) נקרא, ואחריו שניים בהבא() מתקשר, אם כן בקשה (2) שוב.

23: 31: 15.395 [ראשי] כור INFO.Flux.Array.1 - | onSubscribe ([Fusionable Synchronous] FluxArray.ArraySubscription) 23: 31: 15.397 [main] INFO reactor.Flux.Array.1 - | בקשה (2) 23: 31: 15.397 [ראשי] INFO reactor.Flux.Array.1 - | onNext (1) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | onNext (2) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | בקשה (2) 23: 31: 15.398 [ראשי] INFO reactor.Flux.Array.1 - | onNext (3) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | onNext (4) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | בקשה (2) 23: 31: 15.398 [ראשי] INFO reactor.Flux.Array.1 - | onComplete ()

בעיקרו של דבר, זהו לחץ אחורי משיכה תגובתי. אנו מבקשים ממעלה הזרם לדחוף רק כמות מסוימת של אלמנטים, ורק כשאנחנו מוכנים.

אם נדמיין שמזרימים לנו ציוצים מ- Twitter, זה יהיה עליון להחליט מה לעשות. אם ציוצים נכנסו אך אין בקשות מהזרם הבא, אז הזרם עלול להפיל פריטים, לאחסן אותם במאגר או לאסטרטגיה אחרת.

7. פועל בזרם

אנו יכולים גם לבצע פעולות על הנתונים בזרם שלנו, ולהגיב לאירועים כראות עינינו.

7.1. מיפוי נתונים בזרם

פעולה פשוטה שנוכל לבצע היא יישום טרנספורמציה. במקרה זה, בואו פשוט נכפיל את כל המספרים בזרם שלנו:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2). מנוי (אלמנטים :: הוסף);

מַפָּה() יוחל מתי בהבא() נקרא.

7.2. שילוב של שני זרמים

לאחר מכן נוכל להפוך את הדברים למעניינים יותר על ידי שילוב של זרם אחר עם זה. בואו ננסה זאת באמצעות רוכסן() פוּנקצִיָה:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .zipWith (Flux.range (0, Integer.MAX_VALUE), (one, two) -> String.format ("שטף ראשון:% d, שטף שני:% d", אחד, שניים)). מנוי (אלמנטים :: הוסף); assertThat (אלמנטים). מכיל בדיוק ("שטף ראשון: 2, שני שטף: 0", "שטף ראשון: 4, שני שטף: 1", "שטף ראשון: 6, שטף שני: 2", "שטף ראשון: 8, שני שטף: 3 ");

הנה, אנו יוצרים עוד אחד שֶׁטֶף שממשיך להתגבר באחד ולהזרים אותו יחד עם המקורי שלנו. אנו יכולים לראות כיצד אלה פועלים יחד על ידי בדיקת היומנים:

20: 04: 38.064 [ראשי] כור INFO.Flux.Array.1 - | onSubscribe ([Fuseable Synchronous] FluxArray.ArraySubscription) 20: 04: 38.065 [main] INFO reactor.Flux.Array.1 - | onNext (1) 20: 04: 38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe ([Synchronous Fuseable] FluxRange.RangeSubscription) 20: 04: 38.066 [main] INFO reactor.Flux.Range.2 - | onNext (0) 20: 04: 38.067 [main] INFO reactor.Flux.Array.1 - | onNext (2) 20: 04: 38.067 [main] INFO reactor.Flux.Range.2 - | onNext (1) 20: 04: 38.067 [main] INFO reactor.Flux.Array.1 - | onNext (3) 20: 04: 38.067 [main] INFO reactor.Flux.Range.2 - | onNext (2) 20: 04: 38.067 [ראשי] INFO reactor.Flux.Array.1 - | onNext (4) 20: 04: 38.067 [main] INFO reactor.Flux.Range.2 - | onNext (3) 20: 04: 38.067 [main] INFO reactor.Flux.Array.1 - | onComplete () 20: 04: 38.067 [ראשי] INFO reactor.Flux.Array.1 - | בטל () 20: 04: 38.067 [main] INFO reactor.Flux.Range.2 - | לְבַטֵל()

שים לב כיצד יש לנו כעת מנוי אחד לכל שֶׁטֶף. ה בהבא() שיחות מתחלפות גם כן, כך שהאינדקס של כל אלמנט בזרם יתאים כאשר אנו מיישמים את רוכסן() פוּנקצִיָה.

8. זרמים חמים

נכון לעכשיו, התמקדנו בעיקר בזרמים קרים. אלה זרמים סטטיים באורך קבוע שקל להתמודד איתם. מקרה שימוש מציאותי יותר לתגובות יכול להיות משהו שקורה לאין ערוך.

לדוגמה, יכול להיות שיש לנו זרם של תנועות עכבר שכל הזמן צריך להגיב אליו או להזין טוויטר. סוגים אלה של זרמים נקראים זרמים חמים, מכיוון שהם פועלים תמיד וניתן להירשם אליהם בכל נקודת זמן, וחסרים את תחילת הנתונים.

8.1. ליצור ConnectableFlux

אחת הדרכים ליצור זרם חם היא על ידי המרת זרם קר לאחד. בואו ניצור שֶׁטֶף שנמשך לנצח, תוך הצגת התוצאות למסוף, אשר ידמה זרם אינסופי של נתונים המגיע ממשאב חיצוני:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .publish ();

על ידי התקשרות לְפַרְסֵם() ניתן לנו ConnectableFlux. המשמעות היא שיחה הירשם () לא יגרום לכך שהוא יתחיל להנפיק, ויאפשר לנו להוסיף מספר מנויים:

publish.subscribe (System.out :: println); publish.subscribe (System.out :: println);

אם ננסה להריץ את הקוד הזה, שום דבר לא יקרה. זה לא עד שאנחנו מתקשרים לְחַבֵּר(), ש שֶׁטֶף יתחיל לפלוט:

publish.connect ();

8.2. חנק

אם נפעיל את הקוד שלנו, הקונסולה שלנו תהיה מוצפת בכניסה. זה מדמה מצב שבו יותר מדי נתונים מועברים לצרכנים שלנו. בואו ננסה לעקוף את זה בחנק:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}). Sample (ofSeconds (2)) .publish ();

הנה, הצגנו א לִטעוֹם() שיטה עם מרווח של שתי שניות. כעת הערכים יידחקו למנוי שלנו רק בשתי שניות, כלומר הקונסולה תהיה הרבה פחות קדחתנית.

כמובן שישנן מספר אסטרטגיות להפחתת כמות הנתונים הנשלחים במורד הזרם, כגון חלונות וחציצה, אך הם יישארו מחוץ לתחום למאמר זה.

9. מקביליות

כל הדוגמאות שלנו לעיל פועלות כרגע על החוט הראשי. עם זאת, אנו יכולים לשלוט באיזה שרשור הקוד שלנו פועל אם אנו רוצים. ה מתזמן הממשק מספק הפשטה סביב קוד אסינכרוני, אשר עבורו מיועדים לנו יישומים רבים. בואו ננסה להירשם לשרשור אחר ל- main:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribeOn (Schedulers.parallel ()). Subscribe (אלמנטים :: הוסף);

ה מַקְבִּיל מתזמן יגרום להפעלת המנוי שלנו על שרשור אחר, אותו נוכל להוכיח על ידי התבוננות ביומנים. אנו רואים שהערך הראשון מגיע מה- רָאשִׁי החוט והשטף פועל בחוט אחר שנקרא מקביל -1.

20:03:27.505 [רָאשִׁי] DEBUG reactor.util.Loggers $ LoggerFactory - באמצעות מסגרת רישום של Slf4j 20: 03: 27.529 [מקביל -1] INFO reactor.Flux.Array.1 - | onSubscribe ([Fusionable Synchronous] FluxArray.ArraySubscription) 20: 03: 27.531 [מקביל -1] כור INFO.Flux.Array.1 - | בקשה (ללא גבולות) 20: 03: 27.531 [מקביל -1] INFO reactor.Flux.Array.1 - | onNext (1) 20: 03: 27.531 [מקביל -1] כור INFO.Flux.Array.1 - | onNext (2) 20: 03: 27.531 [מקביל -1] INFO reactor.Flux.Array.1 - | onNext (3) 20: 03: 27.531 [מקביל -1] כור INFO.Flux.Array.1 - | onNext (4) 20: 03: 27.531 [מקביל -1] כור INFO.Flux.Array.1 - | onComplete ()

המקביליות מעניינת יותר מכך, וכדאי שנבדוק זאת במאמר אחר.

10. מסקנה

במאמר זה, אנו נותנים סקירה ברמה גבוהה מקצה לקצה של Core Reactive. הסברנו כיצד נוכל לפרסם ולהירשם כמנוי לזרמים, להפעיל לחץ אחורי, לפעול בזרמים וגם לטפל בנתונים באופן אסינכרוני. זה מקווה להניח בסיס עבורנו לכתוב יישומים תגובתי.

מאמרים מאוחרים יותר בסדרה זו יסקרו מקבילות מתקדמות יותר ומושגים תגובתיים אחרים. יש גם מאמר נוסף שמסקר את הכור עם האביב.

קוד המקור ליישום שלנו זמין באתר GitHub; זהו פרויקט Maven שאמור להיות מסוגל לרוץ כמו שהוא.


$config[zx-auto] not found$config[zx-overlay] not found