reconnection to the mqtt server if failed on startup

This commit is contained in:
Lukas Heiligenbrunner 2020-04-24 14:34:58 +02:00
parent 05cc2f83d6
commit a71c1a09a6
3 changed files with 87 additions and 79 deletions

View File

@ -1,4 +1,5 @@
@file:JvmName("Main") @file:JvmName("Main")
package com.wasteinformationserver package com.wasteinformationserver
import com.wasteinformationserver.basicutils.Info import com.wasteinformationserver.basicutils.Info
@ -7,7 +8,6 @@ import com.wasteinformationserver.basicutils.Storage
import com.wasteinformationserver.db.JDBC import com.wasteinformationserver.db.JDBC
import com.wasteinformationserver.mqtt.MqttService import com.wasteinformationserver.mqtt.MqttService
import com.wasteinformationserver.website.Webserver import com.wasteinformationserver.website.Webserver
import java.io.IOException
/** /**
* application entry point * application entry point
@ -22,14 +22,9 @@ fun main() {
Log.info("startup of WasteInformationServer") Log.info("startup of WasteInformationServer")
Runtime.getRuntime().addShutdownHook(Thread(Runnable { Runtime.getRuntime().addShutdownHook(Thread(Runnable {
try { // shutdown routine
Thread.sleep(200) Log.warning("Shutting down ...")
Log.warning("Shutting down ...") JDBC.getInstance().disconnect();
JDBC.getInstance().disconnect();
//shutdown routine
} catch (e: InterruptedException) {
e.printStackTrace()
}
})) }))
Log.info("Server version: " + Info.getVersion()) Log.info("Server version: " + Info.getVersion())
@ -49,10 +44,7 @@ fun main() {
//startup mqtt service //startup mqtt service
Log.message("starting mqtt service") Log.message("starting mqtt service")
if (JDBC.isConnected()) { val m = MqttService.getInstance()
val m = MqttService(Storage.getInstance().mqttServer, Storage.getInstance().mqttPort.toString()) m.init(Storage.getInstance().mqttServer, Storage.getInstance().mqttPort.toString())
m.startupService() m.startupService()
}else{
Log.error("could't start mqtt service because of missing db connection!")
}
} }

View File

@ -1,8 +1,8 @@
package com.wasteinformationserver.db; package com.wasteinformationserver.db;
import com.mysql.cj.exceptions.ConnectionIsClosedException;
import com.wasteinformationserver.basicutils.Log; import com.wasteinformationserver.basicutils.Log;
import com.wasteinformationserver.basicutils.Storage; import com.wasteinformationserver.basicutils.Storage;
import com.wasteinformationserver.mqtt.MqttService;
import java.io.IOException; import java.io.IOException;
import java.sql.*; import java.sql.*;
@ -116,6 +116,12 @@ public class JDBC {
portc = st.getDbPort(); portc = st.getDbPort();
Log.Log.info("Retry connection"); Log.Log.info("Retry connection");
loggedin = logintodb(usernamec, passwordc, dbnamec, ipc, portc); loggedin = logintodb(usernamec, passwordc, dbnamec, ipc, portc);
if (loggedin) {
// startup mqtt service if successfully connected
MqttService srvc = MqttService.Companion.getInstance();
srvc.init(st.getMqttServer(), String.valueOf(st.getMqttPort()));
srvc.startupService();
}
} }
}).start(); }).start();
} }
@ -141,7 +147,7 @@ public class JDBC {
conn.isValid(5); conn.isValid(5);
PreparedStatement stmt = conn.prepareStatement(sql); PreparedStatement stmt = conn.prepareStatement(sql);
return stmt.executeQuery(); return stmt.executeQuery();
} catch (SQLNonTransientConnectionException ee){ } catch (SQLNonTransientConnectionException ee) {
if (logintodb(usernamec, passwordc, dbnamec, ipc, portc)) { if (logintodb(usernamec, passwordc, dbnamec, ipc, portc)) {
return this.executeQuery(sql); return this.executeQuery(sql);
} else { } else {
@ -165,13 +171,13 @@ public class JDBC {
conn.isValid(5); conn.isValid(5);
PreparedStatement stmt = conn.prepareStatement(sql); PreparedStatement stmt = conn.prepareStatement(sql);
return stmt.executeUpdate(); return stmt.executeUpdate();
} catch (SQLNonTransientConnectionException ee){ } catch (SQLNonTransientConnectionException ee) {
if (logintodb(usernamec, passwordc, dbnamec, ipc, portc)) { if (logintodb(usernamec, passwordc, dbnamec, ipc, portc)) {
return this.executeUpdate(sql); return this.executeUpdate(sql);
} else { } else {
throw new SQLException(); throw new SQLException();
} }
} catch (SQLException e){ } catch (SQLException e) {
throw new SQLException(); throw new SQLException();
} }
} }

View File

@ -1,5 +1,6 @@
package com.wasteinformationserver.mqtt package com.wasteinformationserver.mqtt
import com.wasteinformationserver.basicutils.Log
import com.wasteinformationserver.basicutils.Log.Log.debug import com.wasteinformationserver.basicutils.Log.Log.debug
import com.wasteinformationserver.basicutils.Log.Log.error import com.wasteinformationserver.basicutils.Log.Log.error
import com.wasteinformationserver.basicutils.Log.Log.info import com.wasteinformationserver.basicutils.Log.Log.info
@ -19,71 +20,84 @@ import java.util.*
* @author Lukas Heiligenbrunner * @author Lukas Heiligenbrunner
* @author Gregor Dutzler * @author Gregor Dutzler
*/ */
class MqttService(serverurl: String, port: String) { class MqttService {
private val serveruri: String = "tcp://$serverurl:$port" private var serveruri: String = "";
private var client: MqttClient = MqttClient(serveruri, "JavaSample42") private lateinit var client: MqttClient;
private var db: JDBC = JDBC.getInstance() private var db: JDBC = JDBC.getInstance()
/** companion object {
* initial login to db private val obj = MqttService()
*/ fun getInstance(): MqttService {
init { return obj;
connectToDb() }
}
fun init(serverurl: String, port: String){
serveruri = "tcp://$serverurl:$port"
} }
/** /**
* startup of the mqtt service * startup of the mqtt service
*/ */
fun startupService() { fun startupService() {
try { if(JDBC.isConnected()) {
client = MqttClient(serveruri, "JavaSample42") try {
val connOpts = MqttConnectOptions() client = MqttClient(serveruri, "WasteInformationServerID")
connOpts.isCleanSession = true val connOpts = MqttConnectOptions()
client.connect(connOpts) connOpts.isCleanSession = true
client.setCallback(object : MqttCallback { client.connect(connOpts)
override fun connectionLost(throwable: Throwable) { client.setCallback(object : MqttCallback {
error("connection lost") override fun connectionLost(throwable: Throwable) {
connectToDb() error("connection lost")
} Thread.sleep(500)
// restart service
startupService()
}
override fun messageArrived(s: String, mqttMessage: MqttMessage) { override fun messageArrived(s: String, mqttMessage: MqttMessage) {
val deviceid = String(mqttMessage.payload) val deviceid = String(mqttMessage.payload)
message("received Request from PCB") message("received Request from PCB")
val res = db.executeQuery("SELECT * from devices WHERE DeviceID=$deviceid") val res = db.executeQuery("SELECT * from devices WHERE DeviceID=$deviceid")
try { try {
res.last() res.last()
if (res.row != 0) { //existing device if (res.row != 0) {
res.first() // existing device
val devicecities = db.executeQuery("SELECT * from device_city WHERE DeviceID='$deviceid'") res.first()
devicecities.last() val devicecities = db.executeQuery("SELECT * from device_city WHERE DeviceID='$deviceid'")
if (devicecities.row == 0) { //not configured devicecities.last()
tramsmitMessage("$deviceid,-1") if (devicecities.row == 0) {
} // not configured
else { tramsmitMessage("$deviceid,-1")
devicecities.first() }
devicecities.previous() else {
devicecities.first()
devicecities.previous()
while (devicecities.next()) { while (devicecities.next()) {
val cityid = devicecities.getInt("CityID") val cityid = devicecities.getInt("CityID")
checkDatabase(cityid, deviceid.toInt()) checkDatabase(cityid, deviceid.toInt())
}
} }
} }
else {
// new device
db.executeUpdate("INSERT INTO devices (DeviceID) VALUES ($deviceid)")
info("new device registered to server")
tramsmitMessage("$deviceid,-1")
}
} catch (e: SQLException) {
e.printStackTrace()
} }
else { //new device
db.executeUpdate("INSERT INTO devices (DeviceID) VALUES ($deviceid)")
info("new device registered to server")
tramsmitMessage("$deviceid,-1")
}
} catch (e: SQLException) {
e.printStackTrace()
} }
}
override fun deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken) {} override fun deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken) {}
}) })
client.subscribe("TopicIn") client.subscribe("TopicIn")
} catch (e: MqttException) { } catch (e: MqttException) {
error("Connection to the Broker failed") error("Connection to the Broker failed")
}
}else{
Log.error("could't start mqtt service because of missing db connection!")
} }
} }
@ -98,7 +112,8 @@ class MqttService(serverurl: String, port: String) {
val set2 = db.executeQuery("SELECT * FROM cities WHERE `id`='$citywastezoneid'") val set2 = db.executeQuery("SELECT * FROM cities WHERE `id`='$citywastezoneid'")
try { try {
set2.last() set2.last()
if (set2.row != 1) { //error if (set2.row != 1) {
//error
warning("multiple Rows with same city id found - DB Error") warning("multiple Rows with same city id found - DB Error")
} }
else { else {
@ -111,7 +126,8 @@ class MqttService(serverurl: String, port: String) {
val result = db.executeQuery("SELECT pickupdates.pickupdate FROM pickupdates WHERE pickupdates.citywastezoneid=$citywastezoneid") val result = db.executeQuery("SELECT pickupdates.pickupdate FROM pickupdates WHERE pickupdates.citywastezoneid=$citywastezoneid")
try { try {
result.last() result.last()
if (result.row == 0) { //if not found in db --> send zero if (result.row == 0) {
//if not found in db --> send zero
debug("not found in db") debug("not found in db")
tramsmitMessage("$deviceid,$wastetype,0") tramsmitMessage("$deviceid,$wastetype,0")
} }
@ -130,7 +146,8 @@ class MqttService(serverurl: String, port: String) {
return return
} }
} while (result.next()) } while (result.next())
tramsmitMessage("$deviceid,$wastetype,0") //transmit zero if not returned before tramsmitMessage("$deviceid,$wastetype,0")
//transmit zero if not returned before
} }
} catch (e: SQLException) { } catch (e: SQLException) {
e.printStackTrace() e.printStackTrace()
@ -165,11 +182,4 @@ class MqttService(serverurl: String, port: String) {
else -> 0 else -> 0
} }
} }
/**
* receives connection object and initial connection to db
*/
private fun connectToDb() {
db = JDBC.getInstance()
}
} }