מבוא לאפאצ'י קפקא עם אביב

עליון התמדה

רק הכרזתי על החדש למד אביב קורס, המתמקד ביסודות האביב 5 ומגף האביב 2:

>> בדוק את הקורס

1. סקירה כללית

אפאצ'י קפקא היא מערכת לעיבוד זרמים מבוזרת וסובלנית מתקלות.

במאמר זה נסקור את תמיכת האביב ב- Kafka ואת רמת ההפשטות שהיא מספקת על ממשקי API של לקוח Java של Kafka.

אביב קפקא מביא את מודל התכנות הפשוט והטיפוסי של תבנית אביב עם KafkaTemplate ו- POJOs מונע הודעות דרך @KafkaListener ביאור.

2. התקנה והתקנה

כדי להוריד ולהתקין את קפקא, עיין במדריך הרשמי כאן.

אנחנו גם צריכים להוסיף את אביב-קפקא תלות שלנו pom.xml:

 org.springframework.kafka spring-kafka 2.3.7.RELEASE 

הגרסה האחרונה של חפץ זה נמצאת כאן.

היישום לדוגמה שלנו יהיה יישום Spring Boot.

מאמר זה מניח כי השרת מופעל באמצעות תצורת ברירת המחדל ושום יציאת שרת אינה משתנה.

3. קביעת תצורה של נושאים

בעבר נהגנו להפעיל כלי שורת פקודה ליצירת נושאים בקפקא כגון:

$ bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ - גורם שכפול 1 - מחיצות 1 \ - נושא mytopic

אבל עם ההקדמה של AdminClient בקפקא אנו יכולים כעת ליצור נושאים באופן פרוגרמטי.

עלינו להוסיף את KafkaAdmin שעועית אביבית, שתוסיף אוטומטית נושאים לכל השעועית מהסוג נושא חדש:

@Configuration מחלקה ציבורית KafkaTopicConfig {@Value (value = "$ {kafka.bootstrapAddress}") פרטי מחרוזת bootstrapAddress; @Bean הציבור KafkaAdmin kafkaAdmin () {Map configs = HashMap חדש (); configs.put (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); להחזיר KafkaAdmin חדש (configs); } @Bean נושא ציבורי NewTopic1 () {להחזיר NewTopic חדש ("baeldung", 1, (קצר) 1); }}

4. הפקת מסרים

כדי ליצור הודעות, ראשית עלינו להגדיר א מפיק מפעל שקובעת את האסטרטגיה ליצירת קפקא יַצרָן מקרים.

ואז אנחנו צריכים a KafkaTemplate שעוטף א יַצרָן למשל ומספק שיטות נוחות לשליחת הודעות לנושאי קפקא.

יַצרָן המקרים הם בטוחים בחוטים ולכן השימוש במופע יחיד בכל הקשר היישום ייתן ביצועים גבוהים יותר. כתוצאה מכך, KakfaTemplate מקרים הם גם בטוחים בשרשור ומומלץ להשתמש במופע אחד.

4.1. תצורת מפיק

@Configuration public class KafkaProducerConfig {@Bean Public ProducerFactory producerFactory () {Map configProps = HashMap חדש (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); להחזיר DefaultKafkaProducerFactory חדש (configProps); } @Bean הציבור KafkaTemplate kafkaTemplate () {להחזיר KafkaTemplate חדש (producerFactory ()); }}

4.2. פרסום הודעות

אנו יכולים לשלוח הודעות באמצעות KafkaTemplate מעמד:

@ KafkaTemplate פרטית אוטומטית kafkaTemplate; sendMessage public void public (מחרוזת msg) {kafkaTemplate.send (topicName, msg); }

ה לִשְׁלוֹחַ API מחזיר a האזנה לעתיד לְהִתְנַגֵד. אם אנו רוצים לחסום את שרשור השולח ולקבל את התוצאה לגבי ההודעה שנשלחה, אנו יכולים להתקשר אל לקבל ממשק API של האזנה לעתיד לְהִתְנַגֵד. החוט ימתין לתוצאה, אך הוא יאט את המפיק.

