Apache Spark: הבדלים בין מסגרות נתונים, מערכי נתונים ו- RDD

1. סקירה כללית

Apache Spark היא מערכת לעיבוד נתונים מהירה ומופצת. הוא מבצע עיבוד נתונים בזיכרון ומשתמש במטמון בזיכרון ובביצוע אופטימלי וכתוצאה מכך ביצועים מהירים. הוא מספק ממשקי API ברמה גבוהה לשפות תכנות פופולריות כמו Scala, Python, Java ו- R.

במדריך מהיר זה, נעבור על שלושה מהמושגים הבסיסיים של Spark: מסגרות נתונים, מערכי נתונים ו- RDD.

2. DataFrame

Spark SQL הציג הפשטת נתונים בטבלאות הנקראת DataFrame מאז Spark 1.3. מאז, זה הפך לאחד המאפיינים החשובים ביותר בספארק. ממשק API זה שימושי כאשר אנו רוצים לטפל בנתונים מבוזרים, מובנים למחצה, ומופצים.

בחלק 3 נדון במערכי נתונים מבוזרים גמישים (RDD). DataFrames מאחסנים נתונים בצורה יעילה יותר מאשר RDD, זאת מכיוון שהם משתמשים ביכולות הבלתי ניתנות לשינוי, בזיכרון, בגמישות, בהפצה ובמקביל של RDD, אך הם גם מיישמים סכמה על הנתונים. DataFrames מתרגם גם קוד SQL לפעולות RDD ברמה נמוכה ממוטבת.

אנו יכולים ליצור DataFrames בשלוש דרכים:

  • המרת RDD קיימים
  • הפעלת שאילתות SQL
  • טוען נתונים חיצוניים

צוות ניצוצות הציג SparkSession בגירסה 2.0, זה מאחד את כל ההקשרים השונים ומבטיח למפתחים שלא יהיה צורך לדאוג ליצירת הקשרים שונים:

מפגש SparkSession = SparkSession.builder () .appName ("TouristDataFrameExample") .master ("local [*]") .getOrCreate (); DataFrameReader dataFrameReader = session.read ();

ננתח את Tourist.csv קוֹבֶץ:

נתוני מערך נתונים = dataFrameReader.option ("כותרת", "נכון") .csv ("data / Tourist.csv");

מאז ש- Spark 2.0 DataFrame הפך ל- מערך נתונים מהסוג שׁוּרָהכדי שנוכל להשתמש ב- DataFrame ככינוי ל- a מערך נתונים.

אנו יכולים לבחור עמודות ספציפיות בהן אנו מעוניינים. נוכל גם לסנן ולקבץ לפי עמודה נתונה:

data.select (col ("country"), col ("year"), col ("value")) .show (); data.filter (col ("country"). equalTo ("Mexico")) .show (); data.groupBy (col ("country")) .count () .show ();

3. מערכי נתונים

מערך נתונים הוא קבוצה של נתונים מובנים עם הקלדה חזקה. הם מספקים את סגנון התכנות מוכר האובייקטים בתוספת היתרונות של בטיחות סוג מכיוון שמערכי נתונים יכולים לבדוק תחביר ולתפוס שגיאות בזמן הידור.

מערך נתונים הוא הרחבה של DataFrame, ולכן אנו יכולים לראות ב- DataFrame תצוגה לא מוקפדת של מערך נתונים.

צוות ניצוץ שחרר את מערך נתונים API ב- Spark 1.6 וכפי שהזכירו: "המטרה של Spark Datasets היא לספק API המאפשר למשתמשים לבטא בקלות טרנספורמציות בתחומי אובייקט, תוך מתן יתרונות ביצועים וחוסן של מנוע ביצוע Spark SQL".

ראשית, נצטרך ליצור מחלקה מסוג תיירות נתונים:

מעמד ציבורי TouristData {אזור מחרוזות פרטי; מדינה מחרוזת פרטית; שנת מחרוזת פרטית; סדרת מיתרים פרטית; ערך כפול פרטי; הערות שוליים פרטיות של מחרוזת; מקור מחרוזת פרטי; // ... גטרים וקובעים}

כדי למפות כל אחת מהרשומות שלנו לסוג שצוין נצטרך להשתמש במקודד. קודנים מתורגמים בין אובייקטים של Java לפורמט הבינארי הפנימי של Spark:

// אתחול SparkSession וטעינת נתונים responseWithSelectedColumn = data.select (col ("region"), col ("country"), col ("year"), col ("series"), col ("value"). ("כפול"), col ("הערות שוליים"), col ("מקור")); מערך נתונים typedDataset = responseWithSelectedColumns .as (Encoders.bean (TouristData.class));

כמו ב- DataFrame, אנו יכולים לסנן ולקבץ לפי עמודות ספציפיות:

typedDataset.filter ((FilterFunction) record -> record.getCountry () .equals ("Norway")) .show (); typedDataset.groupBy (typedDataset.col ("מדינה")) .count () .show ();

אנו יכולים גם לבצע פעולות כמו לסנן לפי עמודה שתואמת טווח מסוים או מחשבים את סכום העמודה הספציפית, כדי לקבל את הערך הכולל שלה:

typedDataset.filter ((FilterFunction) record -> record.getYear ()! = null && (Long.valueOf (record.getYear ())> 2010 && Long.valueOf (record.getYear ()) record.getValue ()! = null && record.getSeries () .contains ("הוצאה")) .groupBy ("country") .agg (sum ("value")) .show ();

4. RDD

מערך הנתונים המבוסס החוסן או RDD הוא הפשטת התכנות העיקרית של Spark. הוא מייצג אוסף של אלמנטים כלומר: בלתי ניתנים לשינוי, גמישים ומופצים.

RDD מקפל מערך נתונים גדול, Spark יפיץ אוטומטית את הנתונים הכלולים ב- RDD באשכול שלנו ויקביל את הפעולות שאנו מבצעים עליהם..

אנו יכולים ליצור RDDs רק באמצעות פעולות של נתונים באחסון יציב או פעולות על RDDs אחרים.

סובלנות תקלות חיונית כאשר אנו מתמודדים עם קבוצות נתונים גדולות והנתונים מופצים במכונות אשכול. RDDs הם גמישים בגלל מכניקת התאוששות התקלות המובנית של Spark. Spark מסתמך על העובדה ש- RDD משנן כיצד הם נוצרו כך שנוכל להתחקות אחר השושלת כדי לשחזר את המחיצה.

ישנם שני סוגים של פעולות שאנו יכולים לבצע על RDD: טרנספורמציות ופעולות.

4.1. טרנספורמציות

אנו יכולים להחיל טרנספורמציות על RDD כדי לתפעל את הנתונים שלו. לאחר ביצוע מניפולציה זו, נקבל RDD חדש לגמרי, מכיוון ש RDD הם אובייקטים בלתי ניתנים לשינוי.

נבדוק כיצד ליישם את Map and Filter, שניים מהתמורות הנפוצות ביותר.

ראשית, עלינו ליצור JavaSparkContext וטען את הנתונים כ- RDD מה- Tourist.csv קוֹבֶץ:

SparkConf conf = חדש SparkConf (). SetAppName ("uppercaseCountries") .setMaster ("local [*]"); JavaSparkContext sc = JavaSparkContext חדש (conf); תיירי JavaRDD = sc.textFile ("data / Tourist.csv");

לאחר מכן, בוא נשתמש בפונקציית המפה כדי לקבל את שם המדינה מכל רשומה ולהמיר את השם באותיות רישיות. אנו יכולים לשמור את מערך הנתונים החדש שנוצר כקובץ טקסט בדיסק:

JavaRDD upperCaseCountries = tourist.map (שורה -> {מחרוזת [] עמודות = line.split (COMMA_DELIMITER); העמודות החוזרות [1] .toUpperCase ();}). נבדל (); upperCaseCountries.saveAsTextFile ("data / output / uppercase.txt");

אם אנו רוצים לבחור רק מדינה ספציפית, אנו יכולים להחיל את פונקציית המסנן על RDD המקורי שלנו:

JavaRDD touristInMexico = תיירים. פילטר (קו -> line.split (COMMA_DELIMITER) [1]. שווה ("מקסיקו")); touristInMexico.saveAsTextFile ("נתונים / פלט / touristInMexico.txt");

4.2. פעולות

פעולות יחזירו ערך סופי או ישמרו את התוצאות בדיסק לאחר ביצוע חישוב מסוים על הנתונים.

שתיים מהפעולות שנמצאות בשימוש חוזר ב- Spark הן ספירה וצמצום.

בואו ונמנה את סך כל המדינות בקובץ ה- CSV שלנו:

// אתחול ההקשר של ניצוצות ועומס הנתונים על מדינות JavaRDD = toerists.map (שורה -> {מחרוזת [] עמודות = line.split (COMMA_DELIMITER); עמודות החזרה [1];}). נבדל (); NumberOfCountries ארוך = country.count ();

כעת נחשב את ההוצאה הכוללת לפי מדינה. נצטרך לסנן את הרשומות המכילות הוצאות בתיאור שלהן.

במקום להשתמש ב- JavaRDD, נשתמש ב- JavaPairRDD. זוג RDD הוא סוג של RDD שיכול לאחסן זוגות עם ערך מפתח. בואו נבדוק זאת בהמשך:

JavaRDD touristExpenditure = תיירים. פילטר (קו -> line.split (COMMA_DELIMITER) [3]. מכיל ("הוצאה")); JavaPairRDD spendPairRdd = touristExpenditure .mapToPair (שורה -> {מחרוזת [] עמודות = line.split (COMMA_DELIMITER); להחזיר Tuple2 חדש (עמודות [1], Double.valueOf (עמודות [6]));}); רשימה totalByCountry = spendPairRdd .reduceByKey ((x, y) -> x + y) .collect ();

5. מסקנה

לסיכום, עלינו להשתמש ב- DataFrames או במערכי נתונים כאשר אנו זקוקים לממשקי API ספציפיים לתחום, אנו זקוקים לביטויים ברמה גבוהה כגון שאילתות צבירה, סכום או SQL. או כשאנחנו רוצים בטיחות סוג בזמן הקומפילציה.

מצד שני, עלינו להשתמש ב- RDD כאשר הנתונים אינם מובנים ואיננו צריכים ליישם סכמה ספציפית או כאשר אנו זקוקים לשינויים ופעולות ברמה נמוכה.

כמו תמיד, כל דוגמאות הקוד זמינות ב- GitHub.


$config[zx-auto] not found$config[zx-overlay] not found