מבוא למנטיס נטפליקס

1. סקירה כללית

במאמר זה נסתכל על פלטפורמת הגמל שלמה שפותחה על ידי נטפליקס.

נחקור את המושגים העיקריים של גמל שלמה על ידי יצירה, הפעלה וחקירה של עבודת עיבוד זרם.

2. מה זה גמל שלמה?

גמל שלמה היא פלטפורמה לבניית יישומי עיבוד זרם (מקומות תעסוקה). זה מספק דרך קלה לנהל את הפריסה ומעגל החיים של המשרות. יתר על כן, זה מאפשר הקצאת משאבים, גילוי ותקשורת בין עבודות אלה.

לכן, מפתחים יכולים להתמקד בהיגיון העסקי בפועל, תוך כדי תמיכה של פלטפורמה חזקה וניתנת להרחבה להפעלת היישומים הגבוהים שלהם, השהיה נמוכה, ולא חוסמים.

משרת גמל שלמה מורכבת משלושה חלקים נפרדים:

  • ה מָקוֹר, אחראי לאחזור הנתונים ממקור חיצוני
  • אחד או יותר שלבים, אחראי על עיבוד זרמי האירועים הנכנסים
  • ו כִּיוֹר שאוסף את הנתונים המעובדים

בואו נחקור כל אחד מהם.

3. התקנה ותלות

נתחיל בהוספת ה- זמן גמל שלמה ו jackson-databind תלות:

 io.mantisrx mantis-runtime com.fasterxml.jackson.core jackson-databind 

עכשיו, להגדרת מקור הנתונים של העבודה שלנו, בואו נבצע את הגמל שלמה מָקוֹר מִמְשָׁק:

class public RandomLogSource מיישם את המקור {@Override public Observable call (הקשר הקשר, אינדקס אינדקס) {return Observable.just (Observable .interval (250, TimeUnit.MILLISECONDS) .map (this :: createRandomLogEvent)); } מחרוזת פרטית createRandomLogEvent (סמן ארוך) {// צור מחרוזת רישום יומן אקראית ...}}

כפי שאנו רואים, הוא פשוט מייצר ערכי יומן אקראיים מספר פעמים בשנייה.

4. העבודה הראשונה שלנו

בואו ניצור כעת משרת גמל שלמה שפשוט אוספת אירועי יומן מהמקום שלנו RandomLogSource. בהמשך נוסיף טרנספורמציות קבוצתיות וצבירות לקבלת תוצאה מורכבת ומעניינת יותר.

ראשית, בואו ניצור a LogEvent יֵשׁוּת:

המחלקה הציבורית LogEvent מיישמת את JsonType {אינדקס פרטי ארוך; רמת מחרוזת פרטית; הודעת מחרוזת פרטית; // ...}

ואז, בואו נוסיף את שלנו TransformLogStage.

זהו שלב פשוט המיישם את ממשק ScalarComputation ומפצל ערך יומן לבניית LogEvent. כמו כן, הוא מסנן כל מחרוזות שגויות:

מחלקה ציבורית TransformLogStage מיישם ScalarComputation {@Override public Observable call (Context context, Observable logEntry) {return logEntry .map (log -> log.split ("#")) .filter (parts -> parts.length == 3). מפה (LogEvent :: new); }}

4.1. ניהול העבודה

בשלב זה, יש לנו מספיק אבני בניין להרכבת עבודת הגמל שלמה:

מחלקה ציבורית LogCollectingJob מרחיב את MantisJobProvider {@Override Job job getJobInstance () {return MantisJob .source (RandomLogSource new ()) .stage (new TransformLogStage (), ScalarToScalar.Config new ()). sink (Sinks) Sinks. Sinks. Sinks. LogEvent :: toJsonString))) .metadata (Metadata.Builder חדש (). Build ()) .create (); }}

בואו נסתכל מקרוב על העבודה שלנו.

כפי שאנו רואים, זה מתרחב MantisJobProvider. בהתחלה, זה מביא נתונים משלנו RandomLogSource ומחיל את TransformLogStage לנתונים שנאספו. לבסוף, היא שולחת את הנתונים המעובדים לכיור המובנה שמנוי בשקיקה ומספק נתונים באמצעות SSE.

עכשיו, בואו להגדיר את התפקיד שלנו לביצוע מקומי בעת ההפעלה:

@SpringBootApplication בכיתה ציבורית MantisApplication מיישם את CommandLineRunner {// ... @Override public void run (String ... args) {LocalJobExecutorNetworked.execute (LogCollectingJob new (). GetJobInstance ()); }}

בואו נפעיל את היישום. נראה הודעת יומן כמו:

... מגיש שרת HTTP SSE מודרני ביציאה: 86XX

בואו כעת נתחבר לכיור באמצעות סִלְסוּל:

$ curl localhost: נתונים 86XX: {"index": 86, "level": "WARN", "message": "ניסיון כניסה"} נתונים: {"index": 87, "level": "ERROR", "message ":" המשתמש יצר "} נתונים: {" אינדקס ": 88," level ":" INFO "," message ":" המשתמש יצר "} נתונים: {" index ": 89," level ":" INFO ", "message": "ניסיון כניסה"} נתונים: {"index": 90, "level": "INFO", "message": "user created"} data: {"index": 91, "level": "ERROR "," message ":" המשתמש יצר "} נתונים: {" index ": 92," level ":" WARN "," message ":" ניסיון כניסה "} נתונים: {" index ": 93," level ": "INFO", "message": "המשתמש נוצר"} ...

4.2. קביעת תצורה של הכיור

עד כה השתמשנו בכיור המובנה לאיסוף הנתונים המעובדים שלנו. בוא נראה אם אנו יכולים להוסיף גמישות רבה יותר לתרחיש שלנו על ידי מתן כיור מותאם אישית.

מה אם, למשל, נרצה לסנן יומנים לפי הוֹדָעָה?

בואו ניצור LogSink שמיישם את כִּיוֹר מִמְשָׁק:

LogSink בכיתה ציבורית מיישם כיור {@Override שיחה בטלה ציבורית (Context context, PortRequest portRequest, Observable logEventObservable) {SelfDocumentingSink sink = new ServerSentEventsSink.Builder () .withEncoder (LogEvent :: toJsonString) .withPredicateM (filterByLage). ; logEventObservable.subscribe (); sink.call (הקשר, portRequest, logEventObservable); } Predicate filter privateByLogMessage () {להחזיר Predicate חדש ("לסנן לפי הודעה", פרמטרים -> {if (parameters! = null && parameters.containsKey ("filter")) {return logEvent -> logEvent.getMessage (). מכיל ( parameters.get ("filter"). get (0));} return logEvent -> true;}); }}

ביישום כיור זה, הגדרנו פרדיקט המשתמש ב- לְסַנֵן פרמטר כדי לאחזר רק יומנים המכילים את ערכת הטקסט ב- לְסַנֵן פָּרָמֶטֶר:

$ curl localhost: 8874? filter = נתוני כניסה: {"index": 93, "level": "ERROR", "message": "ניסיון כניסה"} נתונים: {"index": 95, "level": "INFO "," message ":" ניסיון כניסה "} נתונים: {" index ": 97," level ":" ERROR "," message ":" ניסיון כניסה "} ...

הערה גמל שלמה מציעה גם שפת שאילתות חזקה, MQL, שניתן להשתמש בה לצורך שאילתות, טרנספורמציה וניתוח נתוני זרם בצורה SQL.

5. שרשור במה

נניח כעת שאנחנו מעוניינים לדעת כמה שְׁגִיאָה, לְהַזהִיר, אוֹ מידע רשומות יומן שיש לנו במרווח זמן נתון. לשם כך נוסיף שני שלבים נוספים לתפקידנו ונשלב אותם יחד.

5.1. הַקבָּצָה

ראשית, בואו ליצור GroupLogStage.

שלב זה הוא ToGroupComputation יישום שמקבל א LogEvent להזרים נתונים מהקיים TransformLogStage. לאחר מכן, היא מקבצת ערכים לפי רמת רישום ושולחת אותם לשלב הבא:

מחלקה ציבורית GroupLogStage מיישמת את ToGroupComputation {@Override public נצפה call (הקשר הקשר, logEvent נצפה) {return logEvent.map (log -> MantisGroup new (log.getLevel (), log)); } תצורה סטטית ציבורית ScalarToGroup.Config () {החזר ScalarToGroup.Config (). תיאור ("נתוני אירוע קבוצתי לפי רמה"). codec (JacksonCodecs.pojo (LogEvent.class)) .concurrentInput (); }}

יצרנו גם תצורת שלב מותאמת אישית על ידי מתן תיאור, ה- codec לשימוש לסידור הפלט, ואפשרנו לשיטת קריאה של שלב זה לפעול במקביל באמצעות concurrentInput ().

דבר אחד שיש לציין הוא כי שלב זה ניתן להרחבה אופקית. כלומר נוכל להריץ כמה שיותר מקרים בשלב זה לפי הצורך. ראוי להזכיר גם כשנפרסים באשכול גמל שלמה, שלב זה שולח נתונים לשלב הבא, כך שכל האירועים השייכים לקבוצה מסוימת ינחתו על אותו עובד בשלב הבא.

5.2. צבירה

לפני שנתקדם וליצור את השלב הבא, ראשית נוסיף LogAggregate יֵשׁוּת:

מחלקה ציבורית LogAggregate מיישמת את JsonType {ספירת מספר שלם סופי פרטי; גמר פרטי מחרוזת; }

עכשיו, בואו ניצור את השלב האחרון בשרשרת.

שלב זה מיושם GroupToScalarComputation והופך זרם של קבוצות יומן לסקלר LogAggregate. היא עושה זאת על ידי ספירה כמה פעמים כל סוג יומן מופיע בזרם. בנוסף, יש בו גם LogAggregationDuration פרמטר, שבו ניתן להשתמש כדי לשלוט בגודל חלון הצבירה:

מחלקה ציבורית CountLogStage מיישמת את GroupToScalarComputation {משך זמן פרטי; @ עקוב על init בטל ציבורי (הקשר הקשר) {duration = (int) context.getParameters (). Get ("LogAggregationDuration", 1000); } @ עקוף שיחה נצפית ציבורית (הקשר הקשר, נצפה mantisGroup) {return mantisGroup .window (duration, TimeUnit.MILLISECONDS) .flatMap (o -> o.groupBy (MantisGroup :: getKeyValue) .flatMap (group -> group.reduce (0, (count, value) -> count = count + 1) .map ((count) -> LogAggregate new (count, group.getKey ())))); } GroupToScalar.Config config confic () {להחזיר GroupToScalar.Config (). תיאור ("סכום אירועים לרמת יומן"). codec (JacksonCodecs.pojo (LogAggregate.class)). withParameters (getParameters ()); } רשימה סטטית ציבורית getParameters () {רשימה params = ArrayList חדש (); params.add (חדש IntParameter () .name ("LogAggregationDuration"). תיאור ("גודל חלון לצבירה באלפיות השנייה)). Validator (Validators.range (100, 10000)). defaultValue (5000) .build ()); מסדרים חזרה; }}

5.3. הגדר והפעל את העבודה

הדבר היחיד שנותר לעשות עכשיו הוא להגדיר את העבודה שלנו:

מחלקה ציבורית LogAggregationJob מרחיב את MantisJobProvider {@Override Job job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), TransformLogStage.stageConfig ()) .stage (new GroupLogStage (), Group. Stage (CountLogStage new (), CountLogStage.config ()) .sink (Sinks.eagerSubscribe (Sinks.sse (LogAggregate :: toJsonString))) .metadata (Metadata.Builder חדש (). build ()) .create (); }}

ברגע שאנחנו מריצים את היישום ומבצעים את העבודה החדשה שלנו, אנו יכולים לראות את ספירת היומנים נשלפת אחת לכמה שניות:

$ curl localhost: 8133 data: {"count": 3, "level": "ERROR"} data: {"count": 13, "level": "INFO"} data: {"count": 4, "level ":" WARN "} נתונים: {" count ": 8," level ":" ERROR "} נתונים: {" count ": 5," level ":" INFO "} נתונים: {" count ": 7," level ":" WARN "} ...

6. מסקנה

לסיכום, במאמר זה ראינו מה זה גמל שלמה ולמה הוא יכול לשמש. יתר על כן, בדקנו את המושגים העיקריים, השתמשנו בהם לבניית עבודות וחקרנו תצורות מותאמות אישית לתרחישים שונים.

כמו תמיד, הקוד השלם זמין ב- GitHub.


$config[zx-auto] not found$config[zx-overlay] not found