ECTIVE AccuBox 300S

Guten Tag

Habe in meinem Van ein Accubox von Ective verbaut.

Kann diese auch mit der zugehörigen App über Bluetooth auslesen, würde es aber gerne in den VanPI Integrieren.

Wer kann helfen?

Danke

Hallo Klaus, ich bin mal von dem anderen in deinen Topic rübergekommen da er spezifischer auf die Accubox von Ective ausgelegt ist. Du hattest in dem anderen Topic gefragt wie man das Skript zum Laufen bekommt. Erstmal kurz zum Stand des Skripts, es ist derzeit hier am werkeln und ich bekomme tatsächlich gute Daten aus der Accubox, es gibt gelegentliche einzelne Ausreisser in der Ausgabe was sich aber bis jetzt immer wieder eingependelt hat. Ich dachte mir ich stelle es mal zu Verfügung da es hier im Forum den ein oder anderen gab der Interesse daran hatte und da ich natürlich sehr an Verbesserungen interessiert bin dachte ich mir, vielleicht kann man es hier gemeinsam mal testen und schauen ob und wie gut es funktioniert.

Kurz zu mir, ich habe einige Produkte von Peakaway (Fussy und die Hochstrombüchse :wink: ) aber kein Core, Weihnachten dauert noch ein bisschen :smiley: . Ich habe aber ebenfalls einen Raspberry Pi 4B mit einem Raspbian Lite Version “Debian GNU/Linux 12 (bookworm)” im Camper am laufen der ein Dashboard und ein Cockpit über das lokale WLAN (Pi mittels LAN an einen GL.Inet angebunden) zu Verfügung stellt.

Das Skript: Zum testen und zum entwickeln habe ich erstmal einfach den Laptop genommen und die wirklich langsam werdende Entwicklung irgendwann auf den Pi verschoben, wichtig ist das du einen Bluetooth Adapter an Board hast der BLE unterstützt sollte eigentlich ab in und um 2015 Standart sein. Ich würde an deiner Stelle auch erstmal über den Laptop schauen ob das Skript auch für dein Baujahr der Accubox funktioniert.

MAC Adresse ausfindig machen: Die MAC sollte eigentlich auf der Accubox zufinden sein, die brauchst du später für das Skript da meine derzeit dort hardgecodet drinne steht

# Default MAC address (can be overridden with --mac)
DEFAULT_MAC = "58:b6:4f:49:9c:f1"

musst du entweder das --mac [deine MAC Adresse} flag im Kommando nehmen, das überschreibt die hardgecodete MAC oder du trägst im Skript direkt deine an obiger Stelle im Code ein.

Du brauchst bleak – das ist eine kleine Python-Bibliothek, die sich um die Bluetooth Low Energy Verbindung zu deiner AccuBox kümmert (scannen, verbinden, Daten auslesen), installieren kannst du sie über pip z.b. so:

bash

pip install --user bleak

was die einzige dependency ist. Danach einmal aus- und wieder einloggen (damit die Bluetooth-Berechtigung greift).

Nebenbei: Wie die Daten dann im VanPi weiterverarbeitet werden, weiß ich selbst nicht genau (wie schon geschreiben, habe leider kein VanPi System hier) aber nach meiner Recherche läuft das wahrscheinlich über MQTT ? Ein bisschen vorbereitend (sofern es überhaupt gebrauch finden kann), habe ich das Skript schon mal um eine MQTT-Publish-Funktion erweitert. Für den ersten Test reicht es aber völlig, wenn wir erstmal nur sehen, ob überhaupt Daten von deiner Batterie kommen.

Du kannst dieses Skript ja mal testen

