בניית צינור נתונים עם פלינק וקפקא

1. סקירה כללית

Apache Flink היא מסגרת לעיבוד זרמים שניתן להשתמש בה בקלות עם Java. אפאצ'י קפקא היא מערכת לעיבוד זרמים מבוזרים התומכת בסובלנות תקלות גבוהה.

במדריך זה אנו נבחן כיצד לבנות צינור נתונים באמצעות שתי הטכנולוגיות הללו.

2. התקנה

להתקנה ולהגדרת התצורה של Apache Kafka, עיין במדריך הרשמי. לאחר ההתקנה נוכל להשתמש בפקודות הבאות כדי ליצור את הנושאים החדשים שנקראים flink_input ו flink_output:

 bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ - גורם שכפול 1 - מחיצות 1 \ - נושא flink_output bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ - שכפול גורם 1 - מחיצות 1 \ - נושא flink_input

לצורך הדרכה זו, נשתמש בתצורת ברירת מחדל וביציאות ברירת מחדל עבור Apache Kafka.

3. שימוש בצליבה

אפאצ'י פלינק מאפשרת טכנולוגיית עיבוד זרם בזמן אמת. המסגרת מאפשרת שימוש במספר מערכות של צד שלישי כמקורות זרם או כיורים.

ב- Flink - ישנם מחברים שונים:

  • אפאצ'י קפקא (מקור / כיור)
  • אפאצ'י קסנדרה (כיור)
  • זרמי קינזיס של אמזון (מקור / כיור)
  • חיפוש אלסטי (כיור)
  • Hadoop FileSystem (כיור)
  • RabbitMQ (מקור / כיור)
  • Apache NiFi (מקור / כיור)
  • ממשק API של הזרמת טוויטר (מקור)

כדי להוסיף את Flink לפרויקט שלנו, עלינו לכלול את התלות הבאה של Maven:

 org.apache.flink flink-core 1.5.0 org.apache.flink flink-connector-kafka-0.11_2.11 1.5.0 

הוספת תלות אלה תאפשר לנו לצרוך ולייצר לנושאי קפקא וממנה. תוכל למצוא את הגרסה הנוכחית של Flink ב- Maven Central.

4. צרכן מחרוזת קפקא

כדי לצרוך נתונים מקפה עם Flink עלינו לספק נושא וכתובת קפקא. עלינו לספק מזהה קבוצתי שישמש לקיזוז קיזוז כך שלא תמיד נקרא את כל הנתונים מההתחלה.

בואו ניצור שיטה סטטית שתעשה את היצירה של FlinkKafka צרכנים קל יותר:

סטטי ציבורי FlinkKafkaConsumer011 createStringConsumerForTopic (נושא מחרוזת, מחרוזת kafkaAddress, מחרוזת kafkaGroup) {מאפיינים אביזרים = מאפיינים חדשים (); props.setProperty ("bootstrap.servers", kafkaAddress); props.setProperty ("group.id", kafkaGroup); FlinkKafkaConsumer011 צרכן = FlinkKafkaConsumer011 חדש (נושא, SimpleStringSchema חדש (), אביזרים); צרכן חוזר; }

שיטה זו אורכת א נושא, kafka כתובת, ו קבקהקבוצה ויוצר את FlinkKafka צרכנים שיצרוך נתונים מהנושא הנתון כ- חוּט מאז השתמשנו SimpleStringSchema לפענוח נתונים.

המספר 011 בשם הכיתה מתייחס לגרסת קפקא.

5. מפיק מיתרי קפקא

כדי לייצר נתונים לקפקא, עלינו לספק את כתובת הקפה והנושא בו אנו רוצים להשתמש. שוב, אנו יכולים ליצור שיטה סטטית שתעזור לנו ליצור מפיקים לנושאים שונים:

ציבורי סטטי ציבורי FlinkKafkaProducer011 createStringProducer (נושא מחרוזת, מחרוזת kafkaAddress) {להחזיר FlinkKafkaProducer011 חדש (kafkaAddress, נושא, SimpleStringSchema חדש ()); }

שיטה זו נדרשת בלבד נוֹשֵׂא ו קפה כתובת כטיעונים מכיוון שאין צורך לספק מזהה קבוצתי כאשר אנו מייצרים את נושא קפקא.

6. עיבוד זרם מחרוזות

כשיש לנו צרכן ומפיק עובד לחלוטין, אנחנו יכולים לנסות לעבד נתונים מקפקא ואז לשמור את התוצאות בחזרה לקפקא. את רשימת הפונקציות המלאה בהן ניתן להשתמש לעיבוד זרמים, תוכלו למצוא כאן.

בדוגמה זו, אנו הולכים לנצל מילים בכל ערך קפקא ואז להחזיר אותו לקפקא.

