תחילת העבודה עם עיבוד זרמים עם זרימת נתונים של ענן אביב

1. הקדמה

זרימת נתונים של ענן אביב הוא מודל תכנות ותפעול מקורי לענן למיקרו-שירותים ניתנים לחיבור.

עם זרימת נתונים של ענן אביב, מפתחים יכולים ליצור ולתזמן צינורות נתונים למקרי שימוש נפוץ כגון צריכת נתונים, ניתוח בזמן אמת וייבוא ​​/ ייצוא נתונים.

צינורות נתונים אלה מגיעים בשני טעמים, צינורות סטרימינג ונתוני אצווה.

במקרה הראשון, כמות נתונים בלתי מוגבלת נצרכת או מופקת באמצעות תוכנת תיווך להעברת הודעות. בעוד שבמקרה השני המשימה קצרת הימים מעבדת סט נתונים סופי ואז מסתיימת.

מאמר זה יתמקד בעיבוד סטרימינג.

2. סקירה אדריכלית

מרכיבי המפתח אלו סוגים של ארכיטקטורה יישומים, ה שרת זרימת נתוניםוזמן הריצה היעד.

בנוסף בנוסף למרכיבי המפתח הללו, לרוב יש לנו גם מעטפת זרימת נתונים ו מתווך הודעות בתוך האדריכלות.

בואו נראה את כל הרכיבים הללו ביתר פירוט.

2.1. יישומים

בדרך כלל, צינור נתונים זורם כולל צריכת אירועים ממערכות חיצוניות, עיבוד נתונים והתמדה של מצולעים. שלבים אלה מכונים בדרך כלל מָקוֹר, מעבד, ו כִּיוֹר ב ענן אביב מינוח:

  • מָקוֹר: הוא היישום שצורך אירועים
  • מעבד: צורכת נתונים מה- מָקוֹר, עושה בו עיבוד מסוים, ופולט את הנתונים המעובדים ליישום הבא בצינור
  • כִּיוֹר: או צורכת מ מָקוֹר אוֹ מעבד וכותב את הנתונים לשכבת ההתמדה הרצויה

ניתן לארוז יישומים אלה בשתי דרכים:

  • Spring Boot uber-jar שמתארח במאגר Maven, קובץ, http או כל יישום של משאבי Spring אחרים (שיטה זו תשמש במאמר זה)
  • דוקר

מקורות רבים, יישומי מעבד וכיור למקרי שימוש נפוצים (למשל jdbc, hdfs, http, נתב) כבר מסופקים ומוכנים לשימוש על ידי זרימת נתונים של ענן אביב קְבוּצָה.

2.2. זמן ריצה

כמו כן, יש צורך בהפעלה של יישומים אלה. זמני הריצה הנתמכים הם:

  • בית יציקה בענן
  • אפאצ'י חוט
  • קוברנטס
  • אפאצ'י מסוס
  • שרת מקומי לפיתוח (שישמש במאמר זה)

2.3. שרת זרימת נתונים

הרכיב האחראי על פריסת יישומים לזמן ריצה הוא שרת זרימת נתונים. יש שרת זרימת נתונים צנצנת הפעלה המסופקת לכל אחד משעות ההפעלה היעד.

ה שרת זרימת נתונים אחראי לפרשנות:

  • זרם DSL המתאר את הזרימה הלוגית של נתונים דרך מספר יישומים.
  • מניפסט פריסה המתאר את מיפוי היישומים לזמן הריצה.

2.4. מעטפת זרימת נתונים

פגז זרימת הנתונים הוא לקוח עבור שרת זרימת הנתונים. המעטפת מאפשרת לנו לבצע את פקודת ה- DSL הדרושה לאינטראקציה עם השרת.

כדוגמה, ה- DSL לתאר את זרימת הנתונים ממקור http לכיור jdbc ייכתב כ- "http | jdbc ”. שמות אלה ב- DSL רשומים ב- שרת זרימת נתונים ומפה על חפצי יישומים שניתן לארח במאגרי Maven או Docker.

