מבוא למחברי קפקא

1. סקירה כללית

Apache Kafka® היא פלטפורמת סטרימינג מבוזרת. בהדרכה קודמת דנו כיצד ליישם צרכני קפקא ומפיקים באמצעות אביב.

במדריך זה נלמד כיצד להשתמש במחברי קפקא.

נסתכל על:

  • סוגים שונים של מחברי קפקא
  • תכונות ומצבים של Kafka Connect
  • תצורת מחברים באמצעות קבצי מאפיינים כמו גם REST API

2. יסודות מחברי Kafka Connect ו- Kafka

Kafka Connect היא מסגרת לחיבור קפקא למערכות חיצוניות כגון מאגרי מידע, מאגרי ערך מפתח, אינדקסי חיפוש ומערכות קבצים, באמצעות מה שמכונה מחברים.

מחברי קפקא הם רכיבים מוכנים לשימוש, שיכולים לעזור לנו לייבא נתונים ממערכות חיצוניות לנושאי קפקא ו לייצא נתונים מנושאי קפקא למערכות חיצוניות. אנו יכולים להשתמש ביישומי מחברים קיימים עבור מקורות נתונים ושקעים נפוצים או ליישם מחברים משלנו.

א מחבר מקור אוספת נתונים ממערכת. מערכות מקור יכולות להיות בסיסי נתונים שלמים, טבלאות זרמים או מתווכי הודעות. מחבר מקור יכול גם לאסוף מדדים משרתי יישומים לנושאי קפקא, ולהפוך את הנתונים לזמינים לעיבוד זרמים עם זמן אחזור נמוך.

א מחבר כיור מספק נתונים מנושאי קפקא למערכות אחרות, שעשויות להיות אינדקסים כגון Elasticsearch, מערכות אצווה כגון Hadoop או כל סוג של בסיס נתונים.

מחברים מסוימים מתוחזקים על ידי הקהילה, בעוד שאחרים נתמכים על ידי Confluent או שותפיה. באמת, אנו יכולים למצוא מחברים למערכות הפופולריות ביותר, כמו S3, JDBC וקסנדרה, רק כדי שם כמה.

3. תכונות

התכונות של Kafka Connect כוללות:

  • מסגרת לחיבור מערכות חיצוניות עם קפקא - זה מפשט את הפיתוח, הפריסה והניהול של מחברים
  • מצבים מבוזרים ועצמאים - זה עוזר לנו לפרוס אשכולות גדולים על ידי מינוף האופי המבוזר של קפקא, כמו גם הגדרות לפיתוח, בדיקות ופריסות ייצור קטנות
  • ממשק REST - אנו יכולים לנהל מחברים באמצעות REST API
  • ניהול קיזוז אוטומטי - Kafka Connect עוזר לנו להתמודד עם תהליך התחייבות קיזוז, מה שחוסך לנו את הבעיה ליישם ידנית את החלק המועד לטעויות זה בפיתוח המחברים
  • מופץ וניתן להרחבה כברירת מחדל - Kafka Connect משתמש בפרוטוקול ניהול הקבוצה הקיים; אנחנו יכולים להוסיף עובדים נוספים כדי להגדיל את מקבץ Kafka Connect
  • סטרימינג ושילוב אצווה - Kafka Connect הוא פיתרון אידיאלי לגישור בין מערכות סטרימינג ונתוני אצווה בקשר ליכולות הקפות של קפקא.
  • טרנספורמציות - אלה מאפשרות לנו לבצע שינויים פשוטים וקלילים במסרים בודדים

4. התקנה

במקום להשתמש בהפצת Kafka הרגילה, אנו נוריד את Confluent Platform, הפצת Kafka שמספקת Confluent, Inc., החברה שעומדת מאחורי קפקא. פלטפורמת Confluent מגיעה עם כמה כלים ולקוחות נוספים, בהשוואה ל- Kafka רגיל, כמו גם כמה מחברים נוספים שנבנו מראש.

לענייננו, מהדורת הקוד הפתוח מספיקה, אשר ניתן למצוא באתר Confluent.

5. התחל מהיר Kafka Connect

בתור התחלה, נדון בעקרון של קפקא קונקט, באמצעות המחברים הבסיסיים ביותר שלהם, שהם הקובץ מָקוֹר מחבר והקובץ כִּיוֹר מַחבֵּר.

באופן נוח, פלטפורמת Confluent מגיעה עם שני המחברים הללו, כמו גם תצורות הפניה.

