ETL עם זרימת נתוני ענן אביב

1. סקירה כללית

זרימת הנתונים של ענן האביב היא ערכת כלים מקורית בענן לבניית צינורות נתונים בזמן אמת ותהליכי אצווה. זרימת נתונים של Spring Cloud מוכנה לשימוש למגוון מקרי שימוש בעיבוד נתונים כמו ייבוא ​​/ ייצוא פשוט, עיבוד ETL, הזרמת אירועים וניתוח ניבוי.

במדריך זה נלמד דוגמא של חילוץ טרנספורמציה וטעינה בזמן אמת (ETL) באמצעות צינור זרם המוציא נתונים ממאגר JDBC, הופך אותם ל- POJO פשוטים וטוען אותם ל- MongoDB.

2. עיבוד ETL וזרם אירועים

ETL - חילוץ, שינוי וטעינה - נקרא בדרך כלל תהליך שמטען אצווה נתונים ממספר מסדי נתונים ומערכות למחסן נתונים משותף. במחסן נתונים זה ניתן לבצע עיבוד ניתוח נתונים כבד מבלי לפגוע בביצועים הכוללים של המערכת.

עם זאת, מגמות חדשות משנות את הדרך בה נעשה זאת. ל- ETL יש עדיין תפקיד בהעברת נתונים למחסני נתונים ואגמי נתונים.

כיום ניתן לעשות זאת באמצעות זורם בארכיטקטורת זרם אירועים בעזרת זרימת נתונים של Spring Cloud.

3. זרימת נתונים בענן האביב

בעזרת Spring Cloud Data Flow (SCDF), מפתחים יכולים ליצור צינורות נתונים בשני טעמים:

  • יישומי זרם בזמן אמת לאורך זמן באמצעות Spring Cloud Stream
  • יישומי משימות קצרי מועד באורך זמן באמצעות משימת Spring Cloud

במאמר זה נסקור את אפליקציית הסטרימינג ארוכת הטווח המבוססת על Spring Cloud Stream.

3.1. יישומי זרם ענן אביב

צינורות הזרם של SCDF מורכבים משלבים, איפהכל שלב הוא יישום הבנוי בסגנון Spring Boot באמצעות מיקרו-מסגרת Spring Cloud Stream. יישומים אלה משולבים על ידי תוכנת אמצעי העברת הודעות כמו אפאצ'י קפקא או RabbitMQ.

יישומים אלה מסווגים למקורות, מעבדים וכיורים. בהשוואה לתהליך ETL, נוכל לומר שהמקור הוא ה"תמצית ", המעבד הוא ה"שנאי" והכיור הוא החלק ה"עומס ".

במקרים מסוימים, אנו יכולים להשתמש במתחיל יישום בצעד אחד או יותר של הצינור. משמעות הדבר היא שלא נצטרך ליישם יישום חדש לשלב, אלא במקום זאת, להגדיר מתנע יישומים קיים שכבר יושם.

רשימת התחלות יישומים תוכל למצוא כאן.

3.2. אביב שרת זרימת נתונים בענן

החלק האחרון בארכיטקטורה הוא שרת זרימת הנתונים של Spring Cloud. שרת SCDF מבצע את פריסת היישומים ואת זרם הצינור באמצעות מפרט Spring Cloud Deployer. מפרט זה תומך בטעם יליד הענן SCDF על ידי פריסה למגוון זמני ריצה מודרניים, כגון Kubernetes, Apache Mesos, Yarn ו- Cloud Foundry.

כמו כן, אנו יכולים להפעיל את הזרם כפריסה מקומית.

מידע נוסף על ארכיטקטורת SCDF ניתן למצוא כאן.

4. הגדרת סביבה

לפני שנתחיל, אנחנו צריכים בחר את החלקים של פריסה מורכבת זו. החלק הראשון שהגדיר הוא שרת SCDF.

לבדיקה, נשתמש ב- SCDF Server Local לפיתוח מקומי. עבור פריסת הייצור, נוכל מאוחר יותר לבחור זמן ריצה מקורי לענן, כמו SCDF Server Kubernetes. אנו יכולים למצוא את רשימת זמני ההפעלה של השרת כאן.

