שילוב אביב עם AWS Kinesis

1. הקדמה

Kinesis הוא כלי לאיסוף, עיבוד וניתוח זרמי נתונים בזמן אמת, שפותח באמזון. אחד היתרונות העיקריים שלה הוא בכך שהוא מסייע בפיתוח אפליקציות מונעות אירועים.

במדריך זה נחקור כמה ספריות לאפשר ליישום האביב שלנו לייצר ולצרוך רשומות מזרם Kinesis. דוגמאות הקוד יציגו את הפונקציונליות הבסיסית אך אינן מייצגות את הקוד המוכן לייצור.

2. תנאי מוקדם

לפני שנמשיך הלאה, עלינו לעשות שני דברים.

הראשון הוא ליצור פרויקט אביב, כיוון שהמטרה כאן היא ליצור אינטראקציה עם Kinesis מתוך פרויקט Spring.

השני הוא יצירת זרם נתונים של Kinesis. אנו יכולים לעשות זאת מדפדפן אינטרנט בחשבון AWS שלנו. חלופה אחת עבור אוהדי AWS CLI שבינינו היא להשתמש בשורת הפקודה. מכיוון שניצור איתו קשר מהקוד, עלינו גם להיות אישורי AWS IAM, מפתח הגישה והמפתח הסודי והאזור.

כל המפיקים שלנו ייצרו רשומות כתובת IP מדומות, ואילו הצרכנים יקראו את הערכים וירשמו אותם במסוף היישומים.

3. AWS SDK עבור Java

הספרייה הראשונה בה נשתמש היא AWS SDK עבור Java. יתרונו בכך שהוא מאפשר לנו לנהל חלקים רבים בעבודה עם Kinesis Data Streams. אנחנו יכולים לקרוא נתונים, לייצר נתונים, ליצור זרמי נתונים ולזרום נתונים מחדש. החיסרון הוא שכדי שיהיה לנו קוד מוכן לייצור, נצטרך לקודד היבטים כמו התייבשות מחדש, טיפול בשגיאות או דמון כדי לשמור על הצרכן בחיים.

3.1. תלות של Maven

תלות ה- Maven של אמזון-קינסיס-לקוח תביא את כל מה שאנחנו צריכים כדי לקבל דוגמאות עבודה. כעת נוסיף אותו לאתר שלנו pom.xml קוֹבֶץ:

 com.amazonaws amazon-kinesis-client 1.11.2 

3.2. הגדרת אביב

בואו נעשה שימוש חוזר ב AmazonKinesis אובייקט הדרוש לאינטראקציה עם זרם Kinesis שלנו. ניצור אותו כ- @אפונה בתוך שלנו @ SpringBootApplication מעמד:

@Bean הציבור של AmazonKinesis buildAmazonKinesis () {BasicAWSCredentials awsCredentials = חדש BasicAWSCredentials (accessKey, secretKey); להחזיר את AmazonKinesisClientBuilder.standard () .withCredentials (AWSStaticCredentialsProvider חדש (awsCredentials)) .withRegion (Regions.EU_CENTRAL_1) .build (); }

לאחר מכן, בואו נגדיר את aws.access.key ו aws.secret.key, הדרוש למכונה המקומית, ב application.properties:

aws.access.key = my-aws-access-key-goes-here aws.secret.key = my-aws-secret-key-goes-here

ואנחנו נקרא אותם באמצעות @ערך ביאור:

@Value ("$ {aws.access.key}") accessKey פרטי מחרוזת; @Value ("$ {aws.secret.key}") מחרוזת פרטית secretKey;

למען הפשטות, אנו נסמך על מתוזמן שיטות ליצור ולצרוך רשומות.

3.3. צרכן

ה AWS SDK Kinesis Consumer משתמש במודל משיכה, כלומר הקוד שלנו ימשוך רשומות משברי זרם הנתונים Kinesis:

GetRecordsRequest recordsRequest = GetRecordsRequest חדש (); recordsRequest.setShardIterator (shardIterator.getShardIterator ()); recordsRequest.setLimit (25); GetRecordsResult recordsResult = kinesis.getRecords (recordsRequest); בעוד (! recordsResult.getRecords (). isEmpty ()) {recordsResult.getRecords (). stream () .map (record -> מחרוזת חדשה (record.getData (). array ())) .forEach (System.out: : println); recordsRequest.setShardIterator (recordsResult.getNextShardIterator ()); recordsResult = kinesis.getRecords (recordsRequest); }

ה GetRecordsRequest האובייקט בונה את הבקשה לנתוני זרם. בדוגמה שלנו, הגדרנו מגבלה של 25 רשומות לכל בקשה, ואנחנו ממשיכים לקרוא עד שאין יותר מה לקרוא.

אנו יכולים גם לשים לב שלגבי האיטרציה שלנו השתמשנו ב- GetShardIteratorResult לְהִתְנַגֵד. יצרנו את האובייקט הזה בתוך a @PostConstrucשיטה כדי שנתחיל לעקוב אחר רשומות מיד:

פרטי GetShardIteratorResult shardIterator; @ PostConstruct בטל פרטי buildShardIterator () {GetShardIteratorRequest readShardsRequest = חדש GetShardIteratorRequest (); readShardsRequest.setStreamName (IPS_STREAM); readShardsRequest.setShardIteratorType (ShardIteratorType.LATEST); readShardsRequest.setShardId (IPS_SHARD_ID); this.shardIterator = kinesis.getShardIterator (readShardsRequest); }

3.4. יַצרָן

בואו נראה עכשיו איך לטפל ביצירת רשומות עבור זרם הנתונים שלנו Kinesis.

אנו מכניסים נתונים באמצעות א PutRecordsRequest לְהִתְנַגֵד. עבור אובייקט חדש זה, אנו מוסיפים רשימה הכוללת מספר רב PutRecordsRequestEntry חפצים:

ערכי רשימה = IntStream.range (1, 200) .mapToObj (ipSuffix -> {PutRecordsRequestEntry entry = new PutRecordsRequestEntry (); entry.setData (ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())) ; entry.setPartitionKey (IPS_PARTITION_KEY); ערך חוזר;}). collect (Collectors.toList ()); PutRecordsRequest createRecordsRequest = PutRecordsRequest חדש (); createRecordsRequest.setStreamName (IPS_STREAM); createRecordsRequest.setRecords (ערכים); kinesis.putRecords (createRecordsRequest);

יצרנו צרכן בסיסי ומפיק רשומות IP מדומות. כל שנותר לעשות עכשיו הוא להפעיל את פרויקט האביב שלנו ולראות כתובות IP המופיעות במסוף היישומים שלנו.

4. KCL ו- KPL

ספריית הלקוחות של Kinesis (KCL) היא ספרייה שמפשטת את צריכת הרשומות. זוהי גם שכבת הפשטה מעל ממשקי ה- API של Java של AWS SDK עבור Kinesis Data Streams. מאחורי הקלעים, הספרייה מטפלת באיזון עומסים במקרים רבים, מגיבה לכשלים במופע, בדיקת רשומות מעובדות ומגיבה להתייבשות מחודשת.

ספריית יצרני Kinesis (KPL) היא ספרייה שימושית לכתיבה לזרם נתונים של Kinesis. הוא גם מספק שכבת הפשטה היושבת מעל AWS SDK Java APIs עבור Kinesis Data Streams. לקבלת ביצועים טובים יותר, הספרייה מטפלת באופן אוטומטי בהיגיון אצווה, ריבוי השחלות וניסוי חוזר.

ל- KCL ו- KPL היתרון העיקרי בכך שהם קלים לשימוש, כך שנוכל להתמקד בהפקת וצריכת רשומות.

4.1. תלות Maven

את שתי הספריות ניתן להביא בנפרד בפרויקט שלנו במידת הצורך. כדי לכלול KPL ו- KCL בפרויקט Maven שלנו, עלינו לעדכן את קובץ pom.xml:

 com.amazonaws amazon-kinesis-producer 0.13.1 com.amazonaws amazon-kinesis-client 1.11.2 