#!/usr/bin/env python3
"""
AccuBox BLE Reader for Ective AccuBox (S200)

This script connects to an Ective AccuBox battery via Bluetooth Low Energy,
sends a status request, decodes the 21‑byte notification and outputs the
battery parameters as JSON. It supports both one‑time and cyclic polling,
as well as optional MQTT publishing using `mosquitto_pub` (no Python MQTT
library required).

Features:
	- One‑time or cyclic query mode
	- Automatic charge/discharge mode detection
	- MQTT publishing with configurable broker, topic, QoS, retain, fanout
	- Raw hex output for debugging
	- Graceful shutdown on SIGINT/SIGTERM

Requirements:
	- Python 3.10+
	- bleak library (install with `pip install --user bleak`)
	- mosquitto-clients (only for MQTT, `sudo apt install mosquitto-clients`)

Usage examples:
	# One‑time read (using default MAC or custom)
	python3 script.py
	python3 script.py --mac 58:b6:4f:49:9c:f1

	# Cyclic every 10 seconds
	python3 script.py --mode cyclic --interval 10

	# Cyclic + MQTT (custom broker and topic)
	python3 script.py --mode cyclic --interval 10 --publish mosquitto --topic my/battery --mqtt-host 192.168.1.100

	# With fanout (individual subtopics for each field)
	python3 script.py --mode cyclic --publish mosquitto --fanout

Author: ummeegge / Erik
License: MIT
"""

from __future__ import annotations

import argparse
import asyncio
import json
import logging
import subprocess
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from shutil import which
from typing import Optional

from bleak import BleakClient, BleakScanner

# ---------- BLE protocol constants ----------
# Default MAC address (can be overridden with --mac)
DEFAULT_MAC = "58:b6:4f:49:9c:f1"

CHAR_UUID = "0000ffe1-0000-1000-8000-00805f9b34fb"
CMD_REQUEST_STATUS = bytes.fromhex("1c030400000846b1")

# Empirical scaling factors (from reverse engineering)
VOLT_SCALING_CHARGE = 47.3      # voltage raw > 200 → charging
VOLT_SCALING_DISCHARGE = 10.0   # voltage raw < 200 → discharging
CURRENT_SCALING_CHARGE = 94.0   # current from bytes 12‑13 (charging)
CURRENT_SCALING_DISCHARGE = 7965.0  # current from bytes 18‑19 (discharging)

LOGGER = logging.getLogger("accubox")


# ---------- Data model ----------
@dataclass
class Status:
	"""Decoded battery status data."""

	status: str                     # "ok" or "error"
	timestamp: str                  # ISO format with timezone
	capacity_ah: Optional[int] = None
	soc_percent: Optional[int] = None
	rest_time: Optional[str] = None  # "hh:mm"
	voltage_v: Optional[float] = None
	current_a: Optional[float] = None
	power_w: Optional[float] = None
	mode: Optional[str] = None       # "charging" or "discharging"
	raw_hex: Optional[str] = None    # full 21‑byte hex dump (if --raw)
	message: Optional[str] = None    # error message


# ---------- Helper functions ----------
def utc_now_iso() -> str:
	"""Return current local time in ISO format with timezone."""
	return datetime.now(timezone.utc).astimezone().isoformat()


def status_to_payload(status: Status) -> str:
	"""Serialize a Status object to a JSON string."""
	return json.dumps(asdict(status), ensure_ascii=False)


def decode_status(data: bytes, include_raw: bool = True) -> Status:
	"""
	Decode a 21‑byte BLE notification into a Status object.

	Args:
		data: Raw bytes received from the characteristic.
		include_raw: If True, add the hexadecimal representation to the status.

	Returns:
		Status object with decoded fields.
	"""
	if len(data) != 21:
		return Status(
			status="error",
			timestamp=utc_now_iso(),
			message=f"Unexpected length: {len(data)}",
			raw_hex=data.hex() if include_raw else None,
		)

	# Extract little‑endian fields
	capacity_ah = int.from_bytes(data[4:6], "little")
	soc_percent = int.from_bytes(data[6:8], "little")
	rest_hours = int.from_bytes(data[8:10], "little")
	rest_minutes = int.from_bytes(data[10:12], "little")
	current_charge_raw = int.from_bytes(data[12:14], "little", signed=True)
	# bytes 14-16 are temperature (unused, not displayed in app)
	voltage_raw = int.from_bytes(data[16:18], "little")
	current_discharge_raw = int.from_bytes(data[18:20], "little", signed=True)

	# Mode detection based on voltage raw value
	if voltage_raw > 200:
		mode = "charging"
		voltage_v = round(voltage_raw / VOLT_SCALING_CHARGE, 2)
		current_a = round(abs(current_charge_raw) / CURRENT_SCALING_CHARGE, 2)
	else:
		mode = "discharging"
		voltage_v = round(voltage_raw / VOLT_SCALING_DISCHARGE, 2)
		current_a = -round(abs(current_discharge_raw) / CURRENT_SCALING_DISCHARGE, 2)

	power_w = round(abs(current_a) * voltage_v, 1)
	if current_a < 0:
		power_w = -power_w

	return Status(
		status="ok",
		timestamp=utc_now_iso(),
		capacity_ah=capacity_ah,
		soc_percent=soc_percent,
		rest_time=f"{rest_hours:02d}:{rest_minutes:02d}",
		voltage_v=voltage_v,
		current_a=current_a,
		power_w=power_w,
		mode=mode,
		raw_hex=data.hex() if include_raw else None,
	)