אביב מציעים גם ממשק גרפי בשם פלו, ליצירת וניטור צינורות נתונים זורמים. עם זאת, השימוש בו מחוץ לדיון במאמר זה.

2.5. מתווך הודעות

כפי שראינו בדוגמה של הסעיף הקודם, השתמשנו בסמל הצינור להגדרת זרימת הנתונים. סמל הצינור מייצג את התקשורת בין שני היישומים באמצעות תוכנת אמצעי העברת הודעות.

פירוש הדבר שאנו זקוקים למתווך הודעות שיפעל בסביבת היעד.

שני מתווכי התיווך להעברת הודעות הנתמכים הם:

  • אפאצ'י קפקא
  • RabbitMQ

וכך, עכשיו כשיש לנו סקירה כללית על המרכיבים האדריכליים - הגיע הזמן לבנות את צינור עיבוד הזרם הראשון שלנו.

3. התקן מתווך הודעות

כפי שראינו, היישומים הנמצאים בצנרת זקוקים לתוכנת אמצעי העברת הודעות כדי לתקשר. לצורך מאמר זה נלך עם RabbitMQ.

לפרטים המלאים של ההתקנה תוכלו לעקוב אחר ההוראות באתר הרשמי.

4. שרת זרימת הנתונים המקומי

כדי להאיץ את תהליך יצירת היישומים שלנו, נשתמש באביב Initializr; בעזרתה נוכל להשיג את שלנו מגף אביב יישומים תוך מספר דקות.

לאחר ההגעה לאתר פשוט בחרו ב- קְבוּצָה ו חפץ שֵׁם.

לאחר שהדבר נעשה, לחץ על הכפתור צור פרויקט כדי להתחיל בהורדת חפץ Maven.

לאחר סיום ההורדה, פתח את רוכסן הפרויקט וייבא אותו כפרויקט Maven לפי IDE שבחרת.

בואו נוסיף תלות של Maven לפרויקט. כפי שנצטרך שרת מקומי של זרימת נתונים ספריות, בואו נוסיף את תלות האביב- cloud-starter-dataflow-server-local:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

כעת עלינו להעלות הערות על ה- מגף אביב מעמד ראשי עם @EnableDataFlowServer ביאור:

@EnableDataFlowServer @ SpringBootApplication מחלקה ציבורית SpringDataFlowServerApplication {ציבורי ריק ריק סטטי (String [] args) {SpringApplication.run (SpringDataFlowServerApplication.class, args); }} 

זה הכל. שֶׁלָנוּ שרת זרימת נתונים מקומי מוכן להורג:

קפיץ אתחול mvn: לרוץ

היישום יאותחל ביציאה 9393.

5. מעטפת זרימת הנתונים

שוב עבור אל Initializr האביב ובחר a קְבוּצָה ו חפץ שֵׁם.

לאחר שהורדנו וייבאנו את הפרויקט, בואו נוסיף תלות באביב-ענן-נתונים-זרימת מעטפת:

 org.springframework.cloud spring-cloud-dataflow-shell 

עכשיו אנחנו צריכים להוסיף את @EnableDataFlowShell ביאור ל מגף אביב מעמד ראשי:

@EnableDataFlowShell @ SpringBootApplication מחלקה ציבורית SpringDataFlowShellApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowShellApplication.class, args); }} 

כעת אנו יכולים להריץ את הקליפה:

קפיץ אתחול mvn: לרוץ

לאחר שהקליפה פועלת, אנו יכולים להקליד את עֶזרָה פקודה בהנחיה כדי לראות רשימה מלאה של פקודה שנוכל לבצע.

6. יישום המקור

באופן דומה, ב- Initializr ניצור כעת יישום פשוט ונוסיף א ארנב זרם תלות הנקראת אביב-ענן-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

