מבוא לזרם ענן האביב

1. סקירה כללית

Spring Cloud Stream הוא מסגרת הבנויה על גבי Spring Boot ו- Spring Integration ש עוזר ביצירת מיקרו-שירותים מונחי אירועים או הודעות.

במאמר זה נציג מושגים ומבנים של Spring Cloud Stream עם כמה דוגמאות פשוטות.

2. תלות Maven

כדי להתחיל, עלינו להוסיף את זרם האביב של ענן האביב עם התווך RabbitMQ Maven כתלות להעברת הודעות. pom.xml:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

ונוסיף את תלות המודול ממייבען סנטרל כדי לאפשר גם תמיכה ב- JUnit:

 org.springframework.cloud spring-cloud-stream-test-support-1.3.0.RELEASE test 

3. מושגים עיקריים

ארכיטקטורת המיקרו-שירותים עוקבת אחר עקרון "נקודות קצה חכמות וצינורות מטומטמים". התקשורת בין נקודות הקצה מונעת על ידי צדדים להעברת הודעות כמו RabbitMQ או Apache Kafka. שירותים מתקשרים על ידי פרסום אירועי דומיין דרך נקודות קצה או ערוצים אלה.

בואו נעבור בין המושגים המרכיבים את מסגרת Spring Cloud Stream, יחד עם הפרדיגמות החיוניות שעלינו להיות מודעים אליהן לבניית שירותים מונעי מסרים.

3.1. בונה

בואו נסתכל על שירות פשוט ב- Spring Cloud Stream שמאזין לו קֶלֶט מחייב ושולח תגובה ל תְפוּקָה כריכה:

@SpringBootApplication @EnableBinding (Processor.class) class class MyLoggerServiceApplication {public static void main (String [] args) {SpringApplication.run (MyLoggerServiceApplication.class, args); } @StreamListener (Processor.INPUT) @SendTo (Processor.OUTPUT) Public LogMessage enrichLogMessage (LogMessage log) {החזר LogMessage חדש (String.format ("[1]:% s", log.getMessage ())); }}

ההערה @EnableBinding מגדיר את התצורה של האפליקציה לאיחוד הערוצים קֶלֶט ו תְפוּקָה מוגדר בתוך הממשק מעבד. שני הערוצים הם איגודים שניתן להגדיר אותם לשימוש בתוכנת העברת הודעות בטון או קלסר.

בואו נסתכל על ההגדרה של כל המושגים הללו:

  • כריכות - אוסף ממשקים המזהים את ערוצי הקלט והפלט באופן הצהרתי
  • כּוֹרֵך - יישום הודעות-אמצעי תוכנה כגון קפקא או RabbitMQ
  • עָרוּץ - מייצג את צינור התקשורת בין הודעות-אמצעי תוכנה ליישום
  • StreamListeners - שיטות טיפול בהודעות בשעועית שיופעלו אוטומטית על הודעה מהערוץ לאחר ה- MessageConverter עושה סידור / דה-סידור בין אירועים ספציפיים לתווך לבין סוגי אובייקטים / POJOs תחום
  • Mesמרווה סכמות - המשמשים לסידור והערה של הודעות, ניתן לקרוא סכמות אלה באופן סטטי ממיקום או לטעון אותן באופן דינמי, ותומכות בהתפתחות סוגי אובייקטים של תחומים.

3.2. דפוסי תקשורת

ההודעות המיועדות ליעדים מועברות על ידי פרסם-הירשם דפוס הודעות. מפרסמים מסווגים הודעות לנושאים, שכל אחד מהם מזוהה על ידי שם. המנויים מביעים עניין בנושא אחד או יותר. תוכנת הביניים מסננת את ההודעות ומעבירה את הנושאים המעניינים למנויים.

כעת, ניתן לקבץ את המנויים. א קבוצת צרכנים היא קבוצת מנויים או צרכנים, שזוהתה על ידי מזהה קבוצתי, שבתוכם מועברים מסרים מהנושא או מחיצת הנושא בצורה מאוזנת בין עומסים.

4. מודל תכנות

סעיף זה מתאר את היסודות בבניית יישומי Spring Cloud Stream.

4.1. בדיקות פונקציונליות

תמיכת הבדיקה היא יישום קלסר המאפשר אינטראקציה עם הערוצים ובדיקת מסרים.

בואו נשלח הודעה לעיל להעשיר LogMessage שירות ובדוק אם התגובה מכילה את הטקסט “[1]: “ בתחילת ההודעה:

@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (classes = MyLoggerServiceApplication.class) @DirtiesContext class class MyLoggerApplicationTests {@ צינור מעבד פרטי מאושר; @Autowired פרטי MessageCollector messageCollector; @ מבחן ציבורי בטל כאשר SendMessage_thenResponseShouldUpdateText () {pipe.input () .send (MessageBuilder.withPayload (LogMessage חדש ("זו ההודעה שלי")) .build ()); מטען אובייקט = messageCollector.forChannel (pipe.output ()) .poll () .getPayload (); assertEquals ("[1]: זו ההודעה שלי", payload.toString ()); }}

