מבוא לסערת אפאצ'י

1. סקירה כללית

מדריך זה יהיה מבוא לסערת אפאצ'י, מערכת חישוב מבוזרת בזמן אמת.

אנו נתמקד ונכסה:

  • מה זה בעצם אפאצ'י סטורם ואילו בעיות זה פותר
  • הארכיטקטורה שלה, ו
  • כיצד להשתמש בו בפרויקט

2. מה זה אפאצ'י סטורם?

אפאצ'י סטורם היא מערכת בחינם ומופצת בקוד פתוח לחישובים בזמן אמת.

הוא מספק סובלנות תקלות, יכולת הרחבה ומבטיח עיבוד נתונים, והוא טוב במיוחד בעיבוד זרמי נתונים בלתי מוגבלים.

כמה מקרי שימוש טובים ב- Storm יכולים להיות עיבוד פעולות של כרטיסי אשראי לצורך גילוי הונאה או עיבוד נתונים מבתים חכמים כדי לאתר חיישנים פגומים.

סטורם מאפשר שילוב עם מאגרי מידע ומערכות תור שונות הקיימים בשוק.

3. תלות של Maven

לפני שנשתמש ב- Apache Storm, עלינו לכלול את התלות בליבת הסערה בפרויקט שלנו:

 org.apache.storm ליבת סערה 1.2.2 מסופק 

עלינו להשתמש רק ב- היקף מסופק אם אנו מתכוונים להריץ את היישום שלנו באשכול הסערה.

כדי להריץ את היישום באופן מקומי, אנו יכולים להשתמש במצב שנקרא מקומי אשר ידמה את אשכול הסערה בתהליך מקומי, במקרה כזה עלינו להסיר את בתנאי.

4. מודל נתונים

מודל הנתונים של אפאצ'י סטורם מורכב משני אלמנטים: כפולות וזרמים.

4.1. טופל

א טופל היא רשימה מסודרת של שדות בשם עם סוגים דינמיים. פירוש הדבר שאיננו צריכים להכריז במפורש על סוגי השדות.

סטורם צריך לדעת כיצד לסדר את כל הערכים המשמשים לטפל. כברירת מחדל, זה כבר יכול לסדר סוגים פרימיטיביים, מיתרים ו בתים מערכים.

ומכיוון שסטורם משתמש בסידור קריו, עלינו לרשום את הסידור באמצעות תצורה להשתמש בסוגים המותאמים אישית. אנו יכולים לעשות זאת באחת משתי דרכים:

ראשית, אנו יכולים לרשום את הכיתה לסידור באמצעות שמו המלא:

Config config = Config new (); config.registerSerialization (User.class);

במקרה כזה, קריו יסדר את הכיתה באמצעות סדרה FieldSerializer. כברירת מחדל, זה יסדיר את כל השדות שאינם חולפים בכיתה, פרטיים וציבוריים.

או במקום זאת, אנו יכולים לספק גם את המחלקה לסידור וגם את הסידור שאנו רוצים שסטורם ישתמש בכיתה זו:

Config config = Config new (); config.registerSerialization (User.class, UserSerializer.class);

כדי ליצור את הסידור המותאם אישית, עלינו להרחיב את המחלקה הגנרית Serializer שיש לו שתי שיטות לִכתוֹב ו לקרוא.

4.2. זרם

א זרם הוא הפשטת הליבה במערכת האקולוגית סופה. ה זרם הוא רצף של גבולות ללא גבולות.

סופות מאפשר עיבוד מספר זרמים במקביל.

לכל זרם יש מזהה שמסופק ומוקצה במהלך ההצהרה.

5. טופולוגיה

ההיגיון של אפליקציית Storm בזמן אמת ארוז בטופולוגיה. הטופולוגיה מורכבת מ פיות ו ברגים.

5.1. זַרבּוּבִית

זרבובים הם מקורות הזרמים. הם פולטים טופלים לטופולוגיה.

