MQTT sensor interfacing

Talks about metering
Post Reply
to-scho
Posts: 2
Joined: Sat Dec 26, 2020 1:24 pm

MQTT sensor interfacing

Post by to-scho » Sat Dec 26, 2020 2:01 pm

Thank's for the great project!

I have done some MQTT sensor interfacing which I would like to share. This is not a library quality but can serve as a starting point for your integration.

My situation was as follows:
  • MQTT booker running on a raspberry pi. On same pi I have 123solar and meterN. Nevertheless, meterN does not rely on 123solar values but on two energy meters. In addition there is a RAM disk setup with 10MB in /mnt/RAMdisk used as scratch.
  • Two SML energy meters, one for import/export and the other one for solar production. Both attached via IR to a single sonoff basic device running SML enabled Tasmota FW: https://tasmota.github.io/docs/Smart-Meter-Interface/. By that the counter values are published via MQTT to my brooker. Values are in KWh with 4 digits precision contained in the MQTT tele topic as JSON
  • The gas meter has an Alegro A3213LLHLT-T HAL sensor that is aligned to the lowest counter slice. Pulses are counted by a sonoff device running tasmota standard FW build. Same as above, values are published to MQTT in the tele topic as m3*100
To integrate above to meterN for visualization I am using 2 python scripts + 1 modified house energy.php from meterN source. For the python scripts you may need to pip3 install some of the imported python packages i.e. pahoMqtt or flatten_dict.
  • mqttSub.py is registered as an system service on the raspberry. The script subscribes to the three MQTT topics of my 2x tasmota and 1x AI-on-the-edge-device. When any value is received, a temp file mqttSub.log on a RAM disk is updated holding all the readings in meterN syntax in addition to the JSON based MQTT message which includes a time stamp. One can adjust the reporting of the tele message in tasmota or use that mechanism to request recent values in case of the live meter readings.
  • mqttGet.py is used to request meter readings from mqttSub.log. In addition to the ID a max age of the reading is given. Whenever the time stamp in the mqttSub.log is too old, a new reading is requested in case of the tasmota devices. For the water meter values are just discarded when too old. Since there is no live meter reading here, the default read period of 4.xx minutes from the AI-on-the-edge-device is fine for meterN.
  • house energy.php is used to calculate power consumption of the house and solar self consumption.
meter setup in meterN:
1. Metername: Import
MeterID: ImportWh, Command: /usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py ImportWh 300
MeterID: ImportExportW, Live command: /usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py ImportExportW 10
2. Metername: Export
MeterID: ExportWh, Command: /usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py ExportWh 300
3. Metername: Verbrauch
MeterID: Verbrauch, /var/www/html/metern/comapps/houseenergy.php -energy
MeterID: Verbrauch, Live command: /var/www/html/metern/comapps/houseenergy.php -power
4. Metername: Solar
MeterID: Solar Wh, Command: /usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py SolarWh 300
MeterID: Solar W, Live command: /usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py SolarW 10
5. Metername: Eigenverbrauch
MeterID: Eigenverbrauch, Command: /var/www/html/metern/comapps/houseenergy.php -self
MeterID: Eigenverbrauch, Live Command: /var/www/html/metern/comapps/houseenergy.php -powerself
6. Metername: Gas
MeterID: Gas, Command: /usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py Gas 50
7. Metername: Wasser
MeterID: Wasser, Command: /usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py Wasser 300
8. Metername: -Import
MeterID: -ImportWh, Command: /usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py -ImportWh 300

/etc/systemd/system/mqttSub.service

Code: Select all

[Unit]
Description=MQTT Subscriber
After=network.target

[Service]
Restart=always
RestartSec=10
ExecStart=/usr/bin/python3 /var/www/html/metern/comapps/mqttSub.py

[Install]
WantedBy=multi-user.target
/var/www/html/metern/comapps/mqttSub.py

Code: Select all

#!/usr/bin/python3 
import paho.mqtt.client as mqtt
import time
from datetime import datetime
import fileinput
import os
import json
from flatten_dict import flatten

file = "/mnt/RAMDisk/mqttSub.log"
detailLogFile = "" #"/mnt/RAMDisk/mqttSubDetail.log"

