Skip to content

Commit 55826db

Browse files
Merge pull request #586 from smartenergycontrol-be/feature/influxdb-integration
Add InfluxDB integration for enhanced historical data retrieval
2 parents acacf34 + 3313363 commit 55826db

File tree

8 files changed

+460
-0
lines changed

8 files changed

+460
-0
lines changed

docs/config.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@ We will need to define these parameters to retrieve data from Home Assistant. Th
2424
- 'sensor.power_load_no_var_loads'
2525
- `method_ts_round`: Set the method for timestamp rounding, options are: first, last and nearest.
2626
- `continual_publish`: set to True to save entities to .json after an optimization run. Then automatically republish the saved entities *(with updated current state value)* every `optimization_time_step` minutes. *entity data saved to data_path/entities.*
27+
- `use_influxdb`: Enable InfluxDB as a data source instead of the Home Assistant API. This allows for longer historical data retention and better performance for machine learning models.
28+
- `influxdb_host`: The IP address or hostname of your InfluxDB instance. Defaults to `localhost`.
29+
- `influxdb_port`: The port number for your InfluxDB instance. Defaults to 8086.
30+
- `influxdb_username`: Username for authenticating with InfluxDB. Leave empty if no authentication is required.
31+
- `influxdb_password`: Password for authenticating with InfluxDB. Leave empty if no authentication is required.
32+
- `influxdb_database`: The name of the InfluxDB database containing your Home Assistant data. Defaults to `homeassistant`.
33+
- `influxdb_measurement`: The measurement name where your sensor data is stored. Defaults to `W` for the Home Assistant integration.
34+
- `influxdb_retention_policy`: The retention policy to use for InfluxDB queries. Defaults to `autogen`.
2735

2836
A second part of this section is given by some privacy-sensitive parameters that should be included in a `secrets_emhass.yaml` file alongside the `config_emhass.yaml` file.
2937

