10 Commits

21 changed files with 2052 additions and 918 deletions

View File

@@ -2,54 +2,33 @@
A collection of custom integrations for Home Assistant.
## Repository Structure
```
haos-integrations/
├── immich_album_watcher/ # Immich Album Watcher integration
│ ├── __init__.py
│ ├── binary_sensor.py
│ ├── camera.py
│ ├── config_flow.py
│ ├── const.py
│ ├── coordinator.py
│ ├── manifest.json
│ ├── sensor.py
│ ├── services.yaml
│ ├── strings.json
│ ├── icon.png
│ ├── README.md
│ └── translations/
│ ├── en.json
│ └── ru.json
├── .gitignore
├── LICENSE
└── README.md
```
## Available Integrations
| Integration | Description | Documentation |
|-------------|-------------|---------------|
| [Immich Album Watcher](immich_album_watcher/) | Monitor Immich albums for changes with sensors, events, and face recognition | [README](immich_album_watcher/README.md) |
| [Immich Album Watcher](custom_components/immich_album_watcher/) | Monitor Immich albums for changes with sensors, events, and face recognition | [README](custom_components/immich_album_watcher/README.md) |
## Installation
### HACS Installation (Recommended)
1. Open HACS in Home Assistant
2. Click on the three dots in the top right corner
3. Select **Custom repositories**
4. Add this repository URL: `https://git.dolgolyov-family.by/alexei.dolgolyov/haos-integrations`
5. Select **Integration** as the category
6. Click **Add**
7. Search for "Immich Album Watcher" in HACS and install it
8. Restart Home Assistant
9. Add the integration via **Settings****Devices & Services****Add Integration**
### Manual Installation
1. Download or clone this repository
2. Copy the desired integration folder (e.g., `immich_album_watcher`) to your Home Assistant `custom_components` directory
2. Copy the `custom_components/immich_album_watcher` folder to your Home Assistant `config/custom_components` directory
3. Restart Home Assistant
4. Add the integration via **Settings****Devices & Services****Add Integration**
### HACS Installation
1. Open HACS in Home Assistant
2. Go to **Integrations****Custom repositories**
3. Add this repository URL
4. Install the desired integration
5. Restart Home Assistant
## Contributing
Contributions are welcome! Please feel free to submit issues or pull requests.

View File