# ---------- Command line argument parser ----------
def build_parser() -> argparse.ArgumentParser:
	"""Build and return the argument parser with all options."""
	parser = argparse.ArgumentParser(
		description="Ective AccuBox BLE to JSON/MQTT reader",
		formatter_class=argparse.ArgumentDefaultsHelpFormatter,
	)
	parser.add_argument(
		"--mac",
		default=DEFAULT_MAC,
		help="BLE MAC address (default: %(default)s)",
	)
	parser.add_argument(
		"--mode",
		choices=["once", "cyclic"],
		default="once",
		help="Query mode: once or cyclic",
	)
	parser.add_argument(
		"--interval",
		type=float,
		default=5.0,
		help="Interval in seconds for cyclic mode",
	)
	parser.add_argument(
		"--timeout",
		type=float,
		default=3.0,
		help="Seconds to wait for a BLE notification",
	)
	parser.add_argument(
		"--verbose", action="store_true", help="Enable debug logging"
	)
	parser.add_argument(
		"--raw", action="store_true", help="Include raw_hex field in output"
	)
	parser.add_argument(
		"--publish",
		choices=["none", "stdout", "mosquitto"],
		default="none",
		help="Where to publish the status",
	)
	parser.add_argument(
		"--topic",
		default="battery/accubox",
		help="MQTT topic (default: %(default)s)",
	)
	parser.add_argument(
		"--state-topic",
		default="battery/accubox/state",
		help="MQTT online/offline topic (default: %(default)s)",
	)
	parser.add_argument(
		"--qos", type=int, choices=[0, 1, 2], default=1, help="MQTT QoS level"
	)
	parser.add_argument(
		"--retain", action="store_true", help="Retain MQTT messages"
	)
	parser.add_argument(
		"--no-retain", action="store_true", help="Disable retain"
	)
	parser.add_argument(
		"--fanout",
		action="store_true",
		help="Publish individual fields as subtopics",
	)
	parser.add_argument("--mqtt-host", default="localhost", help="MQTT broker host")
	parser.add_argument("--mqtt-port", type=int, default=1883, help="MQTT broker port")
	parser.add_argument("--mqtt-username", default=None, help="MQTT username (if needed)")
	parser.add_argument("--mqtt-password", default=None, help="MQTT password (if needed)")
	parser.add_argument(
		"--once",
		action="store_true",
		help="Run exactly one cycle even in cyclic mode",
	)
	return parser


# ---------- Logging ----------
def setup_logging(verbose: bool) -> None:
	"""Configure logging level and format."""
	level = logging.DEBUG if verbose else logging.INFO
	logging.basicConfig(
		level=level, format="%(asctime)s %(levelname)s %(message)s"
	)


# ---------- MQTT helpers (using mosquitto_pub) ----------
def effective_retain(args) -> bool:
	"""Determine whether to set the MQTT retain flag."""
	if args.no_retain:
		return False
	if args.retain:
		return True
	return True if args.publish == "mosquitto" else False


def publish_stdout(status: Status) -> None:
	"""Print the status as JSON to standard output."""
	print(status_to_payload(status), flush=True)