לאחר מכן נוסיף את ה- @EnableBinding (Source.class) ביאור ל מגף אביב מעמד ראשי:

@EnableBinding (Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeSourceApplication.class, args); }}

כעת עלינו להגדיר את מקור הנתונים שיש לעבד. מקור זה יכול להיות כל עומס עבודה שעלול להיות אינסופי (נתוני חיישני האינטרנט של הדברים, עיבוד אירועים 24/7, בליעת נתוני עסקאות מקוונים).

ביישום לדוגמא שלנו, אנו מפיקים אירוע אחד (לפשטות חותמת זמן חדשה) כל 10 שניות עם a פולר.

ה @ InboundChannelAdapter ביאור שולח הודעה לערוץ הפלט של המקור, תוך שימוש בערך ההחזרה כעומס המטען של ההודעה:

@Bean @InboundChannelAdapter (value = Source.OUTPUT, poller = @Poller (fixedDelay = "10000", maxMessagesPerPoll = "1")) public MessageSource timeMessageSource () {return () -> MessageBuilder.withPayload (תאריך חדש (). GetTime ()).לִבנוֹת(); } 

מקור הנתונים שלנו מוכן.

7. בקשת המעבד

לאחר מכן - ניצור יישום ונוסיף א ארנב זרם תלות.

לאחר מכן נוסיף את ה- @EnableBinding (Processor.class) ביאור ל מגף אביב מעמד ראשי:

@EnableBinding (Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeProcessorApplication.class, args); }}

בשלב הבא עלינו להגדיר שיטה לעיבוד הנתונים המגיעים מיישום המקור.

כדי להגדיר שנאי, עלינו להוסיף הערה לשיטה זו @שַׁנַאי ביאור:

@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) שינוי אובייקט ציבורי (חותמת זמן ארוכה) {DateFormat dateFormat = חדש SimpleDateFormat ("yyyy / MM / dd hh: mm: yy"); תאריך מחרוזת = dateFormat.format (חותמת זמן); תאריך חזרה; }

הוא ממיר חותמת זמן מערוץ 'קלט' לתאריך מעוצב אשר יישלח לערוץ 'פלט'.

8. יישום הכיור

היישום האחרון שנוצר הוא יישום Sink.

שוב, עבור ל- Spring Initializr ובחר a קְבוּצָה, חפץ שֵׁם. לאחר הורדת הפרויקט בואו להוסיף a ארנב זרם תלות.

ואז הוסף את @EnableBinding (Sink.class) ביאור ל מגף אביב מעמד ראשי:

@EnableBinding (Sink.class) @SpringBootApplication class class SpringDataFlowLoggingSinkApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowLoggingSinkApplication.class, args); }}

כעת אנו זקוקים לשיטה ליירט את ההודעות המגיעות מיישום המעבד.

לשם כך עלינו להוסיף את @StreamListener (Sink.INPUT) ביאור לשיטה שלנו:

@StreamListener (Sink.INPUT) logger void public Sink (תאריך מחרוזת) {logger.info ("התקבל:" + תאריך); }

השיטה פשוט מדפיסה את חותמת הזמן שהופכה בתאריך מעוצב לקובץ יומן רישום.

9. רשום אפליקציית סטרים

מעטפת זרימת הנתונים של ענן אביב מאפשרת לנו לרשום אפליקציית סטרים ברישום האפליקציות באמצעות ה- רישום אפליקציות פקודה.

עלינו לספק שם ייחודי, סוג יישום ו- URI שניתן לפתור לממצא האפליקציה. עבור הסוג, ציין “מָקוֹר“, “מעבד“, או“כִּיוֹר“.

כאשר מספקים URI עם ערכת maven, הפורמט צריך להתאים להלן:

maven: //: [: [:]]:

כדי לרשום את מָקוֹר, מעבד ו כִּיוֹר יישומים שנוצרו בעבר, עבור אל אביב מעטפת זרימת נתונים בענן והנפיק את הפקודות הבאות מההנחיה:

רישום אפליקציה - שם זמן מקור - מקור סוג --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-source: jar: 0.0.1-SNAPSHOT הרשמת אפליקציה - זמן שם -מעבד - מעבד סוג --uri maven: //com.baeldung.spring.cloud: מעבד-נתונים-זרימת זמן-מעבד: צנצנת: 0.0.1-SNAPSHOT הרשמת אפליקציה - שם רישום-כיור - סוג כיור --uri maven: //com.baeldung.spring.cloud: spring-data-flow-logging-sink: jar: 0.0.1-SNAPSHOT 

10. צור ופריס את הזרם

כדי ליצור הגדרת זרם חדשה עבור אל אביב מעטפת זרימת נתונים בענן ולבצע את פקודת הפגז הבאה:

זרם צור - שם זמן להיכנס - הגדרה 'מקור הזמן | מעבד זמן | רישום-כיור '

זה מגדיר זרם בשם זמן להיכנס מבוסס על ביטוי ה- DSL 'זמן-זמן | מעבד זמן | רישום-כיור '.

ואז כדי לפרוס את הזרם, בצע את פקודת הפגז הבאה:

זרם פריסה - שם זמן לרישום

ה שרת זרימת נתונים פותר מקור זמן, מעבד זמן, ו כריתת עצים כדי לתאם קואורדינטות ומשתמש בהן להפעלת ה- מקור זמן, מעבד זמן ו כריתת עצים יישומים של הזרם.

אם הזרם נפרס כראוי תראה ב שרת זרימת נתונים יומני שהמודולים התחילו ונקשרו יחד:

2016-08-24 12: 29: 10.516 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: פריסת מופע זמן עד לוג. Logging-sink 0 יומנים יהיו ב- PATH_TO_LOG / אביב-ענן-זרם נתונים-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink 2016-08-24 12: 29: 17.600 INFO 8096 --- [io-9393-exec-10] oscd spi.local.LocalAppDeployer: פריסת מופע זמן עד לרישום זמן מעבד 0 יומנים יהיו ב- PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034556862 / time-to-log. time-processor 2016-08-24 12: 29: 23.280 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: פריסת מופע זמן לביצוע. זמן מקור 0 יומנים יהיו ב- PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034562861 / time-to-log.time-source

11. סקירת התוצאה

בדוגמה זו, המקור פשוט שולח את חותמת הזמן הנוכחית כהודעה בכל שנייה, המעבד מעצב אותה וכיור היומן מוציא את חותמת הזמן המעוצבת באמצעות מסגרת הרישום.

קבצי היומן ממוקמים בספריה המוצגת ב- שרת זרימת נתוניםפלט היומן, כפי שמוצג לעיל. כדי לראות את התוצאה, אנו יכולים להתאים את היומן:

זנב -f PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink / stdout_0.log 2016-08-24 12: 40: 42.029 INFO 9488 --- [ r.time-to-log-1] scSpringDataFlowLoggingSink יישום: התקבל: 2016/08/24 11:40:01 2016-08-24 12: 40: 52.035 INFO 9488 --- [r.time-to-log-1 ] scSpringDataFlowLoggingSink יישום: התקבל: 2016/08/24 11:40:11 2016-08-24 12: 41: 02.030 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSink יישום: התקבל: 2016/08 / 24 11:40:21

12. מסקנה

במאמר זה ראינו כיצד לבנות צינור נתונים לעיבוד זרמים באמצעות זרימת נתונים של ענן אביב.

כמו כן, ראינו את התפקיד של מָקוֹר, מעבד ו כִּיוֹר יישומים בתוך הזרם וכיצד לחבר ולקשור מודול זה בתוך שרת זרימת נתונים באמצעות מעטפת זרימת נתונים.

את קוד הדוגמה ניתן למצוא בפרויקט GitHub.


$config[zx-auto] not found$config[zx-overlay] not found