@@ -26,6 +26,7 @@ A Home Assistant custom integration that monitors [Immich](https://immich.app/)
- Filename
- Creation date
- Asset owner (who uploaded the asset)
- Asset description/caption
- Public URL (if album has a shared link)
- Detected people in the asset
- **Services** - Custom service calls:
@@ -43,9 +44,12 @@ A Home Assistant custom integration that monitors [Immich](https://immich.app/)
| Sensor | People Count | Number of unique people detected |
| Sensor | Last Updated | When the album was last modified |
| Sensor | Created | When the album was created |
| Sensor | Public URL | Public share link URL (if album is shared) |
| Sensor | Public URL | Public share link URL (accessible links without password) |
| Sensor | Protected URL | Password-protected share link URL (if any exist) |
| Sensor | Protected Password | Password for the protected share link (read-only) |
| Binary Sensor | New Assets | On when new assets were recently added |
| Camera | Thumbnail | Album cover image |
| Text | Share Password | Editable password for the protected share link |
## Installation
@@ -129,6 +133,7 @@ Each item in the `added_assets` list contains the following fields:
| `asset_created` | Date/time when the asset was originally created |
| `asset_owner` | Display name of the user who owns the asset |
| `asset_owner_id` | Unique ID of the user who owns the asset |
| `asset_description` | Description/caption of the asset (from EXIF data) |
| `asset_url` | Public URL to view the asset (only present if album has a shared link) |
| `people` | List of people detected in this specific asset |
@@ -153,7 +158,20 @@ automation:
- Home Assistant 2024.1.0 or newer
- Immich server with API access
- Valid Immich API key with `album.read` and `asset.read` permissions
- Valid Immich API key with the following permissions:
### Required API Permissions
| Permission | Required | Description |
| ---------- | -------- | ----------- |
| `album.read` | Yes | Read album data and asset lists |
| `asset.read` | Yes | Read asset details (type, filename, creation date) |
| `user.read` | Yes | Resolve asset owner names |
| `person.read` | Yes | Read face recognition / people data |
| `sharedLink.read` | Yes | Read shared links for public/protected URL sensors |
| `sharedLink.edit` | Optional | Edit shared link passwords via the Text entity |
> **Note:** If you don't grant `sharedLink.edit` permission, the "Share Password" text entity will not be able to update passwords but will still display the current password.
## License

View File

@@ -0,0 +1,170 @@
"""Immich Album Watcher integration for Home Assistant."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant
from .const import (
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_API_KEY,
CONF_HUB_NAME,
CONF_IMMICH_URL,
CONF_SCAN_INTERVAL,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
PLATFORMS,
)
from .coordinator import ImmichAlbumWatcherCoordinator
_LOGGER = logging.getLogger(__name__)
@dataclass
class ImmichHubData:
"""Data for the Immich hub."""
name: str
url: str
api_key: str
scan_interval: int
@dataclass
class ImmichAlbumRuntimeData:
"""Runtime data for an album subentry."""
coordinator: ImmichAlbumWatcherCoordinator
album_id: str
album_name: str
type ImmichConfigEntry = ConfigEntry[ImmichHubData]
async def async_setup_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bool:
"""Set up Immich Album Watcher hub from a config entry."""
hass.data.setdefault(DOMAIN, {})
hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
url = entry.data[CONF_IMMICH_URL]
api_key = entry.data[CONF_API_KEY]
scan_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
# Store hub data
entry.runtime_data = ImmichHubData(
name=hub_name,
url=url,
api_key=api_key,
scan_interval=scan_interval,
)
# Store hub reference
hass.data[DOMAIN][entry.entry_id] = {
"hub": entry.runtime_data,
"subentries": {},
}
# Track loaded subentries to detect changes
hass.data[DOMAIN][entry.entry_id]["loaded_subentries"] = set(entry.subentries.keys())
# Set up coordinators for all subentries (albums)
for subentry_id, subentry in entry.subentries.items():
await _async_setup_subentry_coordinator(hass, entry, subentry)
# Forward platform setup once - platforms will iterate through subentries
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Register update listener for options and subentry changes
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
_LOGGER.info(
"Immich Album Watcher hub set up successfully with %d albums",
len(entry.subentries),
)
return True
async def _async_setup_subentry_coordinator(
hass: HomeAssistant, entry: ImmichConfigEntry, subentry: ConfigSubentry
) -> None:
"""Set up coordinator for an album subentry."""
hub_data: ImmichHubData = entry.runtime_data
album_id = subentry.data[CONF_ALBUM_ID]
album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
_LOGGER.debug("Setting up coordinator for album: %s (%s)", album_name, album_id)
# Create coordinator for this album
coordinator = ImmichAlbumWatcherCoordinator(
hass,
url=hub_data.url,
api_key=hub_data.api_key,
album_id=album_id,
album_name=album_name,
scan_interval=hub_data.scan_interval,
hub_name=hub_data.name,
)
# Fetch initial data
await coordinator.async_config_entry_first_refresh()
# Store subentry runtime data
subentry_data = ImmichAlbumRuntimeData(
coordinator=coordinator,
album_id=album_id,
album_name=album_name,
)
hass.data[DOMAIN][entry.entry_id]["subentries"][subentry.subentry_id] = subentry_data
_LOGGER.info("Coordinator for album '%s' set up successfully", album_name)
async def _async_update_listener(
hass: HomeAssistant, entry: ImmichConfigEntry
) -> None:
"""Handle config entry updates (options or subentry changes)."""
entry_data = hass.data[DOMAIN][entry.entry_id]
loaded_subentries = entry_data.get("loaded_subentries", set())
current_subentries = set(entry.subentries.keys())
# Check if subentries changed
if loaded_subentries != current_subentries:
_LOGGER.info(
"Subentries changed (loaded: %d, current: %d), reloading entry",
len(loaded_subentries),
len(current_subentries),
)
await hass.config_entries.async_reload(entry.entry_id)
return
# Handle options-only update (scan interval change)
new_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
# Update hub data
entry.runtime_data.scan_interval = new_interval
# Update all subentry coordinators
subentries_data = entry_data["subentries"]
for subentry_data in subentries_data.values():
subentry_data.coordinator.update_scan_interval(new_interval)
_LOGGER.info("Updated scan interval to %d seconds", new_interval)
async def async_unload_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bool:
"""Unload a config entry."""
# Unload all platforms
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
# Clean up hub data
hass.data[DOMAIN].pop(entry.entry_id, None)
_LOGGER.info("Immich Album Watcher hub unloaded")
return unload_ok

View File

@@ -9,18 +9,20 @@ from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from .const import (
ATTR_ADDED_COUNT,
ATTR_ALBUM_ID,
ATTR_ALBUM_NAME,
CONF_ALBUMS,
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_HUB_NAME,
DOMAIN,
NEW_ASSETS_RESET_DELAY,
)
@@ -35,15 +37,19 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Immich Album Watcher binary sensors from a config entry."""
coordinator: ImmichAlbumWatcherCoordinator = hass.data[DOMAIN][entry.entry_id]
album_ids = entry.options.get(CONF_ALBUMS, [])
# Iterate through all album subentries
for subentry_id, subentry in entry.subentries.items():
subentry_data = hass.data[DOMAIN][entry.entry_id]["subentries"].get(subentry_id)
if not subentry_data:
_LOGGER.error("Subentry data not found for %s", subentry_id)
continue
entities = [
ImmichAlbumNewAssetsSensor(coordinator, entry, album_id)
for album_id in album_ids
]
coordinator = subentry_data.coordinator
async_add_entities(entities)
async_add_entities(
[ImmichAlbumNewAssetsSensor(coordinator, entry, subentry)],
config_subentry_id=subentry_id,
)
class ImmichAlbumNewAssetsSensor(
@@ -59,28 +65,29 @@ class ImmichAlbumNewAssetsSensor(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
subentry: ConfigSubentry,
) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator)
self._album_id = album_id
self._entry = entry
self._attr_unique_id = f"{entry.entry_id}_{album_id}_new_assets"
self._reset_unsub = None
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
self._attr_unique_id = f"{unique_id_prefix}_new_assets"
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
if self.coordinator.data is None:
return None
return self.coordinator.data.get(self._album_id)
return self.coordinator.data
@property
def translation_placeholders(self) -> dict[str, str]:
"""Return translation placeholders."""
if self._album_data:
return {"album_name": self._album_data.name}
return {"album_name": f"Album {self._album_id[:8]}"}
return {"album_name": self._album_name}
@property
def is_on(self) -> bool | None:
@@ -96,7 +103,7 @@ class ImmichAlbumNewAssetsSensor(
elapsed = datetime.now() - self._album_data.last_change_time
if elapsed > timedelta(seconds=NEW_ASSETS_RESET_DELAY):
# Auto-reset the flag
self.coordinator.clear_new_assets_flag(self._album_id)
self.coordinator.clear_new_assets_flag()
return False
return True
@@ -124,12 +131,13 @@ class ImmichAlbumNewAssetsSensor(
@property
def device_info(self) -> DeviceInfo:
"""Return device info."""
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._entry.entry_id)},
name="Immich Album Watcher",
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type="service",
entry_type=DeviceEntryType.SERVICE,
via_device=(DOMAIN, self._entry.entry_id),
)
@callback
@@ -139,5 +147,5 @@ class ImmichAlbumNewAssetsSensor(
async def async_turn_off(self, **kwargs) -> None:
"""Turn off the sensor (clear new assets flag)."""
self.coordinator.clear_new_assets_flag(self._album_id)
self.coordinator.clear_new_assets_flag()
self.async_write_ha_state()

View File

@@ -0,0 +1,437 @@
"""Button platform for Immich Album Watcher."""
from __future__ import annotations
import logging
from homeassistant.components.button import ButtonEntity
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from .const import (
ATTR_ALBUM_ID,
ATTR_ALBUM_PROTECTED_URL,
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_HUB_NAME,
DEFAULT_SHARE_PASSWORD,
DOMAIN,
)
from .coordinator import AlbumData, ImmichAlbumWatcherCoordinator
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Immich Album Watcher button entities from a config entry."""
# Iterate through all album subentries
for subentry_id, subentry in entry.subentries.items():
subentry_data = hass.data[DOMAIN][entry.entry_id]["subentries"].get(subentry_id)
if not subentry_data:
_LOGGER.error("Subentry data not found for %s", subentry_id)
continue
coordinator = subentry_data.coordinator
async_add_entities(
[
ImmichCreateShareLinkButton(coordinator, entry, subentry),
ImmichDeleteShareLinkButton(coordinator, entry, subentry),
ImmichCreateProtectedLinkButton(coordinator, entry, subentry),
ImmichDeleteProtectedLinkButton(coordinator, entry, subentry),
],
config_subentry_id=subentry_id,
)
class ImmichCreateShareLinkButton(
CoordinatorEntity[ImmichAlbumWatcherCoordinator], ButtonEntity
):
"""Button entity for creating an unprotected share link."""
_attr_has_entity_name = True
_attr_icon = "mdi:link-plus"
_attr_translation_key = "create_share_link"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the button entity."""
super().__init__(coordinator)
self._entry = entry
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
self._attr_unique_id = f"{unique_id_prefix}_create_share_link"
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
return self.coordinator.data
@property
def translation_placeholders(self) -> dict[str, str]:
"""Return translation placeholders."""
if self._album_data:
return {"album_name": self._album_data.name}
return {"album_name": self._album_name}
@property
def available(self) -> bool:
"""Return if entity is available.
Only available when there is no unprotected link.
"""
if not self.coordinator.last_update_success or self._album_data is None:
return False
# Only available if there's no unprotected link yet
return not self.coordinator.has_unprotected_link()
@property
def device_info(self) -> DeviceInfo:
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type=DeviceEntryType.SERVICE,
via_device=(DOMAIN, self._entry.entry_id),
)
@property
def extra_state_attributes(self) -> dict[str, str]:
"""Return extra state attributes."""
if not self._album_data:
return {}
return {
ATTR_ALBUM_ID: self._album_data.id,
}
async def async_press(self) -> None:
"""Handle button press - create share link."""
if self.coordinator.has_unprotected_link():
_LOGGER.warning(
"Album %s already has an unprotected share link",
self._album_name,
)
return
success = await self.coordinator.async_create_shared_link()
if success:
await self.coordinator.async_refresh()
else:
_LOGGER.error("Failed to create share link for album %s", self._album_id)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.async_write_ha_state()
class ImmichDeleteShareLinkButton(
CoordinatorEntity[ImmichAlbumWatcherCoordinator], ButtonEntity
):
"""Button entity for deleting an unprotected share link."""
_attr_has_entity_name = True
_attr_icon = "mdi:link-off"
_attr_translation_key = "delete_share_link"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the button entity."""
super().__init__(coordinator)
self._entry = entry
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
self._attr_unique_id = f"{unique_id_prefix}_delete_share_link"
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
return self.coordinator.data
@property
def translation_placeholders(self) -> dict[str, str]:
"""Return translation placeholders."""
if self._album_data:
return {"album_name": self._album_data.name}
return {"album_name": self._album_name}
@property
def available(self) -> bool:
"""Return if entity is available.
Only available when there is an unprotected link.
"""
if not self.coordinator.last_update_success or self._album_data is None:
return False
# Only available if there's an unprotected link to delete
return self.coordinator.has_unprotected_link()
@property
def device_info(self) -> DeviceInfo:
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type=DeviceEntryType.SERVICE,
via_device=(DOMAIN, self._entry.entry_id),
)
@property
def extra_state_attributes(self) -> dict[str, str]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
}
public_url = self.coordinator.get_public_url()
if public_url:
attrs[ATTR_ALBUM_PROTECTED_URL] = public_url
return attrs
async def async_press(self) -> None:
"""Handle button press - delete share link."""
link_id = self.coordinator.get_unprotected_link_id()
if not link_id:
_LOGGER.warning(
"No unprotected share link found for album %s",
self._album_name,
)
return
success = await self.coordinator.async_delete_shared_link(link_id)
if success:
await self.coordinator.async_refresh()
else:
_LOGGER.error("Failed to delete share link for album %s", self._album_id)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.async_write_ha_state()
class ImmichCreateProtectedLinkButton(
CoordinatorEntity[ImmichAlbumWatcherCoordinator], ButtonEntity
):
"""Button entity for creating a protected (password) share link."""
_attr_has_entity_name = True
_attr_icon = "mdi:link-lock"
_attr_translation_key = "create_protected_link"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the button entity."""
super().__init__(coordinator)
self._entry = entry
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
self._attr_unique_id = f"{unique_id_prefix}_create_protected_link"
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
return self.coordinator.data
@property
def translation_placeholders(self) -> dict[str, str]:
"""Return translation placeholders."""
if self._album_data:
return {"album_name": self._album_data.name}
return {"album_name": self._album_name}
@property
def available(self) -> bool:
"""Return if entity is available.
Only available when there is no protected link.
"""
if not self.coordinator.last_update_success or self._album_data is None:
return False
# Only available if there's no protected link yet
return not self.coordinator.has_protected_link()
@property
def device_info(self) -> DeviceInfo:
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type=DeviceEntryType.SERVICE,
via_device=(DOMAIN, self._entry.entry_id),
)
@property
def extra_state_attributes(self) -> dict[str, str]:
"""Return extra state attributes."""
if not self._album_data:
return {}
return {
ATTR_ALBUM_ID: self._album_data.id,
}
async def async_press(self) -> None:
"""Handle button press - create protected share link."""
if self.coordinator.has_protected_link():
_LOGGER.warning(
"Album %s already has a protected share link",
self._album_name,
)
return
success = await self.coordinator.async_create_shared_link(
password=DEFAULT_SHARE_PASSWORD
)
if success:
await self.coordinator.async_refresh()
else:
_LOGGER.error(
"Failed to create protected share link for album %s", self._album_id
)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.async_write_ha_state()
class ImmichDeleteProtectedLinkButton(
CoordinatorEntity[ImmichAlbumWatcherCoordinator], ButtonEntity
):
"""Button entity for deleting a protected (password) share link."""
_attr_has_entity_name = True
_attr_icon = "mdi:link-off"
_attr_translation_key = "delete_protected_link"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the button entity."""
super().__init__(coordinator)
self._entry = entry
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
self._attr_unique_id = f"{unique_id_prefix}_delete_protected_link"
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
return self.coordinator.data
@property
def translation_placeholders(self) -> dict[str, str]:
"""Return translation placeholders."""
if self._album_data:
return {"album_name": self._album_data.name}
return {"album_name": self._album_name}
@property
def available(self) -> bool:
"""Return if entity is available.
Only available when there is a protected link.
"""
if not self.coordinator.last_update_success or self._album_data is None:
return False
# Only available if there's a protected link to delete
return self.coordinator.has_protected_link()
@property
def device_info(self) -> DeviceInfo:
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type=DeviceEntryType.SERVICE,
via_device=(DOMAIN, self._entry.entry_id),
)
@property
def extra_state_attributes(self) -> dict[str, str]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
}
protected_url = self.coordinator.get_protected_url()
if protected_url:
attrs[ATTR_ALBUM_PROTECTED_URL] = protected_url
return attrs
async def async_press(self) -> None:
"""Handle button press - delete protected share link."""
link_id = self.coordinator.get_protected_link_id()
if not link_id:
_LOGGER.warning(
"No protected share link found for album %s",
self._album_name,
)
return
success = await self.coordinator.async_delete_shared_link(link_id)
if success:
await self.coordinator.async_refresh()
else:
_LOGGER.error(
"Failed to delete protected share link for album %s", self._album_id
)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.async_write_ha_state()

View File

@@ -8,14 +8,16 @@ from datetime import timedelta
import aiohttp
from homeassistant.components.camera import Camera
from homeassistant.config_entries import ConfigEntry
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from .const import CONF_ALBUMS, DOMAIN
from .const import CONF_ALBUM_ID, CONF_ALBUM_NAME, CONF_HUB_NAME, DOMAIN
from .coordinator import AlbumData, ImmichAlbumWatcherCoordinator
_LOGGER = logging.getLogger(__name__)
@@ -29,15 +31,19 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Immich Album Watcher cameras from a config entry."""
coordinator: ImmichAlbumWatcherCoordinator = hass.data[DOMAIN][entry.entry_id]
album_ids = entry.options.get(CONF_ALBUMS, [])
# Iterate through all album subentries
for subentry_id, subentry in entry.subentries.items():
subentry_data = hass.data[DOMAIN][entry.entry_id]["subentries"].get(subentry_id)
if not subentry_data:
_LOGGER.error("Subentry data not found for %s", subentry_id)
continue
entities = [
ImmichAlbumThumbnailCamera(coordinator, entry, album_id)
for album_id in album_ids
]
coordinator = subentry_data.coordinator
async_add_entities(entities)
async_add_entities(
[ImmichAlbumThumbnailCamera(coordinator, entry, subentry)],
config_subentry_id=subentry_id,
)
class ImmichAlbumThumbnailCamera(
@@ -52,30 +58,32 @@ class ImmichAlbumThumbnailCamera(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
subentry: ConfigSubentry,
) -> None:
"""Initialize the camera."""
CoordinatorEntity.__init__(self, coordinator)
Camera.__init__(self)
self._album_id = album_id
self._entry = entry
self._attr_unique_id = f"{entry.entry_id}_{album_id}_thumbnail"
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
self._attr_unique_id = f"{unique_id_prefix}_thumbnail"
self._cached_image: bytes | None = None
self._last_thumbnail_id: str | None = None
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
if self.coordinator.data is None:
return None
return self.coordinator.data.get(self._album_id)
return self.coordinator.data
@property
def translation_placeholders(self) -> dict[str, str]:
"""Return translation placeholders."""
if self._album_data:
return {"album_name": self._album_data.name}
return {"album_name": f"Album {self._album_id[:8]}"}
return {"album_name": self._album_name}
@property
def available(self) -> bool:
@@ -88,12 +96,13 @@ class ImmichAlbumThumbnailCamera(
@property
def device_info(self) -> DeviceInfo:
"""Return device info."""
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._entry.entry_id)},
name="Immich Album Watcher",
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type="service",
entry_type=DeviceEntryType.SERVICE,
via_device=(DOMAIN, self._entry.entry_id),
)
@property

