Example using the SmartDeviceWorker class
Example code using the SmartDeviceWorker class which implements a worker thread to manage the interface to a SCSmartDevice() instance.
Module to initialise the SCSmartDevice instance
See the example config page for the YAML configuration used by this example.
"""Manual testing code for the ShellyControl class."""
import threading
from mergedeep import merge
from sc_foundation import (
SCCommon,
SCConfigManager,
SCLogger,
)
from validation_extras import smart_switch_extra_validation
from sc_smart_device import SCSmartDevice, smart_devices_validator
CONFIG_FILE = "examples/switch_config.yaml"
def switch_init(wake_event: threading.Event | None = None, extra_validation: dict | None = None) -> tuple[SCConfigManager, SCLogger, SCSmartDevice]:
"""Create an instance of the SCConfigManager, SCLogger and SCSmartDevice class.
Args:
wake_event (threading.Event | None): Optional threading event to signal webhook events.
extra_validation (dict | None): Optional extra validation schema to merge with the default schema.
Returns:
tuple[SCConfigManager, SCLogger, SCSmartDevice]: A tuple containing the initialized SCConfigManager, SCLogger, and SCSmartDevice instances.
Raises:
RuntimeError: If there is an error with the configuration file, logger initialization, or SCSmartDevice initialization.
"""
# Merge the SmartDevices validation schema with the default validation schema
merged_schema = merge({}, smart_devices_validator, smart_switch_extra_validation, extra_validation or {})
assert isinstance(merged_schema, dict), "Merged schema should be type dict"
# Initialize the SC_ConfigManager class
try:
config = SCConfigManager(
config_file=CONFIG_FILE,
validation_schema=merged_schema,
)
except RuntimeError as e:
error_msg = f"Configuration file error: {e}"
raise RuntimeError(error_msg) from e
# Initialize the SC_Logger class
try:
logger_settings = config.get_logger_settings()
logger = SCLogger(logger_settings)
except RuntimeError as e:
error_msg = f"Logger initialisation error: {e}"
raise RuntimeError(error_msg) from e
# Test internet connection
if not SCCommon.check_internet_connection():
logger.log_message("No internet connection detected.", "error")
smart_switch_settings = config.get("SCSmartDevices")
if smart_switch_settings is None:
error_msg = "No SmartDevices settings found in the configuration file."
raise RuntimeError(error_msg)
# Initialize the SCSmartDevice class
try:
smart_switch_control = SCSmartDevice(logger, smart_switch_settings, wake_event)
except RuntimeError as e:
error_msg = f"SCSmartDevice initialization error: {e}"
raise RuntimeError(error_msg) from e
logger.log_message(f"SCSmartDevice initialized successfully with {len(smart_switch_control.devices)} devices.", "summary")
return config, logger, smart_switch_control
Example application
"""Example demonstrating SmartDeviceWorker for sequenced device control.
SmartDeviceWorker runs a background thread that processes
DeviceSequenceRequest jobs. Each job is an ordered list of DeviceStep
steps (REFRESH_STATUS, CHANGE_OUTPUT, SLEEP, GET_LOCATION). The main
thread submits a request, optionally waits for it to finish, then reads
the latest device state from a SmartDeviceView snapshot.
This example submits a single sequence that:
1. Refreshes status for all devices
2. Turns Sydney Dev A O1 ON
3. Waits 5 seconds
4. Turns Sydney Dev B O1 ON
5. Waits 5 seconds
6. Turns Sydney Dev A O1 OFF
7. Waits 5 seconds
8. Turns Sydney Dev B O1 OFF
"""
import platform
import sys
import threading
from sc_foundation import SCLogger
from switch_init import switch_init
from sc_smart_device import (
DeviceSequenceRequest,
DeviceStep,
SCSmartDevice,
SmartDeviceView,
SmartDeviceWorker,
StepKind,
)
# Output names as defined in switch_config.yaml
DEVICE_NAME = "Sydney Dev"
OUTPUT_A = "Sydney Dev A O1"
OUTPUT_B = "Sydney Dev B O1"
def print_view_summary(logger: SCLogger, view: SmartDeviceView) -> None:
"""Log a one-line status summary for each device using a SmartDeviceView snapshot.
Args:
logger: SCLogger instance.
view: Frozen SmartDeviceView snapshot to read from.
"""
snapshot = view.get_json_snapshot()
for device in snapshot["devices"]:
device_id = device["ID"]
name = device.get("Name", f"Device {device_id}")
online = view.get_device_online(device_id)
# Collect output states for this device
output_states = []
for output in snapshot["outputs"]:
if output.get("DeviceID") == device_id:
output_states.append(f"{output['Name']}={'ON' if output.get('State') else 'OFF'}")
state_str = ", ".join(output_states) if output_states else "no outputs"
logger.log_message(
f" {name}: online={online} | {state_str}",
"summary",
)
def create_worker(logger: SCLogger, smart_switch_control: SCSmartDevice) -> tuple[SmartDeviceWorker, threading.Thread]:
"""Wake_event is set by the worker after every completed request, which allows a main-loop thread to react without polling.
Args:
logger: SCLogger instance.
smart_switch_control: SCSmartDevice instance to control.
Returns:
A tuple of (SmartDeviceWorker instance, worker thread) for the caller to manage.
"""
wake_event = threading.Event()
worker = SmartDeviceWorker(
smart_device=smart_switch_control,
logger=logger,
wake_event=wake_event,
)
worker_thread = threading.Thread(target=worker.run, daemon=True, name="device-worker")
worker_thread.start()
logger.log_message("Worker thread started.", "detailed")
return worker, worker_thread
def build_sequence() -> DeviceSequenceRequest:
"""Build the DeviceSequenceRequest defining the steps to execute.
Steps execute in order within a single request. The worker guarantees
each step finishes (or exhausts its retries) before the next begins.
Returns:
A DeviceSequenceRequest instance with the desired steps and parameters.
"""
steps = [
# Step 1: refresh status so our initial snapshot is up to date
DeviceStep(StepKind.REFRESH_STATUS),
# Step 2: turn Sydney Dev A O1 on
DeviceStep(
StepKind.CHANGE_OUTPUT,
{"output_identity": OUTPUT_A, "state": True},
retries=1,
retry_backoff_s=2.0,
),
# Step 3: wait 5 seconds
DeviceStep(StepKind.SLEEP, {"seconds": 5}),
# Step 4: turn Sydney Dev B O1 on
DeviceStep(
StepKind.CHANGE_OUTPUT,
{"output_identity": OUTPUT_B, "state": True},
retries=1,
retry_backoff_s=2.0,
),
# Step 5: wait 5 seconds
DeviceStep(StepKind.SLEEP, {"seconds": 5}),
# Step 6: turn Sydney Dev A O1 off
DeviceStep(
StepKind.CHANGE_OUTPUT,
{"output_identity": OUTPUT_A, "state": False},
retries=1,
retry_backoff_s=2.0,
),
# Step 7: wait 5 seconds
DeviceStep(StepKind.SLEEP, {"seconds": 5}),
# Step 8: turn Sydney Dev B O1 off
DeviceStep(
StepKind.CHANGE_OUTPUT,
{"output_identity": OUTPUT_B, "state": False},
retries=1,
retry_backoff_s=2.0,
),
# Step 9: refresh status so our final snapshot is up to date
DeviceStep(StepKind.REFRESH_STATUS),
]
req = DeviceSequenceRequest(
steps=steps,
label="switch_worker_demo",
timeout_s=60.0,
)
return req
def run_sequence(logger: SCLogger, worker: SmartDeviceWorker, sequence_req: DeviceSequenceRequest) -> None:
"""Build and run the demonstration sequence via SmartDeviceWorker.
Args:
logger: SCLogger instance.
worker: SmartDeviceWorker instance to submit the sequence to.
sequence_req: DeviceSequenceRequest defining the steps to execute.
"""
logger.log_message("\n\n\nStarting SmartDeviceWorker sequence example", "summary")
# ── Show initial state ───────────────────────────────────────────────────
logger.log_message("Initial device state (from worker snapshot):", "summary")
print_view_summary(logger, worker.get_latest_status())
# ── Submit and wait ───────────────────────────────────────────────────────
logger.log_message(
f"Submitting sequence '{sequence_req.label}' ({len(sequence_req.steps)} steps)...", "summary"
)
req_id = worker.submit(sequence_req)
# Block until the sequence finishes or the overall timeout expires.
# We add a small buffer beyond the sequence timeout_s for safety.
wait_timeout = (sequence_req.timeout_s or 60.0) + 5.0
completed = worker.wait_for_result(req_id, timeout=wait_timeout)
# ── Collect and report the result ─────────────────────────────────────────
result = worker.get_result(req_id)
if not completed or result is None:
logger.log_message("Sequence timed out waiting for a result.", "error")
elif result.ok:
elapsed = result.finished_ts - result.started_ts
logger.log_message(
f"Sequence '{sequence_req.label}' completed successfully in {elapsed:.1f}s.", "summary"
)
else:
logger.log_message(
f"Sequence '{sequence_req.label}' failed: {result.error}", "error"
)
def query_devices(logger: SCLogger, worker: SmartDeviceWorker) -> None:
"""Example of querying the latest device state from the worker's SmartDeviceView snapshot.
Args:
logger: SCLogger instance.
worker: SmartDeviceWorker instance to query the view from.
"""
view: SmartDeviceView = worker.get_latest_status()
logger.log_message("Device state:", "summary")
print_view_summary(logger, view)
# Get a standard and extended device attrbute like online status by device ID
device_id = view.get_device_id(f"{DEVICE_NAME} A")
temperature = view.get_device_temperature(device_id)
hostname = view.get_device_value(device_id, "Hostname", default="Unknown") # custom attribute defined in switch_config.yaml
print(f"{DEVICE_NAME} (ID = {device_id}). Temperature: {temperature}°C, Hostname: {hostname}")
# Get standard, extended and custom output attributes by output name
component_id = view.get_output_id(OUTPUT_A)
output_state = view.get_output_state(component_id)
object_type = view.get_output_value(component_id, "ObjectType", default="Unknown") # extended attribute defined by SCSmartDevice
group_name = view.get_output_value(component_id, "Group", default="Ungrouped") # extended attribute defined by SCSmartDevice
color = view.get_output_value(component_id, "Colour", default="None") # custom attribute defined in switch_config.yaml
print(f"{OUTPUT_A} (ID = {component_id}) state: {'ON' if output_state else 'OFF'}, ObjectType: {object_type}, Group: {group_name}, Colour: {color}")
def shutdown_worker(logger: SCLogger, worker: SmartDeviceWorker, worker_thread: threading.Thread) -> None:
"""Cleanly shut down the worker thread."""
worker.stop()
worker_thread.join(timeout=5)
logger.log_message("Worker thread stopped.", "detailed")
def main() -> None:
"""Main entry point for the switch_worker example."""
print(f"Hello from switch_worker running on {platform.system()}")
try:
_config, logger, smart_switch_control = switch_init()
except RuntimeError as e:
print(f"Initialization error: {e}", file=sys.stderr)
sys.exit(1)
worker, worker_thread = create_worker(logger, smart_switch_control)
sequence_req = build_sequence() # noqa: F841
# run_sequence(logger, worker=worker, sequence_req=sequence_req)
query_devices(logger, worker)
shutdown_worker(logger, worker, worker_thread)
if __name__ == "__main__":
main()