def publish_mosquitto(status: Status, args) -> None:
	"""
	Publish the status to an MQTT broker using mosquitto_pub.

	Args:
		status: Status object to publish.
		args: Parsed command line arguments with MQTT settings.

	Raises:
		RuntimeError: If mosquitto_pub is not found in PATH.
		subprocess.CalledProcessError: If mosquitto_pub fails.
	"""
	if which("mosquitto_pub") is None:
		raise RuntimeError("mosquitto_pub not found – install mosquitto-clients")

	payload = status_to_payload(status)
	base = [
		"mosquitto_pub",
		"-h", args.mqtt_host,
		"-p", str(args.mqtt_port),
		"-t", args.topic,
		"-m", payload,
		"-q", str(args.qos),
	]
	if effective_retain(args):
		base.append("-r")
	if args.mqtt_username:
		base.extend(["-u", args.mqtt_username])
	if args.mqtt_password:
		base.extend(["-P", args.mqtt_password])

	subprocess.run(base, check=True)

	# Fanout: one subtopic per field
	if args.fanout and status.status == "ok":
		fanout_fields = {
			"voltage_v": status.voltage_v,
			"current_a": status.current_a,
			"power_w": status.power_w,
			"soc_percent": status.soc_percent,
			"capacity_ah": status.capacity_ah,
			"mode": status.mode,
			"rest_time": status.rest_time,
		}
		for name, value in fanout_fields.items():
			if value is None:
				continue
			cmd = base.copy()
			# replace topic and message
			topic_idx = cmd.index("-t") + 1
			cmd[topic_idx] = f"{args.topic.rstrip('/')}/{name}"
			msg_idx = cmd.index("-m") + 1
			cmd[msg_idx] = str(value)
			subprocess.run(cmd, check=True)

	# State online (always retained)
	state_cmd = base.copy()
	state_topic_idx = state_cmd.index("-t") + 1
	state_cmd[state_topic_idx] = args.state_topic
	state_msg_idx = state_cmd.index("-m") + 1
	state_cmd[state_msg_idx] = "online"
	if "-r" not in state_cmd:
		state_cmd.append("-r")
	subprocess.run(state_cmd, check=True)


def publish_offline(args) -> None:
	"""Publish offline state to the MQTT state topic."""
	if which("mosquitto_pub") is None:
		return
	cmd = [
		"mosquitto_pub",
		"-h", args.mqtt_host,
		"-p", str(args.mqtt_port),
		"-t", args.state_topic,
		"-m", "offline",
		"-q", str(args.qos),
		"-r",
	]
	if args.mqtt_username:
		cmd.extend(["-u", args.mqtt_username])
	if args.mqtt_password:
		cmd.extend(["-P", args.mqtt_password])
	try:
		subprocess.run(cmd, check=False)
	except Exception:
		pass


# ---------- BLE communication ----------
async def find_characteristic(client: BleakClient, uuid: str):
	"""Return the first GATT characteristic matching the UUID."""
	services = client.services if getattr(client, "services", None) else await client.get_services()
	for service in services:
		for char in service.characteristics:
			if char.uuid.lower() == uuid.lower():
				return char
	return None


async def read_once(mac: str, timeout: float, include_raw: bool) -> Status:
	"""
	Connect to the battery, request one status update, and return it.

	Args:
		mac: BLE MAC address.
		timeout: Seconds to wait for the notification.
		include_raw: Whether to include raw_hex in the output.

	Returns:
		Status object (may be error status).
	"""
	device = await BleakScanner.find_device_by_address(mac, timeout=8.0)
	if not device:
		return Status(status="error", timestamp=utc_now_iso(), message=f"Device {mac} not found")

	async with BleakClient(device) as client:
		char = await find_characteristic(client, CHAR_UUID)
		if not char:
			return Status(status="error", timestamp=utc_now_iso(), message="Characteristic not found")

		result: Optional[Status] = None

		def handler(sender, data):
			nonlocal result
			result = decode_status(data, include_raw=include_raw)

		await client.start_notify(char, handler)
		await asyncio.sleep(0.2)
		await client.write_gatt_char(char, CMD_REQUEST_STATUS, response=False)

		# Simple polling loop (works reliably)
		for _ in range(int(timeout * 10)):
			if result is not None:
				break
			await asyncio.sleep(0.1)

		await client.stop_notify(char)

		return result or Status(status="error", timestamp=utc_now_iso(), message="Timeout")