View File

@@ -8,19 +8,27 @@ from typing import Any
import aiohttp
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry, ConfigFlow, OptionsFlow
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
ConfigSubentryFlow,
OptionsFlow,
SubentryFlowResult,
)
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from .const import (
CONF_ALBUMS,
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_API_KEY,
CONF_HUB_NAME,
CONF_IMMICH_URL,
CONF_SCAN_INTERVAL,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
SUBENTRY_TYPE_ALBUM,
)
_LOGGER = logging.getLogger(__name__)
@@ -57,13 +65,12 @@ async def fetch_albums(
class ImmichAlbumWatcherConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Immich Album Watcher."""
VERSION = 1
VERSION = 2
def __init__(self) -> None:
"""Initialize the config flow."""
self._url: str | None = None
self._api_key: str | None = None
self._albums: list[dict[str, Any]] = []
@staticmethod
@callback
@@ -71,13 +78,22 @@ class ImmichAlbumWatcherConfigFlow(ConfigFlow, domain=DOMAIN):
"""Get the options flow for this handler."""
return ImmichAlbumWatcherOptionsFlow(config_entry)
@classmethod
@callback
def async_get_supported_subentry_types(
cls, config_entry: ConfigEntry
) -> dict[str, type[ConfigSubentryFlow]]:
"""Return supported subentry types."""
return {SUBENTRY_TYPE_ALBUM: ImmichAlbumSubentryFlowHandler}
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
) -> ConfigFlowResult:
"""Handle the initial step - connection details."""
errors: dict[str, str] = {}
if user_input is not None:
hub_name = user_input[CONF_HUB_NAME].strip()
self._url = user_input[CONF_IMMICH_URL].rstrip("/")
self._api_key = user_input[CONF_API_KEY]
@@ -85,12 +101,22 @@ class ImmichAlbumWatcherConfigFlow(ConfigFlow, domain=DOMAIN):
try:
await validate_connection(session, self._url, self._api_key)
self._albums = await fetch_albums(session, self._url, self._api_key)
if not self._albums:
errors["base"] = "no_albums"
else:
return await self.async_step_albums()
# Set unique ID based on URL
await self.async_set_unique_id(self._url)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=hub_name,
data={
CONF_HUB_NAME: hub_name,
CONF_IMMICH_URL: self._url,
CONF_API_KEY: self._api_key,
},
options={
CONF_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL,
},
)
except InvalidAuth:
errors["base"] = "invalid_auth"
@@ -106,6 +132,7 @@ class ImmichAlbumWatcherConfigFlow(ConfigFlow, domain=DOMAIN):
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_HUB_NAME, default="Immich"): str,
vol.Required(CONF_IMMICH_URL): str,
vol.Required(CONF_API_KEY): str,
}
@@ -116,45 +143,87 @@ class ImmichAlbumWatcherConfigFlow(ConfigFlow, domain=DOMAIN):
},
)
async def async_step_albums(
class ImmichAlbumSubentryFlowHandler(ConfigSubentryFlow):
"""Handle subentry flow for adding albums."""
def __init__(self) -> None:
"""Initialize the subentry flow."""
super().__init__()
self._albums: list[dict[str, Any]] = []
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle album selection step."""
) -> SubentryFlowResult:
"""Handle album selection."""
errors: dict[str, str] = {}
# Get parent config entry data
config_entry = self._get_entry()
url = config_entry.data[CONF_IMMICH_URL]
api_key = config_entry.data[CONF_API_KEY]
# Fetch available albums
session = async_get_clientsession(self.hass)
try:
self._albums = await fetch_albums(session, url, api_key)
except Exception:
_LOGGER.exception("Failed to fetch albums")
errors["base"] = "cannot_connect"
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({}),
errors=errors,
)
if not self._albums:
return self.async_abort(reason="no_albums")
if user_input is not None:
selected_albums = user_input.get(CONF_ALBUMS, [])
album_id = user_input[CONF_ALBUM_ID]
if not selected_albums:
errors["base"] = "no_albums_selected"
else:
# Create unique ID based on URL
await self.async_set_unique_id(self._url)
self._abort_if_unique_id_configured()
# Check if album is already configured
for subentry in config_entry.subentries.values():
if subentry.data.get(CONF_ALBUM_ID) == album_id:
return self.async_abort(reason="album_already_configured")
return self.async_create_entry(
title="Immich Album Watcher",
data={
CONF_IMMICH_URL: self._url,
CONF_API_KEY: self._api_key,
},
options={
CONF_ALBUMS: selected_albums,
CONF_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL,
},
)
# Find album name
album_name = "Unknown Album"
for album in self._albums:
if album["id"] == album_id:
album_name = album.get("albumName", "Unnamed")
break
# Build album selection list
return self.async_create_entry(
title=album_name,
data={
CONF_ALBUM_ID: album_id,
CONF_ALBUM_NAME: album_name,
},
)
# Get already configured album IDs
configured_albums = set()
for subentry in config_entry.subentries.values():
if aid := subentry.data.get(CONF_ALBUM_ID):
configured_albums.add(aid)
# Build album selection list (excluding already configured)
album_options = {
album["id"]: f"{album.get('albumName', 'Unnamed')} ({album.get('assetCount', 0)} assets)"
for album in self._albums
if album["id"] not in configured_albums
}
if not album_options:
return self.async_abort(reason="all_albums_configured")
return self.async_show_form(
step_id="albums",
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_ALBUMS): cv.multi_select(album_options),
vol.Required(CONF_ALBUM_ID): vol.In(album_options),
}
),
errors=errors,
@@ -167,43 +236,21 @@ class ImmichAlbumWatcherOptionsFlow(OptionsFlow):
def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize options flow."""
self._config_entry = config_entry
self._albums: list[dict[str, Any]] = []
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
) -> ConfigFlowResult:
"""Manage the options."""
errors: dict[str, str] = {}
# Fetch current albums from Immich
session = async_get_clientsession(self.hass)
url = self._config_entry.data[CONF_IMMICH_URL]
api_key = self._config_entry.data[CONF_API_KEY]
try:
self._albums = await fetch_albums(session, url, api_key)
except Exception:
_LOGGER.exception("Failed to fetch albums")
errors["base"] = "cannot_connect"
if user_input is not None and not errors:
if user_input is not None:
return self.async_create_entry(
title="",
data={
CONF_ALBUMS: user_input.get(CONF_ALBUMS, []),
CONF_SCAN_INTERVAL: user_input.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
),
},
)
# Build album selection list
album_options = {
album["id"]: f"{album.get('albumName', 'Unnamed')} ({album.get('assetCount', 0)} assets)"
for album in self._albums
}
current_albums = self._config_entry.options.get(CONF_ALBUMS, [])
current_interval = self._config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
@@ -212,15 +259,11 @@ class ImmichAlbumWatcherOptionsFlow(OptionsFlow):
step_id="init",
data_schema=vol.Schema(
{
vol.Required(CONF_ALBUMS, default=current_albums): cv.multi_select(
album_options
),
vol.Required(
CONF_SCAN_INTERVAL, default=current_interval
): vol.All(vol.Coerce(int), vol.Range(min=10, max=3600)),
}
),
errors=errors,
)

View File

@@ -6,14 +6,21 @@ from typing import Final
DOMAIN: Final = "immich_album_watcher"
# Configuration keys
CONF_HUB_NAME: Final = "hub_name"
CONF_IMMICH_URL: Final = "immich_url"
CONF_API_KEY: Final = "api_key"
CONF_ALBUMS: Final = "albums"
CONF_ALBUM_ID: Final = "album_id"
CONF_ALBUM_NAME: Final = "album_name"
CONF_SCAN_INTERVAL: Final = "scan_interval"
# Subentry type
SUBENTRY_TYPE_ALBUM: Final = "album"
# Defaults
DEFAULT_SCAN_INTERVAL: Final = 60 # seconds
NEW_ASSETS_RESET_DELAY: Final = 300 # 5 minutes
DEFAULT_SHARE_PASSWORD: Final = "immich123"
# Events
EVENT_ALBUM_CHANGED: Final = f"{DOMAIN}_album_changed"
@@ -21,9 +28,13 @@ EVENT_ASSETS_ADDED: Final = f"{DOMAIN}_assets_added"
EVENT_ASSETS_REMOVED: Final = f"{DOMAIN}_assets_removed"
# Attributes
ATTR_HUB_NAME: Final = "hub_name"
ATTR_ALBUM_ID: Final = "album_id"
ATTR_ALBUM_NAME: Final = "album_name"
ATTR_ALBUM_URL: Final = "album_url"
ATTR_ALBUM_URLS: Final = "album_urls"
ATTR_ALBUM_PROTECTED_URL: Final = "album_protected_url"
ATTR_ALBUM_PROTECTED_PASSWORD: Final = "album_protected_password"
ATTR_ASSET_COUNT: Final = "asset_count"
ATTR_PHOTO_COUNT: Final = "photo_count"
ATTR_VIDEO_COUNT: Final = "video_count"
@@ -44,13 +55,16 @@ ATTR_ASSET_CREATED: Final = "asset_created"
ATTR_ASSET_OWNER: Final = "asset_owner"
ATTR_ASSET_OWNER_ID: Final = "asset_owner_id"
ATTR_ASSET_URL: Final = "asset_url"
ATTR_ASSET_DOWNLOAD_URL: Final = "asset_download_url"
ATTR_ASSET_PLAYBACK_URL: Final = "asset_playback_url"
ATTR_ASSET_DESCRIPTION: Final = "asset_description"
# Asset types
ASSET_TYPE_IMAGE: Final = "IMAGE"
ASSET_TYPE_VIDEO: Final = "VIDEO"
# Platforms
PLATFORMS: Final = ["sensor", "binary_sensor", "camera"]
PLATFORMS: Final = ["sensor", "binary_sensor", "camera", "text", "button"]
# Services
SERVICE_REFRESH: Final = "refresh"

View File

@@ -0,0 +1,746 @@
"""Data coordinator for Immich Album Watcher."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any
import aiohttp
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
ASSET_TYPE_IMAGE,
ASSET_TYPE_VIDEO,
ATTR_ADDED_ASSETS,
ATTR_ADDED_COUNT,
ATTR_ALBUM_ID,
ATTR_ALBUM_NAME,
ATTR_ALBUM_URL,
ATTR_ASSET_CREATED,
ATTR_ASSET_DESCRIPTION,
ATTR_ASSET_DOWNLOAD_URL,
ATTR_ASSET_FILENAME,
ATTR_ASSET_OWNER,
ATTR_ASSET_OWNER_ID,
ATTR_ASSET_TYPE,
ATTR_ASSET_URL,
ATTR_ASSET_PLAYBACK_URL,
ATTR_CHANGE_TYPE,
ATTR_HUB_NAME,
ATTR_PEOPLE,
ATTR_REMOVED_ASSETS,
ATTR_REMOVED_COUNT,
DOMAIN,
EVENT_ALBUM_CHANGED,
EVENT_ASSETS_ADDED,
EVENT_ASSETS_REMOVED,
)
_LOGGER = logging.getLogger(__name__)
@dataclass
class SharedLinkInfo:
"""Data class for shared link information."""
id: str
key: str
has_password: bool = False
password: str | None = None
expires_at: datetime | None = None
allow_download: bool = True
show_metadata: bool = True
@property
def is_expired(self) -> bool:
"""Check if the link has expired."""
if self.expires_at is None:
return False
return datetime.now(self.expires_at.tzinfo) > self.expires_at
@property
def is_accessible(self) -> bool:
"""Check if the link is accessible without password and not expired."""
return not self.has_password and not self.is_expired
@classmethod
def from_api_response(cls, data: dict[str, Any]) -> SharedLinkInfo:
"""Create SharedLinkInfo from API response."""
expires_at = None
if data.get("expiresAt"):
try:
expires_at = datetime.fromisoformat(
data["expiresAt"].replace("Z", "+00:00")
)
except ValueError:
pass
password = data.get("password")
return cls(
id=data["id"],
key=data["key"],
has_password=bool(password),
password=password if password else None,
expires_at=expires_at,
allow_download=data.get("allowDownload", True),
show_metadata=data.get("showMetadata", True),
)
@dataclass
class AssetInfo:
"""Data class for asset information."""
id: str
type: str # IMAGE or VIDEO
filename: str
created_at: str
owner_id: str = ""
owner_name: str = ""
description: str = ""
people: list[str] = field(default_factory=list)
@classmethod
def from_api_response(
cls, data: dict[str, Any], users_cache: dict[str, str] | None = None
) -> AssetInfo:
"""Create AssetInfo from API response."""
people = []
if "people" in data:
people = [p.get("name", "") for p in data["people"] if p.get("name")]
owner_id = data.get("ownerId", "")
owner_name = ""
if users_cache and owner_id:
owner_name = users_cache.get(owner_id, "")
# Get description from exifInfo if available
description = ""
exif_info = data.get("exifInfo")
if exif_info:
description = exif_info.get("description", "") or ""
return cls(
id=data["id"],
type=data.get("type", ASSET_TYPE_IMAGE),
filename=data.get("originalFileName", ""),
created_at=data.get("fileCreatedAt", ""),
owner_id=owner_id,
owner_name=owner_name,
description=description,
people=people,
)
@dataclass
class AlbumData:
"""Data class for album information."""
id: str
name: str
asset_count: int
photo_count: int
video_count: int
created_at: str
updated_at: str
shared: bool
owner: str
thumbnail_asset_id: str | None
asset_ids: set[str] = field(default_factory=set)
assets: dict[str, AssetInfo] = field(default_factory=dict)
people: set[str] = field(default_factory=set)
has_new_assets: bool = False
last_change_time: datetime | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], users_cache: dict[str, str] | None = None
) -> AlbumData:
"""Create AlbumData from API response."""
assets_data = data.get("assets", [])
asset_ids = set()
assets = {}
people = set()
photo_count = 0
video_count = 0
for asset_data in assets_data:
asset = AssetInfo.from_api_response(asset_data, users_cache)
asset_ids.add(asset.id)
assets[asset.id] = asset
people.update(asset.people)
if asset.type == ASSET_TYPE_IMAGE:
photo_count += 1
elif asset.type == ASSET_TYPE_VIDEO:
video_count += 1
return cls(
id=data["id"],
name=data.get("albumName", "Unnamed"),
asset_count=data.get("assetCount", len(asset_ids)),
photo_count=photo_count,
video_count=video_count,
created_at=data.get("createdAt", ""),
updated_at=data.get("updatedAt", ""),
shared=data.get("shared", False),
owner=data.get("owner", {}).get("name", "Unknown"),
thumbnail_asset_id=data.get("albumThumbnailAssetId"),
asset_ids=asset_ids,
assets=assets,
people=people,
)
@dataclass
class AlbumChange:
"""Data class for album changes."""
album_id: str
album_name: str
change_type: str
added_count: int = 0
removed_count: int = 0
added_assets: list[AssetInfo] = field(default_factory=list)
removed_asset_ids: list[str] = field(default_factory=list)
class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]):
"""Coordinator for fetching Immich album data."""
def __init__(
self,
hass: HomeAssistant,
url: str,
api_key: str,
album_id: str,
album_name: str,
scan_interval: int,
hub_name: str = "Immich",
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=f"{DOMAIN}_{album_id}",
update_interval=timedelta(seconds=scan_interval),
)
self._url = url.rstrip("/")
self._api_key = api_key
self._album_id = album_id
self._album_name = album_name
self._hub_name = hub_name
self._previous_state: AlbumData | None = None
self._session: aiohttp.ClientSession | None = None
self._people_cache: dict[str, str] = {} # person_id -> name
self._users_cache: dict[str, str] = {} # user_id -> name
self._shared_links: list[SharedLinkInfo] = []
@property
def immich_url(self) -> str:
"""Return the Immich URL."""
return self._url
@property
def api_key(self) -> str:
"""Return the API key."""
return self._api_key
@property
def album_id(self) -> str:
"""Return the album ID."""
return self._album_id
@property
def album_name(self) -> str:
"""Return the album name."""
return self._album_name
def update_scan_interval(self, scan_interval: int) -> None:
"""Update the scan interval."""
self.update_interval = timedelta(seconds=scan_interval)
async def async_refresh_now(self) -> None:
"""Force an immediate refresh."""
await self.async_request_refresh()
async def async_get_recent_assets(self, count: int = 10) -> list[dict[str, Any]]:
"""Get recent assets from the album."""
if self.data is None:
return []
# Sort assets by created_at descending
sorted_assets = sorted(
self.data.assets.values(),
key=lambda a: a.created_at,
reverse=True,
)[:count]
result = []
for asset in sorted_assets:
asset_data = {
"id": asset.id,
"type": asset.type,
"filename": asset.filename,
"created_at": asset.created_at,
"description": asset.description,
"people": asset.people,
"thumbnail_url": f"{self._url}/api/assets/{asset.id}/thumbnail",
}
if asset.type == ASSET_TYPE_VIDEO:
video_url = self._get_asset_video_url(asset.id)
if video_url:
asset_data["video_url"] = video_url
result.append(asset_data)
return result
async def async_fetch_people(self) -> dict[str, str]:
"""Fetch all people from Immich."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
headers = {"x-api-key": self._api_key}
try:
async with self._session.get(
f"{self._url}/api/people",
headers=headers,
) as response:
if response.status == 200:
data = await response.json()
people_list = data.get("people", data) if isinstance(data, dict) else data
self._people_cache = {
p["id"]: p.get("name", "")
for p in people_list
if p.get("name")
}
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch people: %s", err)
return self._people_cache
async def _async_fetch_users(self) -> dict[str, str]:
"""Fetch all users from Immich and cache them."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
headers = {"x-api-key": self._api_key}
try:
async with self._session.get(
f"{self._url}/api/users",
headers=headers,
) as response:
if response.status == 200:
data = await response.json()
self._users_cache = {
u["id"]: u.get("name", u.get("email", "Unknown"))
for u in data
if u.get("id")
}
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch users: %s", err)
return self._users_cache
async def _async_fetch_shared_links(self) -> list[SharedLinkInfo]:
"""Fetch shared links for this album from Immich."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
headers = {"x-api-key": self._api_key}
self._shared_links = []
try:
async with self._session.get(
f"{self._url}/api/shared-links",
headers=headers,
) as response:
if response.status == 200:
data = await response.json()
for link in data:
album = link.get("album")
key = link.get("key")
if album and key and album.get("id") == self._album_id:
link_info = SharedLinkInfo.from_api_response(link)
self._shared_links.append(link_info)
_LOGGER.debug(
"Found shared link for album: key=%s, has_password=%s",
key[:8],
link_info.has_password,
)
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch shared links: %s", err)
return self._shared_links
def _get_accessible_links(self) -> list[SharedLinkInfo]:
"""Get all accessible (no password, not expired) shared links."""
return [link for link in self._shared_links if link.is_accessible]
def _get_protected_links(self) -> list[SharedLinkInfo]:
"""Get password-protected but not expired shared links."""
return [link for link in self._shared_links if link.has_password and not link.is_expired]
def get_public_url(self) -> str | None:
"""Get the public URL if album has an accessible shared link."""
accessible_links = self._get_accessible_links()
if accessible_links:
return f"{self._url}/share/{accessible_links[0].key}"
return None
def get_any_url(self) -> str | None:
"""Get any non-expired URL (prefers accessible, falls back to protected)."""
accessible_links = self._get_accessible_links()
if accessible_links:
return f"{self._url}/share/{accessible_links[0].key}"
non_expired = [link for link in self._shared_links if not link.is_expired]
if non_expired:
return f"{self._url}/share/{non_expired[0].key}"
return None
def get_protected_url(self) -> str | None:
"""Get a protected URL if any password-protected link exists."""
protected_links = self._get_protected_links()
if protected_links:
return f"{self._url}/share/{protected_links[0].key}"
return None
def get_protected_urls(self) -> list[str]:
"""Get all password-protected URLs."""
return [f"{self._url}/share/{link.key}" for link in self._get_protected_links()]
def get_protected_password(self) -> str | None:
"""Get the password for the first protected link."""
protected_links = self._get_protected_links()
if protected_links and protected_links[0].password:
return protected_links[0].password
return None
def get_public_urls(self) -> list[str]:
"""Get all accessible public URLs."""
return [f"{self._url}/share/{link.key}" for link in self._get_accessible_links()]
def get_shared_links_info(self) -> list[dict[str, Any]]:
"""Get detailed info about all shared links."""
return [
{
"url": f"{self._url}/share/{link.key}",
"has_password": link.has_password,
"is_expired": link.is_expired,
"expires_at": link.expires_at.isoformat() if link.expires_at else None,
"is_accessible": link.is_accessible,
}
for link in self._shared_links
]
def _get_asset_public_url(self, asset_id: str) -> str | None:
"""Get the public viewer URL for an asset (web page)."""
accessible_links = self._get_accessible_links()
if accessible_links:
return f"{self._url}/share/{accessible_links[0].key}/photos/{asset_id}"
non_expired = [link for link in self._shared_links if not link.is_expired]
if non_expired:
return f"{self._url}/share/{non_expired[0].key}/photos/{asset_id}"
return None
def _get_asset_download_url(self, asset_id: str) -> str | None:
"""Get the direct download URL for an asset (media file)."""
accessible_links = self._get_accessible_links()
if accessible_links:
return f"{self._url}/api/assets/{asset_id}/original?key={accessible_links[0].key}"
non_expired = [link for link in self._shared_links if not link.is_expired]
if non_expired:
return f"{self._url}/api/assets/{asset_id}/original?key={non_expired[0].key}"
return None
def _get_asset_video_url(self, asset_id: str) -> str | None:
"""Get the transcoded video playback URL for a video asset."""
accessible_links = self._get_accessible_links()
if accessible_links:
return f"{self._url}/api/assets/{asset_id}/video/playback?key={accessible_links[0].key}"
non_expired = [link for link in self._shared_links if not link.is_expired]
if non_expired:
return f"{self._url}/api/assets/{asset_id}/video/playback?key={non_expired[0].key}"
return None
async def _async_update_data(self) -> AlbumData | None:
"""Fetch data from Immich API."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
# Fetch users to resolve owner names
if not self._users_cache:
await self._async_fetch_users()
# Fetch shared links (refresh each time as links can change)
await self._async_fetch_shared_links()
headers = {"x-api-key": self._api_key}
try:
async with self._session.get(
f"{self._url}/api/albums/{self._album_id}",
headers=headers,
) as response:
if response.status == 404:
_LOGGER.warning("Album %s not found", self._album_id)
return None
if response.status != 200:
raise UpdateFailed(
f"Error fetching album {self._album_id}: HTTP {response.status}"
)
data = await response.json()
album = AlbumData.from_api_response(data, self._users_cache)
# Detect changes
if self._previous_state:
change = self._detect_change(self._previous_state, album)
if change:
album.has_new_assets = change.added_count > 0
album.last_change_time = datetime.now()
self._fire_events(change, album)
else:
album.has_new_assets = False
# Preserve has_new_assets from previous state if still within window
if self._previous_state:
prev = self._previous_state
if prev.has_new_assets and prev.last_change_time:
album.last_change_time = prev.last_change_time
if not album.has_new_assets:
album.has_new_assets = prev.has_new_assets
# Update previous state
self._previous_state = album
return album
except aiohttp.ClientError as err:
raise UpdateFailed(f"Error communicating with Immich: {err}") from err
def _detect_change(
self, old_state: AlbumData, new_state: AlbumData
) -> AlbumChange | None:
"""Detect changes between two album states."""
added_ids = new_state.asset_ids - old_state.asset_ids
removed_ids = old_state.asset_ids - new_state.asset_ids
if not added_ids and not removed_ids:
return None
change_type = "changed"
if added_ids and not removed_ids:
change_type = "assets_added"
elif removed_ids and not added_ids:
change_type = "assets_removed"
added_assets = [
new_state.assets[aid] for aid in added_ids if aid in new_state.assets
]
return AlbumChange(
album_id=new_state.id,
album_name=new_state.name,
change_type=change_type,
added_count=len(added_ids),
removed_count=len(removed_ids),
added_assets=added_assets,
removed_asset_ids=list(removed_ids),
)
def _fire_events(self, change: AlbumChange, album: AlbumData) -> None:
"""Fire Home Assistant events for album changes."""
added_assets_detail = []
for asset in change.added_assets:
asset_detail = {
"id": asset.id,
ATTR_ASSET_TYPE: asset.type,
ATTR_ASSET_FILENAME: asset.filename,
ATTR_ASSET_CREATED: asset.created_at,
ATTR_ASSET_OWNER: asset.owner_name,
ATTR_ASSET_OWNER_ID: asset.owner_id,
ATTR_ASSET_DESCRIPTION: asset.description,
ATTR_PEOPLE: asset.people,
}
asset_url = self._get_asset_public_url(asset.id)
if asset_url:
asset_detail[ATTR_ASSET_URL] = asset_url
asset_download_url = self._get_asset_download_url(asset.id)
if asset_download_url:
asset_detail[ATTR_ASSET_DOWNLOAD_URL] = asset_download_url
if asset.type == ASSET_TYPE_VIDEO:
asset_video_url = self._get_asset_video_url(asset.id)
if asset_video_url:
asset_detail[ATTR_ASSET_PLAYBACK_URL] = asset_video_url
added_assets_detail.append(asset_detail)
event_data = {
ATTR_HUB_NAME: self._hub_name,
ATTR_ALBUM_ID: change.album_id,
ATTR_ALBUM_NAME: change.album_name,
ATTR_CHANGE_TYPE: change.change_type,
ATTR_ADDED_COUNT: change.added_count,
ATTR_REMOVED_COUNT: change.removed_count,
ATTR_ADDED_ASSETS: added_assets_detail,
ATTR_REMOVED_ASSETS: change.removed_asset_ids,
ATTR_PEOPLE: list(album.people),
}
album_url = self.get_any_url()
if album_url:
event_data[ATTR_ALBUM_URL] = album_url
self.hass.bus.async_fire(EVENT_ALBUM_CHANGED, event_data)
_LOGGER.info(
"Album '%s' changed: +%d -%d assets",
change.album_name,
change.added_count,
change.removed_count,
)
if change.added_count > 0:
self.hass.bus.async_fire(EVENT_ASSETS_ADDED, event_data)
if change.removed_count > 0:
self.hass.bus.async_fire(EVENT_ASSETS_REMOVED, event_data)
def get_protected_link_id(self) -> str | None:
"""Get the ID of the first protected link."""
protected_links = self._get_protected_links()
if protected_links:
return protected_links[0].id
return None
async def async_set_shared_link_password(
self, link_id: str, password: str | None
) -> bool:
"""Update the password for a shared link via Immich API."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
headers = {
"x-api-key": self._api_key,
"Content-Type": "application/json",
}
payload = {"password": password if password else None}
try:
async with self._session.patch(
f"{self._url}/api/shared-links/{link_id}",
headers=headers,
json=payload,
) as response:
if response.status == 200:
_LOGGER.info("Successfully updated shared link password")
await self._async_fetch_shared_links()
return True
else:
_LOGGER.error(
"Failed to update shared link password: HTTP %s",
response.status,
)
return False
except aiohttp.ClientError as err:
_LOGGER.error("Error updating shared link password: %s", err)
return False
def clear_new_assets_flag(self) -> None:
"""Clear the new assets flag."""
if self.data:
self.data.has_new_assets = False
self.data.last_change_time = None
def has_unprotected_link(self) -> bool:
"""Check if album has an unprotected (accessible) shared link."""
return len(self._get_accessible_links()) > 0
def has_protected_link(self) -> bool:
"""Check if album has a protected (password) shared link."""
return len(self._get_protected_links()) > 0
def get_unprotected_link_id(self) -> str | None:
"""Get the ID of the first unprotected link."""
accessible_links = self._get_accessible_links()
if accessible_links:
return accessible_links[0].id
return None
async def async_create_shared_link(self, password: str | None = None) -> bool:
"""Create a new shared link for the album via Immich API."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
headers = {
"x-api-key": self._api_key,
"Content-Type": "application/json",
}
payload: dict[str, Any] = {
"albumId": self._album_id,
"type": "ALBUM",
"allowDownload": True,
"allowUpload": False,
"showMetadata": True,
}
if password:
payload["password"] = password
try:
async with self._session.post(
f"{self._url}/api/shared-links",
headers=headers,
json=payload,
) as response:
if response.status == 201:
_LOGGER.info(
"Successfully created shared link for album %s",
self._album_name,
)
await self._async_fetch_shared_links()
return True
else:
error_text = await response.text()
_LOGGER.error(
"Failed to create shared link: HTTP %s - %s",
response.status,
error_text,
)
return False
except aiohttp.ClientError as err:
_LOGGER.error("Error creating shared link: %s", err)
return False
async def async_delete_shared_link(self, link_id: str) -> bool:
"""Delete a shared link via Immich API."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
headers = {"x-api-key": self._api_key}
try:
async with self._session.delete(
f"{self._url}/api/shared-links/{link_id}",
headers=headers,
) as response:
if response.status == 200:
_LOGGER.info("Successfully deleted shared link")
await self._async_fetch_shared_links()
return True
else:
error_text = await response.text()
_LOGGER.error(
"Failed to delete shared link: HTTP %s - %s",
response.status,
error_text,
)
return False
except aiohttp.ClientError as err:
_LOGGER.error("Error deleting shared link: %s", err)
return False

View File

Before

Width:  |  Height:  |  Size: 6.5 KiB

After

Width:  |  Height:  |  Size: 6.5 KiB

View File

@@ -0,0 +1,12 @@
{
"domain": "immich_album_watcher",
"name": "Immich Album Watcher",
"codeowners": ["@alexei.dolgolyov"],
"config_flow": true,
"dependencies": [],
"documentation": "https://git.dolgolyov-family.by/alexei.dolgolyov/haos-integrations",
"iot_class": "cloud_polling",
"issue_tracker": "https://git.dolgolyov-family.by/alexei.dolgolyov/haos-integrations/issues",
"requirements": [],
"version": "1.2.0"
}

View File

@@ -6,20 +6,26 @@ import logging
from datetime import datetime
from typing import Any
import voluptuous as vol
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse, callback
from homeassistant.helpers import entity_platform
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from .const import (
ATTR_ALBUM_ID,
ATTR_ALBUM_URL,
ATTR_ALBUM_PROTECTED_URL,
ATTR_ALBUM_URLS,
ATTR_ASSET_COUNT,
ATTR_CREATED_AT,
ATTR_LAST_UPDATED,
@@ -29,8 +35,12 @@ from .const import (
ATTR_SHARED,
ATTR_THUMBNAIL_URL,
ATTR_VIDEO_COUNT,
CONF_ALBUMS,
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_HUB_NAME,
DOMAIN,
SERVICE_GET_RECENT_ASSETS,
SERVICE_REFRESH,
)
from .coordinator import AlbumData, ImmichAlbumWatcherCoordinator
@@ -43,20 +53,47 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Immich Album Watcher sensors from a config entry."""
coordinator: ImmichAlbumWatcherCoordinator = hass.data[DOMAIN][entry.entry_id]
album_ids = entry.options.get(CONF_ALBUMS, [])
# Iterate through all album subentries
for subentry_id, subentry in entry.subentries.items():
subentry_data = hass.data[DOMAIN][entry.entry_id]["subentries"].get(subentry_id)
if not subentry_data:
_LOGGER.error("Subentry data not found for %s", subentry_id)
continue
entities: list[SensorEntity] = []
for album_id in album_ids:
entities.append(ImmichAlbumAssetCountSensor(coordinator, entry, album_id))
entities.append(ImmichAlbumPhotoCountSensor(coordinator, entry, album_id))
entities.append(ImmichAlbumVideoCountSensor(coordinator, entry, album_id))
entities.append(ImmichAlbumLastUpdatedSensor(coordinator, entry, album_id))
entities.append(ImmichAlbumCreatedSensor(coordinator, entry, album_id))
entities.append(ImmichAlbumPeopleSensor(coordinator, entry, album_id))
entities.append(ImmichAlbumPublicUrlSensor(coordinator, entry, album_id))
coordinator = subentry_data.coordinator
async_add_entities(entities)
entities: list[SensorEntity] = [
ImmichAlbumAssetCountSensor(coordinator, entry, subentry),
ImmichAlbumPhotoCountSensor(coordinator, entry, subentry),
ImmichAlbumVideoCountSensor(coordinator, entry, subentry),
ImmichAlbumLastUpdatedSensor(coordinator, entry, subentry),
ImmichAlbumCreatedSensor(coordinator, entry, subentry),
ImmichAlbumPublicUrlSensor(coordinator, entry, subentry),
ImmichAlbumProtectedUrlSensor(coordinator, entry, subentry),
ImmichAlbumProtectedPasswordSensor(coordinator, entry, subentry),
]
async_add_entities(entities, config_subentry_id=subentry_id)
# Register entity services
platform = entity_platform.async_get_current_platform()
platform.async_register_entity_service(
SERVICE_REFRESH,
{},
"async_refresh_album",
)
platform.async_register_entity_service(
SERVICE_GET_RECENT_ASSETS,
{
vol.Optional("count", default=10): vol.All(
vol.Coerce(int), vol.Range(min=1, max=100)
),
},
"async_get_recent_assets",
supports_response=SupportsResponse.ONLY,
)
class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], SensorEntity):
@@ -68,26 +105,29 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._album_id = album_id
self._entry = entry
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
# Generate unique_id prefix: {hub_name}_album_{album_name}
self._unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
if self.coordinator.data is None:
return None
return self.coordinator.data.get(self._album_id)
return self.coordinator.data
@property
def translation_placeholders(self) -> dict[str, str]:
"""Return translation placeholders."""
if self._album_data:
return {"album_name": self._album_data.name}
return {"album_name": f"Album {self._album_id[:8]}"}
return {"album_name": self._album_name}
@property
def available(self) -> bool:
@@ -96,12 +136,13 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se
@property
def device_info(self) -> DeviceInfo:
"""Return device info."""
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._entry.entry_id)},
name="Immich Album Watcher",
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type="service",
entry_type=DeviceEntryType.SERVICE,
via_device=(DOMAIN, self._entry.entry_id),
)
@callback
@@ -109,6 +150,15 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se
"""Handle updated data from the coordinator."""
self.async_write_ha_state()
async def async_refresh_album(self) -> None:
"""Refresh data for this album."""
await self.coordinator.async_refresh_now()
async def async_get_recent_assets(self, count: int = 10) -> ServiceResponse:
"""Get recent assets for this album."""
assets = await self.coordinator.async_get_recent_assets(count)
return {"assets": assets}
class ImmichAlbumAssetCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album asset count."""
@@ -121,11 +171,11 @@ class ImmichAlbumAssetCountSensor(ImmichAlbumBaseSensor):
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, album_id)
self._attr_unique_id = f"{entry.entry_id}_{album_id}_asset_count"
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_asset_count"
@property
def native_value(self) -> int | None:
@@ -152,7 +202,6 @@ class ImmichAlbumAssetCountSensor(ImmichAlbumBaseSensor):
ATTR_PEOPLE: list(self._album_data.people),
}
# Add thumbnail URL if available
if self._album_data.thumbnail_asset_id:
attrs[ATTR_THUMBNAIL_URL] = (
f"{self.coordinator.immich_url}/api/assets/"
@@ -173,11 +222,11 @@ class ImmichAlbumPhotoCountSensor(ImmichAlbumBaseSensor):
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, album_id)
self._attr_unique_id = f"{entry.entry_id}_{album_id}_photo_count"
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_photo_count"
@property
def native_value(self) -> int | None:
@@ -198,11 +247,11 @@ class ImmichAlbumVideoCountSensor(ImmichAlbumBaseSensor):
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, album_id)
self._attr_unique_id = f"{entry.entry_id}_{album_id}_video_count"
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_video_count"
@property
def native_value(self) -> int | None:
@@ -223,11 +272,11 @@ class ImmichAlbumLastUpdatedSensor(ImmichAlbumBaseSensor):
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, album_id)
self._attr_unique_id = f"{entry.entry_id}_{album_id}_last_updated"
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_last_updated"
@property
def native_value(self) -> datetime | None:
@@ -253,11 +302,11 @@ class ImmichAlbumCreatedSensor(ImmichAlbumBaseSensor):
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, album_id)
self._attr_unique_id = f"{entry.entry_id}_{album_id}_created"
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_created"
@property
def native_value(self) -> datetime | None:
@@ -272,41 +321,6 @@ class ImmichAlbumCreatedSensor(ImmichAlbumBaseSensor):
return None
class ImmichAlbumPeopleSensor(ImmichAlbumBaseSensor):
"""Sensor representing people detected in an Immich album."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:account-group"
_attr_translation_key = "album_people_count"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, album_id)
self._attr_unique_id = f"{entry.entry_id}_{album_id}_people_count"
@property
def native_value(self) -> int | None:
"""Return the state of the sensor (number of unique people)."""
if self._album_data:
return len(self._album_data.people)
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
return {
ATTR_PEOPLE: list(self._album_data.people),
}
class ImmichAlbumPublicUrlSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album public URL."""
@@ -317,17 +331,102 @@ class ImmichAlbumPublicUrlSensor(ImmichAlbumBaseSensor):
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
album_id: str,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, album_id)
self._attr_unique_id = f"{entry.entry_id}_{album_id}_public_url"
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_public_url"
@property
def native_value(self) -> str | None:
"""Return the state of the sensor (public URL)."""
if self._album_data:
return self.coordinator.get_album_public_url(self._album_id)
return self.coordinator.get_public_url()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_SHARED: self._album_data.shared,
}
all_urls = self.coordinator.get_public_urls()
if len(all_urls) > 1:
attrs[ATTR_ALBUM_URLS] = all_urls
links_info = self.coordinator.get_shared_links_info()
if links_info:
attrs["shared_links"] = links_info
return attrs
class ImmichAlbumProtectedUrlSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album password-protected URL."""
_attr_icon = "mdi:link-lock"
_attr_translation_key = "album_protected_url"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_protected_url"
@property
def native_value(self) -> str | None:
"""Return the state of the sensor (protected URL)."""
if self._album_data:
return self.coordinator.get_protected_url()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
}
all_urls = self.coordinator.get_protected_urls()
if len(all_urls) > 1:
attrs["protected_urls"] = all_urls
return attrs
class ImmichAlbumProtectedPasswordSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album protected link password."""
_attr_icon = "mdi:key"
_attr_translation_key = "album_protected_password"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_protected_password"
@property
def native_value(self) -> str | None:
"""Return the state of the sensor (protected link password)."""
if self._album_data:
return self.coordinator.get_protected_password()
return None
@property
@@ -338,5 +437,5 @@ class ImmichAlbumPublicUrlSensor(ImmichAlbumBaseSensor):
return {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_SHARED: self._album_data.shared,
ATTR_ALBUM_PROTECTED_URL: self.coordinator.get_protected_url(),
}

View File

@@ -1,17 +1,19 @@
refresh:
name: Refresh
description: Force an immediate refresh of all album data from Immich.
description: Force an immediate refresh of album data from Immich.
target:
entity:
integration: immich_album_watcher
domain: sensor
get_recent_assets:
name: Get Recent Assets
description: Get the most recent assets from a specific album.
description: Get the most recent assets from the targeted album.
target:
entity:
integration: immich_album_watcher
domain: sensor
fields:
album_id:
name: Album ID
description: The ID of the album to get recent assets from.
required: true
selector:
text:
count:
name: Count
description: Number of recent assets to return (1-100).

View File

@@ -0,0 +1,160 @@
"""Text platform for Immich Album Watcher."""
from __future__ import annotations
import logging
from homeassistant.components.text import TextEntity, TextMode
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from .const import (
ATTR_ALBUM_ID,
ATTR_ALBUM_PROTECTED_URL,
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_HUB_NAME,
DOMAIN,
)
from .coordinator import AlbumData, ImmichAlbumWatcherCoordinator
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Immich Album Watcher text entities from a config entry."""
# Iterate through all album subentries
for subentry_id, subentry in entry.subentries.items():
subentry_data = hass.data[DOMAIN][entry.entry_id]["subentries"].get(subentry_id)
if not subentry_data:
_LOGGER.error("Subentry data not found for %s", subentry_id)
continue
coordinator = subentry_data.coordinator
async_add_entities(
[ImmichAlbumProtectedPasswordText(coordinator, entry, subentry)],
config_subentry_id=subentry_id,
)
class ImmichAlbumProtectedPasswordText(
CoordinatorEntity[ImmichAlbumWatcherCoordinator], TextEntity
):
"""Text entity for editing an Immich album's protected link password."""
_attr_has_entity_name = True
_attr_icon = "mdi:key-variant"
_attr_translation_key = "album_protected_password_edit"
_attr_mode = TextMode.PASSWORD
_attr_native_max = 100
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the text entity."""
super().__init__(coordinator)
self._entry = entry
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
self._attr_unique_id = f"{unique_id_prefix}_protected_password_edit"
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
return self.coordinator.data
@property
def translation_placeholders(self) -> dict[str, str]:
"""Return translation placeholders."""
if self._album_data:
return {"album_name": self._album_data.name}
return {"album_name": self._album_name}
@property
def available(self) -> bool:
"""Return if entity is available.
Only available when the album has a protected shared link.
"""
if not self.coordinator.last_update_success or self._album_data is None:
return False
# Only available if there's a protected link to edit
return self.coordinator.get_protected_link_id() is not None
@property
def device_info(self) -> DeviceInfo:
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type=DeviceEntryType.SERVICE,
via_device=(DOMAIN, self._entry.entry_id),
)
@property
def native_value(self) -> str | None:
"""Return the current password value."""
if self._album_data:
return self.coordinator.get_protected_password()
return None
@property
def extra_state_attributes(self) -> dict[str, str]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
}
protected_url = self.coordinator.get_protected_url()
if protected_url:
attrs[ATTR_ALBUM_PROTECTED_URL] = protected_url
return attrs
async def async_set_value(self, value: str) -> None:
"""Set the password for the protected shared link."""
link_id = self.coordinator.get_protected_link_id()
if not link_id:
_LOGGER.error(
"Cannot set password: no protected link found for album %s",
self._album_id,
)
return
# Empty string means remove password
password = value if value else None
success = await self.coordinator.async_set_shared_link_password(
link_id, password
)
if success:
# Trigger a coordinator update to refresh the state
await self.coordinator.async_request_refresh()
else:
_LOGGER.error("Failed to update password for album %s", self._album_id)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.async_write_ha_state()

View File

@@ -16,11 +16,14 @@
"album_created": {
"name": "{album_name}: Created"
},
"album_people_count": {
"name": "{album_name}: People Count"
},
"album_public_url": {
"name": "{album_name}: Public URL"
},
"album_protected_url": {
"name": "{album_name}: Protected URL"
},
"album_protected_password": {
"name": "{album_name}: Protected Password"
}
},
"binary_sensor": {
@@ -32,6 +35,25 @@
"album_thumbnail": {
"name": "{album_name}: Thumbnail"
}
},
"text": {
"album_protected_password_edit": {
"name": "{album_name}: Share Password"
}
},
"button": {
"create_share_link": {
"name": "{album_name}: Create Share Link"
},
"delete_share_link": {
"name": "{album_name}: Delete Share Link"
},
"create_protected_link": {
"name": "{album_name}: Create Protected Link"
},
"delete_protected_link": {
"name": "{album_name}: Delete Protected Link"
}
}
},
"config": {
@@ -40,64 +62,76 @@
"title": "Connect to Immich",
"description": "Enter your Immich server details. You can get an API key from Immich → User Settings → API Keys.",
"data": {
"hub_name": "Hub Name",
"immich_url": "Immich URL",
"api_key": "API Key"
},
"data_description": {
"hub_name": "A name for this Immich server (used in entity IDs)",
"immich_url": "The URL of your Immich server (e.g., http://192.168.1.100:2283)",
"api_key": "Your Immich API key"
}
},
"albums": {
"title": "Select Albums",
"description": "Choose which albums to monitor for changes.",
"data": {
"albums": "Albums to watch"
}
}
},
"error": {
"cannot_connect": "Failed to connect to Immich server",
"invalid_auth": "Invalid API key",
"no_albums": "No albums found on the server",
"no_albums_selected": "Please select at least one album",
"unknown": "Unexpected error occurred"
},
"abort": {
"already_configured": "This Immich server is already configured"
}
},
"config_subentries": {
"album": {
"initiate_flow": {
"user": "Add Album"
},
"entry_type": "Album",
"step": {
"user": {
"title": "Add Album to Watch",
"description": "Select an album from your Immich server to monitor for changes.",
"data": {
"album_id": "Album"
}
}
},
"error": {
"cannot_connect": "Failed to connect to Immich server"
},
"abort": {
"parent_not_found": "Hub configuration not found",
"no_albums": "No albums found on the server",
"all_albums_configured": "All albums are already configured",
"album_already_configured": "This album is already being watched"
}
}
},
"options": {
"step": {
"init": {
"title": "Immich Album Watcher Options",
"description": "Configure which albums to monitor and how often to check for changes.",
"description": "Configure the polling interval for all albums.",
"data": {
"albums": "Albums to watch",
"scan_interval": "Scan interval (seconds)"
},
"data_description": {
"scan_interval": "How often to check for album changes (10-3600 seconds)"
}
}
},
"error": {
"cannot_connect": "Failed to connect to Immich server"
}
},
"services": {
"refresh": {
"name": "Refresh",
"description": "Force an immediate refresh of all album data from Immich."
"description": "Force an immediate refresh of album data from Immich."
},
"get_recent_assets": {
"name": "Get Recent Assets",
"description": "Get the most recent assets from a specific album.",
"description": "Get the most recent assets from the targeted album.",
"fields": {
"album_id": {
"name": "Album ID",
"description": "The ID of the album to get recent assets from."
},
"count": {
"name": "Count",
"description": "Number of recent assets to return (1-100)."

View File

@@ -16,11 +16,14 @@
"album_created": {
"name": "{album_name}: Дата создания"
},
"album_people_count": {
"name": "{album_name}: Число людей"
},
"album_public_url": {
"name": "{album_name}: Публичная ссылка"
},
"album_protected_url": {
"name": "{album_name}: Защищённая ссылка"
},
"album_protected_password": {
"name": "{album_name}: Пароль ссылки"
}
},
"binary_sensor": {
@@ -32,6 +35,25 @@
"album_thumbnail": {
"name": "{album_name}: Превью"
}
},
"text": {
"album_protected_password_edit": {
"name": "{album_name}: Пароль ссылки"
}
},
"button": {
"create_share_link": {
"name": "{album_name}: Создать ссылку"
},
"delete_share_link": {
"name": "{album_name}: Удалить ссылку"
},
"create_protected_link": {
"name": "{album_name}: Создать защищённую ссылку"
},
"delete_protected_link": {
"name": "{album_name}: Удалить защищённую ссылку"
}
}
},
"config": {
@@ -40,64 +62,76 @@
"title": "Подключение к Immich",
"description": "Введите данные вашего сервера Immich. API-ключ можно получить в Immich → Настройки пользователя → API-ключи.",
"data": {
"hub_name": "Название хаба",
"immich_url": "URL Immich",
"api_key": "API-ключ"
},
"data_description": {
"hub_name": "Название для этого сервера Immich (используется в ID сущностей)",
"immich_url": "URL вашего сервера Immich (например, http://192.168.1.100:2283)",
"api_key": "Ваш API-ключ Immich"
}
},
"albums": {
"title": "Выбор альбомов",
"description": "Выберите альбомы для отслеживания изменений.",
"data": {
"albums": "Альбомы для отслеживания"
}
}
},
"error": {
"cannot_connect": "Не удалось подключиться к серверу Immich",
"invalid_auth": "Неверный API-ключ",
"no_albums": "На сервере не найдено альбомов",
"no_albums_selected": "Пожалуйста, выберите хотя бы один альбом",
"unknown": "Произошла непредвиденная ошибка"
},
"abort": {
"already_configured": "Этот сервер Immich уже настроен"
}
},
"config_subentries": {
"album": {
"initiate_flow": {
"user": "Добавить альбом"
},
"entry_type": "Альбом",
"step": {
"user": {
"title": "Добавить альбом для отслеживания",
"description": "Выберите альбом с вашего сервера Immich для отслеживания изменений.",
"data": {
"album_id": "Альбом"
}
}
},
"error": {
"cannot_connect": "Не удалось подключиться к серверу Immich"
},
"abort": {
"parent_not_found": "Конфигурация хаба не найдена",
"no_albums": "На сервере не найдено альбомов",
"all_albums_configured": "Все альбомы уже настроены",
"album_already_configured": "Этот альбом уже отслеживается"
}
}
},
"options": {
"step": {
"init": {
"title": "Настройки Immich Album Watcher",
"description": "Настройте отслеживаемые альбомы и частоту проверки изменений.",
"description": "Настройте интервал опроса для всех альбомов.",
"data": {
"albums": "Альбомы для отслеживания",
"scan_interval": "Интервал сканирования (секунды)"
},
"data_description": {
"scan_interval": "Как часто проверять изменения в альбомах (10-3600 секунд)"
}
}
},
"error": {
"cannot_connect": "Не удалось подключиться к серверу Immich"
}
},
"services": {
"refresh": {
"name": "Обновить",
"description": "Принудительно обновить данные всех альбомов из Immich."
"description": "Принудительно обновить данные альбома из Immich."
},
"get_recent_assets": {
"name": "Получить последние файлы",
"description": "Получить последние файлы из указанного альбома.",
"description": "Получить последние файлы из выбранного альбома.",
"fields": {
"album_id": {
"name": "ID альбома",
"description": "ID альбома для получения последних файлов."
},
"count": {
"name": "Количество",
"description": "Количество возвращаемых файлов (1-100)."

6
hacs.json Normal file
View File

@@ -0,0 +1,6 @@
{
"name": "Immich Album Watcher",
"homeassistant": "2024.1.0",
"render_readme": true,
"content_in_root": false
}

View File

@@ -1,145 +0,0 @@
"""Immich Album Watcher integration for Home Assistant."""
from __future__ import annotations
import logging
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, ServiceCall, ServiceResponse, SupportsResponse
from homeassistant.helpers import config_validation as cv
from .const import (
ATTR_ALBUM_ID,
CONF_ALBUMS,
CONF_API_KEY,
CONF_IMMICH_URL,
CONF_SCAN_INTERVAL,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
PLATFORMS,
SERVICE_GET_RECENT_ASSETS,
SERVICE_REFRESH,
)
from .coordinator import ImmichAlbumWatcherCoordinator
_LOGGER = logging.getLogger(__name__)
# Service schemas
SERVICE_REFRESH_SCHEMA = vol.Schema({})
SERVICE_GET_RECENT_ASSETS_SCHEMA = vol.Schema(
{
vol.Required(ATTR_ALBUM_ID): cv.string,
vol.Optional("count", default=10): vol.All(
vol.Coerce(int), vol.Range(min=1, max=100)
),
}
)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Immich Album Watcher from a config entry."""
hass.data.setdefault(DOMAIN, {})
url = entry.data[CONF_IMMICH_URL]
api_key = entry.data[CONF_API_KEY]
album_ids = entry.options.get(CONF_ALBUMS, [])
scan_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
coordinator = ImmichAlbumWatcherCoordinator(
hass,
url=url,
api_key=api_key,
album_ids=album_ids,
scan_interval=scan_interval,
)
# Fetch initial data
await coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id] = coordinator
# Set up platforms
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Register update listener for options changes
entry.async_on_unload(entry.add_update_listener(async_update_options))
# Register services (only once)
await async_setup_services(hass)
_LOGGER.info(
"Immich Album Watcher set up successfully, watching %d albums",
len(album_ids),
)
return True
async def async_setup_services(hass: HomeAssistant) -> None:
"""Set up services for Immich Album Watcher."""
if hass.services.has_service(DOMAIN, SERVICE_REFRESH):
return # Services already registered
async def handle_refresh(call: ServiceCall) -> None:
"""Handle the refresh service call."""
for coordinator in hass.data[DOMAIN].values():
if isinstance(coordinator, ImmichAlbumWatcherCoordinator):
await coordinator.async_refresh_now()
async def handle_get_recent_assets(call: ServiceCall) -> ServiceResponse:
"""Handle the get_recent_assets service call."""
album_id = call.data[ATTR_ALBUM_ID]
count = call.data.get("count", 10)
for coordinator in hass.data[DOMAIN].values():
if isinstance(coordinator, ImmichAlbumWatcherCoordinator):
if coordinator.data and album_id in coordinator.data:
assets = await coordinator.async_get_recent_assets(album_id, count)
return {"assets": assets}
return {"assets": [], "error": f"Album {album_id} not found"}
hass.services.async_register(
DOMAIN,
SERVICE_REFRESH,
handle_refresh,
schema=SERVICE_REFRESH_SCHEMA,
)
hass.services.async_register(
DOMAIN,
SERVICE_GET_RECENT_ASSETS,
handle_get_recent_assets,
schema=SERVICE_GET_RECENT_ASSETS_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
async def async_update_options(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
coordinator: ImmichAlbumWatcherCoordinator = hass.data[DOMAIN][entry.entry_id]
album_ids = entry.options.get(CONF_ALBUMS, [])
scan_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
coordinator.update_config(album_ids, scan_interval)
# Reload the entry to update sensors
await hass.config_entries.async_reload(entry.entry_id)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
# Unregister services if no more entries
if not hass.data[DOMAIN]:
hass.services.async_remove(DOMAIN, SERVICE_REFRESH)
hass.services.async_remove(DOMAIN, SERVICE_GET_RECENT_ASSETS)
return unload_ok

View File

@@ -1,480 +0,0 @@
"""Data coordinator for Immich Album Watcher."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any
import aiohttp
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
ASSET_TYPE_IMAGE,
ASSET_TYPE_VIDEO,
ATTR_ADDED_ASSETS,
ATTR_ADDED_COUNT,
ATTR_ALBUM_ID,
ATTR_ALBUM_NAME,
ATTR_ALBUM_URL,
ATTR_ASSET_CREATED,
ATTR_ASSET_FILENAME,
ATTR_ASSET_OWNER,
ATTR_ASSET_OWNER_ID,
ATTR_ASSET_TYPE,
ATTR_ASSET_URL,
ATTR_CHANGE_TYPE,
ATTR_PEOPLE,
ATTR_REMOVED_ASSETS,
ATTR_REMOVED_COUNT,
DOMAIN,
EVENT_ALBUM_CHANGED,
EVENT_ASSETS_ADDED,
EVENT_ASSETS_REMOVED,
)
_LOGGER = logging.getLogger(__name__)
@dataclass
class AssetInfo:
"""Data class for asset information."""
id: str
type: str # IMAGE or VIDEO
filename: str
created_at: str
owner_id: str = ""
owner_name: str = ""
people: list[str] = field(default_factory=list)
@classmethod
def from_api_response(
cls, data: dict[str, Any], users_cache: dict[str, str] | None = None
) -> AssetInfo:
"""Create AssetInfo from API response."""
people = []
if "people" in data:
people = [p.get("name", "") for p in data["people"] if p.get("name")]
owner_id = data.get("ownerId", "")
owner_name = ""
if users_cache and owner_id:
owner_name = users_cache.get(owner_id, "")
return cls(
id=data["id"],
type=data.get("type", ASSET_TYPE_IMAGE),
filename=data.get("originalFileName", ""),
created_at=data.get("fileCreatedAt", ""),
owner_id=owner_id,
owner_name=owner_name,
people=people,
)
@dataclass
class AlbumData:
"""Data class for album information."""
id: str
name: str
asset_count: int
photo_count: int
video_count: int
created_at: str
updated_at: str
shared: bool
owner: str
thumbnail_asset_id: str | None
asset_ids: set[str] = field(default_factory=set)
assets: dict[str, AssetInfo] = field(default_factory=dict)
people: set[str] = field(default_factory=set)
has_new_assets: bool = False
last_change_time: datetime | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], users_cache: dict[str, str] | None = None
) -> AlbumData:
"""Create AlbumData from API response."""
assets_data = data.get("assets", [])
asset_ids = set()
assets = {}
people = set()
photo_count = 0
video_count = 0
for asset_data in assets_data:
asset = AssetInfo.from_api_response(asset_data, users_cache)
asset_ids.add(asset.id)
assets[asset.id] = asset
people.update(asset.people)
if asset.type == ASSET_TYPE_IMAGE:
photo_count += 1
elif asset.type == ASSET_TYPE_VIDEO:
video_count += 1
return cls(
id=data["id"],
name=data.get("albumName", "Unnamed"),
asset_count=data.get("assetCount", len(asset_ids)),
photo_count=photo_count,
video_count=video_count,
created_at=data.get("createdAt", ""),
updated_at=data.get("updatedAt", ""),
shared=data.get("shared", False),
owner=data.get("owner", {}).get("name", "Unknown"),
thumbnail_asset_id=data.get("albumThumbnailAssetId"),
asset_ids=asset_ids,
assets=assets,
people=people,
)
@dataclass
class AlbumChange:
"""Data class for album changes."""
album_id: str
album_name: str
change_type: str
added_count: int = 0
removed_count: int = 0
added_assets: list[AssetInfo] = field(default_factory=list)
removed_asset_ids: list[str] = field(default_factory=list)
class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[dict[str, AlbumData]]):
"""Coordinator for fetching Immich album data."""
def __init__(
self,
hass: HomeAssistant,
url: str,
api_key: str,
album_ids: list[str],
scan_interval: int,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=scan_interval),
)
self._url = url.rstrip("/")
self._api_key = api_key
self._album_ids = album_ids
self._previous_states: dict[str, AlbumData] = {}
self._session: aiohttp.ClientSession | None = None
self._people_cache: dict[str, str] = {} # person_id -> name
self._users_cache: dict[str, str] = {} # user_id -> name
self._shared_links_cache: dict[str, str] = {} # album_id -> share_key
@property
def immich_url(self) -> str:
"""Return the Immich URL."""
return self._url
@property
def api_key(self) -> str:
"""Return the API key."""
return self._api_key
def update_config(self, album_ids: list[str], scan_interval: int) -> None:
"""Update configuration."""
self._album_ids = album_ids
self.update_interval = timedelta(seconds=scan_interval)
async def async_refresh_now(self) -> None:
"""Force an immediate refresh."""
await self.async_request_refresh()
async def async_get_recent_assets(
self, album_id: str, count: int = 10
) -> list[dict[str, Any]]:
"""Get recent assets from an album."""
if self.data is None or album_id not in self.data:
return []
album = self.data[album_id]
# Sort assets by created_at descending
sorted_assets = sorted(
album.assets.values(),
key=lambda a: a.created_at,
reverse=True,
)[:count]
return [
{
"id": asset.id,
"type": asset.type,
"filename": asset.filename,
"created_at": asset.created_at,
"people": asset.people,
"thumbnail_url": f"{self._url}/api/assets/{asset.id}/thumbnail",
}
for asset in sorted_assets
]
async def async_fetch_people(self) -> dict[str, str]:
"""Fetch all people from Immich."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
headers = {"x-api-key": self._api_key}
try:
async with self._session.get(
f"{self._url}/api/people",
headers=headers,
) as response:
if response.status == 200:
data = await response.json()
people_list = data.get("people", data) if isinstance(data, dict) else data
self._people_cache = {
p["id"]: p.get("name", "")
for p in people_list
if p.get("name")
}
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch people: %s", err)
return self._people_cache
async def _async_fetch_users(self) -> dict[str, str]:
"""Fetch all users from Immich and cache them."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
headers = {"x-api-key": self._api_key}
try:
async with self._session.get(
f"{self._url}/api/users",
headers=headers,
) as response:
if response.status == 200:
data = await response.json()
self._users_cache = {
u["id"]: u.get("name", u.get("email", "Unknown"))
for u in data
if u.get("id")
}
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch users: %s", err)
return self._users_cache
async def _async_fetch_shared_links(self) -> dict[str, str]:
"""Fetch shared links from Immich and cache album_id -> share_key mapping."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
headers = {"x-api-key": self._api_key}
try:
async with self._session.get(
f"{self._url}/api/shared-links",
headers=headers,
) as response:
if response.status == 200:
data = await response.json()
_LOGGER.debug("Fetched %d shared links from Immich", len(data))
self._shared_links_cache.clear()
for link in data:
# Only process album-type shared links
link_type = link.get("type", "")
album = link.get("album")
key = link.get("key")
_LOGGER.debug(
"Shared link: type=%s, key=%s, album_id=%s",
link_type,
key[:8] if key else None,
album.get("id") if album else None,
)
if album and key:
album_id = album.get("id")
if album_id:
self._shared_links_cache[album_id] = key
_LOGGER.debug(
"Cached %d album shared links", len(self._shared_links_cache)
)
else:
_LOGGER.warning(
"Failed to fetch shared links: HTTP %s", response.status
)
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch shared links: %s", err)
return self._shared_links_cache
def get_album_public_url(self, album_id: str) -> str | None:
"""Get the public URL for an album if it has a shared link."""
share_key = self._shared_links_cache.get(album_id)
if share_key:
return f"{self._url}/share/{share_key}"
return None
def _get_asset_public_url(self, album_id: str, asset_id: str) -> str | None:
"""Get the public URL for an asset if the album has a shared link."""
share_key = self._shared_links_cache.get(album_id)
if share_key:
return f"{self._url}/share/{share_key}/photos/{asset_id}"
return None
async def _async_update_data(self) -> dict[str, AlbumData]:
"""Fetch data from Immich API."""
if self._session is None:
self._session = async_get_clientsession(self.hass)
# Fetch users to resolve owner names
if not self._users_cache:
await self._async_fetch_users()
# Fetch shared links to resolve public URLs (refresh each time as links can change)
await self._async_fetch_shared_links()
headers = {"x-api-key": self._api_key}
albums_data: dict[str, AlbumData] = {}
for album_id in self._album_ids:
try:
async with self._session.get(
f"{self._url}/api/albums/{album_id}",
headers=headers,
) as response:
if response.status == 404:
_LOGGER.warning("Album %s not found, skipping", album_id)
continue
if response.status != 200:
raise UpdateFailed(
f"Error fetching album {album_id}: HTTP {response.status}"
)
data = await response.json()
album = AlbumData.from_api_response(data, self._users_cache)
# Detect changes and update flags
if album_id in self._previous_states:
change = self._detect_change(
self._previous_states[album_id], album
)
if change:
album.has_new_assets = change.added_count > 0
album.last_change_time = datetime.now()
self._fire_events(change, album)
else:
# First run, no changes
album.has_new_assets = False
# Preserve has_new_assets from previous state if still within window
if album_id in self._previous_states:
prev = self._previous_states[album_id]
if prev.has_new_assets and prev.last_change_time:
# Keep the flag if change was recent
album.last_change_time = prev.last_change_time
if not album.has_new_assets:
album.has_new_assets = prev.has_new_assets
albums_data[album_id] = album
except aiohttp.ClientError as err:
raise UpdateFailed(f"Error communicating with Immich: {err}") from err
# Update previous states
self._previous_states = albums_data.copy()
return albums_data
def _detect_change(
self, old_state: AlbumData, new_state: AlbumData
) -> AlbumChange | None:
"""Detect changes between two album states."""
added_ids = new_state.asset_ids - old_state.asset_ids
removed_ids = old_state.asset_ids - new_state.asset_ids
if not added_ids and not removed_ids:
return None
change_type = "changed"
if added_ids and not removed_ids:
change_type = "assets_added"
elif removed_ids and not added_ids:
change_type = "assets_removed"
# Get full asset info for added assets
added_assets = [
new_state.assets[aid] for aid in added_ids if aid in new_state.assets
]
return AlbumChange(
album_id=new_state.id,
album_name=new_state.name,
change_type=change_type,
added_count=len(added_ids),
removed_count=len(removed_ids),
added_assets=added_assets,
removed_asset_ids=list(removed_ids),
)
def _fire_events(self, change: AlbumChange, album: AlbumData) -> None:
"""Fire Home Assistant events for album changes."""
# Build detailed asset info for events
added_assets_detail = []
for asset in change.added_assets:
asset_detail = {
"id": asset.id,
ATTR_ASSET_TYPE: asset.type,
ATTR_ASSET_FILENAME: asset.filename,
ATTR_ASSET_CREATED: asset.created_at,
ATTR_ASSET_OWNER: asset.owner_name,
ATTR_ASSET_OWNER_ID: asset.owner_id,
ATTR_PEOPLE: asset.people,
}
# Add public URL if album has a shared link
asset_url = self._get_asset_public_url(change.album_id, asset.id)
if asset_url:
asset_detail[ATTR_ASSET_URL] = asset_url
added_assets_detail.append(asset_detail)
event_data = {
ATTR_ALBUM_ID: change.album_id,
ATTR_ALBUM_NAME: change.album_name,
ATTR_CHANGE_TYPE: change.change_type,
ATTR_ADDED_COUNT: change.added_count,
ATTR_REMOVED_COUNT: change.removed_count,
ATTR_ADDED_ASSETS: added_assets_detail,
ATTR_REMOVED_ASSETS: change.removed_asset_ids,
ATTR_PEOPLE: list(album.people),
}
# Add album public URL if it has a shared link
album_url = self.get_album_public_url(change.album_id)
if album_url:
event_data[ATTR_ALBUM_URL] = album_url
# Fire general change event
self.hass.bus.async_fire(EVENT_ALBUM_CHANGED, event_data)
_LOGGER.info(
"Album '%s' changed: +%d -%d assets",
change.album_name,
change.added_count,
change.removed_count,
)
# Fire specific events
if change.added_count > 0:
self.hass.bus.async_fire(EVENT_ASSETS_ADDED, event_data)
if change.removed_count > 0:
self.hass.bus.async_fire(EVENT_ASSETS_REMOVED, event_data)
def clear_new_assets_flag(self, album_id: str) -> None:
"""Clear the new assets flag for an album."""
if self.data and album_id in self.data:
self.data[album_id].has_new_assets = False
self.data[album_id].last_change_time = None

View File

@@ -1,12 +0,0 @@
{
"domain": "immich_album_watcher",
"name": "Immich Album Watcher",
"codeowners": [],
"config_flow": true,
"dependencies": [],
"documentation": "https://github.com/your-repo/immich-album-watcher",
"iot_class": "cloud_polling",
"issue_tracker": "https://github.com/your-repo/immich-album-watcher/issues",
"requirements": [],
"version": "1.0.0"
}