עיבוד אצווה של Java EE 7

1. הקדמה

דמיין שעלינו לבצע משימות ידנית כמו עיבוד תלושי משכורת, חישוב ריביות והפקת חשבונות. זה יהפוך משעמם למדי, מועד לטעויות ורשימה בלתי נגמרת של משימות ידניות!

במדריך זה נסתכל על עיבוד Java Batch (JSR 352), חלק מפלטפורמת EE של ג'קרטה, ומפרט נהדר לאוטומציה של משימות כמו אלה. הוא מציע למפתחי אפליקציות מודל לפיתוח מערכות עיבוד אצווה חזקות כדי שיוכלו להתמקד בהיגיון העסקי.

2. תלות Maven

מכיוון ש- JSR 352 הוא רק מפרט, נצטרך לכלול את ה- API והיישום שלו כמו ג'ברט:

 javax.batch javax.batch-api 1.0.1 org.jberet jberet-core 1.0.2.Final org.jberet jberet-support 1.0.2.Final org.jberet jberet-se 1.0.2.Final 

נוסיף גם מסד נתונים בזיכרון כדי שנוכל להסתכל על תרחישים מציאותיים יותר.

3. מושגי מפתח

JSR 352 מציג כמה מושגים, אותם נוכל להסתכל כך:

בואו נגדיר תחילה כל חלק:

  • החל משמאל, יש לנו את JobOperator. זה מנהל את כל ההיבטים של עיבוד התפקידים כגון התחלה, עצירה והפעלה מחדש
  • לאחר מכן, יש לנו את עבודה. עבודה היא אוסף הגיוני של צעדים; הוא מכיל תהליך אצווה שלם
  • עבודה תכלול בין 1 ל- n שלבס. כל שלב הוא יחידת עבודה עצמאית ורצופה. שלב מורכב מ קריאה קֶלֶט, מעבד הקלט הזה, ו כְּתִיבָה תְפוּקָה
  • ואחרון חביב, יש לנו את JobRepository המאחסן את המידע הפועל של העבודות. זה עוזר לעקוב אחר משרות, מצבם ותוצאות השלמתם

לשלבים יש קצת יותר פרטים מזה, אז בואו נסתכל על זה בהמשך. ראשית, נסתכל על חתיכה צעדים ואז ב ביגלטס.

4. יצירת נתח

כאמור, נתח הוא סוג של צעד. לעתים קרובות נשתמש בגוש כדי לבטא פעולה שמבוצעת שוב ושוב, למשל על קבוצה של פריטים. זה בערך כמו פעולות ביניים מזרמי ג'אווה.

כאשר אנו מתארים נתח, עלינו להביע מאיפה לקחת פריטים, כיצד לעבד אותם ולאן לשלוח אותם לאחר מכן.

4.1. פריטי קריאה

כדי לקרוא פריטים, נצטרך ליישם ItemReader.

במקרה זה, ניצור קורא שפשוט יפלוט את המספרים 1 עד 10:

מחלקה ציבורית @Named SimpleChunkItemReader מרחיב את AbstractItemReader {אסימונים שלמים שלמים []; ספירת מספרים שלמים פרטיים; @Inject JobContext jobContext; @Override ציבור שלם readItem () זורק חריג {אם (ספירה> = אסימונים.אורך) {להחזיר null; } jobContext.setTransientUserData (ספירה); החזר אסימונים [ספירה ++]; } @Override חלל ציבורי פתוח (מחסום ניתן לסידור) זורק חריג {אסימונים = מספר שלם חדש [] {1,2,3,4,5,6,7,8,9,10}; ספירה = 0; }}

עכשיו, אנחנו רק קוראים מהמצב הפנימי של הכיתה כאן. אבל כמובן ש, readItem יכול לשלוף ממסד נתונים, ממערכת הקבצים, או ממקור חיצוני אחר.

שים לב שאנו שומרים חלק ממצב פנימי זה באמצעות JobContext # setTransientUserData () אשר יועיל בהמשך.

כמו כן, שים לב ל מחסום פָּרָמֶטֶר. אנחנו גם נאסוף את זה.