whCounterMin = 2
whCounterIncMax = 10
whCounterInvalid = [0, 999.999, 9999.999, 99999.999, 999999.999]
wInvalid = [999, 9999]

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe([("tele/aussenbeleuchtung-treppe-haus/SENSOR", 1)])
    client.subscribe([("tele/4ch-keller/SENSOR", 1)])
    client.subscribe([("wasserzaehler/zaehlerstand", 1)])

def updateLog(file,key,updatedLine):
    #print("Try to update file")
    try:
        writeMe = True
        if os.path.exists(file):
            for line in fileinput.input(file, inplace=True):
                if (not line.startswith(key)) and (not line == "\n"):
                    print(line, end='')
                elif writeMe:
                    print(updatedLine)
                    writeMe = False
            fileinput.close()
        if writeMe:
             with open(file, 'a+') as fh:
                fh.write(updatedLine + "\n")
    except:
        print("Cannot write to file ")

def on_message(client, userdata, message):
    print("Message received: " + message.topic + " : " + str(message.payload))
    if message.topic == 'tele/aussenbeleuchtung-treppe-haus/SENSOR':
        prevVal = None
        if os.path.exists(file):
            for line in fileinput.input(file):
                 if line.startswith(message.topic):
                     prevVal = flatten(json.loads(line.replace(message.topic + ": " ,"")), reducer='underscore')
                     break
            fileinput.close()
        updateLog(file , message.topic, message.topic + ": " + message.payload.decode("utf-8") )
        payloadDict = flatten(json.loads(message.payload), reducer='underscore')
        for d in payloadDict:
            if "SML" in d:
                thisPayload = payloadDict[d]
                thisUnit = None
                if "Total" in d:
                    incVal = 0
                    try:
                        if not prevVal is None:
                            incVal = thisPayload - prevVal[d]
                    except:
                        pass
                    if (thisPayload >= whCounterMin) and (incVal>=0) and (incVal<=whCounterIncMax) and (not thisPayload in whCounterInvalid):
                        thisUnit = "Wh"
                    thisPayload = thisPayload * 1000
                else:
                    if not thisPayload in wInvalid:
                        thisUnit = "W"
                    if "SML2" in d:
                        thisPayload = -thisPayload
                if not thisUnit is None:
                    if isinstance(thisPayload, str):
                        payloadStr = "{}({}*{})"
                    else:
                        payloadStr = "{}({:.2f}*{})"
                    updateLine = payloadStr.format(d, thisPayload, thisUnit) 
                    updateLog(file, str(d), updateLine)

    if message.topic == 'tele/4ch-keller/SENSOR':
        updateLog(file , message.topic, message.topic + ": " + message.payload.decode("utf-8") )
        payloadDict = flatten(json.loads(message.payload), reducer='underscore')
        for d in payloadDict:
            if "COUNTER" in d:
                thisPayload = payloadDict[d]
                thisUnit = None
                if "C1" in d:
                    thisUnit = "m3"
                    thisPayload = thisPayload / 100
                if not thisUnit is None:
                    if isinstance(thisPayload, str):
                        payloadStr = "{}({}*{})"
                    else:
                        payloadStr = "{}({:.2f}*{})"
                    updateLine = payloadStr.format(d, thisPayload, thisUnit) 
                    updateLog(file, str(d), updateLine)


    if message.topic == 'wasserzaehler/zaehlerstand':
        thisPayload = float(message.payload.decode("utf-8"))
        now = datetime.now()
        mydict = {}
        mydict['Time'] = now.strftime("%Y-%m-%dT%H:%M:%S")
        mydict['WasserZaehlerstand'] = thisPayload
        myjson = json.dumps(mydict)
        updateLog(file , message.topic, message.topic + ": " + myjson )
        thisUnit = "l"
        if isinstance(thisPayload, str):
            payloadStr = "{}({}*{})"
        else:
            payloadStr = "{}({:.2f}*{})"
        updateLine = payloadStr.format('WasserZaehlerstand', thisPayload*1000, thisUnit) 
        updateLog(file, 'WasserZaehlerstand', updateLine)


