מבוא ל- KafkaStreams בג'אווה

1. סקירה כללית

במאמר זה נבחן את ה- KafkaStreams סִפְרִיָה.

KafkaStreams מתוכנן על ידי יוצרי אפאצ'י קפקא. המטרה העיקרית של תוכנה זו היא לאפשר למתכנתים ליצור יישומי סטרימינג יעילים בזמן אמת, שיכולים לעבוד כמיקרו-שירותים.

KafkaStreams מאפשר לנו לצרוך מנושאי קפקא, לנתח או לשנות נתונים, ואולי לשלוח אותם לנושא קפקא אחר.

להדגים KafkaStreams, ניצור יישום פשוט הקורא משפטים מתוך נושא, סופר מופעים של מילים ומדפיס את הספירה למילה.

חשוב לציין כי ה- KafkaStreams הספרייה אינה תגובתית ואין לה תמיכה בפעולות אסינכרון וטיפול בלחץ אחורי.

2. תלות של Maven

כדי להתחיל לכתוב לוגיקה בעיבוד זרם באמצעות KafkaStreams, עלינו להוסיף תלות ב קפקה-נחלים ו לקוחות לקפה:

 org.apache.kafka kafka-streams 1.0.0 org.apache.kafka kafka-clients 1.0.0 

עלינו להתקין ולהתחיל את Apache Kafka מכיוון שנשתמש בנושא קפקא. נושא זה יהווה את מקור הנתונים עבור עבודת הסטרימינג שלנו.

אנו יכולים להוריד קפקא ותלות נדרשת אחרת מהאתר הרשמי.

3. קביעת תצורה של קלט KafkaStreams

הדבר הראשון שנעשה הוא הגדרת נושא קפקא.

אנחנו יכולים להשתמש ב- מחוברים כלי שהורדנו - הוא מכיל שרת קפקא. הוא מכיל גם את קפקא-קונסולה-מפיק שנוכל להשתמש בהם כדי לפרסם הודעות לקפקא.

כדי להתחיל בוא נפעיל את אשכול קפקא שלנו:

./ התחלה מקיפה

לאחר הפעלת קפקא, אנו יכולים להגדיר באמצעותם את מקור הנתונים ואת שם היישום שלנו APPLICATION_ID_CONFIG:

מחרוזת inputTopic = "inputTopic";
מאפיינים streamsConfiguration = מאפיינים חדשים (); streamsConfiguration.put (StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

פרמטר תצורה מכריע הוא BOOTSTRAP_SERVER_CONFIG. זוהי כתובת האתר למופע קפקא המקומי שרק התחלנו:

פרטי מחרוזת bootstrapServers = "localhost: 9092"; streamsConfiguration.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

בשלב הבא עלינו להעביר את סוג המפתח והערך של ההודעות שיצרכו מהם inputTopic:

streamsConfiguration.put (StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put (StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ());

לעיתים קרובות עיבוד זרמים הוא סטטיסטי. כאשר אנו רוצים לשמור תוצאות ביניים, עלינו לציין את ה- STATE_DIR_CONFIG פָּרָמֶטֶר.

בבדיקה שלנו אנו משתמשים במערכת קבצים מקומית:

streamsConfiguration.put (StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory (). getAbsolutePath ()); 

4. בניית טופולוגיית סטרימינג

לאחר שהגדרנו את נושא הקלט שלנו, נוכל ליצור טופולוגיית סטרימינג - כלומר הגדרה כיצד יש לטפל ולהפוך אירועים.

בדוגמה שלנו נרצה ליישם מונה מילים. על כל משפט שנשלח אל inputTopic, אנו רוצים לפצל אותה למילים ולחשב את המופע של כל מילה.

אנו יכולים להשתמש במופע של ה- KStreamsBuilder בכיתה להתחיל בבניית הטופולוגיה שלנו:

בונה KStreamBuilder = KStreamBuilder חדש (); KStream textLines = builder.stream (inputTopic); תבנית תבנית = תבנית.קומפילציה ("\ W +", תבנית.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues ​​(value -> Arrays.asList (pattern.split (value.toLowerCase ()))) .groupBy ((key, word) -> word) .count ();

כדי ליישם ספירת מילים, ראשית, עלינו לפצל את הערכים באמצעות הביטוי הרגולרי.

שיטת הפיצול מחזירה מערך. אנו משתמשים ב- flatMapValues ​​() לשטח אותו. אחרת, בסופו של דבר תהיה לנו רשימת מערכים, וזה לא יהיה נוח לכתוב קוד באמצעות מבנה כזה.

לבסוף, אנו צוברים את הערכים עבור כל מילה וקוראים ל לספור() שיחשב הופעות של מילה ספציפית.

5. טיפול בתוצאות

כבר חישבנו את ספירת המילים של הודעות הקלט שלנו. עכשיו בואו נדפיס את התוצאות על הפלט הסטנדרטי באמצעות ה- לכל אחד() שיטה:

wordCounts .foreach ((w, c) -> System.out.println ("word:" + w + "->" + c));

בייצור, לעתים קרובות עבודת סטרימינג כזו עשויה לפרסם את הפלט לנושא אחר של קפקא.

נוכל לעשות זאת באמצעות ה- לשיטה ():

מחרוזת outputTopic = "outputTopic"; Serde stringSerde = Serdes.String (); Serde longSerde = Serdes.Long (); wordCounts.to (stringSerde, longSerde, outputTopic);

ה סרדה מחלקה נותנת לנו סידורי סדר מוגדרים מראש לסוגי Java שישמשו לסידור אובייקטים למערך בתים. מערך הבתים יישלח לנושא קפקא.

אנחנו משתמשים חוּט כמפתח לנושא שלנו ארוך כערך לספירה בפועל. ה ל() השיטה תשמור את הנתונים שהתקבלו ל- outputTopic.

6. התחלת ג'ק קפקא סטרים

עד לנקודה זו בנינו טופולוגיה שניתן לבצע. עם זאת, העבודה טרם התחילה.

עלינו להתחיל את עבודתנו במפורש על ידי התקשרות ל הַתחָלָה() שיטה על KafkaStreams למשל:

זרמי KafkaStreams = KafkaStreams חדשים (builder, streamsConfiguration); streams.start (); Thread.sleep (30000); streams.close ();

שים לב שאנחנו מחכים 30 שניות לסיום העבודה. בתרחיש של העולם האמיתי, העבודה הזו תפעל כל הזמן ותעבד אירועים מקפקא בהגיעם.

אנו יכולים לבדוק את עבודתנו על ידי פרסום כמה אירועים לנושא קפקא.

נתחיל א קפקא-קונסולה-מפיק ולשלוח ידנית כמה אירועים לאתר שלנו inputTopic:

./kafka-console-producer --topic inputTopic - broker-list localhost: 9092> "זה פוני"> "זה סוס ופוני" 

בדרך זו פרסמנו לקפקא שני אירועים. האפליקציה שלנו תצרוך אירועים אלה ותדפיס את הפלט הבא:

מילה: -> מילה אחת: זו -> מילה אחת: היא -> מילה אחת: a -> מילה אחת: פוני -> מילה אחת: -> 2 מילה: זו -> 2 מילה: היא -> 2 מילה: a - > 2 מילה: סוס -> מילה אחת: ו -> מילה אחת: פוני -> 2

אנו יכולים לראות שכשההודעה הראשונה הגיעה, המילה פוני התרחש רק פעם אחת. אבל כששלחנו את ההודעה השנייה, את המילה פוני קרה בפעם השנייה בהדפסה: "word: pony -> 2 ″.

6. מסקנה

מאמר זה דן כיצד ליצור יישום עיבוד זרם ראשי באמצעות Apache Kafka כמקור נתונים ו- KafkaStreams הספרייה כספריית עיבוד הזרמים.

ניתן למצוא את כל הדוגמאות וקטעי הקוד בפרויקט GitHub - זהו פרויקט Maven, כך שיהיה קל לייבא ולהפעיל אותו כפי שהוא.


$config[zx-auto] not found$config[zx-overlay] not found