175 lines
6.3 KiB
Kotlin
Raw Normal View History

2020-01-31 09:37:06 +01:00
package com.wasteinformationserver.mqtt
import com.wasteinformationserver.basicutils.Log.Log.debug
import com.wasteinformationserver.basicutils.Log.Log.error
import com.wasteinformationserver.basicutils.Log.Log.info
import com.wasteinformationserver.basicutils.Log.Log.message
2020-04-03 13:26:44 +02:00
import com.wasteinformationserver.basicutils.Log.Log.warning
2020-01-31 09:37:06 +01:00
import com.wasteinformationserver.db.JDBC
import org.eclipse.paho.client.mqttv3.*
import java.sql.SQLException
import java.text.ParseException
import java.text.SimpleDateFormat
import java.util.*
/**
* Mqtt Service to receive and send back messages to the Hardware
* check values from db send right feedback to hardware.
2020-01-31 09:37:06 +01:00
*
* @author Lukas Heiligenbrunner
* @author Gregor Dutzler
*/
class MqttService(serverurl: String, port: String) {
private val serveruri: String = "tcp://$serverurl:$port"
private var client: MqttClient = MqttClient(serveruri, "JavaSample42")
private var db: JDBC = JDBC.getInstance()
2020-01-31 09:37:06 +01:00
/**
2020-04-03 13:26:44 +02:00
* initial login to db
2020-01-31 09:37:06 +01:00
*/
init {
connectToDb()
2020-01-31 09:37:06 +01:00
}
/**
* startup of the mqtt service
*/
fun startupService() {
try {
client = MqttClient(serveruri, "JavaSample42")
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
client.connect(connOpts)
client.setCallback(object : MqttCallback {
2020-01-31 09:37:06 +01:00
override fun connectionLost(throwable: Throwable) {
error("connection lost")
connectToDb()
2020-01-31 09:37:06 +01:00
}
override fun messageArrived(s: String, mqttMessage: MqttMessage) {
val deviceid = String(mqttMessage.payload)
message("received Request from PCB")
val res = db.executeQuery("SELECT * from devices WHERE DeviceID=$deviceid")
2020-01-31 09:37:06 +01:00
try {
res.last()
if (res.row != 0) { //existing device
res.first()
val devicecities = db.executeQuery("SELECT * from device_city WHERE DeviceID='$deviceid'")
2020-01-31 09:37:06 +01:00
devicecities.last()
if (devicecities.row == 0) { //not configured
tramsmitMessage("$deviceid,-1")
}
else {
2020-01-31 09:37:06 +01:00
devicecities.first()
devicecities.previous()
while (devicecities.next()) {
val cityid = devicecities.getInt("CityID")
checkDatabase(cityid, deviceid.toInt())
}
}
}
else { //new device
db.executeUpdate("INSERT INTO devices (DeviceID) VALUES ($deviceid)")
2020-01-31 09:37:06 +01:00
info("new device registered to server")
tramsmitMessage("$deviceid,-1")
}
} catch (e: SQLException) {
e.printStackTrace()
}
}
override fun deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken) {}
})
client.subscribe("TopicIn")
2020-01-31 09:37:06 +01:00
} catch (e: MqttException) {
error("Connection to the Broker failed")
}
}
2020-04-03 13:26:44 +02:00
/**
* Check if device is configured and zone infos are stored in db
*
* @param citywastezoneid zone/city id
* @param deviceid device id
*/
2020-01-31 09:37:06 +01:00
private fun checkDatabase(citywastezoneid: Int, deviceid: Int) {
var wastetype = -1
val set2 = db.executeQuery("SELECT * FROM cities WHERE `id`='$citywastezoneid'")
2020-01-31 09:37:06 +01:00
try {
set2.last()
if (set2.row != 1) { //error
2020-04-03 13:26:44 +02:00
warning("multiple Rows with same city id found - DB Error")
}
else {
2020-01-31 09:37:06 +01:00
val typ = set2.getString("wastetype")
wastetype = getIntTyp(typ)
}
} catch (e: SQLException) {
e.printStackTrace()
}
val result = db.executeQuery("SELECT pickupdates.pickupdate FROM pickupdates WHERE pickupdates.citywastezoneid=$citywastezoneid")
2020-01-31 09:37:06 +01:00
try {
result.last()
if (result.row == 0) { //if not found in db --> send zero
debug("not found in db")
tramsmitMessage("$deviceid,$wastetype,0")
}
else {
2020-01-31 09:37:06 +01:00
debug(result.getString("pickupdate"))
result.first()
do {
val formatter = SimpleDateFormat("yyyy-MM-dd")
val timestamp = formatter.parse(result.getString("pickupdate")).time
val timestampnow = formatter.parse(formatter.format(Date())).time
debug("timestamp is :$timestamp")
if (timestamp == timestampnow || timestamp == timestampnow + 86400000) { // 86400000 == one day
// valid time
tramsmitMessage("$deviceid,$wastetype,1")
debug("valid time")
return
}
} while (result.next())
tramsmitMessage("$deviceid,$wastetype,0") //transmit zero if not returned before
}
} catch (e: SQLException) {
e.printStackTrace()
} catch (e: ParseException) {
e.printStackTrace()
}
}
2020-04-03 13:26:44 +02:00
/**
* send a mqtt message to predefined topic
*/
2020-01-31 09:37:06 +01:00
private fun tramsmitMessage(temp: String) {
2020-04-03 13:26:44 +02:00
message("reply back to PCB: $temp")
2020-01-31 09:37:06 +01:00
val message = MqttMessage(temp.toByteArray())
message.qos = 2
try {
client.publish("TopicOut", message)
2020-01-31 09:37:06 +01:00
} catch (e: MqttException) {
e.printStackTrace()
}
}
2020-04-03 13:26:44 +02:00
/**
* parse Type name to representing integer value
*/
2020-01-31 09:37:06 +01:00
private fun getIntTyp(temp: String): Int {
return when (temp) {
"Plastic" -> 1
"Metal" -> 2
"Residual waste" -> 3
"Biowaste" -> 4
else -> 0
2020-01-31 09:37:06 +01:00
}
}
2020-04-03 13:26:44 +02:00
/**
* receives connection object and initial connection to db
*/
private fun connectToDb() {
db = JDBC.getInstance()
2020-01-31 09:37:06 +01:00
}
}