בדיוק פעם אחת בעיבוד בקפה עם Java

1. סקירה כללית

במדריך זה נבדוק כיצד Kafka מבטיח משלוח חד פעמי בדיוק בין יישומי יצרן לצרכן באמצעות ממשק ה- API של Transactional שהושק לאחרונה.

בנוסף, נשתמש בממשק ה- API הזה כדי ליישם יצרני עסקאות וצרכנים להשגת קצה לקצה בדיוק במשלוח אחד בדוגמה של WordCount.

2. מסירת הודעות בקפקא

בשל כשלים שונים, מערכות העברת הודעות אינן יכולות להבטיח מסירת הודעות בין יישומי יצרן לצרכן. תלוי באופן שבו יישומי הלקוח מתקשרים עם מערכות כאלה, סמנטיקה להודעות אפשרית:

  • אם מערכת העברת הודעות לעולם לא תשכפל הודעה אך עשויה להחמיץ את ההודעה המזדמנת, אנו קוראים לזה לכל היותר פעם אחת
  • לחלופין, אם היא לעולם לא תחמיץ הודעה אך עשויה לשכפל את ההודעה המזדמנת, אנו קוראים לה לפחות פעם אחת
  • אבל אם זה תמיד מספק את כל ההודעות ללא כפילות, כלומר בדיוק-פעם אחת

בתחילה, קפקא תמך במסירת הודעות לכל היותר ולפחות פעם אחת.

למרות זאת, הכנסת עסקאות בין מתווכי קפקא ליישומי לקוח מבטיחה אספקה ​​אחת בפעם אחת בדיוק בקפה. כדי להבין זאת טוב יותר, בואו נבדוק במהירות את ה- API של הלקוח העסקי.

3. תלות Maven

כדי לעבוד עם ממשק ה- API לעסקה, נצטרך את לקוח Java של קפקא בפום שלנו:

 org.apache.kafka kafka-clients 2.0.0 

4. עסקה צורכים-הופכים-מייצרים לוּלָאָה

לדוגמא שלנו אנו נצרוך הודעות מנושא קלט, משפטים.

ואז עבור כל משפט, נספור כל מילה ונשלח את ספירת המילים הבודדת לנושא פלט, סופרת.

בדוגמה, נניח שיש כבר נתונים עסקיים ב- משפטים נוֹשֵׂא.

4.1. מפיק מודע לעסקאות

אז בואו קודם להוסיף מפיק קפקא טיפוסי.

נכסים producerProps = נכסים חדשים (); producerProps.put ("bootstrap.servers", "localhost: 9092");

בנוסף, עם זאת, עלינו לציין א עסקאות.יד ולאפשר חוסר יכולת:

producerProps.put ("enable.idempotence", "true"); producerProps.put ("transactional.id", "prod-1"); מפיק KafkaProducer = KafkaProducer חדש (producerProps);

מכיוון שהפעלנו idempotence, Kafka ישתמש בזהות העסקה הזו כחלק מהאלגוריתם שלה ל- תשכפל כל הודעה שמפיק זהשולח, להבטיח חוסר יכולת.

במילים פשוטות, אם המפיק בטעות שולח את אותה הודעה לקפקא יותר מפעם אחת, הגדרות אלה מאפשרות לה להבחין בה.

כל מה שאנחנו צריכים לעשות זה ודא שמזהה העסקה נבדל עבור כל מפיק, אם כי עקבי בין הפעלות מחדש.

4.2. הפעלת המפיק לעסקאות

ברגע שאנחנו מוכנים, אז אנחנו גם צריכים להתקשר initTransaction להכין את המפיק לשימוש בעסקאות:

producer.initTransactions ();

זה רושם את היצרן אצל המתווך כמי שיכול להשתמש בעסקאות, מזהה אותו לפי שלו עסקאות.יד ומספר רצף, או עידן. בתורו, המתווך ישתמש באלה כדי להקדים כל פעולה ביומן העסקה.

וכתוצאה מכך, המתווך יסיר כל פעולות מאותו יומן השייכות ליצרן עם אותו מזהה עסקה וקודם לכןתְקוּפָה, בהנחה שהם מעסקאות שהושחתו.

4.3. צרכן מודע לעסקאות

כאשר אנו צורכים, אנו יכולים לקרוא את כל ההודעות על מחיצת הנושא לפי הסדר. אם כי, אנחנו יכולים לציין עם בידוד.רמה שעלינו להמתין לקרוא הודעות עסקה עד לביצוע העסקה המשויכת:

נכסים consumerProps = מאפיינים חדשים (); consumerProps.put ("bootstrap.servers", "localhost: 9092"); consumerProps.put ("group.id", "my-group-id"); consumerProps.put ("enable.auto.commit", "false"); consumerProps.put ("isolation.level", "read_committed"); KafkaConsumer צרכן = KafkaConsumer חדש (consumerProps); הצרכן. מנוי (סינגלטון ("משפטים"));

שימוש בערך של התחייב לקרוא מבטיח שלא נקרא הודעות עסקאות לפני השלמת העסקה.

ערך ברירת המחדל של בידוד.רמה הוא התחייבו לקריאה.

4.4. צורכים ומשתנים על ידי עסקה

כעת, לאחר שהמפיק והצרכן מוגדרים לכתוב ולקרוא באופן עסקי, אנו יכולים לצרוך רשומות מתוך נושא הקלט ולספור כל מילה בכל רשומה:

רשומות ConsumerRecords = consumer.poll (ofSeconds (60)); מפה wordCountMap = records.records (TopicPartition חדש ("קלט", 0)) .stream () .flatMap (record -> Stream.of (record.value (). Split (""))) .map (word -> Tuple.of (word, 1)) .collect (Collectors.toMap (tuple -> tuple.getKey (), t1 -> t1.getValue (), (v1, v2) -> v1 + v2));

שים לב, אין שום דבר עסקי בקוד הנ"ל. אבל, מאז שהשתמשנו מחויב לקריאה, פירוש הדבר כי שום הודעה שלא נכתבה לנושא הקלט באותה עסקה לא תקרא על ידי צרכן זה עד שכולם ייכתבו.

כעת נוכל לשלוח את ספירת המילים המחושבת לנושא הפלט.

בואו נראה איך נוכל לייצר את התוצאות שלנו, גם באופן עסקי.

4.5. שלח API

כדי לשלוח את הספירות שלנו כהודעות חדשות, אך באותה עסקה, אנו מתקשרים beginTransaction:

producer.beginTransaction ();

לאחר מכן נוכל לכתוב כל אחד מהם לנושא ה"ספירות "שלנו כשהמפתח הוא המילה והספירה היא הערך:

wordCountMap.forEach ((key, value) -> producer.send (ProducerRecord חדש ("ספירות", מפתח, value.toString ())));

שים לב כי מכיוון שהיצרן יכול לחלק את הנתונים באמצעות המפתח, פירוש הדבר הוא הודעות עסקאות יכולות להיקף על מספר מחיצות, כאשר כל אחת מהן נקראת על ידי צרכנים נפרדים. לכן, המתווך של קפקא יאחסן רשימה של כל המחיצות המעודכנות לעסקה.

שים לב גם כי, בתוך עסקה, מפיק יכול להשתמש במספר שרשורים כדי לשלוח רשומות במקביל.

4.6. ביצוע קיזוזים

ולבסוף, עלינו לבצע את הקיזוזים שזה עתה סיימנו לצרוך. בעסקאות אנו מחזיקים את הקיזוזים לנושא הקלט ממנו קראנו אותם, כרגיל. גם אם כי, אָנוּ שלח אותם לעסקת המפיק.

אנחנו יכולים לעשות את כל זה בשיחה אחת, אך ראשית עלינו לחשב את הקיזוזים עבור כל מחיצת נושא:

מפה offsetsToCommit = חדש HashMap (); עבור (TopicPartition מחיצה: records.partitions ()) {List partitionedRecords = records.records (מחיצה); קיזוז ארוך = partitionedRecords.get (partitionedRecords.size () - 1) .offset (); offsetsToCommit.put (מחיצה, OffsetAndMetadata חדש (offset + 1)); }

שים לב שמה שאנחנו מתחייבים לעסקה הוא הקיזוז הקרוב, כלומר עלינו להוסיף 1.

אז נוכל לשלוח את הקיזוזים המחושבים שלנו לעסקה:

producer.sendOffsetsToTransaction (offsetsToCommit, "מזהה הקבוצה שלי");

4.7. ביצוע או ביטול העסקה

ולבסוף, אנו יכולים לבצע את העסקה, שתכתוב באופן אטומי את הקיזוזים ל- קיזוז הצרכן הנושא וכן לעסקה עצמה:

producer.commitTransaction ();

זה שוטף כל הודעה שנאגרה למחיצות המתאימות. בנוסף, מתווך קפקא מעמיד לרשות הצרכנים את כל ההודעות באותה עסקה.

כמובן, אם משהו משתבש בזמן שאנחנו מעבדים, למשל, אם נתפוס חריג, אנחנו יכולים להתקשר abortTransaction:

נסה {// ... לקרוא מנושא הקלט // ... הפוך // ... כתוב לנושא הפלט producer.commitTransaction (); } לתפוס (חריג e) {producer.abortTransaction (); }

ושחרר כל הודעות שנאגרו והסר את העסקה מהמתווך.

אם אנחנו לא מתחייבים ולא מפילים לפני שהוגדר המתווך max.transaction.timeout.ms, המתווך של קפקא יבטל את העסקה עצמה. ערך ברירת המחדל של נכס זה הוא 900,000 אלפיות השנייה או 15 דקות.

5. אחר צורכים-הופכים-מייצרים לולאות

מה שראינו זה עתה בסיסי צורכים-הופכים-מייצרים לולאה שקוראת וכותבת לאותו אשכול קפקא.

לעומת זאת, יישומים שחייבים לקרוא ולכתוב לאשכולות קפקא שונים חייבים להשתמש בתוכנות הישנות יותר commitSync ו commitAsync ממשק API. בדרך כלל, יישומים יאחסנו קיזוזים של צרכנים באחסון המצב החיצוני שלהם כדי לשמור על עסקאות.

6. מסקנה

עבור יישומים קריטיים לנתונים, עיבוד מקצה לקצה בדיוק פעם אחת הוא הכרחי.

במדריך זה, ראינו כיצד אנו משתמשים בקפקא לעשות זאת בדיוק תוך שימוש בעסקאות, ויישמנו דוגמה לספירת מילים מבוססת עסקאות כדי להמחיש את העיקרון.

אל תהסס לבדוק את כל דגימות הקוד ב- GitHub.


$config[zx-auto] not found$config[zx-overlay] not found