broker_address = "localhost"  # Broker address
port = 1883  # Broker port
# user = "yourUser"                    #Connection username
# password = "yourPassword"            #Connection password

client = mqtt.Client()  # create new instance
# client.username_pw_set(user, password=password)    #set username and password
client.on_connect = on_connect  # attach function to callback
client.on_message = on_message  # attach function to callback

client.connect(broker_address, port=port)  # connect to broker

client.loop_forever()
/var/www/html/metern/comapps/mqttGet.py

Code: Select all

#!/usr/bin/python3 
import fileinput
import os
import time
from datetime import datetime
import json
import sys
import paho.mqtt.client as mqtt

def getLastMqtt(dictIDs, file):
    lastMqttMessage = None
    lastMqttTopic = None
    lastMqttTime = None
    lastMqttAge = None
    try:
        for line in fileinput.input(file):
            if all(d in line for d in dictIDs + ["Time"]):
                lastMqttTopic = line.partition(":")[0]
                lastMqttMessage = line.replace(lastMqttTopic + ": ", "")
                lastMqttTime = json.loads(lastMqttMessage)["Time"]
                lastMqttAge = (datetime.now()-datetime.strptime(lastMqttTime, "%Y-%m-%dT%H:%M:%S")).total_seconds()
                break
        fileinput.close()
    except:
        fileinput.close()
    return lastMqttAge, lastMqttMessage, lastMqttTopic, lastMqttTime

if len(sys.argv) < 3:
    print("Expected at least two command line arguments")
    sys.exit(1)

dictIDs = sys.argv[1:-1]
repID = None
negSign = False
if len(dictIDs)==1:
    repID = dictIDs[0]
    if dictIDs[0].lower() == "gas":
        dictIDs = ["COUNTER", "C1"]
    elif dictIDs[0].lower() == "solarwh":
        dictIDs = ["SML2", "Total_out"]
    elif dictIDs[0].lower() == "solarw":
        dictIDs = ["SML2", "Power_curr"]
    elif dictIDs[0].lower() == "importwh":
        dictIDs = ["SML1", "Total_in"]
    elif dictIDs[0].lower() == "-importwh":
        dictIDs = ["SML1", "Total_in"]
        negSign = True
    elif dictIDs[0].lower() == "exportwh":
        dictIDs = ["SML1", "Total_out"]
    elif dictIDs[0].lower() == "importexportw":
        dictIDs = ["SML1", "Power_curr"]
    elif dictIDs[0].lower() == "wasser":
        dictIDs = ["WasserZaehlerstand"]
try:
    maxAge = int(sys.argv[-1])
except:
    print("Age argument should be integer")
    sys.exit(1)

file = "/mnt/RAMDisk/mqttSub.log"

if not os.path.exists(file):
    print("MQTT RAMDisk file not found")
    sys.exit(2)

lastMqttAge, lastMqttMessage, lastMqttTopic, lastMqttTime = getLastMqtt(dictIDs, file)
lastMqttUpdateCmnd = None

if (not lastMqttAge is None):
    if lastMqttTopic.startswith("tele"):
        lastMqttDevice = lastMqttTopic.partition("/")[2].partition("/")[0]
        lastMqttUpdateCmnd = "cmnd/" + lastMqttDevice + "/teleperiod"
    elif (lastMqttAge > maxAge):
        print("No update from MQTT received in time, was {}s ago. No Tasmota, cannot ask for update!".format(lastMqttAge))
        sys.exit(3)

if (lastMqttUpdateCmnd is not None) and (lastMqttAge > maxAge):
    #print("Last MQTT is {}s old, requesting update via MQTT: {}".format(lastMqttAge, lastMqttUpdateCmnd))
    broker_address = "localhost"  # Broker address
    port = 1883  # Broker port
    # user = "yourUser"                    #Connection username
    # password = "yourPassword"            #Connection password
    client = mqtt.Client()  # create new instance
    # client.username_pw_set(user, password=password)    #set username and password
    client.connect(broker_address, port=port)  # connect to broker
    ret = client.publish(lastMqttUpdateCmnd,"")
    for s in range(5):
        lastMqttAge,_,_,_ = getLastMqtt(dictIDs, file)
        if lastMqttAge<=5:
            break
        time.sleep(1)
    else:
        print("No update from MQTT received in time")
        sys.exit(3)

