מדריך לספריית אספני מקבלי Java

1. הקדמה

Parallel-Collectors היא ספרייה קטנה המספקת קבוצה של אספני Java Stream API המאפשרים עיבוד מקביל - ובו בזמן לעקוף ליקויים עיקריים בזרמים מקבילים סטנדרטיים.

2. תלות Maven

אם אנו רוצים להתחיל להשתמש בספרייה, עלינו להוסיף ערך יחיד ב- Maven's pom.xml קוֹבֶץ:

 com.pivovarit מקבלי אספנים 1.1.0 

או שורה אחת בקובץ ה- build של Gradle:

הידור 'com.pivovarit: אספנים מקבילים: 1.1.0'

את הגרסה החדשה ביותר תוכלו למצוא ב- Maven Central.

3. אזהרות מקבילות

זרמים מקבילים היו אחד מנקודות השיא של Java 8, אך התברר שהם ישימים לעיבוד כבד של המעבד באופן בלעדי.

הסיבה לכך הייתה העובדה ש זרמים מקבילים גובו באופן פנימי על ידי שיתוף JVM רחב ForkJoinPool, אשר סיפקה הקבלה מוגבלת ושימש את כל הזרמים המקבילים הפועלים על מופע JVM יחיד.

לדוגמא, דמיין שיש לנו רשימת מזהים ואנחנו רוצים להשתמש בהם כדי להביא רשימת משתמשים ושהפעולה הזו יקרה.

נוכל להשתמש בזרמים מקבילים לשם כך:

מזהי רשימה = Arrays.asList (1, 2, 3); תוצאות רשימה = ids.parallelStream () .map (i -> fetchById (i)) // כל פעולה אורכת שנייה אחת .collect (Collectors.toList ()); System.out.println (תוצאות); // [user-1, user-2, user-3]

ואכן, אנו יכולים לראות שיש מהירות מורגשת. אבל זה הופך להיות בעייתי אם נתחיל להפעיל מספר פעולות חסימה מקבילות ... במקביל. זה עשוי להרוות את הבריכה במהירות ולהביא לחביונים עצומים שעלולים להיות. לכן חשוב לבנות מחיצות על ידי יצירת מאגרי חוטים נפרדים - כדי למנוע ממשימות שאינן קשורות להשפיע על הביצוע של השנייה.

על מנת לספק מנהג ForkJoinPool למשל, נוכל למנף את הטריק המתואר כאן, אך גישה זו הסתמכה על פריצה ללא תיעוד והייתה פגומה עד ל- JDK10. אנו יכולים לקרוא עוד בגיליון עצמו - [JDK8190974].

4. אספנים מקבילים בפעולה

אספנים מקבילים, כפי שהשם מרמז, הם רק אספני סטרימינג API סטרימיים המאפשרים ביצוע פעולות נוספות במקביל בשעה לאסוף() שלב.

ParallelCollectors (שמשקף אספנים class) class הוא חזית המספקת גישה לכל הפונקציונליות של הספרייה.

אם היינו רוצים לעשות את הדוגמה לעיל, פשוט נוכל לכתוב:

ExecutorService executor = Executors.newFixedThreadPool (10); מזהי רשימה = Arrays.asList (1, 2, 3); העתיד תוצאות = ids.stream () .collect (ParallelCollectors.parallelToList (i -> fetchById (i), מבצע, 4)); System.out.println (results.join ()); // [user-1, user-2, user-3]

התוצאה היא זהה, עם זאת, הצלחנו לספק את מאגר החוטים המותאם אישית שלנו, לציין את רמת ההקבלה המותאמת אישית והתוצאה הגיעה עטופה ב- העתיד מופע מבלי לחסום את השרשור הנוכחי.

זרמים מקבילים סטנדרטיים, לעומת זאת, לא הצליחו להשיג אף אחד מאלה.

4.1. ParallelCollectors.parallelToList / ToSet ()

עד כמה שזה נעשה אינטואיטיבי, אם אנחנו רוצים לעבד א זרם במקביל ולאסוף תוצאות ל- a רשימה אוֹ מַעֲרֶכֶתאנחנו יכולים פשוט להשתמש ParallelCollectors.parallelToList אוֹ parallelToSet:

מזהי רשימה = Arrays.asList (1, 2, 3); תוצאות תוצאות = ids.stream () .collect (parallelToList (i -> fetchById (i), מבצע, 4)) .join ();

4.2. ParallelCollectors.parallelToMap ()

אם אנחנו רוצים לאסוף זרם אלמנטים לתוך a מַפָּה למשל, בדיוק כמו עם ממשק API של Stream, עלינו לספק שני ממפים:

מזהי רשימה = Arrays.asList (1, 2, 3); תוצאות מפה = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), executor, 4)) .join (); // {1 = משתמש -1, 2 = משתמש -2, 3 = משתמש -3}

אנו יכולים גם לספק מותאם אישית מַפָּה למשל ספק:

תוצאות מפה = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, executor, 4)) .join (); 

ואסטרטגיה מותאמת אישית לפתרון סכסוכים:

מזהי רשימה = Arrays.asList (1, 2, 3); תוצאות תוצאות = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, (s1, s2) -> s1, executor, 4)) .join ();

4.3. ParallelCollectors.parallelToCollection ()

בדומה לאמור לעיל, אנו יכולים להעביר את המנהג שלנו ספק אוספים אם אנו רוצים להשיג תוצאות ארוזות במיכל המותאם אישית שלנו:

תוצאות תוצאות = ids.stream () .collect (parallelToCollection (i -> fetchById (i), LinkedList :: new, executor, 4)) .join ();

4.4. ParallelCollectors.parallelToStream ()

אם לא די באמור לעיל, נוכל להשיג א זרם למשל ולהמשיך שם בעיבוד מותאם אישית:

מַפָּה results = ids.stream () .collect (parallelToStream (i -> fetchById (i), executor, 4)) .thenApply (stream -> stream.collect (Collectors.groupingBy (i -> i.length ()))) .לְהִצְטַרֵף();

4.5. ParallelCollectors.parallel ()

זה מאפשר לנו להזרים תוצאות בסדר השלמה:

ids.stream () .collect (מקביל (i -> fetchByIdWithRandomDelay (i), מבצע, 4)) .forEach (System.out :: println); // user-1 // user-3 // user-2 

במקרה זה, אנו יכולים לצפות מהאספן שיחזיר תוצאות שונות בכל פעם מאז הכנסנו עיכוב עיבוד אקראי.

4.6. ParallelCollectors.parallelOrdered ()

מתקן זה מאפשר תוצאות סטרימינג בדיוק כמו האמור לעיל, אך שומר על הסדר המקורי:

ids.stream () .collect (parallelOrdered (i -> fetchByIdWithRandomDelay (i), executor, 4)) .forEach (System.out :: println); // user-1 // user-2 // user-3 

במקרה זה, האספן תמיד ישמור על הסדר אך עשוי להיות איטי יותר מהאמור לעיל.

5. מגבלות

בשלב הכתיבה, אספנים מקבילים אינם עובדים עם זרמים אינסופיים גם אם משתמשים בפעולות קיצור - זו מגבלת תכנון שמטילה הפנימיים של Stream API. פשוט שים, זרםs מתייחסים לאספנים כאל פעולות לא קצרות, כך שהזרם צריך לעבד את כל האלמנטים במעלה הזרם לפני שיסופק.

המגבלה האחרת היא ש פעולות קיצור דרך אינן קוטעות את המשימות הנותרות לאחר קצר חשמלי.

6. מסקנה

ראינו כיצד ספריית האספנים המקבילים מאפשרת לנו לבצע עיבוד מקביל באמצעות שימוש ב- Java Stream API מותאם אישית אספנים ו CompleteFutures להשתמש בבריכות חוטים מותאמות אישית, מקבילות וסגנון שאינו חוסם CompleteFutures.

כמו תמיד, קטעי קוד זמינים ב- GitHub.

לקריאה נוספת, עיין בספריית האספנים המקבילים ב- GitHub, בבלוג המחבר ובחשבון הטוויטר של המחבר.