אפאצ'י RocketMQ עם מגף קפיץ

1. הקדמה

במדריך זה, ניצור מפיק הודעות וצרכן באמצעות Spring Boot ו- Apache RocketMQ, פלטפורמת נתוני העברת הודעות והזרמת קוד פתוח.

2. תלות

עבור פרויקטים של Maven, עלינו להוסיף את תלות האביב של RocketMQ Spring Boot:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. הפקת מסרים

לדוגמא שלנו, ניצור מפיק הודעות בסיסי שישלח אירועים בכל פעם שהמשתמש מוסיף או מסיר פריט מעגלת הקניות.

ראשית, בואו נגדיר את מיקום השרת ואת שם הקבוצה שלנו application.properties:

rocketmq.name-server = 127.0.0.1: 9876 rocketmq.producer.group = עגלת-מפיק-קבוצה

שים לב שאם היו לנו יותר משרת שמות אחד, נוכל לרשום אותם כמו host: port; host: port.

עכשיו, כדי לשמור על הפשטות, ניצור א CommandLineRunner יישום וליצור כמה אירועים במהלך אתחול היישום:

@SpringBootApplication בכיתה ציבורית CartEventProducer מיישם את CommandLineRunner {@Autowired פרטי RocketMQTemplate raketMQTemplate; ריק סטטי ציבורי ראשי (String [] args) {SpringApplication.run (CartEventProducer.class, args); } הפעלה בטלנית ציבורית (String ... args) זורקת Exception {rocketMQTemplate.convertAndSend ("cart-item-add-topic", CartItemEvent חדש ("אופניים", 1)); rocketMQTemplate.convertAndSend ("cart-item-add-topic", CartItemEvent חדש ("מחשב", 2)); rocketMQTemplate.convertAndSend ("cart-item-removed-topic", CartItemEvent חדש ("אופניים", 1)); }}

ה CartItemEvent מורכב משני מאפיינים בלבד - מזהה הפריט וכמות:

class CartItemEvent {private String itemId; כמות אינטנס פרטית; // קונסטרוקטור, גטרים וקובעים}

בדוגמה שלעיל אנו משתמשים ב- convertAndSend () שיטה, שיטה כללית המוגדרת על ידי תקציר הודעה שליחת תבנית שיעור מופשט, כדי לשלוח את אירועי העגלה שלנו. נדרשים שני פרמטרים: יעד, שבמקרה שלנו הוא שם נושא, ועומס מטען להודעה.

4. צרכן הודעות

צריכת הודעות RocketMQ היא פשוטה כמו יצירת רכיב קפיצי עם הערות @RocketMQMessageListener ויישום ה- RocketMQListener מִמְשָׁק:

@SpringBootApplication מחלקה ציבורית CartEventConsumer {public static void main (String [] args) {SpringApplication.run (CartEventConsumer.class, args); } @Service @RocketMQMessageListener (topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic") מחלקה ציבורית CardItemAddConsumer מיישם RocketMQListener {public void onMessage (CartItemEvent addItemEvent) {log.info ( "הוספת פריט: {}", addItemEvent); // לוגיקה נוספת}} @ Service @RocketMQMessageListener (topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic") מחלקה ציבורית CardItemRemoveConsumer מיישם RocketMQListener {public void onMessage (CartItemEvent removeItemEvent) {log.info ("הסרת פריט: {}", removeItemEvent); // לוגיקה נוספת}}}

עלינו ליצור רכיב נפרד לכל נושא המסר אליו אנו מאזינים. בכל אחד מהמאזינים הללו אנו מגדירים את שם הנושא ואת שם קבוצת הצרכנים באמצעות ה- @RocketMQMessageListener ביאור.

5. שידור סינכרוני ואסינכרוני

בדוגמאות הקודמות השתמשנו ב- convertAndSend שיטה לשלוח את ההודעות שלנו. יש לנו כמה אפשרויות אחרות.

נוכל, למשל, להתקשר syncSend שהוא שונה מ convertAndSend כי זה חוזר SendResult לְהִתְנַגֵד.

ניתן להשתמש בו, למשל, כדי לאמת אם ההודעה שלנו נשלחה בהצלחה או לקבל את מזהה:

הפעלה בטלנית ציבורית (String ... args) זורקת חריג {SendResult addBikeResult = rocketMQTemplate.syncSend ("cart-item-add-topic", CartItemEvent חדש ("אופניים", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend ("cart-item-add-topic", CartItemEvent חדש ("מחשב", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend ("cart-item-removed-topic", CartItemEvent חדש ("אופניים", 1)); }

כמו convertAndSend, שיטה זו מוחזרת רק לאחר סיום הליך השליחה.

עלינו להשתמש בשידור סינכרוני במקרים הדורשים אמינות גבוהה, כמו הודעות חשובות או הודעות SMS.

מצד שני, ייתכן שבמקום זאת נרצה לשלוח את ההודעה בצורה לא סינכרונית ולקבל הודעה על השלמת השליחה.

אנחנו יכולים לעשות זאת עם async שלח, שלוקח א SendCallback כפרמטר וחוזר מיד:

rocketMQTemplate.asyncSend ("cart-item-add-topic", CartItemEvent חדש ("bike", 1), SendCallback חדש () {@Override public void onSuccess (SendResult sendResult) {log.error ("פריט העגלה שנשלח בהצלחה") ;} @ ביטול ציבורי בטל ב-Exception (ניתן לזריק) {log.error ("חריג במהלך שליחת פריט לעגלה", זורק);}});

אנו משתמשים בשידור אסינכרוני במקרים הדורשים תפוקה גבוהה.

לבסוף, עבור תרחישים שבהם יש לנו דרישות תפוקה גבוהות מאוד, נוכל להשתמש sendOneWay במקום async שלח. sendOneWay שונה מ async שלח בכך שזה לא מבטיח שההודעה נשלחת.

שידור חד כיווני יכול לשמש גם למקרי אמינות רגילים, כגון איסוף בולי עץ.

6. שליחת הודעות בעסקה

RocketMQ מספקת לנו את היכולת לשלוח הודעות במסגרת עסקה. אנו יכולים לעשות זאת באמצעות sendInTransaction () שיטה:

MessageBuilder.withPayload (CartItemEvent חדש ("אופניים", 1)). Build (); rocketMQTemplate.sendMessageInTransaction ("עסקת בדיקה", "שם הנושא", msg, null);

כמו כן, עלינו ליישם א RocketMQLocalTransactionListener מִמְשָׁק:

@RocketMQTransactionListener (txProducerGroup = "test-transaction") class TransactionListenerImpl מיישם RocketMQLocalTransactionListener {@Override public RocketMQLocalTransactionState executeLocalTransaction (הודעה msg, אובייקט arg) {// ... תהליך עסקה מקומי, החזר ROLLBACKNQLT. } @Override ציבורי RocketMQLocalTransactionState checkLocalTransaction (הודעה msg) {// ... לבדוק את מצב העסקה ולהחזיר ROLLBACK, COMMIT או UNKNOWN להחזיר RocketMQLocalTransactionState.COMMIT; }}

ב sendMessageInTransaction (), הפרמטר הראשון הוא שם העסקה. זה חייב להיות זהה ל @RocketMQTransactionListenerתחום החברים txProducerGroup.

7. תצורת מפיק הודעות

אנו יכולים גם להגדיר היבטים של מפיק ההודעות עצמו:

  • rocketmq.producer.send-message-timeout: פסק הזמן לשליחת ההודעה באלפיות השנייה - ערך ברירת המחדל הוא 3000
  • rocketmq.producer.com סף לחץ-הודעה-גוף: סף שמעליו, RocketMQ ידחס הודעות - ערך ברירת המחדל הוא 1024.
  • rocketmq.producer.max-message-size: גודל ההודעה המרבי בתים - ערך ברירת המחדל הוא 4096.
  • rocketmq.producer.retry-times-when-send-async-failed: המספר המרבי של נסיונות לבצע באופן פנימי במצב אסינכרוני לפני שליחת כישלון - ערך ברירת המחדל הוא 2.
  • rocketmq.producer.retry-next-server: מציין אם לנסות שוב מתווך אחר בעת שליחת כשל באופן פנימי - ערך ברירת המחדל הוא שֶׁקֶר.
  • rocketmq.producer.retry-times-when-send-failed: המספר המרבי של נסיונות לבצע באופן פנימי במצב אסינכרוני לפני שליחת כישלון - ערך ברירת המחדל הוא 2.

8. מסקנה

במאמר זה למדנו כיצד לשלוח ולצרוך הודעות באמצעות Apache RocketMQ ו- Spring Boot. כמו תמיד כל קוד המקור זמין ב- GitHub.


$config[zx-auto] not found$config[zx-overlay] not found