feat: add KNX gateway snapshot and command transaction handling

Signed-off-by: Tony <tonylu@tony-cloud.com>
This commit is contained in:
Tony
2026-06-13 01:47:36 +08:00
parent 40a0e8e303
commit be9ff9c2c9
8 changed files with 527 additions and 6 deletions
@@ -282,9 +282,14 @@ GatewayController::GatewayController(GatewayRuntime& runtime, DaliDomainService&
dali_domain_(dali_domain),
cache_(cache),
config_(config),
maintenance_lock_(xSemaphoreCreateMutex()) {}
maintenance_lock_(xSemaphoreCreateMutex()),
transaction_lock_(xSemaphoreCreateMutex()) {}
GatewayController::~GatewayController() {
if (transaction_lock_ != nullptr) {
vSemaphoreDelete(transaction_lock_);
transaction_lock_ = nullptr;
}
if (maintenance_lock_ != nullptr) {
vSemaphoreDelete(maintenance_lock_);
maintenance_lock_ = nullptr;
@@ -341,6 +346,74 @@ bool GatewayController::enqueueCommandFrame(const std::vector<uint8_t>& frame) {
return true;
}
GatewayCommandTransactionResult GatewayController::transactCommandFrame(
const std::vector<uint8_t>& frame,
uint32_t timeout_ms,
uint32_t idle_ms,
size_t max_response_bytes) {
GatewayCommandTransactionResult result;
if (!GatewayRuntime::isGatewayCommandFrame(frame) || !GatewayRuntime::hasValidChecksum(frame) ||
frame.size() < 5) {
result.status = GatewayCommandTransactionStatus::kInvalidFrame;
return result;
}
TransactionWaiter waiter;
waiter.gateway_id = frame[2];
waiter.opcode = frame[3];
waiter.max_response_bytes = max_response_bytes;
waiter.signal = xSemaphoreCreateBinary();
if (waiter.signal == nullptr) {
result.status = GatewayCommandTransactionStatus::kQueueRejected;
return result;
}
{
LockGuard guard(transaction_lock_);
transaction_waiters_.push_back(&waiter);
}
const bool enqueued = enqueueCommandFrame(frame);
if (!enqueued) {
{
LockGuard guard(transaction_lock_);
transaction_waiters_.erase(std::remove(transaction_waiters_.begin(),
transaction_waiters_.end(), &waiter),
transaction_waiters_.end());
}
vSemaphoreDelete(waiter.signal);
result.status = GatewayCommandTransactionStatus::kQueueRejected;
return result;
}
const TickType_t timeout_ticks = pdMS_TO_TICKS(timeout_ms);
const BaseType_t first = xSemaphoreTake(waiter.signal, timeout_ticks);
if (first == pdTRUE && idle_ms > 0) {
while (xSemaphoreTake(waiter.signal, pdMS_TO_TICKS(idle_ms)) == pdTRUE) {
}
}
{
LockGuard guard(transaction_lock_);
transaction_waiters_.erase(std::remove(transaction_waiters_.begin(),
transaction_waiters_.end(), &waiter),
transaction_waiters_.end());
result.frames = waiter.frames;
}
vSemaphoreDelete(waiter.signal);
if (waiter.overflow) {
result.status = GatewayCommandTransactionStatus::kQueueRejected;
} else if (!result.frames.empty()) {
result.status = GatewayCommandTransactionStatus::kOk;
} else if (first == pdTRUE) {
result.status = GatewayCommandTransactionStatus::kNoResponse;
} else {
result.status = GatewayCommandTransactionStatus::kTimeout;
}
return result;
}
void GatewayController::addNotificationSink(NotificationSink sink) {
if (sink) {
notification_sinks_.push_back(std::move(sink));
@@ -1109,11 +1182,65 @@ void GatewayController::publishBridgeTransportResponse(uint8_t gateway_id, uint8
}
void GatewayController::publishFrame(const std::vector<uint8_t>& frame) {
captureTransactionFrame(frame);
for (const auto& sink : notification_sinks_) {
sink(frame);
}
}
bool GatewayController::transactionFrameMatches(const TransactionWaiter& waiter,
const std::vector<uint8_t>& frame) const {
if (frame.size() < 3 || frame[0] != 0x22) {
return false;
}
const uint8_t response_opcode = frame[1];
const uint8_t response_gateway = frame.size() > 2 ? frame[2] : 0;
const bool gateway_matches = response_gateway == waiter.gateway_id ||
waiter.gateway_id == 0 ||
response_gateway == 0xff;
switch (waiter.opcode) {
case 0x06:
return response_opcode == 0x03 && gateway_matches;
case 0x09:
return (response_opcode == 0x09 && waiter.gateway_id == 0) ||
((response_opcode == 0x03 || response_opcode == 0x04) && gateway_matches);
case 0x14:
return (response_opcode == 0x03 || response_opcode == 0x04) && gateway_matches;
case kDali103QueryOpcode:
return (response_opcode == kDali103QueryResponseOpcode ||
response_opcode == kDali103NoResponseOpcode) &&
gateway_matches;
case 0x05:
case 0x0A:
case 0x30:
case kGatewayCacheOpcode:
case 0xA0:
case 0xA2:
return response_opcode == waiter.opcode && gateway_matches;
case kBridgeTransportRequestOpcode:
return response_opcode == kBridgeTransportResponseOpcode && gateway_matches;
default:
return response_opcode == waiter.opcode && gateway_matches;
}
}
void GatewayController::captureTransactionFrame(const std::vector<uint8_t>& frame) {
LockGuard guard(transaction_lock_);
for (auto* waiter : transaction_waiters_) {
if (waiter == nullptr || waiter->signal == nullptr ||
!transactionFrameMatches(*waiter, frame)) {
continue;
}
if (waiter->frames.size() + frame.size() > waiter->max_response_bytes) {
waiter->overflow = true;
xSemaphoreGive(waiter->signal);
continue;
}
waiter->frames.insert(waiter->frames.end(), frame.begin(), frame.end());
xSemaphoreGive(waiter->signal);
}
}
std::optional<uint8_t> GatewayController::applicationControllerResponse(
uint8_t gateway_id, uint8_t first, uint8_t instance, uint8_t opcode) const {
const uint8_t gateway_short =