4.2. ערוצים מותאמים אישית

בדוגמה שלעיל השתמשנו ב- מעבד ממשק המסופק על ידי Spring Cloud, שיש בו רק קלט אחד וערוץ פלט אחד.

אם אנו זקוקים למשהו אחר, כמו קלט אחד ושני ערוצי פלט, אנו יכולים ליצור מעבד מותאם אישית:

ממשק ציבורי MyProcessor {String INPUT = "myInput"; @Input SubscribableChannel myInput (); @Output ("myOutput") MessageChannel anOutput (); @ פלט הודעה ערוץ אחר פלט (); }

אביב יספק עבורנו יישום נכון של ממשק זה. ניתן להגדיר את שמות הערוצים באמצעות הערות כמו ב- @Output ("myOutput").

אחרת, Spring ישתמש בשמות השיטות כשמות הערוצים. לכן, יש לנו שלושה ערוצים שנקראים הקלט שלי, myOutput, ו פלט אחר.

עכשיו, בואו נדמיין שאנחנו רוצים לנתב את ההודעות לפלט אחד אם הערך קטן מ- 10 ולפלט אחר הערך גדול או שווה ל- 10:

@ מעבד MyProcessor פרטי פרטי; @StreamListener (MyProcessor.INPUT) routeValues ​​בטל ציבורי (Valve Integer) {if (val <10) {processor.anOutput (). Send (message (val)); } אחר {processor.anotherOutput (). send (message (val)); }} הודעה סטטית פרטית סופית (T val) {return MessageBuilder.withPayload (val) .build (); }

4.3. שיגור מותנה

משתמש ב @StreamListener ביאור, אנחנו גם יכולים לסנן את ההודעות שאנו מצפים מהצרכן באמצעות כל תנאי שאנחנו מגדירים עם ביטויי SpEL.

כדוגמה, נוכל להשתמש בשיגור מותנה כגישה נוספת לניתוב הודעות לפלטים שונים:

@ מעבד MyProcessor פרטי פרטי; @StreamListener (target = MyProcessor.INPUT, condition = "loadload = 10") נתיב חלל ציבורי ValueValuesToAnotherOutput (Valve Integer) {processor.anotherOutput (). Send (message (val)); }

היחיד המגבלה של גישה זו היא שאסור שיטות אלה יחזירו ערך.

5. התקנה

בואו נגדיר את היישום שיעבד את ההודעה מהמתווך RabbitMQ.

5.1. תצורת קלסר

אנו יכולים להגדיר את היישום שלנו לשימוש ביישום קלסר ברירת המחדל באמצעות META-INF / spring.binders:

ארנב: \ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

לחלופין, נוכל להוסיף את ספריית הקלסרים של RabbitMQ לשביל הכיתה על ידי הכללה התלות הזו:

 org.springframework.cloud אביב-ענן-זרם-קלסר-ארנב 1.3.0.RELEASE 

אם לא יינתן יישום קלסר, אביב ישתמש בתקשורת הודעה ישירה בין הערוצים.

5.2. תצורת RabbitMQ

כדי להגדיר את הדוגמה בסעיף 3.1 לשימוש בקלסר RabbitMQ, עלינו לעדכן את ה- application.yml ממוקם ב src / main / resources:

קפיץ: ענן: זרם: כריכות: קלט: יעד: queue.log.messages קלסר: local_rabbit פלט: יעד: queue.pretty.log.messages קלסר: קלסרים local_rabbit: local_rabbit: סוג: סביבת ארנב: קפיץ: rabbitmq: host: port : 5672 שם משתמש: סיסמה: מארח וירטואלי: /

ה קֶלֶט מחייב ישתמש במרכזיה שנקראה queue.log.messages, וה תְפוּקָה הכריכה תשתמש בבורסה queue.pretty.log.messages. שתי הכריכות ישתמשו בקלסר שנקרא מקומי_רביט.

שים לב שאיננו צריכים ליצור את חילופי התורים של RabbitMQ או את התורים מראש. בעת הפעלת היישום, שתי המרכזיות נוצרות אוטומטית.

כדי לבדוק את היישום, אנו יכולים להשתמש באתר הניהול של RabbitMQ כדי לפרסם הודעה. בתוך ה פרסם הודעה פאנל הבורסה queue.log.messages, עלינו להזין את הבקשה בפורמט JSON.

5.3. התאמה אישית של המרת הודעות

Spring Cloud Stream מאפשר לנו להחיל המרת הודעות עבור סוגי תוכן ספציפיים. בדוגמה שלעיל, במקום להשתמש בפורמט JSON, אנו רוצים לספק טקסט רגיל.

לשם כך נצטרך להחיל שינוי מותאם אישית על LogMessage באמצעות א MessageConverter:

@SpringBootApplication @EnableBinding (Processor.class) מחלקה ציבורית MyLoggerServiceApplication {// ... @Bean MessageConverter ציבורי מספק TextPlainMessageConverter () {להחזיר TextPlainMessageConverter חדש (); } // ...}
מחלקה ציבורית TextPlainMessageConverter מרחיב את AbstractMessageConverter {public TextPlainMessageConverter () {super (MimeType חדש ("טקסט", "רגיל")); } תמיכה בוליאנית מוגנת @Override (קלאז קלאס) {return (LogMessage.class == clazz); } @Override מוגן אובייקט convertFromInternal (הודעת הודעה, Class targetClass, Object conversionHint) {מטען אובייקט = message.getPayload (); טקסט מחרוזת = מופע מטען מחרוזת? מטען (מחרוזת): מטען חדש מחרוזת ((בתים [])); להחזיר LogMessage חדש (טקסט); }}

לאחר החלת השינויים הללו, חוזר אל פרסם הודעה אם אנו מגדירים את הכותרת "contentTypes" ל "טקסט / רגיל"והמטען ל"שלום עולם", זה אמור לעבוד כמו פעם.

5.4. קבוצות צרכנות

כאשר מריצים מספר מופעים של היישום שלנו, בכל פעם שיש הודעה חדשה בערוץ קלט, כל המנויים יקבלו הודעה.

לרוב אנו זקוקים לעיבוד ההודעה פעם אחת בלבד. Spring Cloud Stream מיישם התנהגות זו באמצעות קבוצות צרכנים.

כדי לאפשר התנהגות זו, כל קשירת צרכנים יכולה להשתמש ב- spring.cloud.stream.bindings..group מאפיין לציין שם קבוצה:

קפיץ: ענן: זרם: כריכות: קלט: יעד: תור.לוג.הודעות קלסר: מקומי_ריביט קבוצה: logMessageConsumers ...

6. שירותי מיקרו-שירותים מונעים הודעות

בחלק זה אנו מציגים את כל התכונות הנדרשות להפעלת יישומי Spring Cloud Stream שלנו בהקשר של מיקרו-שירותים.

6.1. הגדלה

כאשר מספר יישומים פועלים, חשוב לוודא שהנתונים מפוצלים כראוי בין הצרכנים. לשם כך, Spring Cloud Stream מספק שני מאפיינים:

  • spring.cloud.stream.instanceCount - מספר היישומים הפועלים
  • spring.cloud.stream.instanceIndex - אינדקס היישום הנוכחי

לדוגמה, אם פרסנו שני מופעים של האמור לעיל MyLoggerServiceApplication יישום, הנכס spring.cloud.stream.instanceCount צריך להיות 2 עבור שני היישומים, וגם עבור הנכס spring.cloud.stream.instanceIndex צריך להיות 0 ו- 1 בהתאמה.

מאפיינים אלה מוגדרים באופן אוטומטי אם נפרוס את יישומי Spring Cloud Stream באמצעות זרימת נתונים של Spring כמתואר במאמר זה.

6.2. מחיצה

אירועי התחום יכולים להיות מחולק הודעות. זה עוזר כשאנחנו הגדלת האחסון ושיפור ביצועי היישומים.

לאירוע התחום יש בדרך כלל מפתח מחיצה כך שהוא יסתיים באותה מחיצה עם הודעות קשורות.

בואו נגיד שאנחנו רוצים שמחלקים את הודעות היומן לפי האות הראשונה בהודעה, שתהיה מפתח המחיצה, ומקובצות לשתי מחיצות.

תהיה מחיצה אחת להודעות היומן שמתחילות איתן א-מ ומחיצה נוספת עבור נ-ז. ניתן להגדיר זאת באמצעות שני מאפיינים:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression - הביטוי לחלוקת המטענים
  • spring.cloud.stream.bindings.output.producer.partitionCount - מספר הקבוצות

לפעמים הביטוי למחיצה מורכב מכדי לכתוב אותו בשורה אחת בלבד. במקרים אלה, אנו יכולים לכתוב את אסטרטגיית המחיצה המותאמת אישית שלנו באמצעות המאפיין spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. מחוון בריאות

בהקשר של שירותי מיקרוסופט, אנחנו צריכים גם לזהות מתי שירות מושבת או מתחיל להיכשל. Spring Cloud Stream מספק את הנכס management.health.binders.enabled כדי לאפשר את מדדי הבריאות לקלסרים.

בעת הפעלת היישום, אנו יכולים לשאול על מצב הבריאות בכתובת //:/בְּרִיאוּת.

7. מסקנה

במדריך זה הצגנו את המושגים העיקריים של Spring Cloud Stream והראינו כיצד להשתמש בו באמצעות כמה דוגמאות פשוטות על גבי RabbitMQ. מידע נוסף על זרם ענן האביב ניתן למצוא כאן.

קוד המקור של מאמר זה ניתן למצוא באתר GitHub.


$config[zx-auto] not found$config[zx-overlay] not found