payload = None
joinStr = "_"
key = joinStr.join(dictIDs)

for line in fileinput.input(file):
    if (line.startswith(key)):
        payload = line
        break
fileinput.close()

if payload is None:
    print("Payload {} not found in file {}".format(key, file))
    sys.exit(4)

if not repID is None:
    payload = payload.replace(key, repID)

if negSign:
    val = payload.partition("(")[2].partition("*")[0]
    try:
        valNeg = str(round(1e9-float(val),1))
    except:
        valNeg = ""
    payload = payload.replace(val, valNeg)

print(payload, end='')
/var/www/html/metern/comapps/houseenergy.php

Code: Select all

#!/usr/bin/php
<?php
// A virtual meters example for meterN. This script will simulate a house and self consumption meters (*). You must own a total import/export and a production meter. 
//                         _____  
//                        /     \
//      +----------+     /       \     +-Total meter-+     - ^ -
//      |Production| --> | House | <-- |   import    |___   /X\ Grid
//      +----------+     |       | --> |   export    |     /V V\
//                                     +-------------+
//                consumption/selfconsumuption (*)
//
// ln -s /srv/http/comapps/houseenergy.php /usr/bin/houseenergy and chmod +x houseenergy.php
//
// The house virtual meter should be configured in mN as 'Elect House consumption' type with a passover value like 100000.
// The house self consumuption meter as 'Elect Other' also with a passover value like 100000.

if (isset($_SERVER['REMOTE_ADDR'])) {
    die('Direct access not permitted');
}
//if (!file_exists('/dev/shm/sdm_log.txt')) {
//    usleep(500);
//    die('sdm_log.txt does not exist yet');
//}

//// Set the 2 virtual meters ////
// Consumption
$HOUSEID     = 'Verbrauch'; // ID
$HOUSEmetnum = 3; // meter number
// Selfconsumption
$SELFID      = 'Eigenverbrauch';
$SELFCmetnum = 5;

//// Set up real meters ////
// Production
$PRODID       = 'SolarWh'; // ID
$PRODcmd      = '/usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py SolarWh 300'; // Energy command
$POWERPRODID  = 'SolarW'; // ID
$POWERPRODcmd = '/usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py SolarW 10'; // Power
$PRODmetnum   = 4; // meter number

// TOT return the total power (eg if import = 45W, export -55W)
$TPID        = 'ImportExportW';
$TOTPOWERcmd = '/usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py ImportExportW 10';

// Energy Imported
$IMPID     = 'ImportWh';
$IMPcmd    = '/usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py ImportWh 300';
$IMPmetnum = 1;

// Energy Exported
$EXPID     = 'ExportWh';
$EXPcmd    = '/usr/bin/python3 /var/www/html/metern/comapps/mqttGet.py ExportWh 300';
$EXPmetnum = 2;

//// Indicators ////
//$VOLTID  = '1_V';
//$VOLTcmd = 'cat /dev/shm/sdm_log.txt | egrep "^1_V\(" | grep "*V)"';
//$FRQID   = '1_F';
//$FRQcmd  = 'cat /dev/shm/sdm_log.txt | egrep "^1_F\(" | grep "*Hz)"';
//$COSID   = '1_PF';
//$COScmd  = 'cat /dev/shm/sdm_log.txt | egrep "^1_PF\(" | grep "*F)"';

// Path to metern
$MNDIR    = '/var/www/html/metern';
// Save previous values in a file, make sure the http user can write there
$prevfile = '/mnt/RAMDisk/houseenergy_prev.json';

// No edit should be needed bellow

function getvalue($id, $cmd) //  Get data and validate with IEC 62056 data set structure
{
    $datareturn = null;
    $giveup     = 0;
    $regexp     = "/^$id\(-?[0-9\.]+\*[A-z0-9≥≤%∞]+\)$/i"; //ID(VALUE*UNIT)
    
    while (!isset($datareturn) && $giveup < 3) { // Try 3 times
        exec($cmd, $datareturn);
        $datareturn = trim(implode($datareturn));
        
        if (preg_match($regexp, $datareturn)) {
            $datareturn = preg_replace("/^$id\(/i", '', $datareturn, 1); // VALUE*UNIT)
            $datareturn = preg_replace("/\*[A-z0-9≥≤%∞]+\)$/i", '', $datareturn, 1); // VALUE
            settype($datareturn, 'float');
        } else {
            $datareturn = null;
        }
        $giveup++;
    }
    return $datareturn;
}

