לקוח MQTT בג'אווה

1. סקירה כללית

במדריך זה נראה כיצד נוכל להוסיף הודעות MQTT בפרויקט Java באמצעות הספריות שמספק פרויקט Eclipse Paho.

2. פריימר MQTT

MQTT (MQ Telemetry Transport) הוא פרוטוקול העברת הודעות שנוצר כדי לענות על הצורך בשיטה פשוטה וקלה להעברת נתונים אל / ממכשירים בעלי עוצמה נמוכה, כמו אלה המשמשים ביישומים תעשייתיים.

עם הפופולריות המוגברת של מכשירי IoT (Internet of Things), MQTT ראתה שימוש מוגבר שהוביל לתקינה על ידי OASIS ו- ISO.

הפרוטוקול תומך בתבנית הודעות יחידה, דהיינו דפוס פרסום-הרשמה: כל הודעה שנשלחת על ידי לקוח מכילה "נושא" משויך המשמש את המתווך לניתובו ללקוחות מנויים. שמות נושאים יכולים להיות מחרוזות פשוטות כמו "oiltemp"או מחרוזת דמוית נתיב"מנוע / 1 / סל"ד“.

על מנת לקבל הודעות, לקוח מנוי על נושא אחד או יותר באמצעות שמו המדויק או מחרוזת המכילה אחד מכרטי הווי התומכים הנתמכים ("#" לנושאים מרובי רמות ו- "+" לרמה יחידה ").

3. הגדרת פרויקט

על מנת לכלול את ספריית Paho בפרויקט Maven, עלינו להוסיף את התלות הבאה:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

את הגרסה האחרונה של מודול ספריית Eclipse Paho Java ניתן להוריד מ Maven Central.

4. הגדרת לקוח

כאשר אנו משתמשים בספריית Paho, הדבר הראשון שעלינו לעשות על מנת לשלוח ו / או לקבל הודעות מתווך MQTT הוא להשיג יישום של IMqttClient מִמְשָׁק. ממשק זה מכיל את כל השיטות הנדרשות על ידי יישום על מנת ליצור חיבור לשרת, לשלוח ולקבל הודעות.

פאהו יוצא מהקופסה עם שתי יישומים של ממשק זה, אחד אסינכרוני (MqttAsyncClient) וסינכרוני (MqttClient).במקרה שלנו, נתמקד בגרסה הסינכרונית, שיש לה סמנטיקה פשוטה יותר.

ההתקנה עצמה היא תהליך דו-שלבי: ראשית אנו יוצרים מופע של ה- MqttClient בכיתה ואז אנו מחברים אותו לשרת שלנו. החלק התחתון הבא מפרט את השלבים האלה.

4.1. יצירת חדש IMqttClient למשל

קטע הקוד הבא מראה כיצד ליצור חדש IMqttClient מופע סינכרוני:

מחרוזת publisherId = UUID.randomUUID (). ToString (); מו"ל IMqttClient = MqttClient חדש ("tcp: //iot.eclipse.org: 1883", publisherId);

במקרה הזה, אנו משתמשים בבנאי הפשוט ביותר שיש, אשר לוקח את כתובת נקודת הקצה של המתווך שלנו ב- MQTT ומזהה לקוח, שמזהה באופן ייחודי את הלקוח שלנו.

במקרה שלנו, השתמשנו ב- UUID אקראי, כך שייווצר מזהה לקוח חדש בכל ריצה.

Paho מספק גם קונסטרוקציות נוספות בהן אנו יכולים להשתמש בכדי להתאים אישית את מנגנון ההתמדה המשמש לאחסון הודעות שלא אושרו ו / או ScheduledExecutorService משמש להפעלת משימות רקע הנדרשות על ידי יישום מנוע הפרוטוקול.

נקודת הקצה של השרת בה אנו משתמשים היא מתווך MQTT ציבורי שמתארח על ידי פרויקט Paho, המאפשר לכל מי שיש לו חיבור לאינטרנט לבדוק לקוחות ללא צורך באימות כלשהו.

4.2. מתחבר לשרת

החדש שלנו שנוצר MqttClient מופע אינו מחובר לשרת. אנו עושים זאת על ידי קריאתו לְחַבֵּר() שיטה, מעביר אופציונלי a MqttConnectOptions מופע המאפשר לנו להתאים אישית כמה היבטים של הפרוטוקול.

בפרט, אנו יכולים להשתמש באפשרויות אלה כדי להעביר מידע נוסף כגון אישורי אבטחה, מצב התאוששות הפעלה, מצב חיבור מחדש וכן הלאה.

ה MqttConnectionOptions class חושפים את האפשרויות הללו כתכונות פשוטות שנוכל להגדיר בשיטות קביעות רגילות. עלינו להגדיר רק את המאפיינים הנדרשים לתרחיש שלנו - הנותרים מניחים ערכי ברירת מחדל.

הקוד המשמש ליצירת חיבור לשרת בדרך כלל נראה כך:

אפשרויות MqttConnectOptions = MqttConnectOptions חדשים (); options.setAutomaticReconnect (נכון); options.setCleanSession (נכון); options.setConnectionTimeout (10); publisher.connect (אפשרויות);

כאן אנו מגדירים את אפשרויות החיבור שלנו כך:

  • הספרייה תנסה באופן אוטומטי להתחבר מחדש לשרת במקרה של כשל ברשת
  • זה ישליך הודעות שלא נשלחו מריצה קודמת
  • זמן קצוב לחיבור מוגדר ל- 10 שניות

5. שליחת הודעות

שליחת הודעות באמצעות מחובר כבר MqttClient מאוד פשוט. אנו משתמשים באחד מה- לְפַרְסֵם() גרסאות שיטה לשליחת המטען, שהוא תמיד מערך בתים, לנושא נתון, תוך שימוש באחת מהאפשרויות הבאות לאיכות השירות:

  • 0 - סמנטיקה "לכל היותר פעם אחת", המכונה גם "אש-ושכח". השתמש באפשרות זו כאשר אובדן הודעות מקובל, מכיוון שהיא אינה דורשת כל סוג של אישור או התמדה
  • 1 - סמנטיקה "לפחות פעם אחת". השתמש באפשרות זו כאשר אובדן הודעות אינו מקובל ו המנויים שלך יכולים להתמודד עם כפילויות
  • 2 - סמנטיקה "בדיוק פעם אחת". השתמש באפשרות זו כאשר אובדן הודעות אינו מקובל ו המנויים שלך אינם יכולים להתמודד עם כפילויות

בפרויקט המדגם שלנו, מנוע טמפרטורה חיישן בכיתה ממלא את התפקיד של חיישן מדומה המייצר קריאת טמפרטורה חדשה בכל פעם שאנו קוראים לו שִׂיחָה() שיטה.

כיתה זו מיישמת את ניתן להתקשר ממשק כדי שנוכל להשתמש בו בקלות עם אחד מה- שירות ExecutorService יישומים זמינים ב java.util.concurrent חֲבִילָה:

מחלקה ציבורית EngineTemperatureSensor מיישמת הניתנת להתקשרות {// ... חברים פרטיים השמיטו את EngineTemperatureSensor הציבורי (לקוח IMqttClient) {this.client = client; } @Override שיחת בטל ציבורית () זורקת חריג {אם (! Client.isConnected ()) {return null; } MqttMessage msg = readEngineTemp (); msg.setQos (0); msg.setRetained (נכון); client.publish (TOPIC, msg); החזר אפס; } פרטי MqttMessage readEngineTemp () {temp כפול = 80 + rnd.nextDouble () * 20.0; בתים [] מטען = String.format ("T:% 04.2f", temp) .getBytes (); להחזיר MqttMessage (מטען) חדש; }}

ה MqttMessage מכיל את המטען עצמו, את איכות השירות המבוקש וגם את שמור דגל להודעה. דגל זה מציין בפני המתווך שעליו לשמור על הודעה זו עד שנצרך על ידי מנוי.

אנו יכולים להשתמש בתכונה זו כדי ליישם התנהגות "ידוע אחרון", ולכן כאשר מנוי חדש יתחבר לשרת, הוא יקבל את ההודעה השמורה מייד.

6. קבלת הודעות

על מנת לקבל הודעות מתווך MQTT, עלינו להשתמש באחד מה- הירשם () גרסאות שיטה, המאפשרים לנו לציין:

  • מסנן נושא אחד או יותר להודעות שאנחנו רוצים לקבל
  • ה- QoS המשויך
  • המטפל להתקשרות חוזרת לעיבוד הודעות שהתקבלו

בדוגמה הבאה אנו מראים כיצד להוסיף מאזין הודעות לקיים IMqttClient למשל לקבל הודעות מהנושא הנתון. אנו משתמשים ב- CountDownLatch כמנגנון סנכרון בין ההתקשרות החוזרת שלנו לבין חוט הביצוע הראשי, שמוריד אותו בכל פעם שמגיעה הודעה חדשה.

בקוד לדוגמה השתמשנו בקובץ אחר IMqttClient למשל כדי לקבל הודעות. עשינו את זה רק כדי להבהיר יותר איזה לקוח עושה מה, אבל זו לא מגבלה של Paho - אם תרצה, תוכל להשתמש באותו לקוח לצורך פרסום וקבלת הודעות:

CountDownLatch receivedSignal = CountDownLatch חדש (10); subscriber.subscribe (EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte [] payload = msg.getPayload (); // ... טיפול בעומס המטען הושמט receivedSignal.countDown ();}); receivedSignal.await (1, TimeUnit.MINUTES);

ה הירשם () הגרסה המשמשת לעיל לוקח IMqttMessageListener מופע כטיעון השני שלו.

במקרה שלנו, אנו משתמשים בפונקציית למבדה פשוטה המעבדת את המטען ומורידה מונה. אם לא מספיק הודעות מגיעות בחלון הזמן שצוין (דקה אחת), ה- לְהַמתִין() השיטה תזרוק חריג.

כאשר אנו משתמשים ב- Paho, איננו צריכים לאשר במפורש את קבלת ההודעה. אם ההתקשרות חוזרת כרגיל, Paho מניח שהיא צריכה מוצלחת ושולח אישור לשרת.

אם השיחה החוזרת זורקת יוצא מן הכללהלקוח ייסגר. שים לב שהדבר יביא לאובדן של כל ההודעות שנשלחו ברמת QoS של 0.

הודעות שנשלחות עם QoS ברמה 1 או 2 יועברו מחדש על ידי השרת לאחר חיבור הלקוח מחדש וירשום שוב לנושא.

7. מסקנה

במאמר זה הדגמנו כיצד אנו יכולים להוסיף תמיכה לפרוטוקול MQTT ביישומי Java שלנו באמצעות הספרייה שמספק פרויקט Eclipse Paho.

ספרייה זו מטפלת בכל פרטי הפרוטוקול ברמה נמוכה, ומאפשרת לנו להתמקד בהיבטים אחרים של הפתרון שלנו, תוך השארת מקום טוב להתאמה אישית של היבטים חשובים בתכונות הפנימיות שלה, כגון התמדה בהודעות.

הקוד המוצג במאמר זה זמין באתר GitHub.