From 85ba0b23159d0b070ef2a4f8cbd1cca4f5b5bda8 Mon Sep 17 00:00:00 2001 From: Ryan Mulder Date: Tue, 22 Jul 2025 15:51:14 -0400 Subject: [PATCH] Linux MS/TP Fixes (#1051) * Linux MS/TP: * dlmstp_init: * Maximize thread priority if permitted * MSTP_Put_Receive/dlmstp_receive: * Use a buffer so incoming frames are not dropped during bursts * dlmstp_receive: * Make timeout optional * MSTP_Get_Reply: * Lock the mutex before using the PDU Queue * Iterate over all queued replies to find a match (instead of only checking the first) * Sleep for a millisecond before checking again to wait for the application provide a reply * RS485_Check_UART_Data: * Ensure waiter is initilized before using with select * Ensure bytes are read before adding to FIFO TSM: * Add log when `datalink_send_pdu` fails for consistency with all other `datalink_send_pdu` calls dlenv_maintenance_timer: * Fix Tx/Rx labels in MSTP statitistics print * don't log on queued requests --- ports/linux/dlmstp.c | 319 ++++++++++++++++++++++-------------- ports/linux/rs485.c | 13 +- src/bacnet/basic/tsm/tsm.c | 6 +- src/bacnet/datalink/dlenv.c | 2 +- 4 files changed, 204 insertions(+), 136 deletions(-) diff --git a/ports/linux/dlmstp.c b/ports/linux/dlmstp.c index 6be0eacb..00a1b4e3 100644 --- a/ports/linux/dlmstp.c +++ b/ports/linux/dlmstp.c @@ -29,7 +29,11 @@ #include "rs485.h" /* packet queues */ -static DLMSTP_PACKET Receive_Packet; +#ifndef MSTP_RECEIVE_PACKET_COUNT +#define MSTP_RECEIVE_PACKET_COUNT 8 +#endif +static DLMSTP_PACKET Receive_Buffer[MSTP_RECEIVE_PACKET_COUNT]; +static RING_BUFFER Receive_Queue; /* mechanism to wait for a packet */ static pthread_cond_t Receive_Packet_Flag; static pthread_mutex_t Receive_Packet_Mutex; @@ -124,6 +128,9 @@ int dlmstp_send_pdu( } } pthread_mutex_unlock(&Ring_Buffer_Mutex); + if (!pkt) { + debug_printf("DLMSTP: PDU Queue Full!\n"); + } return bytes_sent; } @@ -260,23 +267,17 @@ static bool dlmstp_compare_data_expecting_reply( reply.invoke_id = reply_pdu[offset + 1]; break; default: + /* A queued request, just look for another */ return false; } + if (request.invoke_id != reply.invoke_id) { + /* Normal to have multiple replies queued, just look for another */ + return false; + } /* these don't have service choice included */ - if ((reply.pdu_type == PDU_TYPE_REJECT) || - (reply.pdu_type == PDU_TYPE_ABORT) || - (reply.pdu_type == PDU_TYPE_SEGMENT_ACK)) { - if (request.invoke_id != reply.invoke_id) { - debug_printf("DLMSTP: DER Compare failed: " - "Invoke ID mismatch.\n"); - return false; - } - } else { - if (request.invoke_id != reply.invoke_id) { - debug_printf("DLMSTP: DER Compare failed: " - "Invoke ID mismatch.\n"); - return false; - } + if ((request.pdu_type != PDU_TYPE_REJECT) && + (request.pdu_type != PDU_TYPE_ABORT) && + (request.pdu_type != PDU_TYPE_SEGMENT_ACK)) { if (request.service_choice != reply.service_choice) { debug_printf("DLMSTP: DER Compare failed: " "Service choice mismatch.\n"); @@ -307,98 +308,6 @@ static bool dlmstp_compare_data_expecting_reply( return true; } -/** - * @brief The MS/TP state machine uses this function for getting data to send - * as the reply to a DATA_EXPECTING_REPLY frame, or nothing - * @param mstp_port MSTP port structure for this port - * @param timeout number of milliseconds to wait for a packet - * @return number of bytes, or 0 if no reply is available - */ -uint16_t MSTP_Get_Reply(struct mstp_port_struct_t *mstp_port, unsigned timeout) -{ - uint16_t pdu_len = 0; - bool matched = false; - uint8_t frame_type = 0; - struct mstp_pdu_packet *pkt; - - (void)timeout; - if (Ringbuf_Empty(&PDU_Queue)) { - return 0; - } - pkt = (struct mstp_pdu_packet *)Ringbuf_Peek(&PDU_Queue); - /* is this the reply to the DER? */ - matched = dlmstp_compare_data_expecting_reply( - &mstp_port->InputBuffer[0], mstp_port->DataLength, - mstp_port->SourceAddress, (uint8_t *)&pkt->buffer[0], pkt->length, - pkt->destination_mac); - if (!matched) { - return 0; - } - if (pkt->data_expecting_reply) { - frame_type = FRAME_TYPE_BACNET_DATA_EXPECTING_REPLY; - } else { - frame_type = FRAME_TYPE_BACNET_DATA_NOT_EXPECTING_REPLY; - } - /* convert the PDU into the MSTP Frame */ - pdu_len = MSTP_Create_Frame( - &mstp_port->OutputBuffer[0], /* <-- loading this */ - mstp_port->OutputBufferSize, frame_type, pkt->destination_mac, - mstp_port->This_Station, (uint8_t *)&pkt->buffer[0], pkt->length); - DLMSTP_Statistics.transmit_pdu_counter++; - (void)Ringbuf_Pop(&PDU_Queue, NULL); - - return pdu_len; -} - -/** - * @brief Send an MSTP frame - * @param mstp_port - port specific data - * @param buffer - data to send - * @param nbytes - number of bytes of data to send - */ -void MSTP_Send_Frame( - struct mstp_port_struct_t *mstp_port, - const uint8_t *buffer, - uint16_t nbytes) -{ - RS485_Send_Frame(mstp_port, buffer, nbytes); - DLMSTP_Statistics.transmit_frame_counter++; -} - -/** - * @brief MS/TP state machine received a frame - * @return number of bytes queued, or 0 if unable to be queued - */ -uint16_t MSTP_Put_Receive(struct mstp_port_struct_t *mstp_port) -{ - uint16_t pdu_len = 0; - - pthread_mutex_lock(&Receive_Packet_Mutex); - if (Receive_Packet.ready) { - debug_printf("MS/TP: Dropped! Not Ready.\n"); - } else { - /* bounds check - maybe this should send an abort? */ - pdu_len = mstp_port->DataLength; - if (pdu_len > sizeof(Receive_Packet.pdu)) { - pdu_len = sizeof(Receive_Packet.pdu); - } - if (pdu_len == 0) { - debug_printf("MS/TP: PDU Length is 0!\n"); - } - memmove( - (void *)&Receive_Packet.pdu[0], (void *)&mstp_port->InputBuffer[0], - pdu_len); - dlmstp_fill_bacnet_address( - &Receive_Packet.address, mstp_port->SourceAddress); - Receive_Packet.pdu_len = mstp_port->DataLength; - Receive_Packet.ready = true; - pthread_cond_signal(&Receive_Packet_Flag); - } - pthread_mutex_unlock(&Receive_Packet_Mutex); - - return pdu_len; -} - /** * Add a certain number of nanoseconds to the specified time. * @@ -438,13 +347,121 @@ static void get_abstime(struct timespec *abstime, unsigned long milliseconds) timespec_add_ns(abstime, 1000000 * milliseconds); } +static void millisleep(const unsigned long milliseconds) +{ + struct timespec abstime; + get_abstime(&abstime, milliseconds); + while (EINTR == + clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &abstime, NULL)) { } +} + +/** + * @brief The MS/TP state machine uses this function for getting data to send + * as the reply to a DATA_EXPECTING_REPLY frame, or nothing + * @param mstp_port MSTP port structure for this port + * @param timeout number of milliseconds to wait for a packet + * @return number of bytes, or 0 if no reply is available + */ +uint16_t MSTP_Get_Reply(struct mstp_port_struct_t *mstp_port, unsigned timeout) +{ + uint16_t pdu_len = 0; + bool matched = false; + uint8_t frame_type = 0; + struct mstp_pdu_packet *pkt; + (void)timeout; + + pthread_mutex_lock(&Ring_Buffer_Mutex); + for (pkt = (struct mstp_pdu_packet *)Ringbuf_Peek(&PDU_Queue); pkt; + pkt = (struct mstp_pdu_packet *)Ringbuf_Peek_Next( + &PDU_Queue, (uint8_t *)pkt)) { + /* is this the reply to the DER? */ + matched = dlmstp_compare_data_expecting_reply( + &mstp_port->InputBuffer[0], mstp_port->DataLength, + mstp_port->SourceAddress, (uint8_t *)&pkt->buffer[0], pkt->length, + pkt->destination_mac); + if (matched) { + break; + } + } + if (matched) { + if (pkt->data_expecting_reply) { + frame_type = FRAME_TYPE_BACNET_DATA_EXPECTING_REPLY; + } else { + frame_type = FRAME_TYPE_BACNET_DATA_NOT_EXPECTING_REPLY; + } + /* convert the PDU into the MSTP Frame */ + pdu_len = MSTP_Create_Frame( + &mstp_port->OutputBuffer[0], /* <-- loading this */ + mstp_port->OutputBufferSize, frame_type, pkt->destination_mac, + mstp_port->This_Station, (uint8_t *)&pkt->buffer[0], pkt->length); + DLMSTP_Statistics.transmit_pdu_counter++; + /* This will pop the element no matter where we found it */ + (void)Ringbuf_Pop_Element(&PDU_Queue, (uint8_t *)pkt, NULL); + } + pthread_mutex_unlock(&Ring_Buffer_Mutex); + if (pdu_len <= 0) { + /* Didn't find a match so wait for application layer to provide one */ + millisleep(1); + } + + return pdu_len; +} + +/** + * @brief Send an MSTP frame + * @param mstp_port - port specific data + * @param buffer - data to send + * @param nbytes - number of bytes of data to send + */ +void MSTP_Send_Frame( + struct mstp_port_struct_t *mstp_port, + const uint8_t *buffer, + uint16_t nbytes) +{ + RS485_Send_Frame(mstp_port, buffer, nbytes); + DLMSTP_Statistics.transmit_frame_counter++; +} + +/** + * @brief MS/TP state machine received a frame + * @return number of bytes queued, or 0 if unable to be queued + */ +uint16_t MSTP_Put_Receive(struct mstp_port_struct_t *mstp_port) +{ + uint16_t pdu_len = 0; + + pthread_mutex_lock(&Receive_Packet_Mutex); + DLMSTP_PACKET *pkt = (DLMSTP_PACKET *)Ringbuf_Data_Peek(&Receive_Queue); + if (!pkt) { + debug_printf("MS/TP: Dropped! Not Ready.\n"); + } else { + /* bounds check - maybe this should send an abort? */ + pdu_len = mstp_port->DataLength; + if (pdu_len > sizeof(pkt->pdu)) { + pdu_len = sizeof(pkt->pdu); + } + if (pdu_len == 0) { + debug_printf("MS/TP: PDU Length is 0!\n"); + } + memmove( + (void *)&pkt->pdu[0], (void *)&mstp_port->InputBuffer[0], pdu_len); + dlmstp_fill_bacnet_address(&pkt->address, mstp_port->SourceAddress); + pkt->pdu_len = mstp_port->DataLength; + pkt->ready = true; + if (Ringbuf_Data_Put(&Receive_Queue, (uint8_t *)pkt)) { + pthread_cond_signal(&Receive_Packet_Flag); + } + } + pthread_mutex_unlock(&Receive_Packet_Mutex); + + return pdu_len; +} + /** * @brief Run the MS/TP state machines, and get packet if available * @param pdu - place to put PDU data for the caller * @param max_pdu - number of bytes of PDU data that caller can receive * @return number of bytes in received packet, or 0 if no packet was received - * @note Must be called at least once every 1 milliseconds, with no more than - * 5 milliseconds jitter. */ uint16_t dlmstp_receive( BACNET_ADDRESS *src, /* source address */ @@ -454,28 +471,32 @@ uint16_t dlmstp_receive( { /* milliseconds to wait for a packet */ uint16_t pdu_len = 0; struct timespec abstime; - + DLMSTP_PACKET *pkt; (void)max_pdu; + + pthread_mutex_lock(&Receive_Packet_Mutex); + if (timeout > 0) { + get_abstime(&abstime, timeout); + pthread_cond_timedwait( + &Receive_Packet_Flag, &Receive_Packet_Mutex, &abstime); + } + /* see if there is a packet available, and a place to put the reply (if necessary) and process it */ - pthread_mutex_lock(&Receive_Packet_Mutex); - get_abstime(&abstime, timeout); - pthread_cond_timedwait( - &Receive_Packet_Flag, &Receive_Packet_Mutex, &abstime); - if (Receive_Packet.ready) { - if (Receive_Packet.pdu_len) { + pkt = (DLMSTP_PACKET *)Ringbuf_Peek(&Receive_Queue); + if (pkt) { + if (pkt->pdu_len) { DLMSTP_Statistics.receive_pdu_counter++; if (src) { - memmove( - src, &Receive_Packet.address, - sizeof(Receive_Packet.address)); + memmove(src, &pkt->address, sizeof(pkt->address)); } if (pdu) { - memmove(pdu, &Receive_Packet.pdu, sizeof(Receive_Packet.pdu)); + memmove(pdu, &pkt->pdu, sizeof(pkt->pdu)); } - pdu_len = Receive_Packet.pdu_len; + pdu_len = pkt->pdu_len; } - Receive_Packet.ready = false; + pkt->ready = false; + (void)Ringbuf_Pop(&Receive_Queue, NULL); } pthread_mutex_unlock(&Receive_Packet_Mutex); @@ -979,6 +1000,8 @@ void dlmstp_silence_reset(void *arg) */ bool dlmstp_init(char *ifname) { + pthread_attr_t thread_attr; + struct sched_param sch_param; pthread_condattr_t attr; int rv = 0; @@ -1002,8 +1025,9 @@ bool dlmstp_init(char *ifname) &PDU_Queue, (uint8_t *)&PDU_Buffer, sizeof(struct mstp_pdu_packet), MSTP_PDU_PACKET_COUNT); /* initialize packet queue */ - Receive_Packet.ready = false; - Receive_Packet.pdu_len = 0; + Ringbuf_Init( + &Receive_Queue, (uint8_t *)&Receive_Buffer, sizeof(DLMSTP_PACKET), + MSTP_RECEIVE_PACKET_COUNT); rv = pthread_cond_init(&Receive_Packet_Flag, &attr); if (rv != 0) { fprintf( @@ -1065,11 +1089,54 @@ bool dlmstp_init(char *ifname) (MSTP_Port.CheckAutoBaud ? "true" : "false")); fflush(stderr); #endif + pthread_attr_init(&thread_attr); + + // Set scheduling policy to SCHED_FIFO and priority + rv = pthread_attr_setinheritsched(&thread_attr, PTHREAD_EXPLICIT_SCHED); + if (rv != 0) { + fprintf( + stderr, + "MS/TP Interface: %s\n cannot setup thread schedule to " + "explicit.\n", + ifname); + exit(1); + } + rv = pthread_attr_setschedpolicy(&thread_attr, SCHED_FIFO); + if (rv != 0) { + fprintf( + stderr, + "MS/TP Interface: %s\n cannot setup thread schedule policy to " + "FIFO.\n", + ifname); + exit(1); + } + sch_param.sched_priority = 99; + rv = pthread_attr_setschedparam(&thread_attr, &sch_param); + if (rv != 0) { + fprintf( + stderr, "MS/TP Interface: %s\n cannot setup thread priority.\n", + ifname); + exit(1); + } /* start one thread */ Thread_Run = true; - rv = pthread_create(&hThread, NULL, dlmstp_thread, NULL); + rv = pthread_create(&hThread, &thread_attr, dlmstp_thread, NULL); + if (rv == EPERM) { + fprintf( + stdout, + "MS/TP Interface: %s\n" + " Insufficient permissions to create thread with priority.\n" + " A thread without priority will be created.\n" + " Run this executable as a user with thread priority permission\n" + " or grant capability with \"setcap 'cap_sys_nice=eip'\"", + ifname); + rv = pthread_create(&hThread, NULL, dlmstp_thread, NULL); + } if (rv != 0) { - fprintf(stderr, "Failed to start MS/TP thread\n"); + fprintf( + stderr, "MS/TP Interface: %s\n Failed to start MS/TP thread.\n", + ifname); + exit(1); } return true; diff --git a/ports/linux/rs485.c b/ports/linux/rs485.c index 4d9caafd..dacf6562 100644 --- a/ports/linux/rs485.c +++ b/ports/linux/rs485.c @@ -249,6 +249,8 @@ void RS485_Check_UART_Data(struct mstp_port_struct_t *mstp_port) ssize_t n; int handle = RS485_Handle; FIFO_BUFFER *fifo = &Rx_FIFO; + waiter.tv_sec = 0; + waiter.tv_usec = 5000; SHARED_MSTP_DATA *poSharedData = (SHARED_MSTP_DATA *)mstp_port->UserData; if (poSharedData) { @@ -258,9 +260,6 @@ void RS485_Check_UART_Data(struct mstp_port_struct_t *mstp_port) if (mstp_port->ReceiveError == true) { /* do nothing but wait for state machine to clear the error */ - /* burning time, so wait a longer time */ - waiter.tv_sec = 0; - waiter.tv_usec = 5000; } else if (mstp_port->DataAvailable == false) { /* wait for state machine to read from the DataRegister */ if (FIFO_Count(fifo) > 0) { @@ -270,10 +269,6 @@ void RS485_Check_UART_Data(struct mstp_port_struct_t *mstp_port) /* FIFO is giving data - just poll */ waiter.tv_sec = 0; waiter.tv_usec = 0; - } else { - /* FIFO is empty - wait a longer time */ - waiter.tv_sec = 0; - waiter.tv_usec = 5000; } } /* grab bytes and stuff them into the FIFO every time */ @@ -285,7 +280,9 @@ void RS485_Check_UART_Data(struct mstp_port_struct_t *mstp_port) } if (FD_ISSET(handle, &input)) { n = read(handle, buf, sizeof(buf)); - FIFO_Add(fifo, &buf[0], n); + if (n > 0) { + FIFO_Add(fifo, &buf[0], n); + } } } diff --git a/src/bacnet/basic/tsm/tsm.c b/src/bacnet/basic/tsm/tsm.c index 707ec53a..43b3b85c 100644 --- a/src/bacnet/basic/tsm/tsm.c +++ b/src/bacnet/basic/tsm/tsm.c @@ -299,6 +299,7 @@ bool tsm_get_transaction_pdu( void tsm_timer_milliseconds(uint16_t milliseconds) { unsigned i = 0; /* counter */ + int bytes_sent = 0; BACNET_TSM_DATA *plist = &TSM_List[0]; @@ -314,13 +315,16 @@ void tsm_timer_milliseconds(uint16_t milliseconds) if (plist->RetryCount < apdu_retries()) { plist->RequestTimer = apdu_timeout(); plist->RetryCount++; - datalink_send_pdu( + bytes_sent = datalink_send_pdu( &plist->dest, &plist->npdu_data, &plist->apdu[0], plist->apdu_len); DEBUG_PRINTF( "invoke-id[%u] Retry %u of %u after %ums\n", plist->InvokeID, plist->RetryCount, apdu_retries(), plist->RequestTimer); + if (bytes_sent <= 0) { + debug_perror("invoke-id[%u] Failed to Send Retry"); + } } else { /* note: the invoke id has not been cleared yet and this indicates a failed message: diff --git a/src/bacnet/datalink/dlenv.c b/src/bacnet/datalink/dlenv.c index 9bf014ed..70e4b213 100644 --- a/src/bacnet/datalink/dlenv.c +++ b/src/bacnet/datalink/dlenv.c @@ -806,8 +806,8 @@ void dlenv_maintenance_timer(uint16_t elapsed_seconds) statistics.receive_valid_frame_counter, statistics.receive_invalid_frame_counter, statistics.transmit_frame_counter, - statistics.transmit_pdu_counter, statistics.receive_pdu_counter, + statistics.transmit_pdu_counter, statistics.lost_token_counter); fflush(stderr); #endif