function retrievecsv($meternum, $csvarray, $passo) // Retrieve last know value in latest csv
{
    $datareturn = null;
    $contalines = count($csvarray);
    $j          = 0;
    while (!isset($datareturn)) {
        $j++;
        $array      = preg_split('/,/', $csvarray[$contalines - $j]);
        $datareturn = (int) trim($array[$meternum]);
        if ($datareturn == '') {
            $datareturn = null;
        }
        if ($j == $contalines) {
            $datareturn = 0;
        }
    }
    if ($datareturn > $passo) {
        $datareturn -= $passo;
    }
    return $datareturn;
}

if (isset($argv[2])) {
    die("Abording: Too many arguments\n");
}
if (isset($argv[1])) {
    if ($argv[1] == '-power' || $argv[1] == '-powerimp' || $argv[1] == '-powerexp' || $argv[1] == '-powerself') { // Power
        $totpower  = getvalue($TPID, $TOTPOWERcmd);
        $prodpower = getvalue($POWERPRODID, $POWERPRODcmd);
        $power     = round(($prodpower + $totpower), 1);
        
        if ($argv[1] == '-power') {
            $outstr = utf8_decode("$HOUSEID($power*W)\n");
        } else {
            if ($totpower > 0) { // Import
                $imppower = $power - $prodpower;
                $exppower = 0;
                $slfpower = $prodpower;
            } else { // Export
                $imppower = 0;
                $exppower = $prodpower - $power;
                $slfpower = $power;
            }
        }
        if ($argv[1] == '-powerimp') {
            $imppower = round($imppower, 1);
            $outstr   = utf8_decode("$IMPID($imppower*W)\n");
        } elseif ($argv[1] == '-powerexp') {
            $exppower = round($exppower, 1);
            $outstr   = utf8_decode("$EXPID($exppower*W)\n");
        } elseif ($argv[1] == '-powerself') {
            $slfpower = round($slfpower, 1);
            $outstr   = utf8_decode("$SELFID($slfpower*W)\n");
        }
        echo "$outstr";
//    } elseif ($argv[1] == '-volt' || $argv[1] == '-frq' || $argv[1] == '-cos') { // Indicators
//        if ($argv[1] == '-volt') {
//            $outstr = getvalue($VOLTID, $VOLTcmd);
//            $outstr = round($outstr, 1);
//            $outstr = utf8_decode("$VOLTID($outstr*V)\n");
//        } elseif ($argv[1] == '-frq') {
//            $outstr = getvalue($FRQID, $FRQcmd);
//            $outstr = round($outstr, 1);
//            $outstr = utf8_decode("$FRQID($outstr*Hz)\n");
//        } else {
//            $outstr = getvalue($COSID, $COScmd);
//            $outstr = round($outstr, 1);
//            $outstr = utf8_decode("$COSID($outstr*phi)\n");
//        }
//        echo "$outstr";
    } elseif ($argv[1] == '-eimp' || $argv[1] == '-eimpn' || $argv[1] == '-eexp' || $argv[1] == '-eexpn') { // Imported / Exported meters
        if ($argv[1] == '-eimp') {
            $outstr = getvalue($IMPID, $IMPcmd);
            $outstr = utf8_decode("$IMPID($outstr*Wh)\n");
        } elseif ($argv[1] == '-eimpn') {
            $outstr = 1e9-getvalue($IMPID, $IMPcmd);
            $outstr = utf8_decode("$IMPID($outstr*Wh)\n");
        } elseif ($argv[1] == '-eexp') {
            $outstr = getvalue($EXPID, $EXPcmd);
            $outstr = utf8_decode("$EXPID($outstr*Wh)\n");
        } else {
            $outstr = 1e9-getvalue($EXPID, $EXPcmd);
            $outstr = utf8_decode("$EXPID($outstr*Wh)\n");
        } 
        echo "$outstr";
    } elseif ($argv[1] == '-energy' || $argv[1] == '-self') { // Virtual energy and selfc meters
        define('checkaccess', TRUE);
        include("$MNDIR/config/config_main.php");
        include("$MNDIR/config/config_met$HOUSEmetnum.php");
        include("$MNDIR/config/config_met$SELFCmetnum.php");
        include("$MNDIR/config/config_met$IMPmetnum.php");
        include("$MNDIR/config/config_met$EXPmetnum.php");
        include("$MNDIR/config/config_met$PRODmetnum.php");
        
        // Retrieve previous virtuals meters values
        if (file_exists($prevfile)) {
            $data     = file_get_contents($prevfile);
            $previous = json_decode($data, true);
        } else {
            // Retrieve last know value in last csv
            $dir    = $MNDIR . '/data/csv/';
            $output = array();
            $output = glob($dir . '*.csv');
            rsort($output);
            
            if (file_exists($output[0])) {
                $lines                     = file($output[0]);
                $contalines                = count($lines);
                $previous['prevIMPhouse']  = retrievecsv($IMPmetnum, $lines, ${'PASSO' . $IMPmetnum});
                $previous['prevEXPhouse']  = retrievecsv($EXPmetnum, $lines, ${'PASSO' . $EXPmetnum});
                $previous['prevEXPself']   = retrievecsv($EXPmetnum, $lines, ${'PASSO' . $EXPmetnum});
                $previous['prevHOUSE']     = retrievecsv($HOUSEmetnum, $lines, ${'PASSO' . $HOUSEmetnum});
                $previous['prevSELF']      = retrievecsv($SELFCmetnum, $lines, ${'PASSO' . $SELFCmetnum});
                $previous['prevPRODhouse'] = retrievecsv($PRODmetnum, $lines, ${'PASSO' . $PRODmetnum});
                $previous['prevPRODself']  = $previous['prevPRODhouse'];
            } else { // restarting from scratch !
                $import     = null;
                $export     = null;
                $production = null;
                // latest import
                $import     = getvalue($IMPID, $IMPcmd);
                // latest export
                $export     = getvalue($EXPID, $EXPcmd);
                // latest production
                $production = getvalue($PRODID, $PRODcmd);
                
                $previous['prevIMPhouse']  = $import;
                $previous['prevEXPhouse']  = $export;
                $previous['prevEXPself']   = $export;
                $previous['prevHOUSE']     = 0;
                $previous['prevSELF']      = 0;
                $previous['prevPRODhouse'] = $production;
                $previous['prevPRODself']  = $production;
            }
        }
        
        // Now retrieve latest values
        $import     = null;
        $export     = null;
        $production = null;
        $outstr     = null;
        
        if ($argv[1] == '-energy') { // latest import
            $import = getvalue($IMPID, $IMPcmd);
        }
        $export     = getvalue($EXPID, $EXPcmd); // latest export
        $production = getvalue($PRODID, $PRODcmd); // latest production
        
        // Household consumption
        if ($argv[1] == '-energy' && isset($import) && isset($export)) {
            if ($export >= $previous['prevEXPhouse']) { // Some passover checks
                $diffEXP = $export - $previous['prevEXPhouse'];
            } else {
                $diffEXP = $export + ${'PASSO' . $EXPmetnum} - $previous['prevEXPhouse'];
            }
            if (isset($production)) {
                if ($production >= $previous['prevPRODhouse']) {
                    $diffPROD = $production - $previous['prevPRODhouse'];
                } else {
                    $diffPROD = $production + ${'PASSO' . $PRODmetnum} - $previous['prevPRODhouse'];
                }
                //settype($previous['prevPRODhouse'], 'int');
                $previous['prevPRODhouse'] = $production;
            } else { // no production case
                $diffPROD = 0;
                $diffEXP  = 0;
            }
            if ($import >= $previous['prevIMPhouse']) {
                $diffIMP = $import - $previous['prevIMPhouse'];
            } else {
                $diffIMP = $import + ${'PASSO' . $IMPmetnum} - $previous['prevIMPhouse'];
            }
            $difference = $diffIMP + $diffPROD - $diffEXP;
            if ($difference < 0) { // Might happen if the production return no difference while the export did !
                $difference = 0;
            }
            $previous['prevHOUSE'] += $difference;
            
            if ($previous['prevHOUSE'] >= ${'PASSO' . $HOUSEmetnum}) { // passed over
                $previous['prevHOUSE'] -= ${'PASSO' . $HOUSEmetnum};
            }
            $val    = $previous['prevHOUSE'];
            $outstr = utf8_decode("$HOUSEID($val*Wh)\n");
            
            //settype($previous['prevIMPhouse'], 'int');
            $previous['prevIMPhouse'] = round($import,1);
            //settype($previous['prevEXPhouse'], 'int');
            $previous['prevEXPhouse'] = round($export,1);
            //settype($previous['prevHOUSE'], 'int');
            $previous['prevHOUSE'] = round($previous['prevHOUSE'],1);
        } elseif ($argv[1] == '-self' && isset($export)) { // Self 
            // Some passover checks
            if ($export >= $previous['prevEXPself']) {
                $diffEXP = $export - $previous['prevEXPself'];
            } else {
                $diffEXP = $export + ${'PASSO' . $EXPmetnum} - $previous['prevEXPself'];
            }
            if (isset($production)) {
                if ($production >= $previous['prevPRODself']) {
                    $diffPROD = $production - $previous['prevPRODself'];
                } else {
                    $diffPROD = $production + ${'PASSO' . $PRODmetnum} - $previous['prevPRODself'];
                }
                //settype($previous['prevPRODself'], 'int');
                $previous['prevPRODself'] = round($production,1);
            } else { // no production case
                $diffPROD = 0;
                $diffEXP  = 0;
            }
            
            $difference = $diffPROD - $diffEXP;
            if ($difference < 0) {
                $difference = 0;
            }
            
            $previous['prevSELF'] += $difference;
            if ($previous['prevSELF'] >= ${'PASSO' . $SELFCmetnum}) {
                $previous['prevSELF'] -= ${'PASSO' . $SELFCmetnum};
            }
            $previous['prevSELF'] = round($previous['prevSELF'],1);
            $val    = $previous['prevSELF'];
            $outstr = utf8_decode("$SELFID($val*Wh)\n");
            
            //settype($previous['prevEXPself'], 'int');
            $previous['prevEXPself'] = round($export,1);
            //settype($previous['prevSELF'], 'int');
        }
        
        // Saving previous values
        $data = json_encode($previous);
        file_put_contents($prevfile, $data);
        
        echo "$outstr";
    } elseif ($argv[1] == '-prev') {
        if (file_exists($prevfile)) {
            echo "\n$prevfile :\n\n";
            $data     = file_get_contents($prevfile);
            $previous = json_decode($data, true);
            print_r($previous);
        }
    } else {
        die("Abording: no valid argument given or missing value(s).\n");
    }
} else {
//    echo "Usage: houseenergy { power | powerimp | powerexp | powerself | volt | freq | cos | eimp | eexp | energy | self }\n
//        -power :\t Total power
//        -powerimp :\t Power imported
//        -powerexp :\t Power exported
//        -powerself :\t Power self consumed
//      -volt :\t\t Grid voltage
//      -frq :\t\t Grid frequency
//      -cos :\t\t Power factor
//        -eimp :\t\t Energy imported
//        -eexp :\t\t Energy exported
//        -energy :\t Household virtual consumption meter
//        -self :\t\t Household virtual  self consumption meter
//        -prev :\t\t Show previous stored values
//        \n";
    echo "Usage: houseenergy { power | powerimp | powerexp | powerself | eimp | eexp | energy | self }\n
	-power :\t Total power
	-powerimp :\t Power imported
	-powerexp :\t Power exported
	-powerself :\t Power self consumed
	-eimp :\t\t Energy imported
        -eimpn :\t\t Energy imported negated
        -eexp :\t\t Energy exported
        -eexpn :\t\t Energy exported negated
	-energy :\t Household virtual consumption meter
	-self :\t\t Household virtual  self consumption meter
	-prev :\t\t Show previous stored values
	\n";

}
?>
Finally, if you want to have consistent meter diff values in your home atomization, you can use the script below to publish data from meterN csv to MQTT. I am calling this script from modified meterN file metern.php, maybe some daemon like design would be the better choice for future updates.