עכשיו, בואו נבדוק את דרישות המערכת להפעלת שרת זה.

4.1. דרישות מערכת

כדי להפעיל את שרת SCDF, נצטרך להגדיר ולהגדיר שתי תלות:

  • תוכנת התיווך להעברת הודעות, ו
  • ה- RDBMS.

לתוכנת התיווך להעברת הודעות, נעבוד עם RabbitMQ, ואנו בוחרים ב- PostgreSQL כ- RDBMS לאחסון הגדרות זרם הצינור שלנו.

להפעלת RabbitMQ, הורד כאן את הגרסה האחרונה והתחל מופע של RabbitMQ באמצעות תצורת ברירת המחדל או הפעל את פקודת Docker הבאה:

docker run - name dataflow-rabbit -p 15672: 15672 -p 5672: 5672 -d rabbitmq: 3-management

כשלב ההתקנה האחרון, התקן והפעל את PostgreSQL RDBMS ביציאת ברירת המחדל 5432. לאחר מכן, צור מסד נתונים שבו SCDF יכולה לאחסן את הגדרות הזרם שלו באמצעות התסריט הבא:

צור זרימת נתונים של נתונים;

4.2. אביב ענן זרימת נתונים מקומי

להפעלת שרת SCDF Local, אנו יכולים לבחור להתחיל את השרת באמצעות docker-compose, או שנוכל להפעיל אותו כיישום Java.

כאן נפעיל את שרת ה- SCDF Local כיישום Java. להגדרת התצורה של היישום, עלינו להגדיר את התצורה כפרמטרים של יישום Java. נצטרך Java 8 בנתיב המערכת.

כדי לארח את הצנצנות והתלות, עלינו ליצור תיקיית בית עבור שרת ה- SCDF שלנו ולהוריד את ההפצה המקומית של שרת SCDF לתיקיה זו. תוכל להוריד את ההפצה האחרונה של SCDF Server Local כאן.

כמו כן, עלינו ליצור תיקיית lib ולשים שם מנהל התקן של JDBC. הגרסה האחרונה של מנהל ההתקן PostgreSQL זמינה כאן.

לבסוף, בוא נפעיל את השרת המקומי SCDF:

$ java -Dloader.path = lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \ --spring.datasource.url = jdbc: postgresql: //127.0.0.1: 5432 / dataflow \ --spring.datasource.username = postgres_username \ --spring.datasource.password = postgres_password \ --spring.datasource.driver-class-name = org.postgresql.Driver \ --spring.rabbitmq.host = 127.0.0.1 \ --spring.rabbitmq.port = 5672 \ --spring.rabbitmq.username = אורח \ - spring.rabbitmq.password = אורח

אנו יכולים לבדוק אם הוא פועל על ידי עיון בכתובת אתר זו:

// localhost: 9393 / לוח מחוונים

4.3. אביב מעטפת זרימת נתונים בענן

מעטפת SCDF היא כלי שורת פקודה שמקל על ההרכבה והפריסה של היישומים והצינורות שלנו. פקודות מעטפת אלה פועלות באמצעות ה- API של REST API של שרת זרימת הנתונים בענן.

הורד את הגרסה האחרונה של הצנצנת לתיקיית הבית של SCDF, הזמינה כאן. לאחר שתסיים, הפעל את הפקודה הבאה (עדכן את הגרסה לפי הצורך):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar ____ ____ _ __ / ___ | _ __ _ __ (_) _ __ __ _ / ___ | | ___ _ _ __ | | \ ___ \ | '_ \ | '__ | | '_ \ / _` | | | | | / _ \ | | | | / _ `| ___) | | _) | | | | | | | (_ | | | | ___ | | (_) | | _ | | (_ | | | ____ / | .__ / | _ | _ | _ | | _ | \ __, | \ ____ | _ | \ ___ / \ __, _ | \ __, _ | ____ | _ | _ __ | ___ / __________ | _ \ __ _ | | _ __ _ | ___ | | _____ __ \ \ \ \ \ \ | | | | / _ ` | __ / _` | | | _ | | / _ \ \ / / / \ \ \ \ \ | | _ | | (_ | | || (_ | | | _ | | | (_) \ VV / / / / / / / | ____ / \ __, _ | \ __ \ __, _ | | _ | | _ | \ ___ / \ _ / \ _ / / _ / _ / _ / _ / _ / ברוכים הבאים ל פגז זרימת הנתונים של ענן האביב. לקבלת סיוע, הקש על TAB או הקלד "עזרה"

