ההבדל בין RxJava API ו- Java 9 Flow API

1. הקדמה

API Flow Java הוצג בג'אווה 9 כהטמעה של מפרט זרם תגובתי.

במדריך זה נחקור תחילה זרמים ריאקטיביים. לאחר מכן, נלמד על הקשר שלה ל- RxJava ו- Flow API.

2. מהם זרמים ריאקטיביים?

המניפסט התגובתי הציג זרמים ריאקטיביים כדי לציין תקן לעיבוד זרם אסינכרוני עם לחץ אחורי לא חוסם.

היקף המפרט זרם תגובתי הוא להגדיר מערך ממשקים מינימלי כדי להשיג מטרות אלה:

  • org.reactivestreams.Publisher הינה ספקית נתונים המפרסמת נתונים למנויים על פי דרישתם

  • org.reactivestreams. מנוי הוא הצרכן של נתונים - הוא יכול לקבל נתונים לאחר הרשמה למו"ל

  • org.reactivestreams. מנוי נוצר כאשר מפרסם מקבל מנוי

  • org.reactivestreams.Processor הוא גם מנוי וגם מפרסם - הוא מנוי למפרסם, מעבד את הנתונים ואז מעביר את הנתונים המעובדים למנוי

מקור ה- API של Flow הוא מהמפרט. RxJava קודמת לו, אך מאז 2.0, RxJava תומכת גם במפרט.

ניכנס עמוק לשניהם, אך ראשית, בואו נראה מקרה שימוש מעשי.

3. שימוש במקרה

לצורך הדרכה זו, נשתמש בשירות וידיאו סטרימינג בשידור חי כמקרה השימוש שלנו.

סרטון סטרימינג בשידור חי, בניגוד להזרמת וידאו לפי דרישה, אינו תלוי בצרכן. לכן השרת מפרסם את הזרם בקצב שלו, ובאחריות הצרכן להסתגל אליו.

בצורה הפשוטה ביותר, המודל שלנו מורכב ממפרסם של זרם וידאו ומנגן וידיאו כמנוי.

בואו ניישם VideoFrame כפריט הנתונים שלנו:

VideoFrame בכיתה ציבורית {מספר פרטי ארוך; // שדות נתונים נוספים // קונסטרוקטור, גטררים, סטרים}

אז בוא נעבור את היישומים Flow API וה- RxJava אחד-אחד.

4. יישום בעזרת Flow API

ממשקי ה- API של Flow ב- JDK 9 תואמים למפרט זרמי התגובה. בעזרת ה- Flow API, אם היישום מבקש תחילה פריטי N, המפרסם דוחף לכל היותר N פריטים למנוי.

ממשקי ה- API של Flow נמצאים כולם ב- java.util.concurrent.Flow מִמְשָׁק. הם שווים סמנטית לעמיתיהם בזרם התגובה בהתאמה.

בואו ניישם VideoStreamServer כמפרסם של VideoFrame.

בכיתה ציבורית VideoStreamServer מרחיב SubmissionPublisher {public VideoStreamServer () {super (Executors.newSingleThreadExecutor (), 5); }}

הרחבנו את שלנו VideoStreamServer מ הגשה מפרסם במקום ליישם ישירות זרימה :: מפרסם. הגשה מפרסם הוא יישום JDK של זרימה :: מפרסם לתקשורת אסינכרונית עם מנויים, כך שהיא מאפשרת לנו VideoStreamServer לפלוט בקצב שלו.

כמו כן, זה מועיל לטיפול בלחץ אחורי ובמאגר, כי מתי SubmissionPublisher :: הירשם כמנוי נקרא, זה יוצר מופע של מנוי Bufferedואז מוסיף את המנוי החדש לשרשרת המנויים שלו. מנוי Buffered יכול לאגר פריטים שהונפקו עד SubmissionPublisher # maxBufferCapacity.

עכשיו בואו נגדיר נגן וידאו, אשר צורכת זרם של VideoFrame. מכאן שהוא חייב ליישם זרימה :: מנוי.

מחלקה ציבורית VideoPlayer מיישמת את Flow.Subscriber {מנוי Flow.Subscription = null; @ ביטול ציבורי בטל ב- OnSubscribe (מנוי Flow.Subscription) {this.subscription = מנוי; מנוי. בקשה (1); } @ ביטול חלל ציבורי ב- Next (פריט VideoFrame) {log.info ("play # {}", item.getNumber ()); מנוי. בקשה (1); } @ ביטול ריק בטלפון onError (Throwable throwable) {log.error ("יש שגיאה בהזרמת וידאו: {}", throwable.getMessage ()); } @ ביטול ציבורי בטל onComplete () {log.error ("הסרטון הסתיים"); }}

נגן וידאו מנוי ל- VideoStreamServer, ואז אחרי מנוי מוצלח נגן וידאו::ב- מנוי השיטה נקראת, והיא מבקשת מסגרת אחת. נגן וידאו:: onNext קבל את המסגרת ובקשות למסגרת חדשה. מספר המסגרות המבוקשות תלוי במקרה השימוש ו מָנוּי יישומים.

לסיום, בואו נרכיב את הדברים:

VideoStreamServer streamServer = VideoStreamServer חדש (); streamServer.subscribe (VideoPlayer חדש ()); // הגש מסגרות וידיאו ScheduledExecutorService executor = Executors.newScheduledThreadPool (1); AtomicLong frameNumber = AtomicLong חדש (); executor.scheduleWithFixedDelay (() -> {streamServer.offer (VideoFrame new (frameNumber.getAndIncrement ()), (מנוי, videoFrame) -> {subscriber.onError (RuntimeException חדש ("מסגרת #" + videoFrame.getNumber () + " ירד בגלל לחץ אחורי ")); להחזיר נכון;});}, 0, 1, TimeUnit.MILLISECONDS); שינה (1000);

5. יישום עם RxJava

RxJava הוא יישום Java של ReactiveX. פרויקט ReactiveX (או Reactive Extensions) נועד לספק מושג תכנות תגובתי. זהו שילוב של דפוס Observer, דפוס Iterator ותכנות פונקציונלי.

הגרסה העיקרית האחרונה של RxJava היא 3.x. RxJava תומך בזרמים ריאקטיביים מאז גרסה 2.x עם זורם בכיתת בסיס, אבל זו מערך משמעותי יותר מזרמים ריאקטיביים עם מספר מחלקות בסיס כמו זורם, נצפה, יחיד, הושלם.

זורם כרכיב תאימות לזרם תגובתי הוא זרימה של 0 עד N פריטים עם טיפול בלחץ אחורי. זורם מרחיב מוֹצִיא לָאוֹר מזרמים ריאקטיביים. לכן מפעילי RxJava רבים מקבלים מוֹצִיא לָאוֹר באופן ישיר ולאפשר אינטראקציה ישירה עם יישומי ריאקטיביים אחרים.

עכשיו, בואו נעשה את מחולל זרמי הווידיאו שלנו שהוא זרם עצלן אינסופי:

הזרם videoStream = Stream.iterate (VideoFrame חדש (0), videoFrame -> {// שינה למשך 1ms; להחזיר VideoFrame חדש (videoFrame.getNumber () + 1);});

ואז אנו מגדירים א זורם מופע ליצירת מסגרות בשרשור נפרד:

Flowable .fromStream (videoStream). SubscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ()))

חשוב לציין שזרם אינסופי מספיק לנו, אך אם אנו זקוקים לדרך גמישה יותר להפיק את הזרם שלנו, אז זורם.יצירה זו בחירה טובה.

Flowable .create (new FlowableOnSubscribe () {AtomicLong frame = new AtomicLong (); @Override public void subscribe (@NonNull FlowableEmitter emitter) {while (true) {emitter.onNext (VideoFrame new (frame.incrementAndGet ())); / / לישון למשך 1 אלפיות שנייה כדי לדחות עיכוב}}}, / * הגדר כאן אסטרטגיית לחץ אחורי * /)

ואז, בשלב הבא, VideoPlayer מנוי ל- Flowable זה ומתבונן בפריטים בשרשור נפרד.

videoFlowable .observeOn (Schedulers.from (Executors.newSingleThreadExecutor ())). מנוי (פריט -> {log.info ("play #" + item.getNumber ()); // שינה למשך 30 ms כדי לדמות תצוגת מסגרת}) ;

ולבסוף, נגדיר את האסטרטגיה ללחץ אחורי. אם אנו רוצים לעצור את הסרטון במקרה של אובדן פריימים, מכאן עלינו להשתמש BackpressureOverflowStrategy :: ERROR כשהמאגר מלא.

Flowable .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ())). OnBackpressureBuffer (5, null, BackpressureOverflowStrategy.ERROR) .observeOn (Schedulers.from (Executors.newS). > {log.info ("play #" + item.getNumber ()); // שינה למשך 30 ms כדי לדמות את תצוגת המסגרות});

6. השוואה בין RxJava ו- Flow API

גם בשתי היישומים הפשוטים הללו אנו יכולים לראות כיצד ה- API של RxJava עשיר, במיוחד לניהול חוצץ, טיפול בשגיאות ואסטרטגיית לחץ אחורי. זה נותן לנו יותר אפשרויות ופחות שורות קוד עם ה- API הרהוט שלו. בואו נבחן מקרים מסובכים יותר.

נניח שהנגן שלנו אינו יכול להציג מסגרות וידאו ללא קודק. מכאן ש- Flow API, עלינו ליישם מעבד כדי לדמות את ה- codec ולשבת בין השרת לנגן. עם RxJava, אנחנו יכולים לעשות את זה עם Flowable :: flatMap אוֹ זורם :: מפה.

או בואו נדמיין שהנגן שלנו גם ישדר אודיו תרגום חי, לכן עלינו לשלב זרמי וידאו ושמע ממוציאים לאור נפרדים. בעזרת RxJava נוכל להשתמש זורם :: combineLatest, אבל עם Flow API, זו לא משימה קלה.

אמנם, אפשר לכתוב מנהג מעבד שמנוי לשני הזרמים ושולח את הנתונים המשולבים אל שלנו נגן וידאו. היישום הוא כאב ראש, עם זאת.

7. מדוע Flow API?

בשלב זה עשויה להיות לנו שאלה, מהי הפילוסופיה העומדת מאחורי ה- Flow API?

אם אנו מחפשים שימושים Flow API ב- JDK, אנו יכולים למצוא משהו ב java.net.http ו jdk.internal.net.http.

יתר על כן, אנו יכולים למצוא מתאמים בפרויקט הכור או בחבילת הזרם התגובתי. לדוגמה, org.reactivestreams.FlowAdapters יש שיטות להמרת ממשקי API של Flow למערכות זרם ריאקטיבי ולהיפך. לכן זה עוזר לפעולה הדדית בין Flow API לספריות עם תמיכה בזרם תגובתי.

כל העובדות הללו עוזרות לנו להבין את המטרה של Flow API: היא נוצרה כדי להיות קבוצה של ממשקי מפרט תגובתי ב- JDK ללא ממסר על צדדים שלישיים. יתר על כן, ג'אווה מצפה ש- API של Flow יתקבל כממשקים סטנדרטיים למפרט תגובתי וישמש ב- JDK או בספריות אחרות מבוססות Java המיישמות את המפרט המגיב לתוכנות ותווך.

8. מסקנות

במדריך זה, יש לנו מבוא למפרט Reactive Stream Specification, Flow API ו- RxJava.

יתר על כן, ראינו דוגמה מעשית של יישומי Flow API ו- RxJava עבור זרם וידאו חי.

אבל כל ההיבטים של Flow API ו- RxJava כמו זרימה :: מעבד, זורם :: מפה ו Flowable :: flatMap או אסטרטגיות לחץ אחורי אינן מכוסות כאן.

כמו תמיד, אתה מוצא את הקוד המלא של ההדרכה ב- GitHub.


$config[zx-auto] not found$config[zx-overlay] not found