/var/www/html/metern/scripts/metern.php

Code: Select all

<?php
/**
 * /srv/http/metern/scripts/metern.php
 *
 * @package default
 */

$mqttPubCmd = "/var/www/html/metern/comapps/mqttPub.py";

include 'loadcfg.php';
...
...
                        } // Consumption/prod check
                        include '../config/config_trigger.php';
                        $mqttPubCmdReturn = null;
                        exec($mqttPubCmd, $mqttPubCmdReturn);
                } // 5 min
...
...
/var/www/html/metern/comapps/mqttPub.py

Code: Select all

#!/usr/bin/python3 
import time
import datetime
import csv
import sys
import paho.mqtt.client as mqtt
import glob
import os
import json

csvDataFiles = "/var/www/html/metern/data/csv/*.csv"
mqttTopic = "metern/data"

list_of_files = glob.glob(csvDataFiles)
if len(list_of_files)==0:
    print("Nothing found here: {}".format(csvDataFiles))
    sys.exit(1)

latest_file = max(list_of_files, key=os.path.getmtime)
fileAge = time.time() - os.path.getmtime(latest_file)

if fileAge>300:
    print("csv {} too old: {}s".format(latest_file, fileAge))
    sys.exit(2)

lastRow = None
last2ndRow = None
with open(latest_file) as csvfile:
    csvReader = csv.reader(csvfile, delimiter=',')
    csvTitle = None
    for row in csvReader:
        if csvTitle is None:
            csvTitle = row
        last2ndRow = lastRow
        lastRow = row

