Documentation Index
Fetch the complete documentation index at: https://docs.directenergypartners.com/llms.txt
Use this file to discover all available pages before exploring further.
The code below describes an application to manage the state of charge of a battery system. It continuously checks the
battery’s SOC and adjusts the power setpoint to charge or discharge the battery towards a target SOC, constrained by
predefined minimum and maximum SOC levels. The power setpoint is limited by the maximum import/export capabilities of
the converter and the charge/discharge limits of the battery. Commands are then sent to the converter to adjust its
power output accordingly. The script runs in an infinite loop with a specified delay between each iteration, and it
handles exceptions and graceful shutdown on user interruption.
"""
Main entry point for the target SOC application.
"""
import logging
import os
import sys
import time
from dotenv import load_dotenv
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pplapp import Pplapp
# -- Configuration ------------------------------------------------------------
STARTUP_DELAY_S = 5
CONTROL_LOOP_INTERVAL_S = 5
POWER = 10000
MINSOC = 20
MAXSOC = 90
TARGETSOC = 75
BATTERY_ID = "battery1"
CONVERTER_ID = "converter1"
# -- Logging ------------------------------------------------------------------
log = logging.getLogger("app")
log.setLevel(logging.INFO)
formatter = logging.Formatter(
fmt="[%(asctime)s] %(levelname)s %(name)s %(message)s",
datefmt="%d.%m.%Y %H:%M:%S",
)
_project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
fileHandler = logging.FileHandler(os.path.join(_project_root, "app.log"))
fileHandler.setFormatter(formatter)
log.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
log.addHandler(consoleHandler)
# -- Helper Functions ----------------------------------------------------------
def limit(setpoint, minimum, maximum):
return max(min(setpoint, maximum), minimum)
def setPower(app, powerSetpoint):
commands = {
"control.ports.port2.method": "constant-power",
"control.ports.port2.power": str(powerSetpoint)
}
app.setCommands(CONVERTER_ID, commands)
def disableBatteryPort(app):
commands = {
"control.ports.port2.method": "disabled",
"control.ports.port2.power": str(0)
}
app.setCommands(CONVERTER_ID, commands)
# -- Main Application Logic ---------------------------------------------------
def ems(app):
# Check if target SOC is within limits
targetSoc = limit(TARGETSOC, MINSOC, MAXSOC)
# Battery reported State of Charge
soc = int(float(app.getMeasurements(BATTERY_ID, "measure.ports.port1.soc")))
if soc < targetSoc:
powerSetpoint = -POWER # Charge the battery, need to set negative power setpoint to converter port
elif soc > targetSoc:
powerSetpoint = POWER # Discharge the battery, need to set positive power setpoint to converter port
else:
powerSetpoint = 0
# Check if power setpoint is within limits of the converter
converterImportPowerMax = int(app.getMeasurements(CONVERTER_ID, "measure.ports.port2.power.import.max"))
converterExportPowerMax = int(app.getMeasurements(CONVERTER_ID, "measure.ports.port2.power.export.max"))
powerSetpoint = limit(powerSetpoint, converterExportPowerMax, converterImportPowerMax)
# Check if power setpoint is within limits of the battery
batteryChargePowerMax = int(app.getMeasurements(BATTERY_ID, "measure.ports.port1.power.charge.max"))
batteryDischargePowerMax = int(app.getMeasurements(BATTERY_ID, "measure.ports.port1.power.discharge.max"))
powerSetpoint = limit(powerSetpoint, -batteryChargePowerMax, batteryDischargePowerMax)
setPower(app, powerSetpoint)
log.info("Battery SOC: %s%% - Target SOC: %s%%", soc, targetSoc)
log.info("Power Setpoint: %sW", powerSetpoint)
def main() -> None:
load_dotenv()
ipAddress = os.getenv("IP_ADDRESS")
username = os.getenv("NATS_USERNAME")
password = os.getenv("NATS_PASSWORD")
if not ipAddress or not username or not password:
log.error("IP_ADDRESS, NATS_USERNAME, and NATS_PASSWORD must be set in .env")
sys.exit(1)
log.info("Connecting to PPL controller at %s", ipAddress)
app = Pplapp(ipAddress, username, password)
time.sleep(STARTUP_DELAY_S)
try:
while True:
try:
ems(app)
except Exception as e:
log.exception("Error in control loop: %s", e)
time.sleep(CONTROL_LOOP_INTERVAL_S)
except KeyboardInterrupt:
log.info("Shutdown requested")
disableBatteryPort(app)
time.sleep(CONTROL_LOOP_INTERVAL_S)
app.stop()
log.info("Clean shutdown complete")
if __name__ == "__main__":
main()