מדריך לנחלי עכה

1. סקירה כללית

במאמר זה נבחן את ה- akka-streams ספרייה שנבנית על גבי מסגרת שחקני עכו, הדבוקה במניפסט הזרמים התגובתיים. ממשק ה- Akka Streams מאפשר לנו לחבר בקלות זרמי טרנספורמציה של נתונים משלבים עצמאיים.

יתר על כן, כל העיבודים נעשים באופן תגובתי, לא חוסם ואסינכרוני.

2. תלות Maven

כדי להתחיל, עלינו להוסיף את ה- akka-stream ו akka-stream-testkit ספריות לתוך שלנו pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2 com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. ממשק API של Akka Streams

כדי לעבוד עם זרמי Akka, עלינו להיות מודעים למושגי הליבה של API:

  • מקור - נקודת הכניסה לעיבוד ב akka-stream סִפְרִיָה - אנו יכולים ליצור מופע של מחלקה זו ממספר מקורות; למשל, אנו יכולים להשתמש ב- יחיד() שיטה אם אנו רוצים ליצור מָקוֹר מסינגל חוּט, או שנוכל ליצור מָקוֹר מ- ניתן לנידון של אלמנטים
  • זְרִימָה - אבן הבניין העיקרית לעיבוד - כל אחד זְרִימָה למשל יש קלט אחד וערך יציאה אחד
  • חומר מיצוב - wאנחנו יכולים להשתמש באחד אם אנחנו רוצים את שלנו זְרִימָה לקבל כמה תופעות לוואי כמו רישום או שמירת תוצאות; לרוב, אנו נעביר את לא בשימוש כינוי כ מטריאליזציה לציין את זה שלנו זְרִימָה לא אמורות להיות לתופעות לוואי
  • כִּיוֹר פעולה - כשאנחנו בונים א זְרִימָה, זה לא יבוצע עד שנרשום א כִּיוֹר מבצע עליו - מדובר בפעולה סופנית המפעילה את כל החישובים בסך הכל זְרִימָה

4. יצירה זורמים בזרמי עכו

נתחיל בבניית דוגמה פשוטה, בה נראה כיצד ליצור ולשלב מספר רב זְרִימָהס - לעבד זרם של מספרים שלמים ולחשב את החלון הממוצע הנע של זוגות שלמים מהזרם.

ננתח תיחום של נקודה-פסיק חוּט מספרים שלמים כקלט ליצירת שלנו akka-stream מקור לדוגמא.

4.1. באמצעות א זְרִימָה לנתח קלט

ראשית, בואו ניצור a DataImporter בכיתה שתיקח מופע של מערכת שחקנים שנשתמש בהמשך ליצירת שלנו זְרִימָה:

מחלקה ציבורית DataImporter {ActorSystem private actorSystem; // בונים סטנדרטיים, גטרים ...}

לאחר מכן, בואו ליצור a parseLine שיטה שתייצר a רשימה שֶׁל מספר שלם מהקלט התוחם שלנו חוּט. זכור כי אנו משתמשים ב- Java Stream API כאן רק לצורך ניתוח:

רשימה פרטית parseLine (קו מחרוזת) {מחרוזת [] שדות = line.split (";"); החזר Arrays.stream (שדות) .map (Integer :: parseInt) .collect (Collectors.toList ()); }

הראשונית שלנו זְרִימָה יחול parseLine לקלט שלנו ליצור זְרִימָה עם סוג קלט חוּט וסוג הפלט מספר שלם:

פרטי Flow parseContent () {return Flow.of (String.class) .mapConcat (this :: parseLine); }

כשאנחנו קוראים parseLine () שיטה, המהדר יודע שהטיעון לאותה פונקציה למבדה יהיה a חוּט - זהה לסוג הקלט שלנו זְרִימָה.

שים לב שאנחנו משתמשים ב- mapConcat () שיטה - שווה ערך ל- Java 8 flatMap () שיטה - כי אנחנו רוצים לשטח את רשימה שֶׁל מספר שלם הוחזר על ידי parseLine () לתוך זְרִימָה שֶׁל מספר שלם כך שהשלבים הבאים בעיבוד שלנו לא יצטרכו להתמודד עם רשימה.

4.2. באמצעות א זְרִימָה לבצע חישובים

בשלב זה, יש לנו את שלנו זְרִימָה של מספרים שלמים מעובדים. עכשיו, אנחנו צריכים ליישם לוגיקה שתקבץ את כל אלמנטים הקלט לזוגות ותחשב ממוצע של זוגות אלה.

עכשיו, אנחנו ליצור זְרִימָה שֶׁל מספר שלםוקבץ אותם באמצעות מקובץ () שיטה.

לאחר מכן, אנו רוצים לחשב ממוצע.

מכיוון שאיננו מעוניינים בסדר העיבוד של הממוצעים הללו, נוכל לעשות זאת יש ממוצעים המחושבים במקביל תוך שימוש במספר נושאים באמצעות mapAsyncUnordered () שיטה, העברת מספר האשכולות כטיעון לשיטה זו.

הפעולה שתועבר כלמבה ל זְרִימָה צריך להחזיר א העתיד מכיוון שפעולה זו תחושב בצורה אסינכרונית בשרשור הנפרד:

Flow computeAverage () {return Flow.of (Integer.class) .grouped (2) .mapAsyncUnordered (8, מספרים שלמים -> CompletableFuture.supplyAsync (() -> integers.stream () .mapToDouble (v -> v). ממוצע () .orElse (-1.0))); }

אנו מחשבים ממוצעים בשמונה שרשורים מקבילים. שים לב שאנחנו משתמשים ב- Java 8 Stream API לחישוב ממוצע.

4.3. הלחנה מרובה זורמים לסינגל זְרִימָה

ה זְרִימָה API הוא הפשטה שוטפת המאפשרת לנו להלחין מרובה זְרִימָה מקרים להשגת מטרת העיבוד הסופית שלנו. אנו יכולים לקבל זרמים גרגירים שבהם אחד, למשל, מנתח ג'סון, אחר עושה טרנספורמציה, ואחר אוסף כמה נתונים סטטיסטיים.

פירוט כזה יעזור לנו ליצור קוד לבדיקה יותר מכיוון שנוכל לבדוק כל שלב עיבוד באופן עצמאי.

יצרנו שני זרמים מעל שיכולים לעבוד באופן עצמאי זה מזה. עכשיו, אנחנו רוצים לחבר אותם יחד.

ראשית, אנו רוצים לנתח את התשומות שלנו חוּטוהבא, אנו רוצים לחשב ממוצע על זרם של אלמנטים.

אנו יכולים לחבר את הזרמים שלנו באמצעות ה- באמצעות() שיטה:

זרימת חישוב ממוצע () {החזר Flow.of (String.class) .via (parseContent ()) .via (computeAverage ()); }

יצרנו א זְרִימָה בעל סוג קלט חוּט ועוד שניים זורמים אחריו. ה parseContent ()זְרִימָה לוקח חוּט קלט ומחזיר מספר שלם כפלט. ה זרימת computeAverage () לוקח את זה מספר שלם ומחשב החזר ממוצע לְהַכפִּיל כסוג הפלט.

5. הוספה כִּיוֹר אל ה זְרִימָה

כפי שהזכרנו, עד כאן הכלל זְרִימָה עדיין לא מוצא להורג כי זה עצלן. כדי להתחיל בביצוע זְרִימָה עלינו להגדיר א כִּיוֹר. ה כִּיוֹר פעולה יכולה, למשל, לשמור נתונים במסד נתונים, או לשלוח תוצאות לשירות אינטרנט חיצוני כלשהו.

נניח שיש לנו מאגר ממוצע בכיתה עם הבאים לשמור() שיטה שכותבת תוצאות למאגר שלנו:

שמירת CompletionStage (ממוצע כפול) {להחזיר CompletableFuture.supplyAsync (() -> {// לכתוב לממוצע החזרת בסיס נתונים;}); }

כעת, אנו רוצים ליצור כִּיוֹר פעולה המשתמשת בשיטה זו כדי לשמור את התוצאות של זְרִימָה מעבד. ליצור את שלנו כִּיוֹר, אנחנו צריכים קודם ליצור זְרִימָה זה לוקח תוצאה של העיבוד שלנו כסוג הקלט. לאחר מכן, אנו רוצים לשמור את כל התוצאות שלנו במסד הנתונים.

שוב, לא אכפת לנו מהזמנת האלמנטים, כך שנוכל לבצע את לשמור() פעולות במקביל משתמש ב mapAsyncUnordered () שיטה.

ליצור כִּיוֹר מ ה זְרִימָה אנחנו צריכים להתקשר ל toMat () עם Sink.ignore () כטיעון ראשון ו Keep.right () בתור השני כי אנחנו רוצים להחזיר סטטוס של עיבוד:

כיור פרטי storeAverages () {return Flow.of (Double.class) .mapAsyncUnordered (4, averageRepository :: save) .toMat (Sink.ignore (), Keep.right ()); }

6. הגדרת מקור עבור זְרִימָה

הדבר האחרון שעלינו לעשות הוא לעשות זאת ליצור מָקוֹר מהקלט חוּט. אנחנו יכולים להחיל א חישוב ממוצע ()זְרִימָה למקור זה באמצעות באמצעות() שיטה.

ואז להוסיף את ה- כִּיוֹר לעיבוד, עלינו להתקשר ל- לרוץ עם() שיטה ולהעביר את storeAverages () כיור שזה עתה יצרנו:

CompletionStage calcAAverForContent (תוכן מחרוזת) {return Source.single (content) .via (calcinAverage ()) .runWith (storeAverages (), ActorMaterializer.create (actorSystem)) .whenComplete ((d, e) -> {if (d! = null) {System.out.println ("הייבוא ​​הסתיים");} אחר {e.printStackTrace ();}}); }

שים לב שכאשר העיבוד הסתיים אנו מוסיפים את ה- מתי השלמה () התקשרות חוזרת, בה אנו יכולים לבצע פעולה כלשהי בהתאם לתוצאות העיבוד.

7. בדיקות זרמי עכה

אנו יכולים לבדוק את העיבוד שלנו באמצעות ה- akka-stream-testkit.

הדרך הטובה ביותר לבדוק את ההיגיון בפועל של העיבוד היא לבדוק הכל זְרִימָה הגיון ושימוש TestSink להפעיל את החישוב ולהעמיד על התוצאות.

במבחן שלנו אנו יוצרים את זְרִימָה שאנחנו רוצים לבדוק, והבא, אנחנו יוצרים a מָקוֹר מתוכן קלט המבחן:

@Test ציבורי בטל givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults () {// נתון Flow שנבדק = DataImporter חדש (actorSystem) .calculateAverage (); קלט מחרוזת = "1; 9; 11; 0"; // כאשר מקור הזרימה = Source.single (קלט) .via (נבדק); // ואז לזרום .runWith (TestSink.probe (actorSystem), ActorMaterializer.create (actorSystem)) .quest (4) .expectNextUnordered (5d, 5.5); }

אנו בודקים כי אנו מצפים לארבעה טיעוני קלט, ושתי תוצאות שהן ממוצעות יכולות להגיע בכל סדר כיוון שהעיבוד שלנו נעשה בצורה אסינכרונית ומקבילה.

8. מסקנה

במאמר זה, הסתכלנו על akka-stream סִפְרִיָה.

הגדרנו תהליך המשלב מרובה זורמים לחישוב ממוצע נע של אלמנטים. לאחר מכן, הגדרנו א מָקוֹר זוהי נקודת כניסה לעיבוד הזרם ו- כִּיוֹר שמפעיל את העיבוד בפועל.

לבסוף, כתבנו מבחן לעיבוד שלנו באמצעות ה- akka-stream-testkit.

ניתן למצוא את היישום של כל הדוגמאות וקטעי הקוד בפרויקט GitHub - זהו פרויקט Maven, כך שיהיה קל לייבא ולהפעיל אותו כפי שהוא.


$config[zx-auto] not found$config[zx-overlay] not found