meterNDict = {}
for i in range(len(csvTitle)):
    if csvTitle[i] == "Time":
        try:
            d1 = datetime.datetime.strptime(last2ndRow[i], '%H:%M')
            d2 = datetime.datetime.strptime(lastRow[i], '%H:%M')
            meterNDict[csvTitle[i]] = lastRow[i]
            meterNDict['TimeDiff'] = (d2 - d1).total_seconds()
        except:
            meterNDict['TimeDiff'] = None
            meterNDict[csvTitle[i]] = None
    else:
        try:
            meterNDict[csvTitle[i]] = round(float(lastRow[i]) - float(last2ndRow[i]) , 1)
        except:
            meterNDict[csvTitle[i]] = None

broker_address = "localhost"  # Broker address
port = 1883  # Broker port
# user = "yourUser"                    #Connection username
# password = "yourPassword"            #Connection password
client = mqtt.Client()  # create new instance
# client.username_pw_set(user, password=password)    #set username and password
client.connect(broker_address, port=port)  # connect to broker
ret = client.publish(mqttTopic, json.dumps(meterNDict))
Last edited by to-scho on Sun Dec 27, 2020 9:13 am, edited 5 times in total.

jeanmarc
Posts: 2100
Joined: Thu Aug 29, 2013 7:16 am

Re: MQTT sensor interfacing

Post by jeanmarc » Sat Dec 26, 2020 3:03 pm

Thanks for sharing :thumbup:

flane
Posts: 92
Joined: Mon Aug 11, 2014 7:22 am

Re: MQTT sensor interfacing

Post by flane » Mon Jan 04, 2021 4:32 pm

Hi, great work!

How can I read the live values of 123Solar and Metern files (mN_LIVEMEMORY.json and 123s_LIVEMEMORY.json) and publish them on the MQTT broker?

to-scho
Posts: 2
Joined: Sat Dec 26, 2020 1:24 pm

Re: MQTT sensor interfacing

Post by to-scho » Sat Jan 09, 2021 9:38 am

flane wrote: Mon Jan 04, 2021 4:32 pm Hi, great work!

How can I read the live values of 123Solar and Metern files (mN_LIVEMEMORY.json and 123s_LIVEMEMORY.json) and publish them on the MQTT broker?
There is mqtt_energy.php in the comapps misc examples which may fit your needs. For sure you may also do some python version similar to my stuff.

Post Reply

Who is online

Users browsing this forum: No registered users and 1 guest