ניתן לקרוא צינורות ממערכות חיצוניות שונות כמו קפקא, Kestrel או ActiveMQ.

זרבובים יכולים להיות אָמִין אוֹ לֹא מְהֵימָן. אָמִין פירושו שהזרבוב יכול להשיב כי הכיסוי שלא הצליח לעבד אותו על ידי סופה. לֹא מְהֵימָן פירושו שהזרבוב אינו עונה מכיוון שהוא עומד להשתמש במנגנון אש-ושכח כדי לפלוט את הצינורות.

כדי ליצור את הזרבובית המותאמת אישית, עלינו ליישם את ה- IRichSpout ממשק או הרחב כל מחלקה שכבר מיישמת את הממשק, למשל תקציר BaseRichSpout מעמד.

בואו ניצור לֹא מְהֵימָן זַרבּוּבִית:

מחלקה ציבורית RandomIntSpout מרחיב את BaseRichSpout {אקראי פרטי אקראי; פרטי SpoutOutputCollector outputCollector; @ ביטול חלל ציבורי פתוח (מפת מפה, טופולוגיהקונטקסט טופולוגיהקונטקסט, SpoutOutputCollector spoutOutputCollector) {אקראי = אקראי חדש (); outputCollector = spoutOutputCollector; } @ ביטול ציבורי בטל nextTuple () {Utils.sleep (1000); outputCollector.emit (ערכים חדשים (random.nextInt (), System.currentTimeMillis ())); } @ ביטול חלל ציבורי להכריז על OutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (שדות חדשים ("randomInt", "חותמת זמן")); }}

המנהג שלנו אקראי יפיק מספר שני שלם אקראי וחותמת זמן בכל שנייה.

5.2. בְּרִיחַ

ברגים מעבדים צינורות בזרם. הם יכולים לבצע פעולות שונות כמו סינון, צבירות או פונקציות מותאמות אישית.

פעולות מסוימות דורשות מספר שלבים, ולכן נצטרך להשתמש במספר ברגים במקרים כאלה.

כדי ליצור את המנהג בְּרִיחַ, אנחנו צריכים ליישם IRichBolt או לפעולות פשוטות יותר IBasicBolt מִמְשָׁק.

ישנם גם שיעורי עוזר מרובים זמינים ליישום בְּרִיחַ. במקרה זה נשתמש BaseBasicBolt:

Class public PrintingBolt מרחיב את BaseBasicBolt {@Override public void execut (Tuple tuple, BasicOutputCollector basicOutputCollector) {System.out.println (tuple); } @ Override בטל פומבי להכריז על OutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {}}

מנהג זה PrintingBolt פשוט ידפיס את כל הכדורים למסוף.

6. יצירת טופולוגיה פשוטה

בואו נרכיב את הרעיונות הללו לטופולוגיה פשוטה. לטופולוגיה שלנו יש זרבובית אחת ושלושה ברגים.

6.1. RandomNumberSpout

בהתחלה ניצור זרבובית לא אמינה. זה ייצור מספרים שלמים אקראיים מהטווח (0,100) בכל שנייה:

מחלקה ציבורית RandomNumberSpout מרחיב את BaseRichSpout {אקראי אקראי פרטי; אספן פרטי של SpoutOutputCollector; @ ביטול חלל ציבורי פתוח (מפת מפה, טופולוגיהקונטקסט טופולוגיהקונטקסט, SpoutOutputCollector spoutOutputCollector) {אקראי = אקראי חדש (); אספן = spoutOutputCollector; } @ ביטול ציבורי בטל nextTuple () {Utils.sleep (1000); פעולת int = random.nextInt (101); חותמת זמן ארוכה = System.currentTimeMillis (); ערכי ערכים = ערכים חדשים (פעולה, חותמת זמן); collector.emit (ערכים); } @ ביטול חלל ציבורי להכריז על OutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (שדות חדשים ("פעולה", "חותמת זמן")); }}

