מבוא ל- Apache Flink עם Java

1. סקירה כללית

אפאצ'י פלינק היא מסגרת לעיבוד Big Data המאפשרת למתכנתים לעבד את כמות הנתונים העצומה בצורה יעילה ומדרגית מאוד.

במאמר זה נציג כמה מה- מושגי API מרכזיים ושינוי נתונים סטנדרטי הזמינים ב- אפאצ'י פלינק ממשק API של Java. הסגנון הרהוט של ה- API הזה מקל על העבודה עם המבנה המרכזי של פלינק - האוסף המבוזר.

ראשית, נסתכל על זה של פלינק DataSet טרנספורמציות API ושימוש בהן ליישום תוכנית לספירת מילים. ואז נסתכל בקצרה על זה של פלינק זרם נתונים ממשק API המאפשר לעבד זרמי אירועים בצורה אמתית.

2. תלות של Maven

כדי להתחיל נצטרך להוסיף תלות ב- Maven ל flink-java ו כלים לבדיקת הבהוב ספריות:

 org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. מושגי ליבה API

כשעובדים עם Flink, עלינו לדעת כמה דברים שקשורים ל- API שלו:

  • כל תוכנית Flink מבצעת שינויים באוספי נתונים מבוזרים. ניתנים פונקציות מגוונות לשינוי נתונים, כולל סינון, מיפוי, הצטרפות, קיבוץ וצבירה
  • א כִּיוֹר פעולה ב Flink מפעילה ביצוע זרם כדי לייצר את התוצאה הרצויה של התוכנית, כגון שמירת התוצאה במערכת הקבצים או הדפסתה בפלט הסטנדרטי
  • טרנספורמציות פלינק עצלות, כלומר אינן מבוצעות עד א כִּיוֹר הפעולה מופעלת
  • ה- Apache Flink API תומך בשני מצבי פעולה - אצווה ובזמן אמת. אם אתה מתמודד עם מקור נתונים מוגבל שניתן לעבד במצב אצווה, תשתמש ב- DataSet ממשק API. אם תרצה לעבד זרמי נתונים בלתי מוגבלים בזמן אמת, תצטרך להשתמש ב- זרם נתונים ממשק API

4. טרנספורמציות API של DataSet

נקודת הכניסה לתוכנית Flink היא מופע של ה- ביצוע סביבה class - זה מגדיר את ההקשר שבו מתבצעת תוכנית.

בואו ניצור ביצוע סביבה כדי להתחיל בעיבוד שלנו:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();

שים לב שכאשר אתה מפעיל את היישום במחשב המקומי, הוא יבצע עיבוד ב- JVM המקומי. אם תרצה להתחיל לעבד באשכול של מכונות, יהיה עליך להתקין את Apache Flink במכונות אלה ולהגדיר את תצורת ביצוע סביבה בהתאם לכך.

4.1. יצירת מערך נתונים

כדי להתחיל לבצע טרנספורמציות נתונים, עלינו לספק לתוכנית שלנו את הנתונים.

בוא ניצור מופע של ה- DataSet בשיעור באמצעות שלנו ביצוע סביבה:

כמויות DataSet = env.fromElements (1, 29, 40, 50);

אתה יכול ליצור DataSet ממקורות רבים, כגון אפאצ'י קפקא, קובץ CSV, קובץ או כמעט כל מקור נתונים אחר.

4.2. מסננים ומצמצמים

ברגע שאתה יוצר מופע של ה- DataSet בכיתה, אתה יכול להחיל עליה שינויים.

נניח שברצונך לסנן מספרים הנמצאים מעל סף מסוים ולסכם את כולם בהמשך. אתה יכול להשתמש ב- לְסַנֵן() ו לְהַפחִית() טרנספורמציות כדי להשיג זאת:

סף int = 30; רשימה collect = כמויות .filter (a -> a> סף) .פחת ((מספר שלם, t1) -> מספר שלם + t1) .collect (); assertThat (collect.get (0)). isEqualTo (90); 

שים לב שה- לאסוף() השיטה היא א כִּיוֹר פעולה שמפעילה את תמורות הנתונים בפועל.

4.3. מַפָּה

בוא נגיד שיש לך DataSet שֶׁל אדם חפצים:

כיתה סטטית פרטית אדם {גיל פרטי; שם מחרוזת פרטי; // בונים סטנדרטיים / גטרים / סטרים}

לאחר מכן, בואו ליצור a DataSet של האובייקטים האלה:

DataSet personDataSource = env.fromCollection (Arrays.asList (אדם חדש (23, "טום"), אדם חדש (75, "מייקל"));

נניח שאתה רוצה לחלץ רק את גיל שדה מכל אובייקט באוסף. אתה יכול להשתמש ב- מַפָּה() טרנספורמציה כדי להשיג רק שדה ספציפי של אדם מעמד:

רשימת גילאים = personDataSource .map (p -> p.age) .collect (); assertThat (גילאים) .hasSize (2); טוען כי (גילאים) מכיל (23, 75);

4.4. לְהִצְטַרֵף

כשיש לך שני מערכי נתונים, ייתכן שתרצה להצטרף אליהם בכמה מהם תְעוּדַת זֶהוּת שדה. לשם כך, אתה יכול להשתמש ב- לְהִצְטַרֵף() טרנספורמציה.

בואו ליצור אוספים של עסקאות וכתובות של משתמש:

כתובת Tuple3 = Tuple3 חדש (1, "השדרה החמישית", "לונדון"); DataSet כתובות = env.fromElements (כתובת); Tuple2 firstTransaction = חדש Tuple2 (1, "Transaction_1"); DataSet עסקאות = env.fromElements (FirstTransaction, Tuple2 חדש (12, "Transaction_2")); 

התחום הראשון בשני צמרות הכיסים הוא של מספר שלם סוג, וזהו תְעוּדַת זֶהוּת שדה עליו אנו רוצים להצטרף לשני מערכי הנתונים.

כדי לבצע את הגיון ההצטרפות בפועל, עלינו ליישם a KeySelector ממשק לכתובת ועסקה:

מחלקה סטטית פרטית IdKeySelectorTransaction מיישמת את KeySelector {@ ביטול שלם ציבורי שלם getKey (ערך Tuple2) {return value.f0; }} מחלקה סטטית פרטית IdKeySelectorAddress מיישמת את KeySelector {@ ביטול שלם ציבורי שלם getKey (ערך Tuple3) {return value.f0; }}

כל בורר מחזיר רק את השדה בו יש לבצע את ההצטרפות.

למרבה הצער, לא ניתן להשתמש בביטויי למבדה כאן מכיוון ש Flink זקוק למידע מסוג כללי.

לאחר מכן, בואו נשתמש בהיגיון מיזוג באמצעות אותם בוחרים:

רשימה<>> הצטרף = עסקאות.צטרף (כתובות). איפה (IdKeySelectorTransaction חדש ()) .equalTo (IdKeySelectorAddress חדש ()) .collect (); assertThat (הצטרף) .hasSize (1); assertThat (הצטרף). מכיל (Tuple2 חדש (FirstTransaction, כתובת)); 

4.5. סוג

בואו נגיד שיש לכם את האוסף הבא של Tuple2:

Tuple2 secondPerson = Tuple2 חדש (4, "טום"); Tuple2 thirdPerson = Tuple2 חדש (5, "סקוט"); Tuple2 quarterPerson = Tuple2 חדש (200, "מייקל"); Tuple2 firstPerson = Tuple2 חדש (1, "ג'ק"); DataSet עסקאות = env.fromElements (4thPerson, secondPerson, thirdPerson, firstPerson); 

אם ברצונך למיין אוסף זה לפי השדה הראשון של הטופל, תוכל להשתמש ב sortPartitions () טרנספורמציה:

רשימה מיון = עסקאות .sortPartition (IdKeySelectorTransaction חדש (), Order.ASCENDING) .collect (); assertThat (מיון). contains בדיוק (firstPerson, secondPerson, thirdPerson, 4thPerson);

5. ספירת מילים

בעיית ספירת המילים היא בעיה המשמשת בדרך כלל כדי להציג את היכולות של מסגרות עיבוד Big Data. הפתרון הבסיסי כולל ספירת התרחשויות מילים בקלט טקסט. בואו נשתמש ב- Flink כדי ליישם פתרון לבעיה זו.

כצעד הראשון בפתרון שלנו, אנו יוצרים a LineSplitter מחלקה המחלקת את הקלט שלנו לאסימונים (מילים), אוספת עבור כל אסימון a Tuple2 של זוגות ערכי מפתח. בכל אחד מכפילות אלה, המפתח הוא מילה המצויה בטקסט, והערך הוא המספר השלם (1).

כיתה זו מיישמת את FlatMapFunction ממשק שלוקח חוּט כקלט ומייצר a Tuple2:

LineSplitter בכיתה ציבורית מיישם FlatMapFunction {@ עקוף על FlatMap של הריק הציבורי (ערך מחרוזת, אספן out) {Stream.of (value.toLowerCase (). split ("\ W +")) .filter (t -> t.length ()> 0) .forEach (token -> out.collect (Tuple2 new (token) , 1))); }}

אנו קוראים לאסוף() שיטה על אַסְפָן בכיתה לדחוף נתונים קדימה בצינור העיבוד.

הצעד הבא והאחרון שלנו הוא לקבץ את הכדולים לפי האלמנטים הראשונים שלהם (מילים) ואז לבצע a סְכוּם צברו את האלמנטים השניים כדי לייצר ספירה של המופעים במילה:

סטטי נתונים סטטי ציבורי startWordCount (ExecutionEnvironment env, שורות רשימה) זורק Exception {DataSet text = env.fromCollection (שורות); return text.flatMap (LineSplitter new ()) .groupBy (0) .aggregate (Aggregations.SUM, 1); }

אנו משתמשים בשלושה סוגים של טרנספורמציות Flink: flatMap (), groupBy (), ו לְקַבֵּץ().

בואו נכתוב מבחן כדי לקבוע כי יישום ספירת המילים עובד כצפוי:

רשימת שורות = Arrays.asList ("זהו משפט ראשון", "זהו משפט שני עם מילה אחת"); DataSet תוצאה = WordCount.startWordCount (env, שורות); רשימה collect = result.collect (); assertThat (collect) .containsExactlyInAnyOrder (Tuple2 חדש ("a", 3), Tuple2 חדש ("משפט", 2), Tuple2 חדש ("מילה", 1), Tuple2 חדש ("הוא", 2), Tuple2 חדש ( "זה", 2), חדש Tuple2 ("שני", 1), חדש Tuple2 ("ראשון", 1), חדש Tuple2 ("עם", 1), חדש Tuple2 ("אחד", 1));

6. ממשק API של DataStream

6.1. יצירת DataStream

Apache Flink תומך גם בעיבוד זרמי אירועים באמצעות ה- API של DataStream. אם אנו רוצים להתחיל לצרוך אירועים, ראשית עלינו להשתמש ב- StreamExecutionEnvironment מעמד:

StreamExecutionEnvironment expressionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ();

בשלב הבא נוכל ליצור זרם אירועים באמצעות ה- הסביבה ממגוון מקורות. זה יכול להיות איזה אוטובוס הודעות כמו אפאצ'י קפקא, אך בדוגמה זו, ניצור מקור מכמה אלמנטים מחרוזתיים:

DataStream dataStream = executEnvironment.fromElements ("זהו משפט ראשון", "זהו משפט שני עם מילה אחת");

אנו יכולים להחיל טרנספורמציות על כל אלמנט זרם נתונים כמו בנורמלי DataSet מעמד:

SingleOutputStreamOperator upperCase = text.map (מחרוזת :: toUpperCase);

כדי להפעיל את הביצוע, עלינו להפעיל פעולת כיור כגון הדפס() שרק ידפיס את התוצאה של טרנספורמציות לפלט הסטנדרטי, לאחר מכן עם לבצע() שיטה על StreamExecutionEnvironment מעמד:

upperCase.print (); env.execute ();

זה יפיק את התפוקה הבאה:

1> זהו משפט ראשון 2> זהו משפט שני עם מילה אחת

6.2. חלילה של אירועים

בעת עיבוד זרם אירועים בזמן אמת, ייתכן שתצטרך לקבץ אירועים יחד ולהחיל חישוב כלשהו על חלון של אירועים אלה.

נניח שיש לנו זרם אירועים, כאשר כל אירוע הוא זוג המורכב ממספר האירוע וחותמת הזמן בה נשלח האירוע למערכת שלנו, ושאנחנו יכולים לסבול אירועים שאינם בסדר, אך רק אם הם לא באיחור של יותר מעשרים שניות.

לדוגמא זו, בואו ניצור תחילה זרם המדמה שני אירועים הנמצאים בהפרש של מספר דקות ונגדיר חולץ חותמת זמן המציין את סף האיחור שלנו:

SingleOutputStreamOperator חלון = env.fromElements (חדש Tuple2 (16, ZonedDateTime.now (). plusMinutes (25) .toInstant (). getEpochSecond ()), חדש Tuple2 (15, ZonedDateTime.now (). plusMinutes (2). toInstant () .getEpochSecond ())). להקצות חותמות זמן וימני מים (BoundedOutOfOrdernessTimestampExtractor חדש (Time.seconds (20)) {@Override public long extractTimestamp (Tuple2 element) {return element.f1 * 1000; }});

לאחר מכן, נגדיר פעולת חלון כדי לקבץ את האירועים שלנו לחלונות של חמש שניות ולהחיל טרנספורמציה על אותם אירועים:

SingleOutputStreamOperator מופחת = חלון. windowsAll (TumblingEventTimeWindows.of (זמן.שניות (5))) .maxBy (0, נכון); מופחת.דפוס ();

זה יקבל את האלמנט האחרון של כל חלון של חמש שניות, כך שהוא מודפס:

1> (15,1491221519)

שים לב שאיננו רואים את האירוע השני מכיוון שהוא הגיע מאוחר יותר מסף האיחור שצוין.

7. מסקנה

במאמר זה הצגנו את מסגרת ה- Apache Flink ובחנו כמה מהטרנספורמציות שסופקו עם ה- API שלה.

יישמנו תוכנית לספירת מילים באמצעות ה- DataSet API השוטף והפונקציונלי של Flink. ואז הסתכלנו על ה- API של DataStream והטמענו מהפך פשוט בזמן אמת בזרם אירועים.

ניתן למצוא את היישום של כל הדוגמאות וקטעי הקוד ב- GitHub - זהו פרויקט של Maven, כך שיהיה קל לייבא ולהפעיל אותו כפי שהוא.


$config[zx-auto] not found$config[zx-overlay] not found