docs/differences.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@ See below for a list of associations between the parameters from `config_emhass.
2424
| retrieve_hass_conf | set_zero_min | set_zero_min | |
2525
| retrieve_hass_conf | method_ts_round | method_ts_round | |
2626
| retrieve_hass_conf | continual_publish | continual_publish | |
27+
| retrieve_hass_conf | | use_influxdb | |
28+
| retrieve_hass_conf | | influxdb_host | |
29+
| retrieve_hass_conf | | influxdb_port | |
30+
| retrieve_hass_conf | | influxdb_username | |
31+
| retrieve_hass_conf | | influxdb_password | |
32+
| retrieve_hass_conf | | influxdb_database | |
33+
| retrieve_hass_conf | | influxdb_measurement | |
34+
| retrieve_hass_conf | | influxdb_retention_policy | |
2735
| params_secrets | solcast_api_key | optional_solcast_api_key | |
2836
| params_secrets | solcast_rooftop_id | optional_solcast_rooftop_id | |
2937
| params_secrets | solar_forecast_kwp | optional_solar_forecast_kwp | |
@@ -79,6 +87,12 @@ Descriptions of each parameter can be found at:
7987
- [`Configuration Documentation`](https://emhass.readthedocs.io/en/latest/config.html)
8088
- Configuration page on EMHASS web server (E.g. http://localhost:5000/configuration)
8189

90+
## InfluxDB as a data source
91+
A new feature allows using **InfluxDB** as an alternative data source to the Home Assistant recorder database. This is beneficial for users who want to treat longer data retention periods for training machine learning models or to reduce the query load on their main Home Assistant instance.
92+
93+
When `use_influxdb: true` is set, EMHASS will fetch sensor data directly from your InfluxDB instance using the provided connection parameters. The `influxdb_username` and `influxdb_password` are treated as secrets.
94+
95+
8296
## Passing in secret parameters
8397
Secret parameters are passed differently, depending on which method you choose. Alternative options are also present for passing secrets, if you are running EMHASS separately from Home Assistant. _(I.e. not via EMHASS-Add-on)_
8498

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies = [
3838
"waitress>=3.0.2",
3939
"plotly>=6.0.0",
4040
"gunicorn>=23.0.0",
41+
"influxdb>=5.3.1",
4142
]
4243
[build-system]
4344
requires = ["hatchling"]

src/emhass/data/associations.csv

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@ retrieve_hass_conf,load_negative,load_negative
88
retrieve_hass_conf,set_zero_min,set_zero_min
99
retrieve_hass_conf,var_replace_zero,sensor_replace_zero,list_sensor_replace_zero
1010
retrieve_hass_conf,var_interp,sensor_linear_interp,list_sensor_linear_interp
11+
retrieve_hass_conf,use_influxdb,use_influxdb
12+
retrieve_hass_conf,influxdb_host,influxdb_host
13+
retrieve_hass_conf,influxdb_port,influxdb_port
14+
retrieve_hass_conf,influxdb_username,influxdb_username
15+
retrieve_hass_conf,influxdb_password,influxdb_password
16+
retrieve_hass_conf,influxdb_database,influxdb_database
17+
retrieve_hass_conf,influxdb_measurement,influxdb_measurement
18+
retrieve_hass_conf,influxdb_retention_policy,influxdb_retention_policy
1119
retrieve_hass_conf,method_ts_round,method_ts_round
1220
retrieve_hass_conf,continual_publish,continual_publish
1321
params_secrets,time_zone,time_zone

src/emhass/data/config_defaults.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@
2929
"sensor.power_photovoltaics",
3030
"sensor.power_load_no_var_loads"
3131
],
32+
"use_influxdb": false,
33+
"influxdb_host": "localhost",
34+
"influxdb_port": 8086,
35+
"influxdb_username": "",
36+
"influxdb_password": "",
37+
"influxdb_database": "homeassistant",
38+
"influxdb_measurement": "W",
39+
"influxdb_retention_policy": "autogen",
3240
"load_negative": false,
3341
"set_zero_min": true,
3442
"number_of_deferrable_loads": 2,

src/emhass/retrieve_hass.py

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ def __init__(
7878
self.get_data_from_file = get_data_from_file
7979
self.var_list = []
8080

81+
# Initialize InfluxDB configuration
82+
self.use_influxdb = self.params.get("retrieve_hass_conf", {}).get("use_influxdb", False)
83+
if self.use_influxdb:
84+
influx_conf = self.params.get("retrieve_hass_conf", {})
85+
self.influxdb_host = influx_conf.get("influxdb_host", "localhost")
86+
self.influxdb_port = influx_conf.get("influxdb_port", 8086)
87+
self.influxdb_username = influx_conf.get("influxdb_username", "")
88+
self.influxdb_password = influx_conf.get("influxdb_password", "")
89+
self.influxdb_database = influx_conf.get("influxdb_database", "homeassistant")
90+
self.influxdb_measurement = influx_conf.get("influxdb_measurement", "W")
91+
self.influxdb_retention_policy = influx_conf.get("influxdb_retention_policy", "autogen")
92+
self.logger.info(f"InfluxDB integration enabled: {self.influxdb_host}:{self.influxdb_port}/{self.influxdb_database}")
93+
else:
94+
self.logger.debug("InfluxDB integration disabled, using Home Assistant API")
95+
8196
def get_ha_config(self):
8297
"""
8398
Extract some configuration data from HA.
@@ -142,6 +157,10 @@ def get_data(
142157
.. warning:: The minimal_response and significant_changes_only options \
143158
are experimental
144159
"""
160+
# Use InfluxDB if configured, otherwise use Home Assistant API
161+
if self.use_influxdb:
162+
return self.get_data_influxdb(days_list, var_list)
163+
145164
self.logger.info("Retrieve hass get data method initiated...")
146165
headers = {
147166
"Authorization": "Bearer " + self.long_lived_token,
@@ -301,6 +320,260 @@ def get_data(
301320
self.var_list = var_list
302321
return True
303322

323+
def get_data_influxdb(
324+
self,
325+
days_list: pd.date_range,
326+
var_list: list,
327+
) -> bool:
328+
"""
329+
Retrieve data from InfluxDB database.
330+
331+
This method provides an alternative data source to Home Assistant API,
332+
enabling longer historical data retention for better machine learning model training.
333+
334+
:param days_list: A list of days to retrieve data for
335+
:type days_list: pandas.date_range
336+
:param var_list: List of sensor entity IDs to retrieve
337+
:type var_list: list
338+
:return: Success status of data retrieval
339+
:rtype: bool
340+
"""
341+
self.logger.info("Retrieve InfluxDB get data method initiated...")
342+
343+
# Check for empty inputs
344+
if not days_list.size:
345+
self.logger.error("Empty days_list provided")
346+
return False
347+
348+
client = self._init_influx_client()
349+
if not client:
350+
return False
351+
352+
start_time = days_list[0]
353+
end_time = days_list[-1] + pd.Timedelta(days=1)
354+
total_days = (end_time - start_time).days
355+
356+
self.logger.info(f"Retrieving {len(var_list)} sensors over {total_days} days from InfluxDB")
357+
self.logger.debug(f"Time range: {start_time} to {end_time}")
358+
359+
# Collect sensor dataframes
360+
sensor_dfs = []
361+
global_min_time = None
362+
global_max_time = None
363+
364+
for sensor in filter(None, var_list):
365+
df_sensor = self._fetch_sensor_data(client, sensor, start_time, end_time)
366+
if df_sensor is not None:
367+
sensor_dfs.append(df_sensor)
368+
# Track global time range
369+
sensor_min = df_sensor.index.min()
370+
sensor_max = df_sensor.index.max()
371+
global_min_time = min(global_min_time or sensor_min, sensor_min)
372+
global_max_time = max(global_max_time or sensor_max, sensor_max)
373+
374+
client.close()
375+
376+
if not sensor_dfs:
377+
self.logger.error("No data retrieved from InfluxDB")
378+
return False
379+
380+
# Create complete time index covering all sensors
381+
if global_min_time is not None and global_max_time is not None:
382+
complete_index = pd.date_range(
383+
start=global_min_time,
384+
end=global_max_time,
385+
freq=self.freq
386+
)
387+
self.df_final = pd.DataFrame(index=complete_index)
388+
389+
# Merge all sensor dataframes
390+
for df_sensor in sensor_dfs:
391+
self.df_final = pd.concat([self.df_final, df_sensor], axis=1)
392+
393+
# Set frequency and validate with error handling
394+
try:
395+
self.df_final = set_df_index_freq(self.df_final)
396+
except Exception as e:
397+
self.logger.error(f"Exception occurred while setting DataFrame index frequency: {e}")
398+
return False
399+
400+
if self.df_final.index.freq != self.freq:
401+
self.logger.warning(
402+
f"InfluxDB data frequency ({self.df_final.index.freq}) differs from expected ({self.freq})"
403+
)
404+
405+
self.var_list = var_list
406+
self.logger.info(f"InfluxDB data retrieval completed: {self.df_final.shape}")
407+
return True
408+
409+
def _init_influx_client(self):
410+
"""Initialize InfluxDB client connection."""
411+
try:
412+
from influxdb import InfluxDBClient
413+
except ImportError:
414+
self.logger.error("InfluxDB client not installed. Install with: pip install influxdb")
415+
return None
416+
417+
try:
418+
client = InfluxDBClient(
419+
host=self.influxdb_host,
420+
port=self.influxdb_port,
421+
username=self.influxdb_username or None,
422+
password=self.influxdb_password or None,
423+
database=self.influxdb_database
424+
)
425+
# Test connection
426+
client.ping()
427+
self.logger.debug(f"Successfully connected to InfluxDB at {self.influxdb_host}:{self.influxdb_port}")
428+
429+
# Initialize measurement cache
430+
if not hasattr(self, '_measurement_cache'):
431+
self._measurement_cache = {}
432+
433+
return client
434+
except Exception as e:
435+
self.logger.error(f"Failed to connect to InfluxDB: {e}")
436+
return None
437+
438+
def _discover_entity_measurement(self, client, entity_id: str) -> str:
439+
"""Auto-discover which measurement contains the given entity."""
440+
# Check cache first
441+
if entity_id in self._measurement_cache:
442+
return self._measurement_cache[entity_id]
443+
444+
try:
445+
# Get all available measurements
446+
measurements_query = "SHOW MEASUREMENTS"
447+
measurements_result = client.query(measurements_query)
448+
measurements = [m['name'] for m in measurements_result.get_points()]
449+
450+
# Priority order: check common sensor types first
451+
priority_measurements = ['EUR/kWh', '€/kWh', 'W', 'EUR', '€', '%', 'A', 'V']
452+
all_measurements = priority_measurements + [m for m in measurements if m not in priority_measurements]
453+
454+
self.logger.debug(f"Searching for entity '{entity_id}' across {len(measurements)} measurements")
455+
456+
# Search for entity in each measurement
457+
for measurement in all_measurements:
458+
if measurement not in measurements:
459+
continue # Skip if measurement doesn't exist
460+
461+
try:
462+
# Use SHOW TAG VALUES to get all entity_ids in this measurement
463+
tag_query = f'SHOW TAG VALUES FROM "{measurement}" WITH KEY = "entity_id"'
464+
self.logger.debug(f"Checking measurement '{measurement}' with tag query: {tag_query}")
465+
result = client.query(tag_query)
466+
points = list(result.get_points())
467+
468+
# Check if our target entity_id is in the tag values
469+
for point in points:
470+
if point.get('value') == entity_id:
471+
self.logger.debug(f"Found entity '{entity_id}' in measurement '{measurement}'")
472+
# Cache the result
473+
self._measurement_cache[entity_id] = measurement
474+
return measurement
475+
476+
except Exception as query_error:
477+
self.logger.debug(f"Tag query failed for measurement '{measurement}': {query_error}")
478+
continue
479+
480+
except Exception as e:
481+
self.logger.error(f"Error discovering measurement for entity {entity_id}: {e}")
482+
483+
# Fallback to default measurement if not found
484+
self.logger.warning(f"Entity '{entity_id}' not found in any measurement, using default: {self.influxdb_measurement}")
485+
return self.influxdb_measurement
486+
487+
def _build_influx_query_for_measurement(self, entity_id: str, measurement: str, start_time, end_time) -> str:
488+
"""Build InfluxQL query for specific measurement and entity."""
489+
# Convert frequency to InfluxDB interval
490+
freq_minutes = int(self.freq.total_seconds() / 60)
491+
interval = f"{freq_minutes}m"
492+
493+
# Format times properly for InfluxDB
494+
start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
495+
end_time_str = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
496+
497+
# Use FILL(previous) instead of FILL(linear) for compatibility with open-source InfluxDB
498+
query = f'''
499+
SELECT mean("value") AS "mean_value"
500+
FROM "{self.influxdb_database}"."{self.influxdb_retention_policy}"."{measurement}"
501+
WHERE time >= '{start_time_str}'
502+
AND time < '{end_time_str}'
503+
AND "entity_id"='{entity_id}'
504+
GROUP BY time({interval}) FILL(previous)
505+
'''
506+
return query
507+
508+
def _build_influx_query(self, sensor: str, start_time, end_time) -> str:
509+
"""Build InfluxQL query for sensor data retrieval (legacy method)."""
510+
# Convert sensor name: sensor.sec_pac_solar -> sec_pac_solar
511+
entity_id = sensor.replace('sensor.', '') if sensor.startswith('sensor.') else sensor
512+
513+
# Use default measurement (for backward compatibility)
514+
return self._build_influx_query_for_measurement(entity_id, self.influxdb_measurement, start_time, end_time)
515+
516+
def _fetch_sensor_data(self, client, sensor: str, start_time, end_time):
517+
"""Fetch and process data for a single sensor with auto-discovery."""
518+
self.logger.debug(f"Retrieving sensor: {sensor}")
519+
520+
# Clean sensor name (remove sensor. prefix if present)
521+
entity_id = sensor.replace('sensor.', '') if sensor.startswith('sensor.') else sensor
522+
523+
# Auto-discover which measurement contains this entity
524+
measurement = self._discover_entity_measurement(client, entity_id)
525+
if not measurement:
526+
self.logger.warning(f"Entity '{entity_id}' not found in any InfluxDB measurement")
527+
return None
528+
529+
try:
530+
query = self._build_influx_query_for_measurement(entity_id, measurement, start_time, end_time)
531+
self.logger.debug(f"InfluxDB query: {query}")
532+
533+
# Execute query
534+
result = client.query(query)
535+
points = list(result.get_points())
536+
537+
if not points:
538+
self.logger.warning(f"No data found for entity: {entity_id} in measurement: {measurement}")
539+
return None
540+
541+
self.logger.info(f"Retrieved {len(points)} data points for {sensor}")
542+
543+
# Create DataFrame from points
544+
df_sensor = pd.DataFrame(points)
545+
546+
# Convert time column and set as index with timezone awareness
547+
df_sensor['time'] = pd.to_datetime(df_sensor['time'], utc=True)
548+
df_sensor.set_index('time', inplace=True)
549+
550+
# Rename value column to original sensor name
551+
if 'mean_value' in df_sensor.columns:
552+
df_sensor = df_sensor[['mean_value']].rename(columns={'mean_value': sensor})
553+
else:
554+
self.logger.error(f"Expected 'mean_value' column not found for {sensor} in measurement {measurement}")
555+
return None
556+
557+
# Handle non-numeric data with NaN ratio warning
558+
df_sensor[sensor] = pd.to_numeric(df_sensor[sensor], errors='coerce')
559+
560+
# Check proportion of NaNs and log warning if high
561+
nan_count = df_sensor[sensor].isna().sum()
562+
total_count = len(df_sensor[sensor])
563+
if total_count > 0:
564+
nan_ratio = nan_count / total_count
565+
if nan_ratio > 0.2:
566+
self.logger.warning(
567+
f"Entity '{entity_id}' has {nan_count}/{total_count} ({nan_ratio:.1%}) non-numeric values coerced to NaN."
568+
)
569+
570+
self.logger.debug(f"Successfully retrieved {len(df_sensor)} data points for '{entity_id}' from measurement '{measurement}'")
571+
return df_sensor
572+
573+
except Exception as e:
574+
self.logger.error(f"Failed to query entity {entity_id} from measurement {measurement}: {e}")
575+
return None
576+
304577
def prepare_data(
305578
self,
306579
var_load: str,

0 commit comments

Comments
 (0)