אם במקום "זרימת נתונים:> ” אתה מקבל "שרת לא ידוע:> ” בשורה האחרונה, אתה לא מפעיל את שרת SCDF ב- localhost. במקרה זה, הפעל את הפקודה הבאה כדי להתחבר למארח אחר:

שרת לא ידוע:> שרת תצורת נתונים זרימת נתונים // {מארח}

כעת, מעטפת מחוברת לשרת SCDF ונוכל להריץ את הפקודות שלנו.

הדבר הראשון שעלינו לעשות ב- Shell הוא לייבא את התחלות היישום. מצא כאן את הגרסה האחרונה עבור RabbitMQ + Maven ב- Spring Boot 2.0.x, והפעל את הפקודה הבאה (שוב, עדכן את הגרסה, כאן "דרווין-SR1", לפי צורך):

$ dataflow:> ייבוא ​​אפליקציה --uri //bit.ly/Darwin-SR1-stream-applications-rabbit-maven

לבדיקת היישומים המותקנים, הפעל את פקודת Shell הבאה:

$ dataflow:> רשימת אפליקציות

כתוצאה מכך, עלינו לראות טבלה המכילה את כל היישומים המותקנים.

כמו כן, SCDF מציע ממשק גרפי בשם פלו, שאנו יכולים לגשת אליו באמצעות כתובת זו: // localhost: 9393 / לוח מחוונים. עם זאת, השימוש בו אינו במסגרת מאמר זה.

5. חיבור צינור ETL

בואו ניצור כעת את צינור הזרם שלנו. לשם כך נשתמש במתנע היישום JDBC Source לחילוץ מידע ממאגר המידע היחסי שלנו.

כמו כן, ניצור מעבד מותאם אישית לשינוי מבנה המידע וכיור מותאם אישית לטעינת הנתונים שלנו ל- MongoDB.

5.1. תמצית - הכנת מאגר יחסי למיצוי

בוא ניצור מסד נתונים עם השם crm ושולחן עם שם צרכן:

צור מידע CRM;
צור לקוח טבלה (id bigint לא NULL, ערך בוליאני מיובא כוזב, תו שם לקוח משתנה (50), מפתח ראשוני (id))

שים לב שאנחנו משתמשים בדגל מְיוֹבָּא, שיאחסן איזה רשומה כבר יובאה. אנו יכולים גם לאחסן מידע זה בטבלה אחרת, במידת הצורך.

עכשיו, בואו נכניס כמה נתונים:

הכנס ללקוח (מזהה, שם לקוח, מיובא) VALUES (1, 'John Doe', false);

5.2. טרנספורמציה - מיפוי JDBC שדות ל MongoDB מבנה שדות

לשלב השינוי נעשה תרגום פשוט של השדה שם לקוח מטבלת המקור, לשדה חדש שֵׁם. ניתן לעשות כאן טרנספורמציות אחרות, אך בואו נקצור את הדוגמה קצרה.

לשם כך, ניצור פרויקט חדש עם השם טרנספורמציה של לקוחות. הדרך הקלה ביותר לעשות זאת היא באמצעות אתר Spring Initializr ליצירת הפרויקט. לאחר ההגעה לאתר בחרו שם קבוצה ושם חפץ. נשתמש com.customer ו טרנספורמציה של לקוחות, בהתאמה.

ברגע שזה נעשה, לחץ על הכפתור "צור פרויקט" כדי להוריד את הפרויקט. לאחר מכן, פתח את הפרויקט וייבא אותו ל- IDE המועדף עליך, והוסף את התלות הבאה ל- pom.xml:

 org.springframework.cloud אביב-ענן-זרם-קלסר-ארנב 

כעת אנו אמורים להתחיל בקידוד המרת שם השדה. לשם כך, ניצור את צרכן בכיתה לשמש כמתאם. שיעור זה יקבל את שם לקוח דרך ה- setName () השיטה ותפיק את הערך שלה באמצעות getName שיטה.