6.2. FilteringBolt

לאחר מכן, ניצור בריח שיסנן את כל האלמנטים באמצעותו מבצע שווה ל -0:

מחלקה ציבורית FilteringBolt מרחיב את BaseBasicBolt {@Override public void execut (Tuple tuple, BasicOutputCollector basicOutputCollector) {int operation = tuple.getIntegerByField ("operation"); אם (פעולה> 0) {basicOutputCollector.emit (tuple.getValues ​​()); }} @ ביטול חלל ציבורי להכריז על OutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (שדות חדשים ("פעולה", "חותמת זמן")); }}

6.3. צבירה בריח

לאחר מכן, בואו ניצור מסובך יותר בְּרִיחַ שיצבור את כל הפעולות החיוביות מכל יום.

לצורך כך נשתמש בכיתה ספציפית שנוצרה במיוחד ליישום ברגים הפועלים על חלונות במקום לפעול על צמרות יחיד: BaseWindowedBolt.

חלונות הם מושג חיוני בעיבוד זרמים, המפצלים את הזרמים האינסופיים לנתחים סופיים. לאחר מכן נוכל להחיל חישובים על כל נתח. בדרך כלל ישנם שני סוגים של חלונות:

חלונות זמן משמשים לקיבוץ אלמנטים מתקופת זמן נתונה באמצעות חותמות זמן. חלונות זמן עשויים להכיל מספר שונה של אלמנטים.

ספירת חלונות משמשת ליצירת חלונות בגודל מוגדר. במקרה כזה, כל החלונות יהיו באותו הגודל והחלון יהיה לא להיפלט אם יש פחות אלמנטים מהגודל שהוגדר.

שֶׁלָנוּ צבירה בריח יפיק את סכום כל הפעולות החיוביות מ- חלון זמן יחד עם חותמות הזמן של ההתחלה והסוף שלה:

מחלקה ציבורית AggregatingBolt מרחיב את BaseWindowedBolt {private OutputCollector outputCollector; @ להכין חלל ציבורי (Map stormConf, הקשר TopologyContext, אספן OutputCollector) {this.outputCollector = אספן; } @ ביטול חלל ציבורי declareOutputFields (הכרזת OutputFieldsDeclarer) {declarer.declare (שדות חדשים ("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override לבצע חלל ציבורי (TupleWindow tupleWindow) {List tuples = tupleWindow.get (); tuples.sort (Comparator.comparing (זה :: getTimestamp)); int sumOfOperations = tuples.stream () .mapToInt (tuple -> tuple.getIntegerByField ("פעולה")). sum (); התחלה ארוכה Timestamp = getTimestamp (tuples.get (0)); Long endTimestamp = getTimestamp (tuples.get (tuples.size () - 1)); ערכי ערכים = ערכים חדשים (sumOfOperations, beginTimestamp, endTimestamp); outputCollector.emit (ערכים); } פרטי GetTimestamp ארוך (Tuple tuple) {return tuple.getLongByField ("חותמת זמן"); }}

שים לב, במקרה זה, קבלת האלמנט הראשון ברשימה היא בטוחה. הסיבה לכך היא שכל חלון מחושב באמצעות ה- חותמת זמן שדה של Tuple, כך חייב להיות לפחות אלמנט אחד בכל חלון.

6.4. FileWritingBolt

לבסוף, ניצור בריח שייקח את כל האלמנטים איתו sumOfOperations יותר מ -2000, סדר אותם וכתוב אותם לקובץ:

מחלקה ציבורית FileWritingBolt מרחיב את BaseRichBolt {לוגר לוגר סטטי ציבורי = LoggerFactory.getLogger (FileWritingBolt.class); כותב פרטי BufferedWriter; פרטי מחרוזת filePath; ObjectMapper פרטי ObjectMapper; @ עקוף ניקוי חלל ציבורי () {נסה {כותב.סגור (); } לתפוס (IOException e) {logger.error ("נכשלה סגירת הכותב!"); }} @ להכין חלל ציבורי ריק (מפת מפה, טופולוגיה טופולוגיה טקסטולוגיה, OutputCollector outputCollector) {objectMapper = ObjectMapper חדש (); objectMapper.setVisibility (PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); נסה {כותב = חדש BufferedWriter (FileWriter חדש (filePath)); } לתפוס (IOException e) {logger.error ("נכשל פתיחת קובץ לכתיבה.", e); }} @Override לבצע חלל ציבורי (Tuple tuple) {int sumOfOperations = tuple.getIntegerByField ("sumOfOperations"); התחלת זמן ארוכה = tuple.getLongByField ("התחלת הזמן"); long endTimestamp = tuple.getLongByField ("endTimestamp"); אם (sumOfOperations> 2000) {AggregatedWindow aggregatedWindow = חדש AggregatedWindow (sumOfOperations, beginningTimestamp, endTimestamp); נסה את {writer.write (objectMapper.writeValueAsString (aggregatedWindow)); סופרת.newLine (); writer.flush (); } לתפוס (IOException e) {logger.error ("נכשל כתיבת הנתונים לקובץ.", e); }}} // קונסטרוקטור ציבורי ושיטות אחרות}

שים לב שאיננו צריכים להכריז על הפלט מכיוון שזה יהיה הבריח האחרון בטופולוגיה שלנו

6.5. הפעלת הטופולוגיה

לבסוף, אנו יכולים לשלב הכל ולהפעיל את הטופולוגיה שלנו:

ריק סטטי ציבורי ריק RunTopology () {בונה טופולוגיה = טופולוגיה חדש (); זרבובית אקראית = RandomNumberSpout חדש (); builder.setSpout ("randomNumberSpout"); סינון בורג = FilteringBolt חדש (); builder.setBolt ("filteringBolt", סינון). shuffleGrouping ("randomNumberSpout"); צבירת בורג = AggregatingBolt חדש () .withTimestampField ("חותמת זמן") .withLag (BaseWindowedBolt.Duration.seconds (1)) .withWindow (BaseWindowedBolt.Duration.seconds (5)); builder.setBolt ("aggregatingBolt", aggregating) .shuffleGrouping ("filteringBolt"); מחרוזת filePath = "./src/main/resources/data.txt"; קובץ בורג = FileWritingBolt חדש (filePath); builder.setBolt ("fileBolt", file) .shuffleGrouping ("aggregatingBolt"); Config config = Config new (); config.setDebug (שקר); אשכול LocalCluster = LocalCluster חדש (); cluster.submitTopology ("Test", config, builder.createTopology ()); }

כדי לגרום לנתונים לזרום דרך כל חלק בטופולוגיה, עלינו לציין כיצד לחבר ביניהם. shuffleGroup מאפשר לנו לציין את הנתונים עבור סינון בריח יגיע מ randomNumberSpout.

לכל אחד בְּרִיחַ, אנחנו צריכים להוסיף shuffleGroup המגדיר את מקור האלמנטים לבורג זה. מקור האלמנטים עשוי להיות א זַרבּוּבִית או אחר בְּרִיחַ. ואם נקבע את אותו מקור ליותר מבריח אחד, המקור יפלט את כל האלמנטים לכל אחד מהם.

במקרה זה, הטופולוגיה שלנו תשתמש ב- LocalCluster לנהל את העבודה באופן מקומי.

7. מסקנה

במדריך זה הצגנו את Apache Storm, מערכת חישוב מבוזרת בזמן אמת. יצרנו זרבובית, כמה ברגים, ושילבנו אותם יחד לטופולוגיה שלמה.

וכמו תמיד, ניתן למצוא את כל דגימות הקוד ב- GitHub.