Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.directenergypartners.com/llms.txt

Use this file to discover all available pages before exploring further.

The code below describes an application to manage the state of charge of a battery system. It continuously checks the battery’s SOC and adjusts the power setpoint to charge or discharge the battery towards a target SOC, constrained by predefined minimum and maximum SOC levels. The power setpoint is limited by the maximum import/export capabilities of the converter and the charge/discharge limits of the battery. Commands are then sent to the converter to adjust its power output accordingly. The script runs in an infinite loop with a specified delay between each iteration, and it handles exceptions and graceful shutdown on user interruption.
target-soc.py
"""
Main entry point for the target SOC application.
"""

import logging
import os
import sys
import time
from dotenv import load_dotenv

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from pplapp import Pplapp

# -- Configuration ------------------------------------------------------------
STARTUP_DELAY_S = 5
CONTROL_LOOP_INTERVAL_S = 5

POWER = 10000
MINSOC = 20
MAXSOC = 90
TARGETSOC = 75

BATTERY_ID = "battery1"
CONVERTER_ID = "converter1"

# -- Logging ------------------------------------------------------------------
log = logging.getLogger("app")
log.setLevel(logging.INFO)
formatter = logging.Formatter(
    fmt="[%(asctime)s] %(levelname)s %(name)s %(message)s",
    datefmt="%d.%m.%Y %H:%M:%S",
)
_project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
fileHandler = logging.FileHandler(os.path.join(_project_root, "app.log"))
fileHandler.setFormatter(formatter)
log.addHandler(fileHandler)

consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
log.addHandler(consoleHandler)

# -- Helper Functions ----------------------------------------------------------
def limit(setpoint, minimum, maximum):
    return max(min(setpoint, maximum), minimum)

def setPower(app, powerSetpoint):
    commands = {
        "control.ports.port2.method": "constant-power",
        "control.ports.port2.power": str(powerSetpoint)
    }
    app.setCommands(CONVERTER_ID, commands)

def disableBatteryPort(app):
    commands = {
        "control.ports.port2.method": "disabled",
        "control.ports.port2.power": str(0)
    }
    app.setCommands(CONVERTER_ID, commands)

# -- Main Application Logic ---------------------------------------------------
def ems(app):
    # Check if target SOC is within limits
    targetSoc = limit(TARGETSOC, MINSOC, MAXSOC)

    # Battery reported State of Charge
    soc = int(float(app.getMeasurements(BATTERY_ID, "measure.ports.port1.soc")))

    if soc < targetSoc:
        powerSetpoint = -POWER # Charge the battery, need to set negative power setpoint to converter port
    elif soc > targetSoc:
        powerSetpoint = POWER # Discharge the battery, need to set positive power setpoint to converter port
    else:
        powerSetpoint = 0

    # Check if power setpoint is within limits of the converter
    converterImportPowerMax = int(app.getMeasurements(CONVERTER_ID, "measure.ports.port2.power.import.max"))
    converterExportPowerMax = int(app.getMeasurements(CONVERTER_ID, "measure.ports.port2.power.export.max"))

    powerSetpoint = limit(powerSetpoint, converterExportPowerMax, converterImportPowerMax)

    # Check if power setpoint is within limits of the battery
    batteryChargePowerMax = int(app.getMeasurements(BATTERY_ID, "measure.ports.port1.power.charge.max"))
    batteryDischargePowerMax = int(app.getMeasurements(BATTERY_ID, "measure.ports.port1.power.discharge.max"))

    powerSetpoint = limit(powerSetpoint, -batteryChargePowerMax, batteryDischargePowerMax)

    setPower(app, powerSetpoint)

    log.info("Battery SOC: %s%% - Target SOC: %s%%", soc, targetSoc)
    log.info("Power Setpoint: %sW", powerSetpoint)

def main() -> None:
    load_dotenv()

    ipAddress = os.getenv("IP_ADDRESS")
    username = os.getenv("NATS_USERNAME")
    password = os.getenv("NATS_PASSWORD")

    if not ipAddress or not username or not password:
        log.error("IP_ADDRESS, NATS_USERNAME, and NATS_PASSWORD must be set in .env")
        sys.exit(1)

    log.info("Connecting to PPL controller at %s", ipAddress)
    app = Pplapp(ipAddress, username, password)

    time.sleep(STARTUP_DELAY_S)

    try:
        while True:
            try:
                ems(app)
            except Exception as e:
                log.exception("Error in control loop: %s", e)
            time.sleep(CONTROL_LOOP_INTERVAL_S)

    except KeyboardInterrupt:
        log.info("Shutdown requested")
        disableBatteryPort(app)
        time.sleep(CONTROL_LOOP_INTERVAL_S)
        app.stop()
        log.info("Clean shutdown complete")


if __name__ == "__main__":
    main()