מדריך ל- CountDownLatch בג'אווה
1. הקדמה
במאמר זה אנו נותנים מדריך ל CountDownLatch השיעור והדגימו כיצד ניתן להשתמש בו בכמה דוגמאות מעשיות.
בעיקרו של דבר, באמצעות CountDownLatch אנו יכולים לגרום לחסימה של חוט עד שחוטים אחרים השלימו משימה נתונה.
2. שימוש בתכנות מקביל
במילים פשוטות, א CountDownLatch יש דֶלְפֵּק שדה, שאותו תוכלו להפחית כנדרש. לאחר מכן נוכל להשתמש בו כדי לחסום שרשור קורא עד שהוא נספר לאפס.
אם היינו מבצעים עיבוד מקביל, היינו יכולים לייצר את CountDownLatch עם אותו ערך עבור הדלפק כמספר שרשורים שאנחנו רוצים לעבוד עליהם. ואז יכולנו פשוט להתקשר ספירה לאחור() לאחר סיום כל שרשור, מבטיח כי חוט תלוי מתקשר לְהַמתִין() ייחסם עד לסיום חוטי העובד.
3. מחכה להשלמת מאגר חוטים
בואו ננסה דפוס זה על ידי יצירת a עוֹבֵד ושימוש ב- CountDownLatch שדה לאותות מתי הוא השלים:
מחלקה ציבורית עובדת מיישמת Runnable {private List outputScraper; פרטי CountDownLatch countDownLatch; עובד ציבורי (רשימת outputScraper, CountDownLatch countDownLatch) {this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override הפעלה בטלנית ציבורית () {doSomeWork (); outputScraper.add ("נספר למטה"); countDownLatch.countDown (); }}
לאחר מכן, בואו ניצור מבחן על מנת להוכיח שנוכל להשיג א CountDownLatch לחכות ל עוֹבֵד מקרים להשלמה:
@ מבחן ציבורי בטל כאשרParallelProcessing_thenMainThreadWillBlockUntilCompletion () זורק InterruptedException {List outputScraper = Collections.synchronizedList (ArrayList חדש ()); CountDownLatch countDownLatch = CountDownLatch חדש (5); עובדים עובדים = זרם .generate (() -> שרשור חדש (עובד חדש (outputScraper, countDownLatch))) .limit (5) .collect (toList ()); workers.forEach (אשכול :: התחל); countDownLatch.await (); outputScraper.add ("הבריח שוחרר"); assertThat (outputScraper) .contains בדיוק ("נספר למטה", "נספר למטה", "נספר למטה", "נספר למטה", "נספר למטה", "תפס שוחרר"); }
באופן טבעי "תפס משוחרר" תמיד יהיה הפלט האחרון - מכיוון שהוא תלוי ב- CountDownLatch משחרר.
שימו לב שאם לא התקשרנו לְהַמתִין(), לא נוכל להבטיח את הזמנת ביצוע החוטים, כך שהבדיקה תיכשל באופן אקראי.
4. מאגר חוטים שמחכה להתחיל
אם לקחנו את הדוגמה הקודמת, אך הפעם התחלנו אלפי שרשורים במקום חמישה, סביר להניח שרבים מהקודמים מסיימים את העיבוד עוד לפני שהתקשרנו הַתחָלָה() על המאוחרים יותר. זה יכול להקשות על ניסיון לשחזר בעיית מקביליות, מכיוון שלא נוכל לגרום לכל הנושאים שלנו לפעול במקביל.
כדי לעקוף את זה, בואו נקבל את ספירה לאחור לעבוד אחרת מאשר בדוגמה הקודמת. במקום לחסום שרשור הורה עד לסיום כמה שרשורי ילדים, אנו יכולים לחסום כל שרשור ילד עד שכל האחרים התחילו.
בואו נשנה את שלנו לָרוּץ() שיטה כך שהיא נחסמת לפני העיבוד:
מחלקה ציבורית WaitingWorker מיישמת את Runnable {private list outputScraper; פרטי CountDownLatch readyThreadCounter; פרטי CountDownLatch callingThreadBlocker; CountDownLatch הפרטי completeThreadCounter; WaitingWorker ציבורי (רשימת outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completeThreadCounter) {this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completeThreadCounter; } @Override הפעלה בטלנית ציבורית () {readyThreadCounter.countDown (); נסה את {callingThreadBlocker.await (); תעשה קצת עבודה(); outputScraper.add ("נספר למטה"); } לתפוס (InterruptedException e) {e.printStackTrace (); } סוף סוף {completeThreadCounter.countDown (); }}}
עכשיו, בואו לשנות את הבדיקה שלנו כך שהיא תחסום עד שכל עובדים התחילו, ביטול החסימה של עובדים, ואז חוסם עד עובדים סיים:
@Test ציבורי בטל כאשרDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime () זורק InterruptedException {רשימה outputScraper = Collections.synchronizedList (ArrayList חדש ()); CountDownLatch readyThreadCounter = CountDownLatch חדש (5); CountDownLatch callingThreadBlocker = CountDownLatch חדש (1); CountDownLatch completeThreadCounter = CountDownLatch חדש (5); עובדים עובדים = זרם .generate (() -> שרשור חדש (WaitingWorker חדש (outputScraper, readyThreadCounter, callingThreadBlocker, completeThreadCounter))) .limit (5) .collect (toList ()); workers.forEach (אשכול :: התחל); readyThreadCounter.await (); outputScraper.add ("עובדים מוכנים"); callingThreadBlocker.countDown (); completeThreadCounter.await (); outputScraper.add ("העובדים הושלמו"); assertThat (outputScraper) .contains בדיוק ("עובדים מוכנים", "נספרו למטה", "נספרו למטה", "נספרו למטה", "נספרו למטה", "נספרו למטה", "עובדים הושלמו"); }
תבנית זו שימושית באמת לניסיון לשחזר באגים מקבילים, כפי שניתן להשתמש בהם בכדי לאלץ אלפי שרשורים לנסות לבצע היגיון כלשהו במקביל.
5. סיום א ספירה לאחור מוקדם
לפעמים, אנו עלולים להיתקל במצב בו עובדים לסיים בטעות לפני שסופרים לאחור את CountDownLatch. זה עלול לגרום לכך שהוא לעולם לא יגיע לאפס ו לְהַמתִין() לעולם לא מסתיים:
@ עקוף ריצה בטלנית ציבורית () {if (true) {זרוק RuntimeException חדש ("הו יקירי, אני BrokenWorker"); } countDownLatch.countDown (); outputScraper.add ("נספר למטה"); }
בואו לשנות את הבדיקה הקודמת שלנו כדי להשתמש ב- BrokenWorker, על מנת להראות כיצד לְהַמתִין() יחסום לנצח:
@Test ציבורי בטל כאשר FailingToParallelProcess_thenMainThreadShouldGetNotGetStuck () זורק InterruptedException {List outputScraper = Collections.synchronizedList (ArrayList new ()); CountDownLatch countDownLatch = CountDownLatch חדש (5); עובדים ברשימה = זרם .generate (() -> שרשור חדש (BrokenWorker חדש (outputScraper, countDownLatch))) .limit (5) .collect (toList ()); workers.forEach (אשכול :: התחל); countDownLatch.await (); }
ברור שזו לא ההתנהגות שאנחנו רוצים - עדיף שהיישום יימשך הרבה יותר מאשר לחסום אינסוף.
כדי לעקוף את זה, בואו להוסיף טיעון פסק זמן לקריאה שלנו ל- לְהַמתִין().
בוליאני הושלם = countDownLatch.await (3L, TimeUnit.SECONDS); assertThat (הושלם) .isFalse ();
כפי שאנו רואים, בסופו של דבר המבחן יפסק זמן לְהַמתִין() יחזור שֶׁקֶר.
6. מסקנה
במדריך מהיר זה הוכחנו כיצד אנו יכולים להשתמש ב- CountDownLatch על מנת לחסום שרשור עד ששרשורים אחרים יסיימו עיבוד כלשהו.
הראינו גם כיצד ניתן להשתמש בו בכדי לסייע באיתור באגים בבעיות במקביל על ידי הקפדה על שרשורים פועלים במקביל.
ניתן למצוא את היישום של דוגמאות אלה ב- GitHub; זהו פרויקט מבוסס Maven, ולכן צריך להיות קל להפעלה כפי שהוא.