5.1. תצורת מחבר המקור

עבור מחבר המקור, תצורת ההתייחסות זמינה בכתובת $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties:

שם = מחבר קובץ מקומי מקומי.קלאס = משימות FileStreamSource.max = נושא אחד = חיבור קובץ בדיקה = test.txt

לתצורה זו יש כמה מאפיינים הנפוצים לכל מחברי המקור:

  • שֵׁם הוא שם שצוין על ידי המשתמש למופע המחבר
  • מחבר.קלאס מציין את מחלקת ההטמעה, בעצם סוג המחבר
  • משימות.מקסימום מציין כמה מקרים של מחבר המקור שלנו צריכים לפעול במקביל, ו-
  • נוֹשֵׂא מגדיר את הנושא אליו צריך המחבר לשלוח את הפלט

במקרה זה, יש לנו גם תכונה ספציפית למחבר:

  • קוֹבֶץ מגדיר את הקובץ ממנו צריך המחבר לקרוא את הקלט

כדי שזה יעבוד אז, בואו ניצור קובץ בסיסי עם תוכן כלשהו:

הד -e "foo \ nbar \ n"> $ CONFLUENT_HOME / test.txt

שים לב שספריית העבודה היא $ CONFLUENT_HOME.

5.2. תצורת מחבר כיור

עבור מחבר הכיור שלנו, נשתמש בתצורת ההתייחסות בכתובת $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties:

שם = מחבר קובץ-כיור מקומי.קלאס = משימות FileStreamSink.max = קובץ אחד = נושאים של test.sink.txt = connect-test

באופן הגיוני, הוא מכיל בדיוק את אותם הפרמטרים, אם כי הפעם מחבר.קלאס מציין את יישום מחבר הכיור, ו- קוֹבֶץ הוא המיקום שבו המחבר צריך לכתוב את התוכן.

5.3. עובד קונפיג

לבסוף, עלינו להגדיר את העובד של Connect, שישלב את שני המחברים שלנו ויעשה את עבודת הקריאה ממחבר המקור והכתיבה למחבר הכיור.

לשם כך, אנו יכולים להשתמש $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties:

bootstrap.servers = localhost: 9092 key.converter = org.apache.kafka.connect.json.JsonConverter value.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable = false.converter. schemas.enable = offset.storage.file.filename = / false.offset offset.flush.interval.ms = 10000 plugin.path = / share / java

ציין זאת plugin.path יכול להחזיק רשימת נתיבים שבהם ניתן להשיג יישומי מחברים

כאשר נשתמש במחברים המשולבים בקפקא, נוכל להגדיר plugin.path ל $ CONFLUENT_HOME / share / java. בעבודה עם Windows, ייתכן שיהיה צורך לספק נתיב מוחלט כאן.

עבור הפרמטרים האחרים, אנו יכולים להשאיר את ערכי ברירת המחדל:

  • bootstrap.servers מכיל את כתובות המתווכים של קפקא
  • key.converter ו value.converter להגדיר מחלקות ממיר, המסדרות ומנתקות את הנתונים כשהן זורמות מהמקור לקפקא ואז מקפקא לכיור
  • key.converter.schemas.enable ו value.converter.schemas.enable הן הגדרות ספציפיות לממיר
  • offset.storage.file.filename היא ההגדרה החשובה ביותר בעת הפעלת Connect במצב עצמאי: היא מגדירה היכן Connect צריך לאחסן את נתוני הקיזוז שלה
  • offset.flush.interval.ms מגדיר את המרווח בו העובד מנסה לבצע קיזוזים למשימות

ורשימת הפרמטרים די בשלה, אז עיין בתיעוד הרשמי לרשימה מלאה.

5.4. Kafka Connect במצב עצמאי

ועם זה, אנו יכולים להתחיל בהגדרת המחברים הראשונה שלנו:

$ CONFLUENT_HOME / bin / connect-standalone \ $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-sink. נכסים

ראשית, אנו יכולים לבדוק את תוכן הנושא באמצעות שורת הפקודה:

$ CONFLUENT_HOME / bin / kafka-console-consumer - bootstrap-server localhost: 9092 - בדיקת חיבור טופיק - מההתחלה

כפי שאנו רואים, מחבר המקור לקח את הנתונים מה- test.txt קובץ, הפך אותו ל- JSON ושלח אותו לקפקא:

{"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "מטען": "סרגל"}

ואם נסתכל על התיקיה $ CONFLUENT_HOME, אנו יכולים לראות את הקובץ test.sink.txt נוצר כאן:

חתול $ CONFLUENT_HOME / test.sink.txt בר foo

כאשר מחבר הכיור מחלץ את הערך מה- מטען תכונה וכותב אותו לקובץ היעד, הנתונים ב test.sink.txt מכיל את תוכן המקור test.txt קוֹבֶץ.

עכשיו בואו נוסיף עוד שורות ל- test.txt.

כשאנחנו עושים זאת, אנו רואים שמחבר המקור מגלה שינויים אלה באופן אוטומטי.

עלינו רק להקפיד להוסיף שורה חדשה בסוף, אחרת מחבר המקור לא יתחשב בשורה האחרונה.

בשלב זה, בואו נעצור את תהליך התחברות, כיוון שנתחיל להתחבר מצב מבוזר בכמה שורות.

6. Connect של REST API

עד עכשיו ביצענו את כל התצורות על ידי העברת קבצי נכסים דרך שורת הפקודה. עם זאת, מכיוון ש- Connect מיועד לפעול כשירות, יש גם REST API זמין.

כברירת מחדל, הוא זמין בכתובת // localhost: 8083. כמה נקודות קצה הן:

  • GET / מחברים - מחזירה רשימה עם כל המחברים בשימוש
  • GET / מחברים / {name} - מחזיר פרטים על מחבר ספציפי
  • POST / מחברים - יוצר מחבר חדש; גוף הבקשה צריך להיות אובייקט JSON המכיל שדה שם מחרוזת ושדה תצורת אובייקט עם פרמטרי תצורת המחבר
  • GET / מחברים / {name} / סטטוס - מחזיר את המצב הנוכחי של המחבר - כולל אם הוא פועל, נכשל או מושהה - לאיזה עובד הוא מוקצה, מידע על שגיאות אם הוא נכשל ומצב כל המשימות שלו
  • מחק / מחברים / {name} מוחק מחבר, מפסיק בחן את כל המשימות ומחיקת תצורתו
  • תוספי GET / מחברים - מחזירה רשימה של תוספי מחברים המותקנים באשכול Kafka Connect

התיעוד הרשמי מספק רשימה עם כל נקודות הקצה.

נשתמש ב- REST API ליצירת מחברים חדשים בסעיף הבא.

7. Kafka Connect במצב מבוזר

המצב העצמאי עובד בצורה מושלמת לפיתוח ובדיקה, כמו גם להגדרות קטנות יותר. עם זאת, אם אנו רוצים לעשות שימוש מלא באופי המבוזר של קפקא, עלינו להפעיל את Connect במצב מבוזר.

על ידי כך, הגדרות המחברים ומטא נתונים מאוחסנים בנושאי קפקא במקום מערכת הקבצים. כתוצאה מכך, צמתים העובדים הם באמת חסרי מדינה.

7.1. מתחילים להתחבר

תצורת הפניה למצב מבוזר ניתן למצוא ב- $ CONFLUENT_HOME/etc/kafka/connect-distributed.properties.

הפרמטרים זהים לרוב למצב עצמאי. יש רק כמה הבדלים:

  • group.id מגדיר את השם של קבוצת אשכולות Connect. הערך חייב להיות שונה מכל מזהה של קבוצת צרכנים
  • offset.storage.topic, config.storage.topic ו status.storage.topic הגדר נושאים להגדרות אלה. עבור כל נושא, אנו יכולים גם להגדיר גורם שכפול

שוב, התיעוד הרשמי מספק רשימה עם כל הפרמטרים.

אנו יכולים להתחיל להתחבר במצב מבוזר באופן הבא:

$ CONFLUENT_HOME / bin / connect מופץ $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

7.2. הוספת מחברים באמצעות ממשק ה- API של REST

כעת, בהשוואה לפקודת ההפעלה העצמאית, לא העברנו תצורות מחברים כארגומנטים. במקום זאת, עלינו ליצור את המחברים באמצעות REST API.

כדי להגדיר את הדוגמה שלנו מבעבר, עלינו לשלוח שתי בקשות POST אל // localhost: 8083 / מחברים המכיל את המבנים הבאים של JSON.

ראשית, עלינו ליצור גוף עבור מחבר המקור POST כקובץ JSON. הנה, נקרא לזה connect-file-source.json:

{"name": "source-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-distribution.txt", "topic ":" מופץ לחיבור "}}