async def cyclic_query(mac: str, interval: float, timeout: float, include_raw: bool, args) -> int:
	"""
	Continuously query the battery and publish results.

	Args:
		mac: BLE MAC address.
		interval: Seconds between queries.
		timeout: Seconds to wait for each notification.
		include_raw: Whether to include raw_hex.
		args: Parsed command line arguments (for MQTT settings).

	Returns:
		Exit code (0 on success, >0 on error).
	"""
	device = await BleakScanner.find_device_by_address(mac, timeout=8.0)
	if not device:
		status = Status(status="error", timestamp=utc_now_iso(), message=f"Device {mac} not found")
		publish_stdout(status)
		if args.publish == "mosquitto":
			publish_offline(args)
		return 2

	async with BleakClient(device) as client:
		char = await find_characteristic(client, CHAR_UUID)
		if not char:
			status = Status(status="error", timestamp=utc_now_iso(), message="Characteristic not found")
			publish_stdout(status)
			if args.publish == "mosquitto":
				publish_offline(args)
			return 3

		result: Optional[Status] = None

		def handler(sender, data):
			nonlocal result
			result = decode_status(data, include_raw=include_raw)

		await client.start_notify(char, handler)

		# Warm-up: first query with longer timeout, result is discarded
		result = None
		await client.write_gatt_char(char, CMD_REQUEST_STATUS, response=False)
		for _ in range(50):  # 5 seconds wait (50 * 0.1s)
			if result is not None:
				break
			await asyncio.sleep(0.1)

		try:
			while True:
				result = None
				await client.write_gatt_char(char, CMD_REQUEST_STATUS, response=False)

				# Simple polling loop
				for _ in range(int(timeout * 10)):
					if result is not None:
						break
					await asyncio.sleep(0.1)

				if result:
					status = result
				else:
					status = Status(status="error", timestamp=utc_now_iso(), message="Timeout")

				publish_stdout(status)
				if args.publish == "mosquitto":
					publish_mosquitto(status, args)

				if args.once:
					break

				await asyncio.sleep(interval)
		finally:
			await client.stop_notify(char)
			if args.publish == "mosquitto":
				publish_offline(args)

	return 0


# ---------- Main entry point ----------
async def async_main() -> int:
	"""Async main routine."""
	parser = build_parser()
	args = parser.parse_args()
	setup_logging(args.verbose)

	if args.publish == "mosquitto" and which("mosquitto_pub") is None:
		LOGGER.error("mosquitto_pub not found – install mosquitto-clients")
		return 4

	if args.mode == "once":
		status = await read_once(args.mac, args.timeout, args.raw)
		publish_stdout(status)
		if args.publish == "mosquitto":
			publish_mosquitto(status, args)
			publish_offline(args)
		return 0 if status.status == "ok" else 1

	return await cyclic_query(args.mac, args.interval, args.timeout, args.raw, args)


def main() -> int:
	"""Synchronous entry point."""
	try:
		return asyncio.run(async_main())
	except KeyboardInterrupt:
		print("\nAbbruch durch Benutzer", file=sys.stderr)
		return 0


if __name__ == "__main__":
	raise SystemExit(main())

, speicher es z.b. als “accubox-ble.py” und dann musst du es noch ausführbar machen mittels

chmod +x accubox-ble.py

und testen mittels

./accubox-ble.py --mac 58:b6:4f:49:9c:f1

. Die MAC musst du mit deiner ersetzen (kurzes Stossgebet :smiley: ) und ENTER drücken, es sollte was in dieser Art zusehen sein

{"status": "ok", "timestamp": "2026-06-01T17:27:52.856708+02:00", "capacity_ah": 182, "soc_percent": 91, "rest_time": "143:32", "voltage_v": 13.3, "current_a": -0.29, "power_w": -3.9, "mode": "discharging", "raw_hex": null, "message": null}

Meine AccuBox ist eine S200 von 2023. Ob das Protokoll bei deiner S300 identisch ist, weiß ich nicht – aber ein Test wäre es ja mal wert…

Soweit erstmal. Vielleicht haut es ja hin, würde mich freuen.

Liebe Grüße,

Erik

P.S.: Es ist einiges an Code dazu gekommen durch MQQT , ich denke das lässt sich auch mittels Node-Red regeln aber vielleicht ist es ja so auch praktisch, die leichtere Version ohne MQQT in dem anderen Topic sollte jedenfalls genauso funktionieren.