למטרה זו עלינו ליצור מנהג MapFunction:

WordsCapitalizer בכיתה ציבורית מיישמת את MapFunction {@Override public String map (String s) {return s.toUpperCase (); }}

לאחר יצירת הפונקציה נוכל להשתמש בה בעיבוד זרם:

חלל סטטי ציבורי באותיות רישיות () {String inputTopic = "flink_input"; מחרוזת outputTopic = "flink_output"; מחרוזת consumerGroup = "baeldung"; כתובת מחרוזת = "localhost: 9092"; StreamExecutionEnvironment סביבה = StreamExecutionEnvironment .getExecutionEnvironment (); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic (inputTopic, address, consumerGroup); DataStream stringInputStream = סביבה .addSource (flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer (outputTopic, address); stringInputStream .map (WordsCapitalizer חדש ()) .addSink (flinkKafkaProducer); }

היישום יקרא נתונים מה- flink_input הנושא, בצע פעולות בזרם ואז שמור את התוצאות ל- flink_output נושא בקפקא.

ראינו כיצד להתמודד עם מיתרים באמצעות Flink ו- Kafka. אך לעיתים קרובות נדרש לבצע פעולות על אובייקטים מותאמים אישית. נראה כיצד לעשות זאת בפרקים הבאים.

7. עריקת אובייקט בהתאמה אישית

המחלקה הבאה מייצגת מסר פשוט עם מידע אודות השולח והנמען:

@JsonSerialize InputMessage בכיתה ציבורית {שולח מחרוזת; מקבל מחרוזת; LocalDateTime sentAt; הודעת מחרוזת; }

בעבר השתמשנו SimpleStringSchema לבטל את הערכת המסרים מקפקא, אבל עכשיו ברצוננו לנתק מחדש נתונים ישירות לאובייקטים מותאמים אישית.

לשם כך אנו זקוקים למנהג עריקת ערכים סכמה:

מחלקה ציבורית InputMessageDeserializationSchema מיישמת DeserializationSchema {static ObjectMapper objectMapper = ObjectMapper חדש () .registerModule (JavaTimeModule חדש ()); @Override InputMessage הציבור deserialize (בתים [] בתים) זורק IOException {return objectMapper.readValue (בתים, InputMessage.class); } @Override בוליאני ציבורי isEndOfStream (InputMessage inputMessage) {return false; } @Override ציבורי TypeInformation getProducedType () {החזר TypeInformation.of (InputMessage.class); }}

אנו מניחים כאן שההודעות נשמרות כ- JSON בקפקא.

מכיוון שיש לנו שדה מסוג LocalDateTime, עלינו לציין את JavaTimeModule, אשר דואג למיפוי LocalDateTime מתנגד ל- JSON.

לסכמות של פלינק לא יכולות להיות שדות שאינם ניתנים לסידור מכיוון שכל המפעילים (כמו סכימות או פונקציות) מסודרים בתחילת העבודה.

יש בעיות דומות ב- Apache Spark. אחד התיקונים הידועים לבעיה זו הוא אתחול שדות כ- סטָטִי, כמו שעשינו עם ObjectMapper מֵעַל. זה לא הפיתרון הכי יפה, אבל זה יחסית פשוט ועושה את העבודה.

השיטה isEndOfStream יכול לשמש למקרה המיוחד כאשר יש לעבד את הזרם רק עד לקבלת נתונים ספציפיים. אבל זה לא נחוץ במקרה שלנו.

8. סדרת אובייקטים בהתאמה אישית

עכשיו, נניח שאנחנו רוצים שהמערכת שלנו תהיה אפשרות ליצור גיבוי של הודעות. אנו רוצים שהתהליך יהיה אוטומטי, וכל גיבוי צריך להיות מורכב מהודעות שנשלחו במשך יום שלם אחד.

כמו כן, על הודעת גיבוי להיות מוקצה מזהה ייחודי.

לצורך כך אנו יכולים ליצור את הכיתה הבאה:

גיבוי מחלקה ציבורית {@JsonProperty ("inputMessages") רשימת inputMessages; @JsonProperty ("BackupTimestamp") LocalDateTime backupTimestamp; @JsonProperty ("uuid") UUID uuid; גיבוי ציבורי (רשימת inputMessages, LocalDateTime backupTimestamp) {this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID (); }}

שימו לב שמנגנון יצירת ה- UUID אינו מושלם מכיוון שהוא מאפשר כפילויות. עם זאת, זה מספיק להיקף של דוגמה זו.

אנחנו רוצים להציל את שלנו גיבוי להתנגד כ- JSON לקפקא, ולכן עלינו ליצור את שלנו סידור סכימה:

מחלקה ציבורית BackupSerializationSchema מיישמת SerializationSchema {ObjectMapper objectMapper; לוגר לוגר = LoggerFactory.getLogger (BackupSerializationSchema.class); @ ביטול ציבורי ביתי [] סידורי (Backup backupMessage) {if (objectMapper == null) {objectMapper = ObjectMapper חדש () .registerModule (JavaTimeModule חדש ()); } נסה {return objectMapper.writeValueAsString (backupMessage) .getBytes (); } לתפוס (com.fasterxml.jackson.core.JsonProcessingException e) {logger.error ("נכשל הניסיון לנתח את JSON", e); } להחזיר בתים חדשים [0]; }}

9. הודעות חותמת זמן

מכיוון שאנחנו רוצים ליצור גיבוי לכל ההודעות בכל יום, ההודעות צריכות חותמת זמן.

פלינק מספק את שלושת מאפייני הזמן השונים EventTime, ProcessingTime, ו IngestionTime.

במקרה שלנו, עלינו להשתמש בזמן בו נשלחה ההודעה, לכן נשתמש זמן אירוע.

להשתמש זמן אירועאנחנו צריכים חותמת זמן מקצה שיוציאו חותמות זמן מנתוני הקלט שלנו:

class class InputMessageTimestampAssigner מיישם AssignerWithPunctuatedWatermarks {@Override public long extractTimestamp (InputMessage element, long previousElementTimestamp) {ZoneId zoneId = ZoneId.systemDefault (); return element.getSentAt (). atZone (zoneId) .toEpochSecond () * 1000; } @ Nullable @ Override check Watermark checkAndGetNextWatermark (InputMessage lastElement, long extractedTimestamp) {להחזיר סימן מים חדש (extractedTimestamp - 1500); }}

עלינו לשנות את שלנו LocalDateTime ל EpochSecond מכיוון שזה הפורמט שצפויה פלינק. לאחר הקצאת חותמות זמן, כל הפעולות המבוססות על זמן ישתמשו זמן מ- sentAt שדה להפעלה.

מאז Flink מצפה חותמות זמן להיות באלפיות השנייה toEpochSecond () מחזיר זמן בשניות שהיינו צריכים להכפיל אותו ב- 1000, כך Flink ייצור חלונות כהלכה.

פלינק מגדיר את המושג א סימן מים. סימני מים שימושיים במקרה של נתונים שלא מגיעים לפי סדר שליחתם. סימן מים מגדיר את האיחור המרבי שמותר לעבד אלמנטים.

אלמנטים שעליהם חותמות זמן נמוכות יותר מסימן המים לא יעובדו כלל.

10. יצירת זמן חלונות

כדי להבטיח שהגיבוי שלנו אוסף רק הודעות שנשלחו במהלך יום אחד, אנו יכולים להשתמש ב- timeWindowAll שיטה בזרם, שתפצל הודעות לחלונות.

עם זאת, עדיין נצטרך לצבור הודעות מכל חלון ולהחזיר אותן כ- גיבוי.

לשם כך נצטרך מותאם אישית AggregateFunction:

מחלקה ציבורית BackupAggregator מיישמת את AggregateFunction {@ הרסה ציבורית ציבורי createAccumulator () {להחזיר ArrayList חדש (); } @ הוספה על רשימה ציבורית ציבורית (InputMessage inputMessage, List inputMessages) {inputMessages.add (inputMessage); להחזיר inputMessages; } @ Override גיבוי ציבורי getResult (רשימת inputMessages) {החזר גיבוי חדש (inputMessages, LocalDateTime.now ()); } @Override רשימת מיזוגים ציבורית (רשימה inputMessages, רשימה acc1) {inputMessages.addAll (acc1); return inputMessages; }}

11. גיבויים מצטברים

לאחר הקצאת חותמות זמן מתאימות ויישום שלנו AggregateFunctionנוכל סוף סוף לקחת את קלט הקפקא שלנו ולעבד אותו:

ריק סטטי ציבורי createBackup () זורק חריג {String inputTopic = "flink_input"; מחרוזת outputTopic = "flink_output"; מחרוזת consumerGroup = "baeldung"; מחרוזת kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment הסביבה = StreamExecutionEnvironment.getExecutionEnvironment (); environment.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer (inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest (); flinkKafkaConsumer.assignTimestampsAndWatermarks (InputMessageTimestampAssigner חדש ()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer (outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource (flinkKafkaConsumer); inputMessagesStream .timeWindowAll (Time.hours (24)) .aggregate (BackupAggregator חדש ()) .addSink (flinkKafkaProducer); environment.execute (); }

12. מסקנה

במאמר זה הצגנו כיצד ליצור צינור נתונים פשוט עם Apache Flink ו- Apache Kafka.

כמו תמיד, ניתן למצוא את הקוד ב- Github.


$config[zx-auto] not found$config[zx-overlay] not found