קפקא היא פלטפורמה לעיבוד זרמים מהיר. לכן מומלץ לטפל בתוצאות בצורה אסינכרונית כדי שההודעות הבאות לא יחכו לתוצאה של ההודעה הקודמת. אנו יכולים לעשות זאת באמצעות התקשרות חוזרת:

public void sendMessage (הודעת מחרוזת) {ListenableFuture עתיד = kafkaTemplate.send (נושא שם, הודעה); future.addCallback (חדש. ניתן להאזין() {@ ביטול חלל ציבורי onSuccess (תוצאת SendResult) {System.out.println ("הודעה שנשלחה = [" + הודעה + "] עם קיזוז = [" + result.getRecordMetadata (). קיזוז () + "]") ; } @ ביטול חלל ציבורי onFailure (לשעבר ניתן לזריקה) {System.out.println ("לא ניתן לשלוח הודעה = [" + הודעה + "] עקב:" + ex.getMessage ()); }}); }

5. צריכת הודעות

5.1. תצורת צרכנים

לצורך צריכת הודעות, עלינו להגדיר א ConsumerFactory ו KafkaListenerContainerFactory. לאחר שעועית זו תהיה זמינה במפעל שעועית האביב, ניתן להגדיר צרכנים מבוססי POJO באמצעות @KafkaListener ביאור.

@ EnableKafka הערה נדרשת בכיתת התצורה כדי לאפשר זיהוי של @KafkaListener ביאור על שעועית מנוהלת באביב:

@EnableKafka @Configuration מחלקה ציבורית KafkaConsumerConfig {@Bean public ConsumerFactory consumerFactory () {אביזרי מפה = HashMap חדש (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); להחזיר DefaultKafkaConsumerFactory חדש (אביזרים); } @ שעועית ציבורית ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {מפעל ConcurrentKafkaListenerContainerFactory = ConcurrentKafkaListenerContainerFactory חדש) (); factory.setConsumerFactory (consumerFactory ()); מפעל החזרה; }}

5.2. צורכת הודעות

@KafkaListener (topics = "topicName", groupId = "foo") חלל ציבורי listenGroupFoo (הודעת מחרוזת) {System.out.println ("הודעה שהתקבלה בקבוצת foo:" + הודעה); }

ניתן ליישם מספר מאזינים לנושא, לכל אחד מזהה קבוצתי אחר. יתר על כן, צרכן אחד יכול להאזין להודעות מנושאים שונים:

@KafkaListener (topics = "topic1, topic2", groupId = "foo")

אביב תומך גם באחזור של כותרת הודעה אחת או יותר באמצעות ה- @כּוֹתֶרֶת ביאור במאזין:

@KafkaListener (topics = "topicName") ציבור ריק ריק listenWithHeaders (@Payload מחרוזת הודעה, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) מחיצה int) {System.out.println ("הודעה שהתקבלה:" + הודעה "+" מהמחיצה: "+ מחיצה);}

5.3. צריכת הודעות ממחיצה ספציפית

כפי ששמתם לב, יצרנו את הנושא ביילדונג עם מחיצה אחת בלבד. עם זאת, לנושא עם מספר מחיצות, א @KafkaListener יכול להירשם במפורש למחיצה מסוימת של נושא עם קיזוז ראשוני:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitionOffsets = {@PartitionOffset (partition = "0", initialOffset = "0"), @PartitionOffset (partition = "3", initialOffset = "0")}) , containerFactory = "partitionsKafkaListenerContainerFactory") ציבור ריק ריק listenToPartition (הודעת מחרוזת @Payload, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) מחיצה int) {System.out.println ("הודעה שהתקבלה:" + הודעה "+" מהמחיצה: "+ מחיצה) ;}

מאז initialOffset נשלח ל- 0 במאזין זה, כל ההודעות שנצרכו בעבר מהמחיצות 0 ושלוש ייצרכו מחדש בכל פעם שמאזין זה מאותחל. אם אין צורך בהגדרת הקיזוז, אנו יכולים להשתמש ב- מחיצות רכוש של @TopicPartition ביאור כדי להגדיר רק את המחיצות ללא הקיזוז:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", מחיצות = {"0", "1"}))

5.4. הוספת מסנן הודעות למאזינים

ניתן להגדיר את המאזינים לצרוך סוגים ספציפיים של הודעות על ידי הוספת מסנן מותאם אישית. ניתן לעשות זאת על ידי הגדרת a RecordFilterStrategy אל ה KafkaListenerContainerFactory:

@ שעועית ציבורית ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory () {מפעל ConcurrentKafkaListenerContainerFactory = חדש ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); factory.setRecordFilterStrategy (שיא -> record.value (). מכיל ("עולם")); מפעל החזרה; }

לאחר מכן ניתן להגדיר מאזין להשתמש במפעל המכולות הזה:

@KafkaListener (topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") חלל ציבורי listenWithFilter (הודעת מחרוזת) {System.out.println ("הודעה שהתקבלה במאזין המסונן:" + הודעה); }

במאזין הזה, כל הודעות התואמות את המסנן יימחקו.

6. ממירי הודעות בהתאמה אישית

עד כה כיסינו רק שליחה וקבלת מיתרים כהודעות. עם זאת, אנו יכולים גם לשלוח ולקבל אובייקטים מותאמים אישית של Java. זה דורש הגדרת תצורה של סדרת המתאימים מפיק מפעל ו deserializer ב ConsumerFactory.

בואו נסתכל על שיעור שעועית פשוט, שאנו נשלח כהודעות:

ברכה בכיתה ציבורית {מחרוזת פרטית msg; שם מחרוזת פרטי; // סטנדרטים, סטרים ובונים סטנדרטיים}

6.1. הפקת הודעות בהתאמה אישית

בדוגמה זו נשתמש JsonSerializer. בואו נסתכל על הקוד עבור מפיק מפעל ו KafkaTemplate:

@Pean Public ProducerFactory greetingProducerFactory () {// ... configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); להחזיר DefaultKafkaProducerFactory חדש (configProps); } @Bean ברכה KafkaTemplate ברכה KafkaTemplate () {להחזיר KafkaTemplate חדש (greetingProducerFactory ()); }

זה חדש KafkaTemplate ניתן להשתמש כדי לשלוח את בְּרָכָה הוֹדָעָה:

kafkaTemplate.send (topicName, ברכה חדשה ("שלום", "עולם"));

6.2. צורכת הודעות מותאמות אישית

באופן דומה, בואו ונשנה את ה- ConsumerFactory ו KafkaListenerContainerFactory לנטרל נכון את הודעת הברכה:

@Bean Public ConsumerFactory greetingConsumerFactory () {// ... להחזיר DefaultKafkaConsumerFactory חדש (אביזרים, StringDeserializer חדש (), JsonDeserializer חדש (Greeting.class)); } @ שעועית ברכה ConcurrentKafkaListenerContainerFactory ברכה KafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory מפעל = ConcurrentKafkaListenerContainerFactory חדש) (); factory.setConsumerFactory (greetingConsumerFactory ()); מפעל החזרה; }

האביב- kafka JSON הסידור וההתפשטות משתמש בספריית ג'קסון שהיא גם תלות אופציונלית של maven לפרויקט spring-kafka. אז בואו נוסיף את זה ל pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

במקום להשתמש בגרסה האחרונה של ג'קסון, מומלץ להשתמש בגרסה שנוספה ל- pom.xml של אביב-קפקא.

לבסוף, עלינו לכתוב מאזין שיצרוך בְּרָכָה הודעות:

@KafkaListener (topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") ברכת חלל ציבורית ברכהListener (ברכת ברכה) {// תהליך הודעת ברכה}

7. מסקנה

במאמר זה סקרנו את יסודות התמיכה באביב באפאצ'י קפקא. התבוננו בקצרה בשיעורים המשמשים למשלוח וקבלה של הודעות.

קוד המקור השלם למאמר זה ניתן למצוא באתר GitHub. לפני ביצוע הקוד, אנא ודא ששרת Kafka פועל והנושאים נוצרים באופן ידני.

תחתית התמדה

רק הכרזתי על החדש למד אביב קורס, המתמקד ביסודות האביב 5 ומגף האביב 2:

>> בדוק את הקורס