ה @JsonProperty הערות יעשו את השינוי תוך ביטול עריכה מחדש מ- JSON ל- Java:

לקוח ממעמד ציבורי {פרטי מזהה ארוך; שם מחרוזת פרטי; @JsonProperty ("customer_name") set public name ריק (שם מחרוזת) {this.name = שם; } @JsonProperty ("שם") ציבורי מחרוזת getName () {שם שם; } // גטרס וסטרים}

המעבד צריך לקבל נתונים מקלט, לבצע את השינוי ולקשור את התוצאה לערוץ פלט. בואו ניצור כיתה לשם כך:

יבוא org.springframework.cloud.stream.annotation.EnableBinding; ייבא org.springframework.cloud.stream.messaging.Processor; יבוא org.springframework.integration.annotation.Transformer; @EnableBinding (Processor.class) מחלקה ציבורית CustomerProcessorConfiguration {@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) לקוח ציבורי convertToPojo (מטען לקוחות) {מטען החזר; }}

בקוד הנ"ל אנו יכולים לראות שהטרנספורמציה מתרחשת באופן אוטומטי. הקלט מקבל את הנתונים כאשר JSON וג'קסון מורידים אותם לערך a צרכן אובייקט באמצעות מַעֲרֶכֶת שיטות.

ההפך הוא לגבי הפלט, הנתונים מסודרים ל- JSON באמצעות ה- לקבל שיטות.

5.3. עומס - כיור ב- MongoDB

בדומה לשלב הטרנספורמציה, ניצור פרויקט אחר של maven, עכשיו עם השם צרכן-מונגודב-כִּיוֹר. שוב, גש ל- Spring Initializr, עבור הקבוצה שתבחר com.customerובשביל החפץ בחר לקוח- mongodb-sink. לאחר מכן, הקלד MongoDB בתיבת החיפוש תלות והורד את הפרויקט.

לאחר מכן, פתח את הרוכסן וייבא אותו ל- IDE המועדף עליך.

לאחר מכן, הוסף את אותה תלות נוספת כמו ב- טרנספורמציה של לקוחות פּרוֹיֶקט.

עכשיו ניצור עוד אחד צרכן בכיתה, לקבלת קלט בשלב זה:

ייבא org.springframework.data.mongodb.core.mapping.Document; @Document (collection = "לקוח") לקוח ציבורי {פרטי מזהה ארוך; שם מחרוזת פרטי; // גטרס וסטרים}

לשקיעת צרכן, ניצור מחלקת מאזין שתשמור על ישות הלקוחות באמצעות ה- מאגר לקוחות:

@EnableBinding (Sink.class) מחלקה ציבורית CustomerListener {@ מאגר פרטי CustomerRepository פרטי; @StreamListener (Sink.INPUT) שמירת חלל ריק (לקוח לקוח) {repository.save (לקוח); }}

וה מאגר לקוחות, במקרה זה, הוא א מאגר מאגר מנתוני האביב:

יבוא org.springframework.data.mongodb.repository.MongoRepository; יבוא org.springframework.stereotype.Repository; ממשק ציבורי @ מאגר @ CustomerRepository מרחיב את MongoRepository {} 

5.4. הגדרת זרם

עַכשָׁיו, שני היישומים המותאמים אישית מוכנים להירשם בשרת SCDF. כדי להשיג זאת, הידור את שני הפרויקטים באמצעות פקודת Maven להתקין mvn.

לאחר מכן אנו רושמים אותם באמצעות מעטפת זרימת הנתונים של Spring Cloud:

יישום רישום - שם לקוח-טרנספורמציה - מעבד סוג - אורי מייבן: //com.customer: customer-transform: 0.0.1-SNAPSHOT
רישום אפליקציות - שם לקוח- mongodb-sink - סוג כיור --uri maven: //com.customer: customer-mongodb-sink: jar: 0.0.1-SNAPSHOT

לבסוף, בואו לבדוק אם היישומים מאוחסנים ב- SCDF, הפעלו את פקודת רשימת היישומים במעטפת:

רשימת אפליקציות

כתוצאה מכך, עלינו לראות את שתי היישומים בטבלה המתקבלת.

5.4.1. שפה ספציפית לתחום צינור זרם - DSL

DSL מגדיר את התצורה ואת זרימת הנתונים בין היישומים. ה- DSL של SCDF פשוט. במילה הראשונה אנו מגדירים את שם היישום, ואחריו התצורות.

כמו כן, התחביר הוא תחביר צינור בהשראת Unix, המשתמש בפסים אנכיים, המכונים גם "צינורות", כדי לחבר יישומים מרובים:

http - פורט = 8181 | עֵץ

זה יוצר יישום HTTP המוגש ביציאה 8181 ושולח כל מטען גוף שהתקבל ליומן.

כעת, בואו נראה כיצד ליצור את הגדרת הזרם DSL של מקור JDBC.

5.4.2. הגדרת זרם מקור JDBC

תצורות המפתח עבור מקור JDBC הן שאילתא ו עדכון.שאילתא יבחר רשומות שלא נקראו בזמן עדכון ישנה דגל כדי למנוע קריאה מחדש של הרשומות הנוכחיות.

כמו כן, נגדיר את מקור JDBC לסקר בעיכוב קבוע של 30 שניות ולסקירה מקסימאלית של 1000 שורות. לבסוף נגדיר את תצורות החיבור, כמו מנהל התקן, שם משתמש, סיסמה וכתובת אתר לחיבור:

jdbc --query = 'בחר מזהה, שם_לקוח מ- public.customer WHERE מיובא = false' --update = 'UPDATE public.customer SET Import = true WHERE id in (: id)' --max-rows-per-poll = 1000 - עיכוב קבוע = 30 - time-unit = SECONDS - driver-class-name = org.postgresql.Driver --url = jdbc: postgresql: // localhost: 5432 / crm --username = postgres - סיסמא = פוסטגרס

ניתן למצוא מאפייני תצורה נוספים של מקור JDBC כאן.

5.4.3. הגדרת זרם כיור של MongoDB

כפי שלא הגדרנו את תצורות החיבור ב- application.properties שֶׁל לקוח- mongodb-sink, נגדיר באמצעות פרמטרים של DSL.

היישום שלנו מבוסס באופן מלא על MongoDataAutoConfiguration. אתה יכול לבדוק את התצורות האחרות האפשריות כאן. בעיקרון, נגדיר את אביב.data.mongodb.uri:

customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main

5.4.4. צור ופריס את הזרם

ראשית, כדי ליצור את הגדרת הזרם הסופית, חזור למעטפת וביצע את הפקודה הבאה (ללא מעברי שורות, הם הוכנסו זה עתה לקריאות):

stream create --name jdbc-to-mongodb --definition "jdbc --query = 'SELECT id, customer_name FROM public.customer WHERE Imported = false' - fixed-delay = 30 --max-rows-per-poll = 1000 - עדכון = 'עדכן את ערכת הלקוח המיובאת = נכון WHERE id ב- (: id)' - time-unit = SECONDS - password = postgres - driver-class-name = org.postgresql.Driver - שם משתמש = postgres --url = jdbc: postgresql: // localhost: 5432 / crm | customer-transform | customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main " 

זרם זה DSL מגדיר זרם בשם jdbc-ל-מונגודב. הַבָּא, נפרוס את הזרם בשמו:

זרם פריסה - שם jdbc-to-mongodb 

לבסוף, עלינו לראות את המיקומים של כל היומנים הזמינים בפלט היומן:

יומנים יהיו ב- {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink יומנים יהיו ב- {PATH_TO_LOG} / spring-cloud-deployer / jdbc-to-mongodb /jdbc-to-mongodb.customer-transform יומני יהיה ב- {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. מסקנה

במאמר זה ראינו דוגמה מלאה לצינור נתונים של ETL באמצעות זרימת נתוני Spring Cloud.

ראוי לציון ביותר, ראינו את התצורות של מתנע יישומים, יצרנו צינור זרם ETL באמצעות Spring Cloud Data Flow Shell והטמענו יישומים מותאמים אישית לקריאה, שינוי וכתיבת נתונים.

כמו תמיד, ניתן למצוא את קוד הדוגמה בפרויקט GitHub.


$config[zx-auto] not found$config[zx-overlay] not found