Linux MS/TP Fixes (#1051)

* Linux MS/TP:
 * dlmstp_init:
   * Maximize thread priority if permitted

 * MSTP_Put_Receive/dlmstp_receive:
   * Use a buffer so incoming frames are not dropped during bursts

 * dlmstp_receive:
   * Make timeout optional

 * MSTP_Get_Reply:
   * Lock the mutex before using the PDU Queue
   * Iterate over all queued replies to find a match (instead of only checking the first)
   * Sleep for a millisecond before checking again to wait for the application provide a reply

 * RS485_Check_UART_Data:
   * Ensure waiter is initilized before using with select
   * Ensure bytes are read before adding to FIFO

TSM:
 * Add log when `datalink_send_pdu` fails for consistency with all other `datalink_send_pdu` calls

dlenv_maintenance_timer:
 * Fix Tx/Rx labels in MSTP statitistics print

* don't log on queued requests
This commit is contained in:
Ryan Mulder
2025-07-22 15:51:14 -04:00
committed by GitHub
parent 1a7852f58e
commit 85ba0b2315
4 changed files with 204 additions and 136 deletions
+193 -126
View File
@@ -29,7 +29,11 @@
#include "rs485.h" #include "rs485.h"
/* packet queues */ /* packet queues */
static DLMSTP_PACKET Receive_Packet; #ifndef MSTP_RECEIVE_PACKET_COUNT
#define MSTP_RECEIVE_PACKET_COUNT 8
#endif
static DLMSTP_PACKET Receive_Buffer[MSTP_RECEIVE_PACKET_COUNT];
static RING_BUFFER Receive_Queue;
/* mechanism to wait for a packet */ /* mechanism to wait for a packet */
static pthread_cond_t Receive_Packet_Flag; static pthread_cond_t Receive_Packet_Flag;
static pthread_mutex_t Receive_Packet_Mutex; static pthread_mutex_t Receive_Packet_Mutex;
@@ -124,6 +128,9 @@ int dlmstp_send_pdu(
} }
} }
pthread_mutex_unlock(&Ring_Buffer_Mutex); pthread_mutex_unlock(&Ring_Buffer_Mutex);
if (!pkt) {
debug_printf("DLMSTP: PDU Queue Full!\n");
}
return bytes_sent; return bytes_sent;
} }
@@ -260,23 +267,17 @@ static bool dlmstp_compare_data_expecting_reply(
reply.invoke_id = reply_pdu[offset + 1]; reply.invoke_id = reply_pdu[offset + 1];
break; break;
default: default:
/* A queued request, just look for another */
return false; return false;
} }
if (request.invoke_id != reply.invoke_id) {
/* Normal to have multiple replies queued, just look for another */
return false;
}
/* these don't have service choice included */ /* these don't have service choice included */
if ((reply.pdu_type == PDU_TYPE_REJECT) || if ((request.pdu_type != PDU_TYPE_REJECT) &&
(reply.pdu_type == PDU_TYPE_ABORT) || (request.pdu_type != PDU_TYPE_ABORT) &&
(reply.pdu_type == PDU_TYPE_SEGMENT_ACK)) { (request.pdu_type != PDU_TYPE_SEGMENT_ACK)) {
if (request.invoke_id != reply.invoke_id) {
debug_printf("DLMSTP: DER Compare failed: "
"Invoke ID mismatch.\n");
return false;
}
} else {
if (request.invoke_id != reply.invoke_id) {
debug_printf("DLMSTP: DER Compare failed: "
"Invoke ID mismatch.\n");
return false;
}
if (request.service_choice != reply.service_choice) { if (request.service_choice != reply.service_choice) {
debug_printf("DLMSTP: DER Compare failed: " debug_printf("DLMSTP: DER Compare failed: "
"Service choice mismatch.\n"); "Service choice mismatch.\n");
@@ -307,98 +308,6 @@ static bool dlmstp_compare_data_expecting_reply(
return true; return true;
} }
/**
* @brief The MS/TP state machine uses this function for getting data to send
* as the reply to a DATA_EXPECTING_REPLY frame, or nothing
* @param mstp_port MSTP port structure for this port
* @param timeout number of milliseconds to wait for a packet
* @return number of bytes, or 0 if no reply is available
*/
uint16_t MSTP_Get_Reply(struct mstp_port_struct_t *mstp_port, unsigned timeout)
{
uint16_t pdu_len = 0;
bool matched = false;
uint8_t frame_type = 0;
struct mstp_pdu_packet *pkt;
(void)timeout;
if (Ringbuf_Empty(&PDU_Queue)) {
return 0;
}
pkt = (struct mstp_pdu_packet *)Ringbuf_Peek(&PDU_Queue);
/* is this the reply to the DER? */
matched = dlmstp_compare_data_expecting_reply(
&mstp_port->InputBuffer[0], mstp_port->DataLength,
mstp_port->SourceAddress, (uint8_t *)&pkt->buffer[0], pkt->length,
pkt->destination_mac);
if (!matched) {
return 0;
}
if (pkt->data_expecting_reply) {
frame_type = FRAME_TYPE_BACNET_DATA_EXPECTING_REPLY;
} else {
frame_type = FRAME_TYPE_BACNET_DATA_NOT_EXPECTING_REPLY;
}
/* convert the PDU into the MSTP Frame */
pdu_len = MSTP_Create_Frame(
&mstp_port->OutputBuffer[0], /* <-- loading this */
mstp_port->OutputBufferSize, frame_type, pkt->destination_mac,
mstp_port->This_Station, (uint8_t *)&pkt->buffer[0], pkt->length);
DLMSTP_Statistics.transmit_pdu_counter++;
(void)Ringbuf_Pop(&PDU_Queue, NULL);
return pdu_len;
}
/**
* @brief Send an MSTP frame
* @param mstp_port - port specific data
* @param buffer - data to send
* @param nbytes - number of bytes of data to send
*/
void MSTP_Send_Frame(
struct mstp_port_struct_t *mstp_port,
const uint8_t *buffer,
uint16_t nbytes)
{
RS485_Send_Frame(mstp_port, buffer, nbytes);
DLMSTP_Statistics.transmit_frame_counter++;
}
/**
* @brief MS/TP state machine received a frame
* @return number of bytes queued, or 0 if unable to be queued
*/
uint16_t MSTP_Put_Receive(struct mstp_port_struct_t *mstp_port)
{
uint16_t pdu_len = 0;
pthread_mutex_lock(&Receive_Packet_Mutex);
if (Receive_Packet.ready) {
debug_printf("MS/TP: Dropped! Not Ready.\n");
} else {
/* bounds check - maybe this should send an abort? */
pdu_len = mstp_port->DataLength;
if (pdu_len > sizeof(Receive_Packet.pdu)) {
pdu_len = sizeof(Receive_Packet.pdu);
}
if (pdu_len == 0) {
debug_printf("MS/TP: PDU Length is 0!\n");
}
memmove(
(void *)&Receive_Packet.pdu[0], (void *)&mstp_port->InputBuffer[0],
pdu_len);
dlmstp_fill_bacnet_address(
&Receive_Packet.address, mstp_port->SourceAddress);
Receive_Packet.pdu_len = mstp_port->DataLength;
Receive_Packet.ready = true;
pthread_cond_signal(&Receive_Packet_Flag);
}
pthread_mutex_unlock(&Receive_Packet_Mutex);
return pdu_len;
}
/** /**
* Add a certain number of nanoseconds to the specified time. * Add a certain number of nanoseconds to the specified time.
* *
@@ -438,13 +347,121 @@ static void get_abstime(struct timespec *abstime, unsigned long milliseconds)
timespec_add_ns(abstime, 1000000 * milliseconds); timespec_add_ns(abstime, 1000000 * milliseconds);
} }
static void millisleep(const unsigned long milliseconds)
{
struct timespec abstime;
get_abstime(&abstime, milliseconds);
while (EINTR ==
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &abstime, NULL)) { }
}
/**
* @brief The MS/TP state machine uses this function for getting data to send
* as the reply to a DATA_EXPECTING_REPLY frame, or nothing
* @param mstp_port MSTP port structure for this port
* @param timeout number of milliseconds to wait for a packet
* @return number of bytes, or 0 if no reply is available
*/
uint16_t MSTP_Get_Reply(struct mstp_port_struct_t *mstp_port, unsigned timeout)
{
uint16_t pdu_len = 0;
bool matched = false;
uint8_t frame_type = 0;
struct mstp_pdu_packet *pkt;
(void)timeout;
pthread_mutex_lock(&Ring_Buffer_Mutex);
for (pkt = (struct mstp_pdu_packet *)Ringbuf_Peek(&PDU_Queue); pkt;
pkt = (struct mstp_pdu_packet *)Ringbuf_Peek_Next(
&PDU_Queue, (uint8_t *)pkt)) {
/* is this the reply to the DER? */
matched = dlmstp_compare_data_expecting_reply(
&mstp_port->InputBuffer[0], mstp_port->DataLength,
mstp_port->SourceAddress, (uint8_t *)&pkt->buffer[0], pkt->length,
pkt->destination_mac);
if (matched) {
break;
}
}
if (matched) {
if (pkt->data_expecting_reply) {
frame_type = FRAME_TYPE_BACNET_DATA_EXPECTING_REPLY;
} else {
frame_type = FRAME_TYPE_BACNET_DATA_NOT_EXPECTING_REPLY;
}
/* convert the PDU into the MSTP Frame */
pdu_len = MSTP_Create_Frame(
&mstp_port->OutputBuffer[0], /* <-- loading this */
mstp_port->OutputBufferSize, frame_type, pkt->destination_mac,
mstp_port->This_Station, (uint8_t *)&pkt->buffer[0], pkt->length);
DLMSTP_Statistics.transmit_pdu_counter++;
/* This will pop the element no matter where we found it */
(void)Ringbuf_Pop_Element(&PDU_Queue, (uint8_t *)pkt, NULL);
}
pthread_mutex_unlock(&Ring_Buffer_Mutex);
if (pdu_len <= 0) {
/* Didn't find a match so wait for application layer to provide one */
millisleep(1);
}
return pdu_len;
}
/**
* @brief Send an MSTP frame
* @param mstp_port - port specific data
* @param buffer - data to send
* @param nbytes - number of bytes of data to send
*/
void MSTP_Send_Frame(
struct mstp_port_struct_t *mstp_port,
const uint8_t *buffer,
uint16_t nbytes)
{
RS485_Send_Frame(mstp_port, buffer, nbytes);
DLMSTP_Statistics.transmit_frame_counter++;
}
/**
* @brief MS/TP state machine received a frame
* @return number of bytes queued, or 0 if unable to be queued
*/
uint16_t MSTP_Put_Receive(struct mstp_port_struct_t *mstp_port)
{
uint16_t pdu_len = 0;
pthread_mutex_lock(&Receive_Packet_Mutex);
DLMSTP_PACKET *pkt = (DLMSTP_PACKET *)Ringbuf_Data_Peek(&Receive_Queue);
if (!pkt) {
debug_printf("MS/TP: Dropped! Not Ready.\n");
} else {
/* bounds check - maybe this should send an abort? */
pdu_len = mstp_port->DataLength;
if (pdu_len > sizeof(pkt->pdu)) {
pdu_len = sizeof(pkt->pdu);
}
if (pdu_len == 0) {
debug_printf("MS/TP: PDU Length is 0!\n");
}
memmove(
(void *)&pkt->pdu[0], (void *)&mstp_port->InputBuffer[0], pdu_len);
dlmstp_fill_bacnet_address(&pkt->address, mstp_port->SourceAddress);
pkt->pdu_len = mstp_port->DataLength;
pkt->ready = true;
if (Ringbuf_Data_Put(&Receive_Queue, (uint8_t *)pkt)) {
pthread_cond_signal(&Receive_Packet_Flag);
}
}
pthread_mutex_unlock(&Receive_Packet_Mutex);
return pdu_len;
}
/** /**
* @brief Run the MS/TP state machines, and get packet if available * @brief Run the MS/TP state machines, and get packet if available
* @param pdu - place to put PDU data for the caller * @param pdu - place to put PDU data for the caller
* @param max_pdu - number of bytes of PDU data that caller can receive * @param max_pdu - number of bytes of PDU data that caller can receive
* @return number of bytes in received packet, or 0 if no packet was received * @return number of bytes in received packet, or 0 if no packet was received
* @note Must be called at least once every 1 milliseconds, with no more than
* 5 milliseconds jitter.
*/ */
uint16_t dlmstp_receive( uint16_t dlmstp_receive(
BACNET_ADDRESS *src, /* source address */ BACNET_ADDRESS *src, /* source address */
@@ -454,28 +471,32 @@ uint16_t dlmstp_receive(
{ /* milliseconds to wait for a packet */ { /* milliseconds to wait for a packet */
uint16_t pdu_len = 0; uint16_t pdu_len = 0;
struct timespec abstime; struct timespec abstime;
DLMSTP_PACKET *pkt;
(void)max_pdu; (void)max_pdu;
pthread_mutex_lock(&Receive_Packet_Mutex);
if (timeout > 0) {
get_abstime(&abstime, timeout);
pthread_cond_timedwait(
&Receive_Packet_Flag, &Receive_Packet_Mutex, &abstime);
}
/* see if there is a packet available, and a place /* see if there is a packet available, and a place
to put the reply (if necessary) and process it */ to put the reply (if necessary) and process it */
pthread_mutex_lock(&Receive_Packet_Mutex); pkt = (DLMSTP_PACKET *)Ringbuf_Peek(&Receive_Queue);
get_abstime(&abstime, timeout); if (pkt) {
pthread_cond_timedwait( if (pkt->pdu_len) {
&Receive_Packet_Flag, &Receive_Packet_Mutex, &abstime);
if (Receive_Packet.ready) {
if (Receive_Packet.pdu_len) {
DLMSTP_Statistics.receive_pdu_counter++; DLMSTP_Statistics.receive_pdu_counter++;
if (src) { if (src) {
memmove( memmove(src, &pkt->address, sizeof(pkt->address));
src, &Receive_Packet.address,
sizeof(Receive_Packet.address));
} }
if (pdu) { if (pdu) {
memmove(pdu, &Receive_Packet.pdu, sizeof(Receive_Packet.pdu)); memmove(pdu, &pkt->pdu, sizeof(pkt->pdu));
} }
pdu_len = Receive_Packet.pdu_len; pdu_len = pkt->pdu_len;
} }
Receive_Packet.ready = false; pkt->ready = false;
(void)Ringbuf_Pop(&Receive_Queue, NULL);
} }
pthread_mutex_unlock(&Receive_Packet_Mutex); pthread_mutex_unlock(&Receive_Packet_Mutex);
@@ -979,6 +1000,8 @@ void dlmstp_silence_reset(void *arg)
*/ */
bool dlmstp_init(char *ifname) bool dlmstp_init(char *ifname)
{ {
pthread_attr_t thread_attr;
struct sched_param sch_param;
pthread_condattr_t attr; pthread_condattr_t attr;
int rv = 0; int rv = 0;
@@ -1002,8 +1025,9 @@ bool dlmstp_init(char *ifname)
&PDU_Queue, (uint8_t *)&PDU_Buffer, sizeof(struct mstp_pdu_packet), &PDU_Queue, (uint8_t *)&PDU_Buffer, sizeof(struct mstp_pdu_packet),
MSTP_PDU_PACKET_COUNT); MSTP_PDU_PACKET_COUNT);
/* initialize packet queue */ /* initialize packet queue */
Receive_Packet.ready = false; Ringbuf_Init(
Receive_Packet.pdu_len = 0; &Receive_Queue, (uint8_t *)&Receive_Buffer, sizeof(DLMSTP_PACKET),
MSTP_RECEIVE_PACKET_COUNT);
rv = pthread_cond_init(&Receive_Packet_Flag, &attr); rv = pthread_cond_init(&Receive_Packet_Flag, &attr);
if (rv != 0) { if (rv != 0) {
fprintf( fprintf(
@@ -1065,11 +1089,54 @@ bool dlmstp_init(char *ifname)
(MSTP_Port.CheckAutoBaud ? "true" : "false")); (MSTP_Port.CheckAutoBaud ? "true" : "false"));
fflush(stderr); fflush(stderr);
#endif #endif
pthread_attr_init(&thread_attr);
// Set scheduling policy to SCHED_FIFO and priority
rv = pthread_attr_setinheritsched(&thread_attr, PTHREAD_EXPLICIT_SCHED);
if (rv != 0) {
fprintf(
stderr,
"MS/TP Interface: %s\n cannot setup thread schedule to "
"explicit.\n",
ifname);
exit(1);
}
rv = pthread_attr_setschedpolicy(&thread_attr, SCHED_FIFO);
if (rv != 0) {
fprintf(
stderr,
"MS/TP Interface: %s\n cannot setup thread schedule policy to "
"FIFO.\n",
ifname);
exit(1);
}
sch_param.sched_priority = 99;
rv = pthread_attr_setschedparam(&thread_attr, &sch_param);
if (rv != 0) {
fprintf(
stderr, "MS/TP Interface: %s\n cannot setup thread priority.\n",
ifname);
exit(1);
}
/* start one thread */ /* start one thread */
Thread_Run = true; Thread_Run = true;
rv = pthread_create(&hThread, NULL, dlmstp_thread, NULL); rv = pthread_create(&hThread, &thread_attr, dlmstp_thread, NULL);
if (rv == EPERM) {
fprintf(
stdout,
"MS/TP Interface: %s\n"
" Insufficient permissions to create thread with priority.\n"
" A thread without priority will be created.\n"
" Run this executable as a user with thread priority permission\n"
" or grant capability with \"setcap 'cap_sys_nice=eip'\"",
ifname);
rv = pthread_create(&hThread, NULL, dlmstp_thread, NULL);
}
if (rv != 0) { if (rv != 0) {
fprintf(stderr, "Failed to start MS/TP thread\n"); fprintf(
stderr, "MS/TP Interface: %s\n Failed to start MS/TP thread.\n",
ifname);
exit(1);
} }
return true; return true;
+5 -8
View File
@@ -249,6 +249,8 @@ void RS485_Check_UART_Data(struct mstp_port_struct_t *mstp_port)
ssize_t n; ssize_t n;
int handle = RS485_Handle; int handle = RS485_Handle;
FIFO_BUFFER *fifo = &Rx_FIFO; FIFO_BUFFER *fifo = &Rx_FIFO;
waiter.tv_sec = 0;
waiter.tv_usec = 5000;
SHARED_MSTP_DATA *poSharedData = (SHARED_MSTP_DATA *)mstp_port->UserData; SHARED_MSTP_DATA *poSharedData = (SHARED_MSTP_DATA *)mstp_port->UserData;
if (poSharedData) { if (poSharedData) {
@@ -258,9 +260,6 @@ void RS485_Check_UART_Data(struct mstp_port_struct_t *mstp_port)
if (mstp_port->ReceiveError == true) { if (mstp_port->ReceiveError == true) {
/* do nothing but wait for state machine to clear the error */ /* do nothing but wait for state machine to clear the error */
/* burning time, so wait a longer time */
waiter.tv_sec = 0;
waiter.tv_usec = 5000;
} else if (mstp_port->DataAvailable == false) { } else if (mstp_port->DataAvailable == false) {
/* wait for state machine to read from the DataRegister */ /* wait for state machine to read from the DataRegister */
if (FIFO_Count(fifo) > 0) { if (FIFO_Count(fifo) > 0) {
@@ -270,10 +269,6 @@ void RS485_Check_UART_Data(struct mstp_port_struct_t *mstp_port)
/* FIFO is giving data - just poll */ /* FIFO is giving data - just poll */
waiter.tv_sec = 0; waiter.tv_sec = 0;
waiter.tv_usec = 0; waiter.tv_usec = 0;
} else {
/* FIFO is empty - wait a longer time */
waiter.tv_sec = 0;
waiter.tv_usec = 5000;
} }
} }
/* grab bytes and stuff them into the FIFO every time */ /* grab bytes and stuff them into the FIFO every time */
@@ -285,7 +280,9 @@ void RS485_Check_UART_Data(struct mstp_port_struct_t *mstp_port)
} }
if (FD_ISSET(handle, &input)) { if (FD_ISSET(handle, &input)) {
n = read(handle, buf, sizeof(buf)); n = read(handle, buf, sizeof(buf));
FIFO_Add(fifo, &buf[0], n); if (n > 0) {
FIFO_Add(fifo, &buf[0], n);
}
} }
} }
+5 -1
View File
@@ -299,6 +299,7 @@ bool tsm_get_transaction_pdu(
void tsm_timer_milliseconds(uint16_t milliseconds) void tsm_timer_milliseconds(uint16_t milliseconds)
{ {
unsigned i = 0; /* counter */ unsigned i = 0; /* counter */
int bytes_sent = 0;
BACNET_TSM_DATA *plist = &TSM_List[0]; BACNET_TSM_DATA *plist = &TSM_List[0];
@@ -314,13 +315,16 @@ void tsm_timer_milliseconds(uint16_t milliseconds)
if (plist->RetryCount < apdu_retries()) { if (plist->RetryCount < apdu_retries()) {
plist->RequestTimer = apdu_timeout(); plist->RequestTimer = apdu_timeout();
plist->RetryCount++; plist->RetryCount++;
datalink_send_pdu( bytes_sent = datalink_send_pdu(
&plist->dest, &plist->npdu_data, &plist->apdu[0], &plist->dest, &plist->npdu_data, &plist->apdu[0],
plist->apdu_len); plist->apdu_len);
DEBUG_PRINTF( DEBUG_PRINTF(
"invoke-id[%u] Retry %u of %u after %ums\n", "invoke-id[%u] Retry %u of %u after %ums\n",
plist->InvokeID, plist->RetryCount, apdu_retries(), plist->InvokeID, plist->RetryCount, apdu_retries(),
plist->RequestTimer); plist->RequestTimer);
if (bytes_sent <= 0) {
debug_perror("invoke-id[%u] Failed to Send Retry");
}
} else { } else {
/* note: the invoke id has not been cleared yet /* note: the invoke id has not been cleared yet
and this indicates a failed message: and this indicates a failed message:
+1 -1
View File
@@ -806,8 +806,8 @@ void dlenv_maintenance_timer(uint16_t elapsed_seconds)
statistics.receive_valid_frame_counter, statistics.receive_valid_frame_counter,
statistics.receive_invalid_frame_counter, statistics.receive_invalid_frame_counter,
statistics.transmit_frame_counter, statistics.transmit_frame_counter,
statistics.transmit_pdu_counter,
statistics.receive_pdu_counter, statistics.receive_pdu_counter,
statistics.transmit_pdu_counter,
statistics.lost_token_counter); statistics.lost_token_counter);
fflush(stderr); fflush(stderr);
#endif #endif