מדריך לפיצוח אפאצ'י

1. הקדמה

במדריך זה נדגים את Apache Crunch עם יישום לדוגמא לעיבוד נתונים. נפעיל יישום זה באמצעות מסגרת MapReduce.

נתחיל בכיסוי קצר של כמה מושגי אפאצ'י קראנץ '. ואז נקפוץ לאפליקציה לדוגמה. באפליקציה זו נעשה עיבוד טקסט:

  • קודם כל נקרא את השורות מקובץ טקסט
  • בהמשך, נחלק אותם למילים ונסיר כמה מילים נפוצות
  • לאחר מכן נקבץ את המילים הנותרות כדי לקבל רשימה של מילים ייחודיות וספירותיהן
  • לבסוף, נכתוב רשימה זו לקובץ טקסט

2. מה זה קראנץ '?

MapReduce היא מסגרת תכנות מבוזרת ומקבילה לעיבוד כמויות גדולות של נתונים באשכול שרתים. מסגרות תוכנה כגון Hadoop ו- Spark מיישמות את MapReduce.

Crunch מספק מסגרת לכתיבה, בדיקה והפעלת צינורות MapReduce בג'אווה. כאן איננו כותבים את עבודות MapReduce ישירות. במקום זאת, אנו מגדירים צינור נתונים (כלומר פעולות לביצוע שלבי קלט, עיבוד ופלט) באמצעות ממשקי ה- API של Crunch. Crunch Planner ממפה אותם למשרות MapReduce ומבצע אותם במידת הצורך.

לכן, כל צינור נתוני Crunch מתואם על ידי מופע של ה- צנרת מִמְשָׁק. ממשק זה מגדיר גם שיטות לקריאת נתונים לצינור דרך מָקוֹר מקרים וכתיבת נתונים מהצינור אל יַעַד מקרים.

יש לנו 3 ממשקים לייצוג נתונים:

  1. PCollection - אוסף אלמנטים מבוזר ובלתי משתנה
  2. PTable<>, ו> - מפת רב-מפתח של מקשים וערכים ללא שינוי, מבוזרת ולא מסודרת
  3. PGroupedTable<>, ו> - מפה מפוזרת וממוינת של מקשים מסוג K ל- ניתן לנידון וי שייתכן שיחזור פעם אחת בדיוק

DoFn הוא מחלקת הבסיס לכל פונקציות עיבוד הנתונים. זה תואם ל ממפה, מפחית ו קומבינר שיעורים ב- MapReduce. אנו משקיעים את רוב זמן הפיתוח בכתיבה ובדיקה של חישובים לוגיים באמצעותה.

עכשיו, כשאנחנו מכירים יותר את Crunch, בואו נשתמש בו לבניית היישום לדוגמא.

3. הקמת פרויקט קראנץ '

קודם כל, בואו נקים פרויקט קראנץ 'עם Maven. אנו יכולים לעשות זאת בשתי דרכים:

  1. הוסף את התלות הנדרשת ב pom.xml קובץ של פרויקט קיים
  2. השתמש בארכיטיפ כדי ליצור פרויקט התחלה

בואו נסתכל במהירות על שתי הגישות.

3.1. תלות Maven

על מנת להוסיף קראנץ 'לפרויקט קיים, בואו להוסיף את התלות הנדרשת ב- pom.xml קוֹבֶץ.

ראשית, בואו נוסיף את קראנץ-ליבה סִפְרִיָה:

 org.apache.crunch crunch-core 0.15.0 

לאחר מכן, בואו נוסיף את לקוח hadoop ספריה לתקשר עם Hadoop. אנו משתמשים בגרסה התואמת להתקנת Hadoop:

 org.apache.hadoop hadoop-client 2.2.0 סופק 

אנו יכולים לבדוק ב- Maven Central את הגרסאות העדכניות ביותר של ספריות crunch-core ו- hadoop-client.

3.2. ארב-טיפוס של מייבן

גישה אחרת היא ליצור במהירות פרויקט התחלה באמצעות ארכיטיפ Maven שמספק Crunch:

ארכיטיפ mvn: ליצור -Dfilter = org.apache.crunch: crunch-archetype 

כאשר תתבקש על ידי הפקודה הנ"ל, אנו מספקים את גרסת ה- Crunch ואת פרטי החפץ של הפרויקט.

4. התקנת צינור קראנץ '

לאחר הגדרת הפרויקט, עלינו ליצור צנרת לְהִתְנַגֵד. קראנץ 'כולל 3 צנרת יישומים:

  • MRPipeline - מבצעת בתוך Hadoop MapReduce
  • SparkPipeline - מבוצע כסדרה של צינורות Spark
  • MemPipeline - מבצע זיכרון בלקוח ושימושי לבדיקת יחידות

