feat(gateway): implement reconciliation mechanism and command prioritization

- Introduced a reconciliation job structure to manage the reconciliation process for gateway channels.
- Added methods to schedule and run reconciliation steps, including group, scene, and settings reconciliation.
- Implemented a locking mechanism to ensure thread safety during reconciliation operations.
- Enhanced command handling in GatewayRuntime to classify commands by priority (control, normal, maintenance).
- Updated command enqueueing and processing to respect command priorities, ensuring maintenance commands are handled appropriately.
- Added configuration options for enabling/disabling cache functionality in GatewayRuntime.
- Improved logging to include cache status during runtime initialization.

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
Tony
2026-05-02 03:04:06 +08:00
parent 70c39ea1e1
commit 639fdd860e
11 changed files with 1209 additions and 34 deletions
@@ -16,6 +16,31 @@ namespace {
constexpr const char* kTag = "gateway_controller";
constexpr size_t kMaxNameBytes = 32;
constexpr uint8_t kDaliShortAddressCount = 64;
constexpr uint8_t kDaliSceneCount = 16;
constexpr TickType_t kMaintenancePollTicks = pdMS_TO_TICKS(20);
class LockGuard {
public:
explicit LockGuard(SemaphoreHandle_t lock) : lock_(lock) {
if (lock_ != nullptr) {
xSemaphoreTake(lock_, portMAX_DELAY);
}
}
~LockGuard() {
if (lock_ != nullptr) {
xSemaphoreGive(lock_);
}
}
private:
SemaphoreHandle_t lock_;
};
bool AnyFlagSet(const GatewayCacheChannelFlags& flags) {
return flags.need_update_group || flags.need_update_scene || flags.need_update_settings;
}
std::string NormalizeName(std::string_view name) {
std::string normalized(name);
@@ -71,9 +96,18 @@ const char* PhyKindToString(DaliPhyKind phy_kind) {
GatewayController::GatewayController(GatewayRuntime& runtime, DaliDomainService& dali_domain,
GatewayCache& cache, GatewayControllerConfig config)
: runtime_(runtime), dali_domain_(dali_domain), cache_(cache), config_(config) {}
: runtime_(runtime),
dali_domain_(dali_domain),
cache_(cache),
config_(config),
maintenance_lock_(xSemaphoreCreateMutex()) {}
GatewayController::~GatewayController() = default;
GatewayController::~GatewayController() {
if (maintenance_lock_ != nullptr) {
vSemaphoreDelete(maintenance_lock_);
maintenance_lock_ = nullptr;
}
}
esp_err_t GatewayController::start() {
const auto device_info = runtime_.deviceInfo();
@@ -112,7 +146,7 @@ bool GatewayController::enqueueCommandFrame(const std::vector<uint8_t>& frame) {
ESP_LOGW(kTag, "dropped invalid command frame len=%u", static_cast<unsigned>(frame.size()));
return false;
}
if (!runtime_.enqueueCommand(frame)) {
if (!runtime_.enqueueCommand(frame, GatewayRuntime::classifyCommandPriority(frame))) {
if (runtime_.lastEnqueueDropReason() != GatewayRuntime::CommandDropReason::kDuplicate) {
ESP_LOGW(kTag, "dropped command frame reason=%d",
static_cast<int>(runtime_.lastEnqueueDropReason()));
@@ -210,15 +244,223 @@ void GatewayController::TaskEntry(void* arg) {
void GatewayController::taskLoop() {
while (true) {
bool drained = false;
while (auto command = runtime_.popNextCommand()) {
drained = true;
bool worked = false;
if (auto command = runtime_.popNextCommand()) {
worked = true;
dispatchCommand(*command);
runtime_.completeCurrentCommand();
}
if (!drained) {
ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
if (!worked) {
worked = runMaintenanceStep();
}
if (!worked) {
ulTaskNotifyTake(pdTRUE, hasPendingReconciliation() ? kMaintenancePollTicks : portMAX_DELAY);
}
}
}
void GatewayController::scheduleReconciliation(uint8_t gateway_id) {
if (!cache_.reconciliationEnabled()) {
return;
}
auto flags = cache_.pendingChannelFlags(gateway_id);
if (cache_.fullStateMirrorEnabled() && AnyFlagSet(flags)) {
flags = {true, true, true};
}
if (!AnyFlagSet(flags)) {
return;
}
{
LockGuard guard(maintenance_lock_);
reconciliation_jobs_.try_emplace(gateway_id);
}
if (task_handle_ != nullptr) {
xTaskNotifyGive(task_handle_);
}
}
bool GatewayController::hasPendingReconciliation() const {
LockGuard guard(maintenance_lock_);
return !reconciliation_jobs_.empty();
}
bool GatewayController::runMaintenanceStep() {
if (!cache_.reconciliationEnabled()) {
return false;
}
uint8_t gateway_id = 0;
ReconciliationJob job;
{
LockGuard guard(maintenance_lock_);
if (reconciliation_jobs_.empty()) {
return false;
}
const auto it = reconciliation_jobs_.begin();
gateway_id = it->first;
job = it->second;
}
if (runtime_.shouldYieldMaintenance(gateway_id)) {
return false;
}
const bool keep_job = runReconciliationStep(gateway_id, job);
{
LockGuard guard(maintenance_lock_);
auto it = reconciliation_jobs_.find(gateway_id);
if (it == reconciliation_jobs_.end()) {
return true;
}
if (keep_job) {
it->second = job;
} else {
reconciliation_jobs_.erase(it);
}
}
return true;
}
bool GatewayController::runReconciliationStep(uint8_t gateway_id, ReconciliationJob& job) {
if (job.phase == ReconciliationJob::Phase::kReloadFlags) {
job.flags = cache_.pendingChannelFlags(gateway_id);
if (cache_.fullStateMirrorEnabled() && AnyFlagSet(job.flags)) {
job.flags = {true, true, true};
}
if (!AnyFlagSet(job.flags)) {
return false;
}
job.short_address = 0;
job.scene_id = 0;
if (job.flags.need_update_group) {
job.phase = ReconciliationJob::Phase::kGroups;
} else if (job.flags.need_update_scene) {
job.phase = ReconciliationJob::Phase::kScenes;
} else {
job.phase = ReconciliationJob::Phase::kSettings;
}
}
switch (job.phase) {
case ReconciliationJob::Phase::kGroups:
reconcileGroupStep(gateway_id, job.short_address++);
if (job.short_address >= kDaliShortAddressCount) {
job.short_address = 0;
if (job.flags.need_update_scene) {
job.phase = ReconciliationJob::Phase::kScenes;
} else if (job.flags.need_update_settings) {
job.phase = ReconciliationJob::Phase::kSettings;
} else if (!cache_.clearChannelFlagsIfMatched(gateway_id, job.flags)) {
job.phase = ReconciliationJob::Phase::kReloadFlags;
} else {
return false;
}
}
return true;
case ReconciliationJob::Phase::kScenes:
reconcileSceneStep(gateway_id, job.short_address, job.scene_id);
++job.scene_id;
if (job.scene_id >= kDaliSceneCount) {
job.scene_id = 0;
++job.short_address;
}
if (job.short_address >= kDaliShortAddressCount) {
job.short_address = 0;
if (job.flags.need_update_settings) {
job.phase = ReconciliationJob::Phase::kSettings;
} else if (!cache_.clearChannelFlagsIfMatched(gateway_id, job.flags)) {
job.phase = ReconciliationJob::Phase::kReloadFlags;
} else {
return false;
}
}
return true;
case ReconciliationJob::Phase::kSettings:
reconcileSettingsStep(gateway_id, job.short_address++);
if (job.short_address >= kDaliShortAddressCount) {
job.short_address = 0;
if (!cache_.clearChannelFlagsIfMatched(gateway_id, job.flags)) {
job.phase = ReconciliationJob::Phase::kReloadFlags;
} else {
return false;
}
}
return true;
case ReconciliationJob::Phase::kReloadFlags:
default:
return true;
}
}
void GatewayController::reconcileGroupStep(uint8_t gateway_id, uint8_t short_address) {
const auto policy = cache_.priorityMode();
const auto state = cache_.daliAddressState(gateway_id, short_address);
if (policy == GatewayCachePriorityMode::kLocalGatewayFirst && state.group_mask_known) {
maintenance_activity_gateway_.store(gateway_id);
const bool applied = dali_domain_.applyGroupMask(gateway_id, short_address, state.group_mask);
maintenance_activity_gateway_.store(-1);
const auto verified_mask = dali_domain_.queryGroupMask(gateway_id, short_address);
cache_.setDaliGroupMask(gateway_id, short_address, verified_mask);
if (!applied && verified_mask.has_value()) {
ESP_LOGW(kTag, "group reconcile fallback gateway=%u short=%u", gateway_id, short_address);
}
return;
}
cache_.setDaliGroupMask(gateway_id, short_address,
dali_domain_.queryGroupMask(gateway_id, short_address));
}
void GatewayController::reconcileSceneStep(uint8_t gateway_id, uint8_t short_address,
uint8_t scene_id) {
const auto policy = cache_.priorityMode();
const auto state = cache_.daliAddressState(gateway_id, short_address);
if (policy == GatewayCachePriorityMode::kLocalGatewayFirst &&
state.scene_levels[scene_id].has_value()) {
maintenance_activity_gateway_.store(gateway_id);
dali_domain_.applySceneLevel(gateway_id, short_address, scene_id, state.scene_levels[scene_id]);
maintenance_activity_gateway_.store(-1);
}
cache_.setDaliSceneLevel(gateway_id, short_address, scene_id,
dali_domain_.querySceneLevel(gateway_id, short_address, scene_id));
}
void GatewayController::reconcileSettingsStep(uint8_t gateway_id, uint8_t short_address) {
const auto policy = cache_.priorityMode();
const auto state = cache_.daliAddressState(gateway_id, short_address);
if (policy == GatewayCachePriorityMode::kLocalGatewayFirst && state.settings.anyKnown()) {
maintenance_activity_gateway_.store(gateway_id);
dali_domain_.applyAddressSettings(gateway_id, short_address, {
state.settings.power_on_level,
state.settings.system_failure_level,
state.settings.min_level,
state.settings.max_level,
state.settings.fade_time,
state.settings.fade_rate,
});
maintenance_activity_gateway_.store(-1);
}
const auto settings = dali_domain_.queryAddressSettings(gateway_id, short_address);
if (settings.has_value()) {
cache_.setDaliSettings(gateway_id, short_address,
GatewayCacheDaliSettingsSnapshot{settings->power_on_level,
settings->system_failure_level,
settings->min_level,
settings->max_level,
settings->fade_time,
settings->fade_rate});
} else {
cache_.setDaliSettings(gateway_id, short_address, std::nullopt);
}
}
@@ -480,10 +722,6 @@ void GatewayController::handleDaliRawFrame(const DaliRawFrame& frame) {
if (frame.data.size() != 2 && frame.data.size() != 3) {
return;
}
if (setup_mode_ || dali_domain_.isAllocAddr(frame.gateway_id) ||
runtime_.hasActiveQueryCommand(frame.gateway_id)) {
return;
}
uint8_t addr = 0;
uint8_t data = 0;
@@ -498,6 +736,22 @@ void GatewayController::handleDaliRawFrame(const DaliRawFrame& frame) {
data = frame.data[2];
}
const bool maintenance_activity = maintenance_activity_gateway_.load() == frame.gateway_id;
const bool local_activity = maintenance_activity || runtime_.hasActiveCommand(frame.gateway_id) ||
dali_domain_.isAllocAddr(frame.gateway_id);
const bool flagged = cache_.observeDaliCommand(frame.gateway_id, addr, data,
local_activity
? GatewayCacheRawFrameOrigin::kLocalGateway
: GatewayCacheRawFrameOrigin::kOutsideBus);
if (flagged) {
scheduleReconciliation(frame.gateway_id);
}
if (setup_mode_ || dali_domain_.isAllocAddr(frame.gateway_id) || maintenance_activity ||
runtime_.hasActiveQueryCommand(frame.gateway_id)) {
return;
}
publishPayload(frame.gateway_id, {0x01, frame.gateway_id, addr, data});
}