שים לב איך זה נראה די דומה לקובץ תצורת ההתייחסות בו השתמשנו בפעם הראשונה.

ואז אנחנו מפרסמים את זה:

תלתל -d @ "$ CONFLUENT_HOME / connect-file-source.json" \ -H "סוג תוכן: יישום / json" \ -X POST // localhost: 8083 / connectors

לאחר מכן, נעשה את אותו הדבר עבור מחבר הכיור, ונקרא לקובץ connect-file-sink.json:

{"name": "local-file-sink", "config": {"connector.class": "FileStreamSink", "Tasks.max": 1, "file": "test-distribution.sink.txt", "topics": "מופץ באמצעות קשר"}}

ובצע את ההודעה כמו קודם:

תלתל -d @ $ CONFLUENT_HOME / connect-file-sink.json \ -H "סוג תוכן: יישום / json" \ -X POST // localhost: 8083 / connectors

במידת הצורך נוכל לוודא שההתקנה הזו פועלת כהלכה:

$ CONFLUENT_HOME / bin / kafka-console-consumer - bootstrap-server localhost: 9092 --topic connect-distribution - from-beginning {"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "payload": "bar"}

ואם נסתכל על התיקיה $ CONFLUENT_HOMEאנו יכולים לראות את הקובץ מבחן מופץ.סינק.טקסט נוצר כאן:

חתול $ CONFLUENT_HOME / test-distribution.sink.txt סרגל foo

לאחר שבדקנו את ההתקנה המבוזרת, בואו ננקה על ידי הסרת שני המחברים:

curl -X DELETE // localhost: 8083 / connectors / local-file-source curl -X DELETE // localhost: 8083 / connectors / local-file-sink

8. שינוי נתונים

8.1. תמורות נתמכות

טרנספורמציות מאפשרות לנו לבצע שינויים פשוטים וקלילים במסרים בודדים.

Kafka Connect תומך בתמורות המובנות הבאות:

  • InsertField - הוסף שדה באמצעות נתונים סטטיים או מטה-נתונים רשומות
  • ReplaceField - סנן או שנה שם של שדות
  • מסקפילד - החלף שדה בערך null חוקי עבור הסוג (אפס או מחרוזת ריקה, למשל)
  • HoistField - עוטפים את האירוע כולו כשדה יחיד בתוך מבנה או מפה
  • ExtractField - חילץ שדה ספציפי ממבנה ומפה וכלל רק שדה זה בתוצאות
  • SetSchemaMetadata - שנה את שם או גרסת הסכימה
  • חותמת זמן - שנה את נושא הרשומה על סמך הנושא המקורי וחותמת הזמן
  • RegexRouter - שנה את נושא הרשומה על סמך הנושא המקורי, מחרוזת חלופית וביטוי רגולרי

שינוי מוגדר באמצעות הפרמטרים הבאים:

  • הופכת - רשימת כינויים מופרדים בפסיקים לתמורות
  • הופך. $ alias.type - שם כיתה לשינוי
  • טרנספורמציה. $ כינוי. $ transformationSpecificConfig - תצורה לשינוי המתאים

8.2. יישום שנאי

כדי לבדוק כמה תכונות טרנספורמציה, נקבע את שתי התמורות הבאות:

  • ראשית, בואו נעטוף את כל ההודעה כמבנה JSON
  • לאחר מכן, בואו להוסיף שדה למבנה זה

לפני החלת הטרנספורמציות שלנו, עלינו להגדיר את Connect לשימוש ב- JSON חסרת סכמות, על ידי שינוי ה- connect-distributed.properties:

key.converter.schemas.enable = ערך כוזב.converter.schemas.enable = false

לאחר מכן, עלינו להפעיל מחדש את Connect, שוב במצב מבוזר:

$ CONFLUENT_HOME / bin / connect מופץ $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

שוב, עלינו ליצור גוף עבור מחבר המקור POST כקובץ JSON. הנה, נקרא לזה connect-file-source-transform.json.

מלבד הפרמטרים הידועים כבר, אנו מוסיפים כמה שורות לשתי הטרנספורמציות הנדרשות:

{"name": "source-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-transformation.txt", "topic ":" connect-transformation "," transforms ":" MakeMap, InsertSource "," transforms.MakeMap.type ":" org.apache.kafka.connect.transforms.HoistField $ Value "," transforms.MakeMap.field ": "קו", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField $ Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value ":" test-file-source "}}

לאחר מכן, בואו לבצע את ההודעה:

סלסול -d @ $ CONFLUENT_HOME / connect-file-source-transform.json \ -H "סוג תוכן: יישום / json" \ -X POST // localhost: 8083 / connectors

בואו נכתוב כמה שורות שלנו test-transformation.txt:

פו בר

אם נבדוק כעת את חיבור-טרנספורמציה הנושא, עלינו לקבל את השורות הבאות:

{"line": "Foo", "data_source": "test-file-source"} {"line": "Bar", "data_source": "test-file-source"}

9. שימוש במחברים מוכנים

לאחר השימוש במחברים הפשוטים הללו, בואו נסתכל על מחברים מוכנים יותר לשימוש, וכיצד להתקין אותם.

9.1. היכן למצוא מחברים

מחברים שנבנו מראש זמינים ממקורות שונים:

  • כמה מחברים מחוברים עם אפאצ'י קפקא רגיל (מקור וכיור לקבצים ולמסוף)
  • כמה מחברים נוספים משולבים עם פלטפורמת Confluent (ElasticSearch, HDFS, JDBC ו- AWS S3)
  • בדוק גם את Confluent Hub, שהוא סוג של חנות אפליקציות עבור מחברי קפקא. מספר המחברים המוצעים גדל ברציפות:
    • מחברים מחוברים (שפותחו, נבדקו, תועדו ונתמכים באופן מלא על ידי Confluent)
    • מחברים מוסמכים (מיושמים על ידי צד שלישי ומאושרים על ידי Confluent)
    • מחברים מפותחים ונתמכים על ידי הקהילה
  • מעבר לכך, Confluent מספק גם דף מחברים, עם כמה מחברים שניתן להשיג גם ב- Hub Confluent, אך גם עם כמה מחברים נוספים לקהילה.
  • ולבסוף, ישנם גם ספקים, המספקים מחברים כחלק מהמוצר שלהם. לדוגמה, Landoop מספקת ספריית סטרימינג בשם עדשות, המכילה גם סט של ~ 25 מחברי קוד פתוח (רבים מהם מופיעים ברשימה במקומות אחרים)

9.2. התקנת מחברים מ- Confluent Hub

הגרסה הארגונית של Confluent מספקת סקריפט להתקנת מחברים ורכיבים אחרים מ- Confluent Hub (הסקריפט אינו כלול בגרסת הקוד הפתוח). אם אנו משתמשים בגרסת הארגון, אנו יכולים להתקין מחבר באמצעות הפקודה הבאה:

$ CONFLUENT_HOME / bin / confluent-hub להתקין confluentinc / kafka-connect-mqtt: 1.0.0-preview

9.3. התקנת מחברים באופן ידני

אם אנו זקוקים למחבר שאינו זמין ב- Confluent Hub או אם יש לנו את גרסת הקוד הפתוח של Confluent, אנו יכולים להתקין את המחברים הנדרשים באופן ידני. לשם כך, עלינו להוריד ולפתוח את המחבר ולהעביר את הספריות הכלולות לתיקיה שצוינה כ- plugin.path.

עבור כל מחבר, הארכיון צריך להכיל שתי תיקיות מעניינות עבורנו:

  • ה lib התיקייה מכילה את צנצנת המחברים, למשל, kafka-connect-mqtt-1.0.0-preview.jarוכן כמה צנצנות נוספות הנדרשות על ידי המחבר
  • ה וכו התיקייה מחזיקה קובץ תצורת התייחסות אחד או יותר

עלינו להזיז את lib תיקיה אל $ CONFLUENT_HOME / share / java, או איזה נתיב שציינו כ plugin.path ב connect- standalone.properties ו connect-distributed.properties. בכך, זה עשוי גם להיות הגיוני לשנות את שם התיקיה למשהו בעל משמעות.

אנו יכולים להשתמש בקבצי התצורה מ- וכו או על ידי הפניה אליהם בזמן שמתחילים במצב עצמאי, או שאנחנו יכולים פשוט לתפוס את המאפיינים וליצור מהם קובץ JSON.

10. מסקנה

במדריך זה בדקנו כיצד להתקין ולהשתמש ב- Kafka Connect.

בדקנו סוגי מחברים, הן מקור והן כיור. בדקנו גם כמה תכונות ומצבים שבהם Connect יכול להפעיל. לאחר מכן סקרנו שנאים. ולבסוף, למדנו היכן להשיג וכיצד להתקין מחברים מותאמים אישית.

כמו תמיד, ניתן למצוא את קבצי התצורה ב- GitHub.