בדרך כלל אנו מפתחים ובודקים באמצעות מופע של MemPipeline. בהמשך אנו משתמשים במופע של MRPipeline אוֹ SparkPipeline לביצוע בפועל.

אם היינו זקוקים לצינור בזיכרון, נוכל להשתמש בשיטה הסטטית getInstance להשיג את MemPipeline למשל:

צינור צינור = MemPipeline.getInstance ();

אבל לעת עתה, בוא ניצור מופע של MRPipeline לביצוע היישום עם Hadoop:

צינור צינור = MRPipeline חדש (WordCount.class, getConf ());

5. קרא נתוני קלט

לאחר יצירת אובייקט הצינור, אנו רוצים לקרוא נתוני קלט. ה צנרת הממשק מספק שיטת נוחות לקריאת קלט מקובץ טקסט, readTextFile (pathName).

בואו נקרא לשיטה זו לקריאת קובץ הטקסט הקלט:

שורות PCollection = pipeline.readTextFile (inputPath);

הקוד שלעיל קורא את קובץ הטקסט כאוסף של חוּט.

כשלב הבא, בוא נכתוב מקרה מבחן לקריאת קלט:

@Test הציבור בטל givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead () {צינור צינור = MemPipeline.getInstance (); שורות PCollection = pipeline.readTextFile (INPUT_FILE_PATH); assertEquals (21, lines.asCollection () .getValue () .size ()); }

במבחן זה אנו מוודאים כי אנו מקבלים את מספר השורות הצפוי בעת קריאת קובץ טקסט.

6. שלבי עיבוד נתונים

לאחר קריאת נתוני הקלט, עלינו לעבד אותם. ממשק ה- API של Crunch מכיל מספר מחלקות משנה של DoFn לטיפול בתרחישים נפוצים של עיבוד נתונים:

  • FilterFn - מסנן חברים באוסף המבוסס על מצב בוליאני
  • MapFn - ממפה כל רשומת קלט לרשומת פלט אחת בדיוק
  • CombineFn - משלב מספר ערכים לערך יחיד
  • הצטרף ל- FN - מבצע הצטרפות כגון הצטרפות פנימית, צירוף חיצוני שמאלי, צירוף חיצוני ימני והצטרפות חיצונית מלאה

בוא נבצע את ההיגיון הבא של עיבוד הנתונים באמצעות מחלקות אלה:

  1. פצל כל שורה בקובץ הקלט למילים
  2. הסר את מילות העצירה
  3. ספרו את המילים הייחודיות

6.1. פצל שורה של טקסט למילים

קודם כל, בואו ניצור את טוקניזר בכיתה לפצל שורה למילים.

נאריך את DoFn מעמד. בכיתה זו יש שיטה מופשטת הנקראת תהליך. שיטה זו מעבדת את רשומות הקלט מא PCollection ושולח את הפלט ל- פולט.

עלינו ליישם את הלוגיקה המפצלת בשיטה זו:

מחלקה ציבורית Tokenizer מרחיב את DoFn {פרטית סופית ספליטר SPLITTER = ספליטר. OnPattern ("\ s +") .omitEmptyStrings (); @ ביטול תהליך הריק הציבורי (קו מחרוזת, פולט פולט) {עבור (מילה מחרוזת: SPLITTER.split (שורה)) {emitter.emit (מילה); }}} 

ביישום לעיל השתמשנו ב- ספליטר בכיתה מספריית גויאבה כדי לחלץ מילים משורה.

לאחר מכן, בוא נכתוב מבחן יחידה עבור ה- טוקניזר מעמד:

@RunWith (MockitoJUnitRunner.class) TokenizerUnitTest בכיתה ציבורית {@Mock emitter emitter emitter; @Test ציבורי בטל givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEitted () {ספליטר Tokenizer = Tokenizer חדש (); splitter.process ("שלום עולם", פולט); אמת (פולט) .emit ("שלום"); אמת (פולט) .emit ("עולם"); verifyNoMoreInteractions (פולט); }}

המבחן שלעיל מאמת שהמילים הנכונות מוחזרות.

לבסוף, בואו נחלק את השורות שנקראו מקובץ הטקסט הקלט באמצעות מחלקה זו.

ה parallelDo שיטה של PCollection הממשק מחיל את הנתון DoFn לכל האלמנטים ומחזיר חדש PCollection.