4.2. הגדרת אביב

הכנת האביב היחידה שאנחנו צריכים היא לוודא שיש לנו את תעודות ה- IAM. הערכים עבור aws.access.key ו aws.secret.key מוגדרים אצלנו application.properties כדי שנוכל לקרוא אותם באמצעות @ערך במקרה הצורך.

4.3. צרכן

ראשית, אנחנו ליצור מחלקה שמיישמת את IRecordProcessor ממשק ומגדיר את ההיגיון שלנו כיצד לטפל ברשומות זרם הנתונים של Kinesis, כלומר להדפיס אותם במסוף:

מחלקה ציבורית IpProcessor מיישמת IRecordProcessor {@Override public void initialize (InitializationInput initializationInput) {} @Override public void processRecords (ProcessRecordsInput processRecordsInput) {processRecordsInput.getRecords () .forEach (record -> System.out.println (new record). ().מַעֲרָך()))); } @ עקירה על כיבוי חלל ציבורי (ShutdownInput shutdownInput) {}}

השלב הבא הוא ל להגדיר מחלקת מפעל המיישמת את IRecordProcessorFactory מִמְשָׁק ומחזיר קוד שנוצר בעבר IpProcessor לְהִתְנַגֵד:

מחלקה ציבורית IpProcessorFactory מיישמת IRecordProcessorFactory {@Override ציבורית IRecordProcessor createProcessor () {להחזיר IpProcessor חדש (); }}

ועכשיו לשלב האחרון, נשתמש ב- עוֹבֵד להתנגד להגדרת הצינור הצרכני שלנו. אנחנו צריכים KinesisClientLibConfiguration אובייקט שיגדיר, במידת הצורך, את אישורי IAM ואת אזור AWS.

נעביר את KinesisClientLibConfiguration, ושלנו IpProcessorFactory חפץ, לשלנו עוֹבֵד ואז להתחיל אותו בשרשור נפרד. אנו שומרים על ההיגיון הזה של צריכת רשומות תמיד בחיים עם השימוש ב- עוֹבֵד בכיתה, כך שאנו קוראים ברציפות רשומות חדשות כעת:

BasicAWSCredentials awsCredentials = אישורי BasicAWSCredentials (accessKey, secretKey); KinesisClientLibConfiguration consumerConfig = KinesisClientLibConfiguration חדש (APP_NAME, IPS_STREAM, AWSStaticCredentialsProvider חדש (awsCredentials), IPS_WORKER) .withRegionName (Regions.EU_CENTRAL_1.getName ()); עובד עובד סופי = Worker.Builder חדש () .recordProcessorFactory (IpProcessorFactory חדש ()) .config (consumerConfig) .build (); CompletableFuture.runAsync (worker.run ());

4.4. יַצרָן

בואו נגדיר כעת את KinesisProducerConfiguration אובייקט, הוספת אישורי IAM ואזור AWS:

BasicAWSCredentials awsCredentials = אישורי BasicAWSCredentials חדשים (accessKey, secretKey); KinesisProducerConfiguration producerConfig = KinesisProducerConfiguration חדש (). SetCredentialsProvider (AWSStaticCredentialsProvider חדש (awsCredentials)) .setVerifyCertificate (false) .setRegion (Regions.EU_CENTRAL_1.getName ()); this.kinesisProducer = KinesisProducer חדש (producerConfig);

אנו נכלול את kinesisProducer אובייקט שנוצר בעבר ב- מתוזמן לעבוד ולהפיק רשומות עבור זרם הנתונים שלנו Kinesis ברציפות:

IntStream.range (1, 200) .mapToObj (ipSuffix -> ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())). עבור כל (ערך -> kinesisProducer.addUserRecord (IPS_STREAM, IPS_PARTITION_KEY, );

5. אביב ענן זרם קלסר קינסי

ראינו כבר שתי ספריות, שתיהן נוצרו מחוץ למערכת האקולוגית של אביב. אנחנו נעשה עכשיו ראה כיצד ענן קלסר זרם המעיין באביב יכול לפשט את חיינו עוד יותר תוך כדי בנייה על גבי זרם ענן האביב.

5.1. תלות של Maven

התלות של Maven שאנו צריכים להגדיר ביישום שלנו עבור Kinesis Stream Cloud Binder Kinesis היא:

 org.springframework.cloud spring-cloud-stream-binder-kinesis 1.2.1.RELEASE 

5.2. הגדרת אביב

בעת הפעלה ב- EC2, מאפייני AWS הנדרשים מתגלים באופן אוטומטי, כך שאין צורך להגדיר אותם. מכיוון שאנחנו מריצים את הדוגמאות שלנו על מכונה מקומית, עלינו להגדיר את מפתח הגישה IAM, המפתח הסודי והאזור שלנו עבור חשבון AWS שלנו. השבתנו גם את האיתור אוטומטי של שם ערימת CloudFormation ליישום:

cloud.aws.credentials.access-key = my-aws-key-key cloud.aws.credentials.secret-key = my-aws-secret-key cloud.aws.region.static = eu-central-1 cloud.aws .stack.auto = שקר

זרם ענן האביב משולב בשלושה ממשקים בהם נוכל להשתמש בכריכת הזרם שלנו:

  • ה כִּיוֹר מיועד לבליעת נתונים
  • ה מָקוֹר משמש לפרסום רשומות
  • ה מעבד הוא שילוב של שניהם

אנו יכולים גם להגדיר ממשקים משלנו אם נצטרך.

5.3. צרכן

הגדרת צרכן היא עבודה בשני חלקים. ראשית, נגדיר, ב application.properties, זרם הנתונים ממנו נצרוך:

spring.cloud.stream.bindings.input.destination = live-ips spring.cloud.stream.bindings.input.group = live-ips-group spring.cloud.stream.bindings.input.content-type = text / plain

והבא, בואו נגדיר מעיין @רְכִיב מעמד. ההערה @EnableBinding (Sink.class) יאפשר לנו לקרוא מזרם Kinesis בשיטה המסומנת עם @StreamListener (Sink.INPUT):

@EnableBinding (Sink.class) IpConsumer בכיתה ציבורית {@StreamListener (Sink.INPUT) לצרוך חלל ציבורי (מחרוזת IP) {System.out.println (ip); }}

5.4. יַצרָן

ניתן לחלק את המפיק לשניים. ראשית, עלינו להגדיר את מאפייני הזרם שלנו בפנים application.properties:

spring.cloud.stream.bindings.output.destination = live-ips spring.cloud.stream.bindings.output.content-type = text / plain

ואז אנחנו מוסיפים @EnableBinding (Source.class) על מעיין @רְכִיב וליצור הודעות בדיקה חדשות כל כמה שניות:

@Component @EnableBinding (Source.class) IpProducer בכיתה ציבורית {@ מקור מקור פרטי מאושר; @Schedched (fixedDelay = 3000L) תוצרת חלל פרטית () {IntStream.range (1, 200) .mapToObj (ipSuffix -> "192.168.0." + IpSuffix) .forEach (ערך -> מקור פלט (). MessageBuilder.withPayload (ערך) .build ())); }}

זה כל מה שאנחנו צריכים כדי ש- Spring Cloud Stream Binder Kinesis יעבוד. אנחנו יכולים פשוט להתחיל את היישום עכשיו.

6. מסקנה

במאמר זה ראינו כיצד ניתן לשלב את פרויקט האביב שלנו עם שתי ספריות AWS לאינטראקציה עם זרם נתונים של Kinesis. ראינו גם כיצד להשתמש בספריית Spring Cloud Stream Binder Kinesis כדי להקל עוד יותר על היישום.

קוד המקור של מאמר זה ניתן למצוא באתר Github.


$config[zx-auto] not found$config[zx-overlay] not found