4.2. עיבוד פריטים

כמובן, הסיבה שאנחנו משתבבים היא שאנחנו רוצים לבצע פעולה כלשהי על הפריטים שלנו!

בכל פעם שנחזור ריק ממעבד פריטים, אנו מורידים פריט זה מהאצווה.

אז בואו נגיד כאן שאנחנו רוצים לשמור רק על המספרים הזוגיים. אנחנו יכולים להשתמש ב- פריט מעבד הדוחה את המוזרים בכך שהוא חוזר ריק:

המחלקה הציבורית @Named SimpleChunkItemProcessor מיישמת את ItemProcessor {@Override Public Integer processItem (Object t) {Integer item = (Integer) t; פריט החזר% 2 == 0? פריט: null; }}

תהליך פריט יתקשר פעם אחת לכל פריט שלנו ItemReader פולט.

4.3. פריטי כתיבה

לבסוף, התפקיד יפעיל את ItemWriter כדי שנוכל לכתוב את הפריטים שהשתנו:

המחלקה הציבורית @Named SimpleChunkWriter מרחיבה את AbstractItemWriter {רשימה מעובדת = ArrayList חדש (); @Override public void writeItems (פריטי רשימה) זורק חריג {items.stream (). מפה (Integer.class :: cast) .forEach (מעובד :: add); }} 

כמה זמן הוא פריטים? תוך רגע נגדיר את גודל הנתח, שיקבע את גודל הרשימה אליה נשלח כתיבת פריטים.

4.4. הגדרת נתח בעבודה

כעת שמנו את כל זה בקובץ XML באמצעות JSL או שפת מפרט העבודה. שים לב שנפרט את הקורא, המעבד, הצ'ונקר וגם גודל הנתח:

גודל הנתח הוא התדירות שבה ההתקדמות בגוש מחויבת למאגר העבודה, שחשוב להבטיח השלמה, במידה וחלק מהמערכת תיכשל.

נצטרך למקם את הקובץ הזה מטא-INF / עבודות אצווה ל .קַנקַן קבצים וב WEB-INF / שיעורים / META-INF / עבודות אצווה ל .מִלחָמָה קבצים.

נתנו את העבודה שלנו לזהות "SimpleChunk", אז בואו ננסה את זה במבחן יחידה.

כעת, עבודות מבוצעות בצורה אסינכרונית, מה שהופך אותן למורכבות לבדיקה. במדגם, הקפד לבדוק את שלנו BatchTestHelper איזה סקרים מחכה עד לסיום העבודה:

@Test הציבור בטל givenChunk_thenBatch_completesWithSuccess () זורק חריג {JobOperator jobOperator = BatchRuntime.getJobOperator (); Expression LongId = jobOperator.start ("simpleChunk", מאפיינים חדשים ()); JobExecution jobExecution = jobOperator.getJobExecution (executId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); } 

אז זה מה שיש נתחים. עכשיו, בואו נסתכל על חבילות.

5. יצירת ביגלט

לא הכל משתלב בצורה מסודרת במודל איטרטיבי. לדוגמא, ייתכן שיש לנו משימה שאנחנו פשוט צריכים להפעיל פעם אחת, לרוץ להשלמה ולהחזיר סטטוס יציאה.

החוזה לחבילה הוא די פשוט:

המחלקה הציבורית @Named SimpleBatchLet מרחיבה את AbstractBatchlet {@Override Public String Process () זורק Exception {return BatchStatus.COMPLETED.toString (); }}

כמו גם ה- JSL:

ונוכל לבדוק זאת באותה גישה כמו בעבר:

@Test הציבור בטל givenBatchlet_thenBatch_completeWithSuccess () זורק חריג {JobOperator jobOperator = BatchRuntime.getJobOperator (); Exhibition LongId = jobOperator.start ("simpleBatchLet", מאפיינים חדשים ()); JobExecution jobExecution = jobOperator.getJobExecution (executId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

אז בדקנו כמה דרכים שונות ליישום שלבים.

עכשיו בואו נסתכל על מנגנונים עבור סימון והבטחת התקדמות.

6. מחסום מותאם אישית

כישלונות חייבים לקרות באמצע עבודה. האם עלינו פשוט להתחיל מחדש את כל העניין, או שנוכל איכשהו להתחיל מאיפה שהפסקנו?

כמו שהשם מרמז, מחסומים עזור לנו להגדיר מעת לעת סימניה במקרה של כישלון.

כברירת מחדל, סוף עיבוד הנתחים הוא נקודת ביקורת טבעית.

עם זאת, אנו יכולים להתאים את זה לבד מחסום אלגוריתם:

המחלקה הציבורית @Named CustomCheckPoint מרחיבה את AbstractCheckpointAlgorithm {@Inject JobContext jobContext; @Override בוליאני ציבורי isReadyToCheckpoint () זורק חריג {int counterRead = (Integer) jobContext.getTransientUserData (); מונה החזרה קרא% 5 == 0; }}

זוכר את הספירה ששמנו בנתונים חולפים קודם? פה, אנחנו יכולים לשלוף את זה איתו JobContext # getTransientUserDataלציין שאנחנו רוצים להתחייב על כל מספר 5 שעובד.

בלי זה, התחייבות תתרחש בסוף כל נתח, או במקרה שלנו, כל מספר שלישי.

ואז, אנו מתאימים זאת עם ה- אלגוריתם קופה הנחיה ב- XML ​​שלנו מתחת לגוש שלנו:

בואו נבדוק את הקוד, ושוב נציין שחלק מהצעדים של צלחת הדוד מוסתרים BatchTestHelper:

@Test הציבור בטל givenChunk_whenCustomCheckPoint_thenCommitCountIsThree () זורק חריג {// ... התחל עבודה וחכה לסיום jobOperator.getStepExecutions (executId) .stream () .map (BatchTestHelper :: getCommitCount) .forEach (count -> assertEquals .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

לכן, אנו מצפים לספירת התחייבות של 2 מכיוון שיש לנו עשרה פריטים והגדרנו את ההתחייבות להיות כל פריט חמישי. אבל, המסגרת מבצעת עוד קריאה אחרונה בסוף כדי להבטיח שהכל עבר עיבוד, וזה מה שמביא אותנו ל -3.

לאחר מכן, בואו נסתכל כיצד לטפל בשגיאות.

7. טיפול בחריגים

כברירת מחדל, מפעיל העבודה יסמן את תפקידנו כ- נִכשָׁל במקרה של חריג.

בואו נחליף את קורא הפריטים שלנו כדי לוודא שהוא נכשל:

@Override ציבור שלם readItem () זורק חריג {אם (tokens.hasMoreTokens ()) {String tempTokenize = tokens.nextToken (); לזרוק RuntimeException חדש (); } להחזיר אפס; }

ואז לבדוק:

@Test ציבורי בטל כאשרChunkError_thenBatch_CompletesWithFailed () זורק Exception {// ... התחל עבודה וחכה לסיום assertEquals (jobExecution.getBatchStatus (), BatchStatus.FAILED); }

אך אנו יכולים לבטל התנהגות ברירת מחדל זו במספר דרכים:

  • דלג על הגבלה מציין את מספר החריגים שצעד זה יתעלם ממנו לפני שנכשל
  • נסה להגביל מחדש מציין את מספר הפעמים שמפעיל העבודה צריך לנסות שוב את הצעד לפני שהוא נכשל
  • ניתן לדלג על מעמד חריג מציין מערך חריגים שעיבוד הנתחים יתעלם ממנו

אז נוכל לערוך את העבודה שלנו כך שהיא תתעלם חריגת זמן ריצה, כמו גם כמה אחרים, רק להמחשה:

ועכשיו הקוד שלנו יעבור:

@Test הציבור בטל givenChunkError_thenErrorSkipped_CompletesWithSuccess () זורק חריג {// ... התחל עבודה וחכה לסיום jobOperator.getStepExecutions (executId) .stream () .map (BatchTestHelper :: getProcessSkipCount). ForEachert (skipCount). .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8. ביצוע שלבים מרובים

הזכרנו קודם כי עבודה יכולה לכלול מספר שלבים כלשהו, ​​אז בוא נראה את זה עכשיו.

8.1. ירי השלב הבא

כברירת מחדל, כל שלב הוא השלב האחרון בתפקיד.

על מנת לבצע את השלב הבא במסגרת עבודת אצווה, עלינו לציין במפורש באמצעות ה- הַבָּא תכונה בהגדרת הצעד:

אם נשכח תכונה זו, השלב הבא ברצף לא יבוצע.

ואנחנו יכולים לראות איך זה נראה בממשק ה- API:

@Test הציבור בטל givenTwoSteps_thenBatch_CompleteWithSuccess () זורק חריג {// ... התחל עבודה וחכה לסיום assertEquals (2, jobOperator.getStepExecutions (executId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.2. זורמים

ניתן לצמצם רצף של צעדים גם ל- a זְרִימָה. לאחר סיום הזרימה, הזרימה כולה עוברת לאלמנט הביצוע. כמו כן, אלמנטים בתוך הזרימה אינם יכולים לעבור לאלמנטים מחוץ לזרימה.

אנו יכולים, למשל, לבצע שני צעדים בתוך זרימה, ואז לעבור את הזרימה לשלב מבודד:

ואנחנו עדיין יכולים לראות כל ביצוע שלב באופן עצמאי:

@Test ציבורי בטל givenFlow_thenBatch_CompleteWithSuccess () זורק חריג {// ... התחל עבודה וחכה לסיום assertEquals (3, jobOperator.getStepExecutions (executId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.3. החלטות

יש לנו גם תמיכה אם / אחרת בצורה של החלטות. ההחלטות מספקות דרך מותאמת אישית לקביעת רצף בין צעדים, זרימות ופיצולים.

כמו צעדים, זה עובד על אלמנטים מעבר כגון הַבָּא שיכולים לכוון או להפסיק את ביצוע העבודה.

בואו נראה כיצד ניתן להגדיר את העבודה:

כל הַחְלָטָה אלמנט צריך להיות מוגדר עם מחלקה שמיישמת מחליט. תפקידו להחזיר החלטה כ- חוּט.

כל אחד הַבָּא בְּתוֹך הַחְלָטָה הוא כמו א מקרה ב החלף הַצהָרָה.

8.4. פיצולים

פיצולים שימושיים מכיוון שהם מאפשרים לנו לבצע זרימות במקביל:

כמובן, פירוש הדבר שההזמנה אינה מובטחת.

בואו נאשר שכולם עדיין רצים. שלבי הזרימה יבוצעו בסדר שרירותי, אך הצעד המבודד תמיד יהיה האחרון:

@Test ציבורי בטל givenSplit_thenBatch_CompletesWithSuccess () זורק חריג {// ... התחל עבודה והמתין לסיום רשימת stepExecutions = jobOperator.getStepExecutions (executId); assertEquals (3, stepExecutions.size ()); assertEquals ("splitJobSequenceStep3", stepExecutions.get (2) .getStepName ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

9. חלוקת עבודה

אנו יכולים גם לצרוך את מאפייני האצווה בתוך קוד ה- Java שלנו שהוגדרו בתפקידנו.

ניתן לטפל בהם בשלוש רמות - העבודה, הצעד, וחפץ האצווה.

בואו נראה כמה דוגמאות לאיך הם צרכו.

כשאנחנו רוצים לצרוך את הנכסים ברמת העבודה:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties (); ...

ניתן לצרוך זאת גם ברמת שלב:

@Inject StepContext stepContext; ... stepProperties = stepContext.getProperties (); ...

כשאנחנו רוצים לצרוך את המאפיינים ברמת חפץ אצווה:

@Inject @BatchProperty (name = "name") פרטי מחרוזת nameString;

זה שימושי עם מחיצות.

ראה, עם פיצולים אנו יכולים להריץ זרמים במקביל. אבל אנחנו יכולים גם חֲלוּקָה צעד לתוכו נ קבוצות של פריטים או קביעת תשומות נפרדות, מה שמאפשר לנו דרך אחרת לפצל את העבודה על פני מספר שרשורים.

כדי להבין את קטע העבודה שכל מחיצה צריכה לעשות, נוכל לשלב מאפיינים עם מחיצות:

10. עצור והפעל מחדש

עכשיו, זה להגדרת משרות. עכשיו בואו נדבר רגע על ניהול אותם.

כבר ראינו בבדיקות היחידות שלנו שאנחנו יכולים לקבל מופע JobOperator מ BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator ();

ואז נוכל להתחיל בעבודה:

Exhibition LongId = jobOperator.start ("simpleBatchlet", מאפיינים חדשים ());

עם זאת, אנו יכולים גם להפסיק את העבודה:

jobOperator.stop (executId);

ולבסוף, אנו יכולים להפעיל מחדש את העבודה:

executId = jobOperator.restart (executId, מאפיינים חדשים ());

בואו נראה איך נוכל להפסיק עבודה ריצה:

@Test הציבור בטל givenBatchLetStarted_whenStopped_thenBatchStopped () זורק חריג {JobOperator jobOperator = BatchRuntime.getJobOperator (); Exhibition LongId = jobOperator.start ("simpleBatchLet", מאפיינים חדשים ()); JobExecution jobExecution = jobOperator.getJobExecution (executId); jobOperator.stop (executId); jobExecution = BatchTestHelper.keepTestStopped (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); }

ואם אצווה היא עצרואז נוכל להפעיל אותו מחדש:

@Test ציבורי בטל givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess () {// ... התחל והפסיק את העבודה assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); executId = jobOperator.restart (jobExecution.getExecutionId (), מאפיינים חדשים ()); jobExecution = BatchTestHelper.keepTestAlive (jobOperator.getJobExecution (executId)); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

11. גיוס משרות

כאשר מוגשת עבודת אצווה אז זמן הריצה של האצווה יוצר מופע של ביצוע עבודה כדי לעקוב אחרי זה.

להשיג את ביצוע עבודה עבור מזהה ביצוע, אנו יכולים להשתמש ב- JobOperator # getJobExecution (executionId) שיטה.

וגם ביצוע שלב מספק מידע מועיל למעקב אחר ביצוע שלב.

להשיג את ביצוע שלב עבור מזהה ביצוע, אנו יכולים להשתמש ב- JobOperator # getStepExecutions (executId) שיטה.

ומתוך כך אנו יכולים לקבל מספר מדדים על הצעד שלב ביצוע # getMetrics:

@Test הציבור בטל givenChunk_whenJobStarts_thenStepsHaveMetrics () זורק חריג {// ... התחל עבודה וחכה לסיום assertTrue (jobOperator.getJobNames (). מכיל ("simpleChunk")); assertTrue (jobOperator.getParameters (executionId) .isEmpty ()); StepExecution stepExecution = jobOperator.getStepExecutions (executId) .get (0); מפה metricTest = BatchTestHelper.getMetricsMap (stepExecution.getMetrics ()); assertEquals (10L, metricTest.get (Metric.MetricType.READ_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.FILTER_COUNT) .longValue ()); assertEquals (4L, metricTest.get (Metric.MetricType.COMMIT_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.WRITE_COUNT) .longValue ()); // ... ועוד רבים! }

12. חסרונות

JSR 352 הוא חזק, אם כי הוא חסר במספר תחומים:

  • נראה כי חסרים קוראים וסופרים שיכולים לעבד פורמטים אחרים כגון JSON
  • אין תמיכה בגנריות
  • חלוקה תומכת רק בצעד אחד
  • ה- API אינו מציע שום דבר לתמיכה בתזמון (אם כי ל- J2EE יש מודול תזמון נפרד)
  • בשל אופיו האסינכרוני, בדיקות יכולות להוות אתגר
  • ה- API די מילולי

13. מסקנה

במאמר זה בדקנו את JSR 352 ולמדנו על נתחים, אצבעות, פיצולים, זרימות ועוד. עם זאת, בקושי גירדנו את פני השטח.

כמו תמיד ניתן למצוא את קוד ההדגמה ב- GitHub.