בואו נקרא לשיטה זו באוסף השורות ונעביר מופע של טוקניזר:

מילות PCollection = lines.parallelDo (Tokenizer חדש (), Writables.strings ()); 

כתוצאה מכך, אנו מקבלים את רשימת המילים בקובץ הטקסט הקלט. נסיר את מילות העצירה בשלב הבא.

6.2. הסר מילות עצירה

בדומה לשלב הקודם, בואו ליצור a StopWordFilter בכיתה לסינון מילות עצירה.

עם זאת, נאריך FilterFn במקום DoFn. FilterFn יש שיטה מופשטת הנקראת לְקַבֵּל. עלינו ליישם את לוגיקת הסינון בשיטה זו:

הכיתה הציבורית StopWordFilter מרחיבה את FilterFn {// מילות עצירה באנגלית, מושאלות מלוסין. סופי סטטי פרטי סט STOP_WORDS = ImmutableSet .copyOf (מחרוזת חדשה [] {"a", "ו-", "are", "as", "at", "be", "but", "by", "for" , "אם", "ב", "לתוך", "הוא", "זה", "לא", "לא", "של", "על", "או", "s", "כזה", " t "," that "," the "," their "," then "," there "," these "," they "," this "," to "," was "," will "," with " }); @Override קבל בוליאני ציבורי (מילה מחרוזת) {להחזיר! STOP_WORDS.contains (מילה); }}

לאחר מכן, בוא נכתוב את מבחן היחידה עבור StopWordFilter מעמד:

class class StopWordFilterUnitTest {@Test public void givenFilter_whenStopWordPassed_thenFalseReturned () {FilterFn filter = new StopWordFilter (); assertFalse (filter.accept ("the")); assertFalse (filter.accept ("a")); } @Test הציבור בטל givenFilter_whenNonStopWordPassed_thenTrueReturned () {FilterFn filter = חדש StopWordFilter (); assertTrue (filter.accept ("שלום")); assertTrue (filter.accept ("עולם"); } @ מבט בטל ציבורי givenWordCollection_whenFiltered_thenStopWordsRemoved () {PCollection words = MemPipeline .collectionOf ("This", "is", "a", "test", "משפט"); PCollection noStopWords = words.filter (StopWordFilter חדש ()); assertEquals (ImmutableList.of ("זה", "מבחן", "משפט"), Lists.newArrayList (noStopWords.materialize ())); }}

בדיקה זו מאמתת כי לוגיקת הסינון מתבצעת כהלכה.

לבסוף, בואו נשתמש StopWordFilter לסינון רשימת המילים שנוצרו בשלב הקודם. ה לְסַנֵן שיטה של PCollection הממשק מחיל את הנתון FilterFn לכל האלמנטים ומחזיר חדש PCollection.

בואו נקרא לשיטה זו באוסף המילים ונעביר מופע של StopWordFilter:

PCollection noStopWords = words.filter (חדש StopWordFilter ());

כתוצאה מכך, אנו מקבלים את אוסף המילים המסונן.

6.3. ספרו מילים ייחודיות

לאחר קבלת אוסף המילים המסונן, אנו רוצים לספור באיזו תדירות כל מילה מתרחשת. PCollection לממשק יש מספר שיטות לביצוע צבירות נפוצות:

  • דקה - מחזיר את האלמנט המינימלי באוסף
  • מקסימום - מחזיר את האלמנט המרבי באוסף
  • אורך - מחזיר את מספר האלמנטים באוסף
  • לספור - מחזירה א PTable המכיל את ספירת כל אלמנט ייחודי באוסף

בואו נשתמש ב- לספור שיטה להשיג את המילים הייחודיות יחד עם ספירתן:

// שיטת הספירה מפעילה סדרה של פרימיטיביים של קראנץ ומחזירה // מפה של המילים הייחודיות בקלט ה- PC אוסף לספירות שלהם. ספירת PTable = noStopWords.count ();

7. ציין פלט

כתוצאה מהשלבים הקודמים, יש לנו טבלת מילים וספירתם. אנו רוצים לכתוב את התוצאה לקובץ טקסט. ה צנרת הממשק מספק שיטות נוחות לכתיבת פלט:

ריק לכתוב (אוסף PCollection, יעד יעד); בטל כתיבה (אוסף PCollection, יעד יעד, Target.WriteMode writeMode); בטל writeTextFile (אוסף PCollection, שם מחרוזת);

לכן, בואו נקרא writeTextFile שיטה:

pipeline.writeTextFile (ספירות, outputPath); 

8. נהל את ביצוע הצינור

כל השלבים עד כה הגדירו את צינור הנתונים. שום קלט לא נקרא או מעובד. זה בגלל ש קראנץ משתמש במודל ביצוע עצלן.

הוא לא מפעיל את עבודות MapReduce עד שמופעלת על ממשק הצינור שיטה השולטת בתכנון וביצוע העבודה.

  • לָרוּץ - מכין תוכנית ביצוע ליצירת התפוקות הנדרשות ואז מבצע אותה באופן סינכרוני
  • בוצע - מריץ את כל העבודות שנותרו לצורך יצירת תפוקות ואז מנקה את כל קבצי הנתונים הבינוניים שנוצרו
  • runAsync - דומה לשיטת הריצה, אך מבוצע באופן שאינו חוסם

לכן, בואו נקרא בוצע שיטה לביצוע הצינור כעבודות MapReduce:

תוצאת PipelineResult = pipeline.done (); 

ההצהרה לעיל מריצה את עבודות MapReduce לקריאת קלט, עיבודן וכתיבת התוצאה לספריית הפלט.

9. הרכבת הצינור יחד

עד כה פיתחנו ובדיקנו יחידה את ההיגיון לקריאת נתוני קלט, עיבודם וכתיבה לקובץ הפלט.

לאחר מכן, בואו נרכיב אותם לבניית צינור הנתונים כולו:

ריצה אינטנסיבית ציבורית (String [] args) זורקת Exception {String inputPath = args [0]; מחרוזת outputPath = args [1]; // צור אובייקט לתיאום יצירת וביצוע צינורות. צינור צינור = MRPipeline חדש (WordCount.class, getConf ()); // התייחס לקובץ טקסט נתון כאוסף של מיתרים. שורות PCollection = pipeline.readTextFile (inputPath); // הגדר פונקציה המפצלת כל שורה באוסף PC של מחרוזות ל- // אוסף PC המורכב מהמילים הבודדות בקובץ. // הטיעון השני קובע את פורמט הסידור. מילות PCollection = lines.parallelDo (Tokenizer חדש (), Writables.strings ()); // קח את אוסף המילים והסר מילות עצירה ידועות. PCollection noStopWords = words.filter (StopWordFilter חדש ()); // שיטת הספירה מפעילה סדרה של פרימיטיביים של קראנץ ומחזירה // מפה של המילים הייחודיות בקלט ה- PC אוסף לספירות שלהם. ספירת PTable = noStopWords.count (); // הנח את הצינור לכתוב את ספירות שהתקבלו לקובץ טקסט. pipeline.writeTextFile (ספירות, outputPath); // בצע את הצינור כ- MapReduce. תוצאת PipelineResult = pipeline.done (); תוצאת החזרה. הצליח ()? 0: 1; }

10. תצורת ההשקה של Hadoop

צינור הנתונים מוכן.

עם זאת, אנו זקוקים לקוד כדי להפעיל אותו. לכן, בואו נכתוב את רָאשִׁי שיטה להפעלת היישום:

מעמד ציבורי WordCount מרחיב כלי יישום מוגדר {ציבורי ריק ריק סטטי (מחרוזת [] טענות) זורק חריג {ToolRunner.run (תצורה חדשה (), WordCount חדש (), טענות); }

ToolRunner.run מנתח את תצורת Hadoop משורת הפקודה ומבצע את עבודת MapReduce.

11. הפעל את היישום

היישום המלא מוכן כעת. בוא נפעיל את הפקודה הבאה כדי לבנות אותה:

חבילת mvn 

כתוצאה מהפקודה לעיל, אנו מקבלים את היישום הארוז וצנצנת עבודה מיוחדת בספריית היעד.

בואו נשתמש בצנצנת העבודה הזו לביצוע היישום ב- Hadoop:

מטרת צנצנת hadoop / crunch-1.0-SNAPSHOT-job.jar 

היישום קורא את קובץ הקלט וכותב את התוצאה לקובץ הפלט. קובץ הפלט מכיל מילים ייחודיות יחד עם ספירתם בדומה לבאות:

[הוסף, 1] [נוסף, 1] [הערצה, 1] [מודה, 1] [קצבה, 1]

בנוסף ל- Hadoop, אנו יכולים להריץ את היישום בתוך IDE, כיישום עצמאי או כמבחני יחידות.

12. מסקנה

במדריך זה, יצרנו יישום לעיבוד נתונים הפועל ב- MapReduce. Apache Crunch מקל על כתיבה, בדיקה וביצוע של צינורות MapReduce ב- Java.

כרגיל, ניתן למצוא את קוד המקור המלא ב- Github.