Welcome to mirror list, hosted at ThFree Co, Russian Federation.

__init__.py « skykettle « custom_components - github.com/ClusterM/skykettle-ha.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 0506862275b4af820856ed5bb5f0225aa90129eb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""Support for SkyKettle."""
import logging
from .const import *
from homeassistant.core import HomeAssistant
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import *
import homeassistant.helpers.event as ev
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.entity import DeviceInfo
from datetime import timedelta
from .kettle_connection import KettleConnection
from .skykettle import SkyKettle

_LOGGER = logging.getLogger(__name__)

PLATFORMS = [
    Platform.WATER_HEATER,
    Platform.SWITCH,
    Platform.LIGHT,
    Platform.SENSOR,
    Platform.NUMBER,
]


async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
    """Set up Sky Kettle integration from a config entry."""
    entry.async_on_unload(entry.add_update_listener(entry_update_listener))

    if DOMAIN not in hass.data: hass.data[DOMAIN] = {}
    if entry.entry_id not in hass.data: hass.data[DOMAIN][entry.entry_id] = {}

    # Backward compatibility
    if (CONF_FRIENDLY_NAME in entry.data
        and not SkyKettle.get_model_code(entry.data[CONF_FRIENDLY_NAME]) 
        and SkyKettle.get_model_code(entry.data[CONF_FRIENDLY_NAME] + 'S')):
        config = dict(entry.data.items())
        config[CONF_FRIENDLY_NAME] = config[CONF_FRIENDLY_NAME] + 'S'
        hass.config_entries.async_update_entry(entry, data=config)
        _LOGGER.info(f"Fixed invalid model name: {config[CONF_FRIENDLY_NAME][:-1]} -> {config[CONF_FRIENDLY_NAME]}")

    kettle = KettleConnection(
        mac=entry.data[CONF_MAC],
        key=entry.data[CONF_PASSWORD],
        persistent=entry.data[CONF_PERSISTENT_CONNECTION],
        adapter=entry.data.get(CONF_DEVICE, None),
        hass=hass,
        model=entry.data.get(CONF_FRIENDLY_NAME, None)
    )
    hass.data[DOMAIN][entry.entry_id][DATA_CONNECTION] = kettle

    async def poll(now, **kwargs) -> None:
        await kettle.update()
        await hass.async_add_executor_job(async_dispatcher_send, hass, DISPATCHER_UPDATE)
        if hass.data[DOMAIN][DATA_WORKING]:
            schedule_poll(timedelta(seconds=entry.data[CONF_SCAN_INTERVAL]))
        else:
            _LOGGER.info("Not working anymore, stop")

    def schedule_poll(td):
        hass.data[DOMAIN][DATA_CANCEL] = ev.async_call_later(hass, td, poll)

    hass.data[DOMAIN][DATA_WORKING] = True
    hass.data[DOMAIN][DATA_DEVICE_INFO] = lambda: device_info(entry)

    for component in PLATFORMS:
        hass.async_create_task(
            hass.config_entries.async_forward_entry_setup(entry, component)
        )

    schedule_poll(timedelta(seconds=3))

    return True

def device_info(entry):
    return DeviceInfo(
        name=(FRIENDLY_NAME + " " + entry.data.get(CONF_FRIENDLY_NAME, "")).strip(),
        manufacturer=MANUFACTORER,
        model=entry.data.get(CONF_FRIENDLY_NAME, None),
        sw_version=entry.data.get(ATTR_SW_VERSION, None),
        identifiers={
            (DOMAIN, entry.data[CONF_MAC])
        },
        connections={
            ("mac", entry.data[CONF_MAC])
        }
    )

async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
    """Unload a config entry."""
    _LOGGER.debug("Unloading")
    hass.data[DOMAIN][DATA_WORKING] = False
    for component in PLATFORMS:
        hass.async_create_task(
            hass.config_entries.async_forward_entry_unload(entry, component)
        )
    hass.data[DOMAIN][DATA_CANCEL]()
    await hass.async_add_executor_job(hass.data[DOMAIN][entry.entry_id][DATA_CONNECTION].stop)
    hass.data[DOMAIN][entry.entry_id][DATA_CONNECTION] = None
    _LOGGER.debug("Entry unloaded")
    return True

async def entry_update_listener(hass, entry):
    """Handle options update."""
    kettle = hass.data[DOMAIN][entry.entry_id][DATA_CONNECTION]
    kettle.persistent = entry.data.get(CONF_PERSISTENT_CONNECTION)
    _LOGGER.debug("Options updated")