From a71c1a09a6a4baadfa803eb16a4473b7527a5853 Mon Sep 17 00:00:00 2001 From: Lukas Heiligenbrunner Date: Fri, 24 Apr 2020 14:34:58 +0200 Subject: [PATCH] reconnection to the mqtt server if failed on startup --- src/java/com/wasteinformationserver/Main.kt | 22 +-- .../com/wasteinformationserver/db/JDBC.java | 14 +- .../mqtt/MqttService.kt | 130 ++++++++++-------- 3 files changed, 87 insertions(+), 79 deletions(-) diff --git a/src/java/com/wasteinformationserver/Main.kt b/src/java/com/wasteinformationserver/Main.kt index 012c3e4..bfac487 100644 --- a/src/java/com/wasteinformationserver/Main.kt +++ b/src/java/com/wasteinformationserver/Main.kt @@ -1,4 +1,5 @@ @file:JvmName("Main") + package com.wasteinformationserver import com.wasteinformationserver.basicutils.Info @@ -7,7 +8,6 @@ import com.wasteinformationserver.basicutils.Storage import com.wasteinformationserver.db.JDBC import com.wasteinformationserver.mqtt.MqttService import com.wasteinformationserver.website.Webserver -import java.io.IOException /** * application entry point @@ -22,14 +22,9 @@ fun main() { Log.info("startup of WasteInformationServer") Runtime.getRuntime().addShutdownHook(Thread(Runnable { - try { - Thread.sleep(200) - Log.warning("Shutting down ...") - JDBC.getInstance().disconnect(); - //shutdown routine - } catch (e: InterruptedException) { - e.printStackTrace() - } + // shutdown routine + Log.warning("Shutting down ...") + JDBC.getInstance().disconnect(); })) Log.info("Server version: " + Info.getVersion()) @@ -49,10 +44,7 @@ fun main() { //startup mqtt service Log.message("starting mqtt service") - if (JDBC.isConnected()) { - val m = MqttService(Storage.getInstance().mqttServer, Storage.getInstance().mqttPort.toString()) - m.startupService() - }else{ - Log.error("could't start mqtt service because of missing db connection!") - } + val m = MqttService.getInstance() + m.init(Storage.getInstance().mqttServer, Storage.getInstance().mqttPort.toString()) + m.startupService() } \ No newline at end of file diff --git a/src/java/com/wasteinformationserver/db/JDBC.java b/src/java/com/wasteinformationserver/db/JDBC.java index 309f6b8..a4f3f88 100644 --- a/src/java/com/wasteinformationserver/db/JDBC.java +++ b/src/java/com/wasteinformationserver/db/JDBC.java @@ -1,8 +1,8 @@ package com.wasteinformationserver.db; -import com.mysql.cj.exceptions.ConnectionIsClosedException; import com.wasteinformationserver.basicutils.Log; import com.wasteinformationserver.basicutils.Storage; +import com.wasteinformationserver.mqtt.MqttService; import java.io.IOException; import java.sql.*; @@ -116,6 +116,12 @@ public class JDBC { portc = st.getDbPort(); Log.Log.info("Retry connection"); loggedin = logintodb(usernamec, passwordc, dbnamec, ipc, portc); + if (loggedin) { + // startup mqtt service if successfully connected + MqttService srvc = MqttService.Companion.getInstance(); + srvc.init(st.getMqttServer(), String.valueOf(st.getMqttPort())); + srvc.startupService(); + } } }).start(); } @@ -141,7 +147,7 @@ public class JDBC { conn.isValid(5); PreparedStatement stmt = conn.prepareStatement(sql); return stmt.executeQuery(); - } catch (SQLNonTransientConnectionException ee){ + } catch (SQLNonTransientConnectionException ee) { if (logintodb(usernamec, passwordc, dbnamec, ipc, portc)) { return this.executeQuery(sql); } else { @@ -165,13 +171,13 @@ public class JDBC { conn.isValid(5); PreparedStatement stmt = conn.prepareStatement(sql); return stmt.executeUpdate(); - } catch (SQLNonTransientConnectionException ee){ + } catch (SQLNonTransientConnectionException ee) { if (logintodb(usernamec, passwordc, dbnamec, ipc, portc)) { return this.executeUpdate(sql); } else { throw new SQLException(); } - } catch (SQLException e){ + } catch (SQLException e) { throw new SQLException(); } } diff --git a/src/java/com/wasteinformationserver/mqtt/MqttService.kt b/src/java/com/wasteinformationserver/mqtt/MqttService.kt index f2cd90c..7c65519 100644 --- a/src/java/com/wasteinformationserver/mqtt/MqttService.kt +++ b/src/java/com/wasteinformationserver/mqtt/MqttService.kt @@ -1,5 +1,6 @@ package com.wasteinformationserver.mqtt +import com.wasteinformationserver.basicutils.Log import com.wasteinformationserver.basicutils.Log.Log.debug import com.wasteinformationserver.basicutils.Log.Log.error import com.wasteinformationserver.basicutils.Log.Log.info @@ -19,71 +20,84 @@ import java.util.* * @author Lukas Heiligenbrunner * @author Gregor Dutzler */ -class MqttService(serverurl: String, port: String) { - private val serveruri: String = "tcp://$serverurl:$port" - private var client: MqttClient = MqttClient(serveruri, "JavaSample42") +class MqttService { + private var serveruri: String = ""; + private lateinit var client: MqttClient; private var db: JDBC = JDBC.getInstance() - /** - * initial login to db - */ - init { - connectToDb() + companion object { + private val obj = MqttService() + fun getInstance(): MqttService { + return obj; + } + } + + fun init(serverurl: String, port: String){ + serveruri = "tcp://$serverurl:$port" } /** * startup of the mqtt service */ fun startupService() { - try { - client = MqttClient(serveruri, "JavaSample42") - val connOpts = MqttConnectOptions() - connOpts.isCleanSession = true - client.connect(connOpts) - client.setCallback(object : MqttCallback { - override fun connectionLost(throwable: Throwable) { - error("connection lost") - connectToDb() - } + if(JDBC.isConnected()) { + try { + client = MqttClient(serveruri, "WasteInformationServerID") + val connOpts = MqttConnectOptions() + connOpts.isCleanSession = true + client.connect(connOpts) + client.setCallback(object : MqttCallback { + override fun connectionLost(throwable: Throwable) { + error("connection lost") + Thread.sleep(500) + // restart service + startupService() + } - override fun messageArrived(s: String, mqttMessage: MqttMessage) { - val deviceid = String(mqttMessage.payload) - message("received Request from PCB") - val res = db.executeQuery("SELECT * from devices WHERE DeviceID=$deviceid") - try { - res.last() - if (res.row != 0) { //existing device - res.first() - val devicecities = db.executeQuery("SELECT * from device_city WHERE DeviceID='$deviceid'") - devicecities.last() - if (devicecities.row == 0) { //not configured - tramsmitMessage("$deviceid,-1") - } - else { - devicecities.first() - devicecities.previous() + override fun messageArrived(s: String, mqttMessage: MqttMessage) { + val deviceid = String(mqttMessage.payload) + message("received Request from PCB") + val res = db.executeQuery("SELECT * from devices WHERE DeviceID=$deviceid") + try { + res.last() + if (res.row != 0) { + // existing device + res.first() + val devicecities = db.executeQuery("SELECT * from device_city WHERE DeviceID='$deviceid'") + devicecities.last() + if (devicecities.row == 0) { + // not configured + tramsmitMessage("$deviceid,-1") + } + else { + devicecities.first() + devicecities.previous() - while (devicecities.next()) { - val cityid = devicecities.getInt("CityID") - checkDatabase(cityid, deviceid.toInt()) + while (devicecities.next()) { + val cityid = devicecities.getInt("CityID") + checkDatabase(cityid, deviceid.toInt()) + } } } + else { + // new device + db.executeUpdate("INSERT INTO devices (DeviceID) VALUES ($deviceid)") + info("new device registered to server") + tramsmitMessage("$deviceid,-1") + } + } catch (e: SQLException) { + e.printStackTrace() } - else { //new device - db.executeUpdate("INSERT INTO devices (DeviceID) VALUES ($deviceid)") - info("new device registered to server") - tramsmitMessage("$deviceid,-1") - } - } catch (e: SQLException) { - e.printStackTrace() } - } - override fun deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken) {} - }) - client.subscribe("TopicIn") - } catch (e: MqttException) { - error("Connection to the Broker failed") + override fun deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken) {} + }) + client.subscribe("TopicIn") + } catch (e: MqttException) { + error("Connection to the Broker failed") + } + }else{ + Log.error("could't start mqtt service because of missing db connection!") } } @@ -98,7 +112,8 @@ class MqttService(serverurl: String, port: String) { val set2 = db.executeQuery("SELECT * FROM cities WHERE `id`='$citywastezoneid'") try { set2.last() - if (set2.row != 1) { //error + if (set2.row != 1) { + //error warning("multiple Rows with same city id found - DB Error") } else { @@ -111,7 +126,8 @@ class MqttService(serverurl: String, port: String) { val result = db.executeQuery("SELECT pickupdates.pickupdate FROM pickupdates WHERE pickupdates.citywastezoneid=$citywastezoneid") try { result.last() - if (result.row == 0) { //if not found in db --> send zero + if (result.row == 0) { + //if not found in db --> send zero debug("not found in db") tramsmitMessage("$deviceid,$wastetype,0") } @@ -130,7 +146,8 @@ class MqttService(serverurl: String, port: String) { return } } while (result.next()) - tramsmitMessage("$deviceid,$wastetype,0") //transmit zero if not returned before + tramsmitMessage("$deviceid,$wastetype,0") + //transmit zero if not returned before } } catch (e: SQLException) { e.printStackTrace() @@ -165,11 +182,4 @@ class MqttService(serverurl: String, port: String) { else -> 0 } } - - /** - * receives connection object and initial connection to db - */ - private fun connectToDb() { - db = JDBC.getInstance() - } } \ No newline at end of file