diff options
Diffstat (limited to 'klippy/serialqueue.c')
-rw-r--r-- | klippy/serialqueue.c | 1021 |
1 files changed, 1021 insertions, 0 deletions
diff --git a/klippy/serialqueue.c b/klippy/serialqueue.c new file mode 100644 index 000000000..fa75701d1 --- /dev/null +++ b/klippy/serialqueue.c @@ -0,0 +1,1021 @@ +// Serial port command queuing +// +// Copyright (C) 2016 Kevin O'Connor <kevin@koconnor.net> +// +// This file may be distributed under the terms of the GNU GPLv3 license. +// +// This goal of this code is to handle low-level serial port +// communications with a microcontroller (mcu). This code is written +// in C (instead of python) to reduce communication latencies and to +// reduce scheduling jitter. The code queues messages to be +// transmitted, schedules transmission of commands at specified mcu +// clock times, prioritizes commands, and handles retransmissions. A +// background thread is launched to do this work and minimize latency. + +#include <errno.h> // errno +#include <math.h> // ceil +#include <poll.h> // poll +#include <pthread.h> // pthread_mutex_lock +#include <stddef.h> // offsetof +#include <stdint.h> // uint64_t +#include <stdio.h> // snprintf +#include <stdlib.h> // malloc +#include <string.h> // memset +#include <sys/time.h> // gettimeofday +#include <time.h> // struct timespec +#include <termios.h> // tcflush +#include <unistd.h> // pipe +#include "list.h" // list_add_tail +#include "serialqueue.h" // struct queue_message + + +/**************************************************************** + * Helper functions + ****************************************************************/ + +// Return the current system time as a double +static double +get_time(void) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return (double)tv.tv_sec + (double)tv.tv_usec / 1000000.; +} + +#if 0 +// Fill a 'struct timespec' with a system time stored in a double +struct timespec +fill_time(double time) +{ + time_t t = time; + return (struct timespec) {t, (time - t)*1000000000. }; +} +#endif + +// Report 'errno' in a message written to stderr +void +report_errno(char *where, int rc) +{ + int e = errno; + fprintf(stderr, "Got error %d in %s: (%d)%s\n", rc, where, e, strerror(e)); +} + + +/**************************************************************** + * Poll reactor + ****************************************************************/ + +// The 'poll reactor' code is a mechanism for dispatching timer and +// file descriptor events. + +#define PR_NOW 0. +#define PR_NEVER 9999999999999999. + +struct pollreactor_timer { + double waketime; + double (*callback)(void *data, double eventtime); +}; + +struct pollreactor { + int num_fds, num_timers, must_exit; + void *callback_data; + double next_timer; + struct pollfd *fds; + void (**fd_callbacks)(void *data, double eventtime); + struct pollreactor_timer *timers; +}; + +// Allocate a new 'struct pollreactor' object +static void +pollreactor_setup(struct pollreactor *pr, int num_fds, int num_timers + , void *callback_data) +{ + pr->num_fds = num_fds; + pr->num_timers = num_timers; + pr->must_exit = 0; + pr->callback_data = callback_data; + pr->next_timer = PR_NEVER; + pr->fds = malloc(num_fds * sizeof(*pr->fds)); + memset(pr->fds, 0, num_fds * sizeof(*pr->fds)); + pr->fd_callbacks = malloc(num_fds * sizeof(*pr->fd_callbacks)); + memset(pr->fd_callbacks, 0, num_fds * sizeof(*pr->fd_callbacks)); + pr->timers = malloc(num_timers * sizeof(*pr->timers)); + memset(pr->timers, 0, num_timers * sizeof(*pr->timers)); + int i; + for (i=0; i<num_timers; i++) + pr->timers[i].waketime = PR_NEVER; +} + +// Add a callback for when a file descriptor (fd) becomes readable +static void +pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback) +{ + pr->fds[pos].fd = fd; + pr->fds[pos].events = POLLIN|POLLHUP; + pr->fds[pos].revents = 0; + pr->fd_callbacks[pos] = callback; +} + +// Add a timer callback +static void +pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback) +{ + pr->timers[pos].callback = callback; + pr->timers[pos].waketime = PR_NEVER; +} + +#if 0 +// Return the last schedule wake-up time for a timer +static double +pollreactor_get_timer(struct pollreactor *pr, int pos) +{ + return pr->timers[pos].waketime; +} +#endif + +// Set the wake-up time for a given timer +static void +pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime) +{ + pr->timers[pos].waketime = waketime; + if (waketime < pr->next_timer) + pr->next_timer = waketime; +} + +// Internal code to invoke timer callbacks +static int +pollreactor_check_timers(struct pollreactor *pr, double eventtime) +{ + if (eventtime >= pr->next_timer) { + pr->next_timer = PR_NEVER; + int i; + for (i=0; i<pr->num_timers; i++) { + struct pollreactor_timer *timer = &pr->timers[i]; + double t = timer->waketime; + if (eventtime >= t) { + t = timer->callback(pr->callback_data, eventtime); + timer->waketime = t; + } + if (t < pr->next_timer) + pr->next_timer = t; + } + if (eventtime >= pr->next_timer) + return 0; + } + double timeout = ceil((pr->next_timer - eventtime) * 1000.); + return timeout < 1. ? 1 : (timeout > 1000. ? 1000 : (int)timeout); +} + +// Repeatedly check for timer and fd events and invoke their callbacks +static void +pollreactor_run(struct pollreactor *pr) +{ + pr->must_exit = 0; + double eventtime = get_time(); + while (! pr->must_exit) { + int timeout = pollreactor_check_timers(pr, eventtime); + int ret = poll(pr->fds, pr->num_fds, timeout); + eventtime = get_time(); + if (ret > 0) { + int i; + for (i=0; i<pr->num_fds; i++) + if (pr->fds[i].revents) + pr->fd_callbacks[i](pr->callback_data, eventtime); + } else if (ret < 0) { + report_errno("poll", ret); + pr->must_exit = 1; + } + } +} + +// Request that a currently running pollreactor_run() loop exit +static void +pollreactor_do_exit(struct pollreactor *pr) +{ + pr->must_exit = 1; +} + +// Check if a pollreactor_run() loop has been requested to exit +static int +pollreactor_is_exit(struct pollreactor *pr) +{ + return pr->must_exit; +} + + +/**************************************************************** + * Serial protocol helpers + ****************************************************************/ + +// Implement the standard crc "ccitt" algorithm on the given buffer +static uint16_t +crc16_ccitt(uint8_t *buf, uint8_t len) +{ + uint16_t crc = 0xffff; + while (len--) { + uint8_t data = *buf++; + data ^= crc & 0xff; + data ^= data << 4; + crc = ((((uint16_t)data << 8) | (crc >> 8)) ^ (uint8_t)(data >> 4) + ^ ((uint16_t)data << 3)); + } + return crc; +} + +// Verify a buffer starts with a valid mcu message +static int +check_message(uint8_t *need_sync, uint8_t *buf, int buf_len) +{ + if (buf_len < MESSAGE_MIN) + // Need more data + return 0; + if (*need_sync) + goto error; + uint8_t msglen = buf[MESSAGE_POS_LEN]; + if (msglen < MESSAGE_MIN || msglen > MESSAGE_MAX) + goto error; + uint8_t msgseq = buf[MESSAGE_POS_SEQ]; + if ((msgseq & ~MESSAGE_SEQ_MASK) != MESSAGE_DEST) + goto error; + if (buf_len < msglen) + // Need more data + return 0; + if (buf[msglen-MESSAGE_TRAILER_SYNC] != MESSAGE_SYNC) + goto error; + uint16_t msgcrc = ((buf[msglen-MESSAGE_TRAILER_CRC] << 8) + | (uint8_t)buf[msglen-MESSAGE_TRAILER_CRC+1]); + uint16_t crc = crc16_ccitt(buf, msglen-MESSAGE_TRAILER_SIZE); + if (crc != msgcrc) + goto error; + return msglen; + +error: ; + // Discard bytes until next SYNC found + uint8_t *next_sync = memchr(buf, MESSAGE_SYNC, buf_len); + if (next_sync) { + *need_sync = 0; + return -(next_sync - buf + 1); + } + *need_sync = 1; + return -buf_len; +} + +// Encode an integer as a variable length quantity (vlq) +static uint8_t * +encode_int(uint8_t *p, uint32_t v) +{ + int32_t sv = v; + if (sv < (3L<<5) && sv >= -(1L<<5)) goto f4; + if (sv < (3L<<12) && sv >= -(1L<<12)) goto f3; + if (sv < (3L<<19) && sv >= -(1L<<19)) goto f2; + if (sv < (3L<<26) && sv >= -(1L<<26)) goto f1; + *p++ = (v>>28) | 0x80; +f1: *p++ = ((v>>21) & 0x7f) | 0x80; +f2: *p++ = ((v>>14) & 0x7f) | 0x80; +f3: *p++ = ((v>>7) & 0x7f) | 0x80; +f4: *p++ = v & 0x7f; + return p; +} + + +/**************************************************************** + * Command queues + ****************************************************************/ + +struct command_queue { + struct list_head stalled_queue, ready_queue; + struct list_node node; +}; + +// Allocate a 'struct queue_message' object +static struct queue_message * +message_alloc(void) +{ + struct queue_message *qm = malloc(sizeof(*qm)); + memset(qm, 0, sizeof(*qm)); + return qm; +} + +// Allocate a queue_message and fill it with the specified data +static struct queue_message * +message_fill(uint8_t *data, int len) +{ + struct queue_message *qm = message_alloc(); + memcpy(qm->msg, data, len); + qm->len = len; + return qm; +} + +// Allocate a queue_message and fill it with a series of encoded vlq integers +struct queue_message * +message_alloc_and_encode(uint32_t *data, int len) +{ + struct queue_message *qm = message_alloc(); + int i; + uint8_t *p = qm->msg; + for (i=0; i<len; i++) { + p = encode_int(p, data[i]); + if (p > &qm->msg[MESSAGE_PAYLOAD_MAX]) + goto fail; + } + qm->len = p - qm->msg; + return qm; + +fail: + fprintf(stderr, "Encode error\n"); + qm->len = 0; + return qm; +} + +// Free the storage from a previous message_alloc() call +static void +message_free(struct queue_message *qm) +{ + free(qm); +} + + +/**************************************************************** + * Serialqueue interface + ****************************************************************/ + +struct serialqueue { + // Input reading + struct pollreactor pr; + int serial_fd; + int pipe_fds[2]; + uint8_t input_buf[4096]; + uint8_t need_sync; + int input_pos; + // Threading + pthread_t tid; + pthread_mutex_t lock; // protects variables below + pthread_cond_t cond; + int receive_waiting; + // Baud / clock tracking + double baud_adjust, idle_time; + double est_clock, last_ack_time; + uint64_t last_ack_clock; + double last_receive_sent_time; + // Retransmit support + uint64_t send_seq, receive_seq; + uint64_t retransmit_seq, rtt_sample_seq; + struct list_head sent_queue; + double srtt, rttvar, rto; + // Pending transmission message queues + struct list_head pending_queues; + int ready_bytes, stalled_bytes; + uint64_t need_kick_clock; + int can_delay_writes; + // Received messages + struct list_head receive_queue; + // Debugging + struct list_head old_sent, old_receive; + // Stats + uint32_t bytes_write, bytes_read, bytes_retransmit, bytes_invalid; +}; + +#define SQPF_SERIAL 0 +#define SQPF_PIPE 1 +#define SQPF_NUM 2 + +#define SQPT_RETRANSMIT 0 +#define SQPT_COMMAND 1 +#define SQPT_NUM 2 + +#define MIN_RTO 0.025 +#define MAX_RTO 5.000 +#define MAX_SERIAL_BUFFER 0.050 +#define MIN_REQTIME_DELTA 0.250 +#define IDLE_QUERY_TIME 1.0 + +#define DEBUG_QUEUE_SENT 100 +#define DEBUG_QUEUE_RECEIVE 20 + +// Create a series of empty messages and add them to a list +static void +debug_queue_alloc(struct list_head *root, int count) +{ + int i; + for (i=0; i<count; i++) { + struct queue_message *qm = message_alloc(); + list_add_head(&qm->node, root); + } +} + +// Copy a message to a debug queue and free old debug messages +static void +debug_queue_add(struct list_head *root, struct queue_message *qm) +{ + list_add_tail(&qm->node, root); + struct queue_message *old = list_first_entry( + root, struct queue_message, node); + list_del(&old->node); + message_free(old); +} + +// Wake up the receiver thread if it is waiting +static void +check_wake_receive(struct serialqueue *sq) +{ + if (sq->receive_waiting) { + sq->receive_waiting = 0; + pthread_cond_signal(&sq->cond); + } +} + +// Update internal state when the receive sequence increases +static void +update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq) +{ + // Remove from sent queue + int ack_count = rseq - sq->receive_seq; + uint64_t sent_seq = sq->receive_seq; + while (!list_empty(&sq->sent_queue) && ack_count--) { + struct queue_message *sent = list_first_entry( + &sq->sent_queue, struct queue_message, node); + if (rseq == ++sent_seq) + sq->last_receive_sent_time = sent->receive_time; + list_del(&sent->node); + debug_queue_add(&sq->old_sent, sent); + } + sq->receive_seq = rseq; + if (rseq > sq->send_seq) + sq->send_seq = rseq; + pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW); + + // Update retransmit info + if (sq->rtt_sample_seq && rseq >= sq->rtt_sample_seq + && sq->last_receive_sent_time) { + // RFC6298 rtt calculations + double delta = eventtime - sq->last_receive_sent_time; + if (!sq->srtt) { + sq->rttvar = delta / 2.0; + sq->srtt = delta * 10.0; // use a higher start default + } else { + sq->rttvar = (3.0 * sq->rttvar + fabs(sq->srtt - delta)) / 4.0; + sq->srtt = (7.0 * sq->srtt + delta) / 8.0; + } + double rttvar4 = sq->rttvar * 4.0; + if (rttvar4 < 0.001) + rttvar4 = 0.001; + sq->rto = sq->srtt + rttvar4; + if (sq->rto < MIN_RTO) + sq->rto = MIN_RTO; + else if (sq->rto > MAX_RTO) + sq->rto = MAX_RTO; + sq->rtt_sample_seq = 0; + } + if (list_empty(&sq->sent_queue)) { + pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NEVER); + } else { + struct queue_message *sent = list_first_entry( + &sq->sent_queue, struct queue_message, node); + double nr = eventtime + sq->rto + sent->len * sq->baud_adjust; + pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, nr); + } +} + +// Process a well formed input message +static void +handle_message(struct serialqueue *sq, double eventtime, int len) +{ + // Calculate receive sequence number + uint64_t rseq = ((sq->receive_seq & ~MESSAGE_SEQ_MASK) + | (sq->input_buf[MESSAGE_POS_SEQ] & MESSAGE_SEQ_MASK)); + if (rseq < sq->receive_seq) + rseq += MESSAGE_SEQ_MASK+1; + + if (rseq != sq->receive_seq) + // New sequence number + update_receive_seq(sq, eventtime, rseq); + else if (len == MESSAGE_MIN && rseq > sq->retransmit_seq) + // Duplicate sequence number in an empty message is a nak + pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW); + + if (len > MESSAGE_MIN) { + // Add message to receive queue + struct queue_message *qm = message_fill(sq->input_buf, len); + qm->sent_time = sq->last_receive_sent_time; + qm->receive_time = eventtime; + list_add_tail(&qm->node, &sq->receive_queue); + check_wake_receive(sq); + } +} + +// Callback for input activity on the serial fd +static void +input_event(struct serialqueue *sq, double eventtime) +{ + int ret = read(sq->serial_fd, &sq->input_buf[sq->input_pos] + , sizeof(sq->input_buf) - sq->input_pos); + if (ret <= 0) { + report_errno("read", ret); + pollreactor_do_exit(&sq->pr); + return; + } + sq->input_pos += ret; + for (;;) { + ret = check_message(&sq->need_sync, sq->input_buf, sq->input_pos); + if (!ret) + // Need more data + return; + if (ret > 0) { + // Received a valid message + pthread_mutex_lock(&sq->lock); + handle_message(sq, eventtime, ret); + sq->bytes_read += ret; + pthread_mutex_unlock(&sq->lock); + } else { + // Skip bad data at beginning of input + ret = -ret; + pthread_mutex_lock(&sq->lock); + sq->bytes_invalid += ret; + pthread_mutex_unlock(&sq->lock); + } + sq->input_pos -= ret; + if (sq->input_pos) + memmove(sq->input_buf, &sq->input_buf[ret], sq->input_pos); + } +} + +// Callback for input activity on the pipe fd (wakes command_event) +static void +kick_event(struct serialqueue *sq, double eventtime) +{ + char dummy[4096]; + int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy)); + if (ret < 0) + report_errno("pipe read", ret); + pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW); +} + +// Callback timer for when a retransmit should be done +static double +retransmit_event(struct serialqueue *sq, double eventtime) +{ + int ret = tcflush(sq->serial_fd, TCOFLUSH); + if (ret < 0) + report_errno("tcflush", ret); + + pthread_mutex_lock(&sq->lock); + + // Retransmit all pending messages + uint8_t buf[MESSAGE_MAX * MESSAGE_SEQ_MASK + 1]; + int buflen = 0; + buf[buflen++] = MESSAGE_SYNC; + struct queue_message *qm; + list_for_each_entry(qm, &sq->sent_queue, node) { + memcpy(&buf[buflen], qm->msg, qm->len); + buflen += qm->len; + } + ret = write(sq->serial_fd, buf, buflen); + if (ret < 0) + report_errno("retransmit write", ret); + sq->bytes_retransmit += buflen; + + // Update rto + sq->rto *= 2.0; + if (sq->rto > MAX_RTO) + sq->rto = MAX_RTO; + sq->retransmit_seq = sq->send_seq; + sq->rtt_sample_seq = 0; + sq->idle_time = eventtime + buflen * sq->baud_adjust; + double waketime = sq->idle_time + sq->rto; + + pthread_mutex_unlock(&sq->lock); + return waketime; +} + +// Construct a block of data and send to the serial port +static void +build_and_send_command(struct serialqueue *sq, double eventtime) +{ + struct queue_message *out = message_alloc(); + out->len = MESSAGE_HEADER_SIZE; + + while (sq->ready_bytes) { + // Find highest priority message (message with lowest req_clock) + uint64_t min_clock = MAX_CLOCK; + struct command_queue *q, *cq = NULL; + struct queue_message *qm = NULL; + list_for_each_entry(q, &sq->pending_queues, node) { + if (!list_empty(&q->ready_queue)) { + struct queue_message *m = list_first_entry( + &q->ready_queue, struct queue_message, node); + if (m->req_clock < min_clock) { + min_clock = m->req_clock; + cq = q; + qm = m; + } + } + } + // Append message to outgoing command + if (out->len + qm->len > sizeof(out->msg) - MESSAGE_TRAILER_SIZE) + break; + list_del(&qm->node); + if (list_empty(&cq->ready_queue) && list_empty(&cq->stalled_queue)) + list_del(&cq->node); + memcpy(&out->msg[out->len], qm->msg, qm->len); + out->len += qm->len; + sq->ready_bytes -= qm->len; + message_free(qm); + } + + // Fill header / trailer + out->len += MESSAGE_TRAILER_SIZE; + out->msg[MESSAGE_POS_LEN] = out->len; + out->msg[MESSAGE_POS_SEQ] = MESSAGE_DEST | (sq->send_seq & MESSAGE_SEQ_MASK); + uint16_t crc = crc16_ccitt(out->msg, out->len - MESSAGE_TRAILER_SIZE); + out->msg[out->len - MESSAGE_TRAILER_CRC] = crc >> 8; + out->msg[out->len - MESSAGE_TRAILER_CRC+1] = crc & 0xff; + out->msg[out->len - MESSAGE_TRAILER_SYNC] = MESSAGE_SYNC; + + // Send message + int ret = write(sq->serial_fd, out->msg, out->len); + if (ret < 0) + report_errno("write", ret); + sq->bytes_write += out->len; + if (eventtime > sq->idle_time) + sq->idle_time = eventtime; + sq->idle_time += out->len * sq->baud_adjust; + out->sent_time = eventtime; + out->receive_time = sq->idle_time; + if (list_empty(&sq->sent_queue)) + pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT + , sq->idle_time + sq->rto); + sq->send_seq++; + if (!sq->rtt_sample_seq) + sq->rtt_sample_seq = sq->send_seq; + list_add_tail(&out->node, &sq->sent_queue); +} + +// Determine the time the next serial data should be sent +static double +check_send_command(struct serialqueue *sq, double eventtime) +{ + if (eventtime < sq->idle_time - MAX_SERIAL_BUFFER) + // Serial port already busy + return sq->idle_time - MAX_SERIAL_BUFFER; + if (sq->send_seq - sq->receive_seq >= MESSAGE_SEQ_MASK + && sq->receive_seq != (uint64_t)-1) + // Need an ack before more messages can be sent + return PR_NEVER; + + // Check for stalled messages now ready + double idletime = eventtime > sq->idle_time ? eventtime : sq->idle_time; + idletime += MESSAGE_MIN * sq->baud_adjust; + double timedelta = idletime - sq->last_ack_time; + uint64_t ack_clock = (uint64_t)(timedelta * sq->est_clock) + sq->last_ack_clock; + uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK; + struct command_queue *cq; + list_for_each_entry(cq, &sq->pending_queues, node) { + // Move messages from the stalled_queue to the ready_queue + while (!list_empty(&cq->stalled_queue)) { + struct queue_message *qm = list_first_entry( + &cq->stalled_queue, struct queue_message, node); + if (ack_clock < qm->min_clock) { + if (qm->min_clock < min_stalled_clock) + min_stalled_clock = qm->min_clock; + break; + } + list_del(&qm->node); + list_add_tail(&qm->node, &cq->ready_queue); + sq->stalled_bytes -= qm->len; + sq->ready_bytes += qm->len; + } + // Update min_ready_clock + if (!list_empty(&cq->ready_queue)) { + struct queue_message *qm = list_first_entry( + &cq->ready_queue, struct queue_message, node); + if (qm->req_clock < min_ready_clock) + min_ready_clock = qm->req_clock; + } + } + + // Check for messages to send + if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX) + return PR_NOW; + if (! sq->can_delay_writes) { + if (sq->ready_bytes) + return PR_NOW; + if (sq->est_clock) + sq->can_delay_writes = 1; + sq->need_kick_clock = MAX_CLOCK; + return PR_NEVER; + } + uint64_t reqclock_delta = MIN_REQTIME_DELTA * sq->est_clock; + if (min_ready_clock <= ack_clock + reqclock_delta) + return PR_NOW; + uint64_t wantclock = min_ready_clock - reqclock_delta; + if (min_stalled_clock < wantclock) + wantclock = min_stalled_clock; + sq->need_kick_clock = wantclock; + return idletime + (wantclock - ack_clock) / sq->est_clock; +} + +// Callback timer to send data to the serial port +static double +command_event(struct serialqueue *sq, double eventtime) +{ + pthread_mutex_lock(&sq->lock); + double waketime; + for (;;) { + waketime = check_send_command(sq, eventtime); + if (waketime != PR_NOW) + break; + build_and_send_command(sq, eventtime); + } + pthread_mutex_unlock(&sq->lock); + return waketime; +} + +// Main background thread for reading/writing to serial port +static void * +background_thread(void *data) +{ + struct serialqueue *sq = data; + pollreactor_run(&sq->pr); + + pthread_mutex_lock(&sq->lock); + check_wake_receive(sq); + pthread_mutex_unlock(&sq->lock); + + return NULL; +} + +// Create a new 'struct serialqueue' object +struct serialqueue * +serialqueue_alloc(int serial_fd, double baud_adjust, int write_only) +{ + struct serialqueue *sq = malloc(sizeof(*sq)); + memset(sq, 0, sizeof(*sq)); + sq->baud_adjust = baud_adjust; + + // Reactor setup + sq->serial_fd = serial_fd; + int ret = pipe(sq->pipe_fds); + if (ret) + goto fail; + pollreactor_setup(&sq->pr, SQPF_NUM, SQPT_NUM, sq); + if (!write_only) + pollreactor_add_fd(&sq->pr, SQPF_SERIAL, serial_fd, input_event); + pollreactor_add_fd(&sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event); + pollreactor_add_timer(&sq->pr, SQPT_RETRANSMIT, retransmit_event); + pollreactor_add_timer(&sq->pr, SQPT_COMMAND, command_event); + + // Retransmit setup + sq->send_seq = 1; + if (write_only) { + sq->receive_seq = -1; + sq->rto = PR_NEVER; + } else { + sq->receive_seq = 1; + sq->rto = MIN_RTO; + } + + // Queues + sq->need_kick_clock = MAX_CLOCK; + list_init(&sq->pending_queues); + list_init(&sq->sent_queue); + list_init(&sq->receive_queue); + + // Debugging + list_init(&sq->old_sent); + list_init(&sq->old_receive); + debug_queue_alloc(&sq->old_sent, DEBUG_QUEUE_SENT); + debug_queue_alloc(&sq->old_receive, DEBUG_QUEUE_RECEIVE); + + // Thread setup + ret = pthread_mutex_init(&sq->lock, NULL); + if (ret) + goto fail; + ret = pthread_cond_init(&sq->cond, NULL); + if (ret) + goto fail; + ret = pthread_create(&sq->tid, NULL, background_thread, sq); + if (ret) + goto fail; + + return sq; + +fail: + report_errno("init", ret); + return NULL; +} + +// Request that the background thread exit +void +serialqueue_exit(struct serialqueue *sq) +{ + pollreactor_do_exit(&sq->pr); + int ret = pthread_join(sq->tid, NULL); + if (ret) + report_errno("pthread_join", ret); +} + +// Allocate a 'struct command_queue' +struct command_queue * +serialqueue_alloc_commandqueue(void) +{ + struct command_queue *cq = malloc(sizeof(*cq)); + memset(cq, 0, sizeof(*cq)); + list_init(&cq->ready_queue); + list_init(&cq->stalled_queue); + return cq; +} + +// Write to the internal pipe to wake the background thread if in poll +static void +kick_bg_thread(struct serialqueue *sq) +{ + int ret = write(sq->pipe_fds[1], ".", 1); + if (ret < 0) + report_errno("pipe write", ret); +} + +// Add a batch of messages to the given command_queue +void +serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq + , struct list_head *msgs) +{ + // Make sure min_clock is set in list and calculate total bytes + int len = 0; + struct queue_message *qm; + list_for_each_entry(qm, msgs, node) { + if (qm->min_clock + (1LL<<31) < qm->req_clock) + qm->min_clock = qm->req_clock - (1LL<<31); + len += qm->len; + } + if (! len) + return; + qm = list_first_entry(msgs, struct queue_message, node); + + // Add list to cq->stalled_queue + pthread_mutex_lock(&sq->lock); + if (list_empty(&cq->ready_queue) && list_empty(&cq->stalled_queue)) + list_add_tail(&cq->node, &sq->pending_queues); + list_join_tail(msgs, &cq->stalled_queue); + sq->stalled_bytes += len; + int mustwake = 0; + if (qm->min_clock < sq->need_kick_clock) { + sq->need_kick_clock = 0; + mustwake = 1; + } + pthread_mutex_unlock(&sq->lock); + + // Wake the background thread if necessary + if (mustwake) + kick_bg_thread(sq); +} + +// Schedule the transmission of a message on the serial port at a +// given time and priority. +void +serialqueue_send(struct serialqueue *sq, struct command_queue *cq + , uint8_t *msg, int len, uint64_t min_clock, uint64_t req_clock) +{ + struct queue_message *qm = message_fill(msg, len); + qm->min_clock = min_clock; + qm->req_clock = req_clock; + + struct list_head msgs; + list_init(&msgs); + list_add_tail(&qm->node, &msgs); + serialqueue_send_batch(sq, cq, &msgs); +} + +// Like serialqueue_send() but also builds the message to be sent +void +serialqueue_encode_and_send(struct serialqueue *sq, struct command_queue *cq + , uint32_t *data, int len + , uint64_t min_clock, uint64_t req_clock) +{ + struct queue_message *qm = message_alloc_and_encode(data, len); + qm->min_clock = min_clock; + qm->req_clock = req_clock; + + struct list_head msgs; + list_init(&msgs); + list_add_tail(&qm->node, &msgs); + serialqueue_send_batch(sq, cq, &msgs); +} + +// Return a message read from the serial port (or wait for one if none +// available) +void +serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) +{ + pthread_mutex_lock(&sq->lock); + // Wait for message to be available + while (list_empty(&sq->receive_queue)) { + if (pollreactor_is_exit(&sq->pr)) + goto exit; + sq->receive_waiting = 1; + int ret = pthread_cond_wait(&sq->cond, &sq->lock); + if (ret) + report_errno("pthread_cond_wait", ret); + } + + // Remove message from queue + struct queue_message *qm = list_first_entry( + &sq->receive_queue, struct queue_message, node); + list_del(&qm->node); + + // Copy message + memcpy(pqm->msg, qm->msg, qm->len); + pqm->len = qm->len; + pqm->sent_time = qm->sent_time; + pqm->receive_time = qm->receive_time; + debug_queue_add(&sq->old_receive, qm); + + pthread_mutex_unlock(&sq->lock); + return; + +exit: + pqm->len = -1; + pthread_mutex_unlock(&sq->lock); +} + +// Set the estimated clock rate of the mcu on the other end of the +// serial port +void +serialqueue_set_clock_est(struct serialqueue *sq, double est_clock + , double last_ack_time, uint64_t last_ack_clock) +{ + pthread_mutex_lock(&sq->lock); + sq->est_clock = est_clock; + sq->last_ack_time = last_ack_time; + sq->last_ack_clock = last_ack_clock; + pthread_mutex_unlock(&sq->lock); +} + +// Flush all messages in a "ready" state +void +serialqueue_flush_ready(struct serialqueue *sq) +{ + pthread_mutex_lock(&sq->lock); + sq->can_delay_writes = 0; + pthread_mutex_unlock(&sq->lock); + kick_bg_thread(sq); +} + +// Return a string buffer containing statistics for the serial port +void +serialqueue_get_stats(struct serialqueue *sq, char *buf, int len) +{ + struct serialqueue stats; + pthread_mutex_lock(&sq->lock); + memcpy(&stats, sq, sizeof(stats)); + pthread_mutex_unlock(&sq->lock); + + snprintf(buf, len, "bytes_write=%u bytes_read=%u" + " bytes_retransmit=%u bytes_invalid=%u" + " send_seq=%u receive_seq=%u retransmit_seq=%u" + " srtt=%.3f rttvar=%.3f rto=%.3f" + " ready_bytes=%u stalled_bytes=%u" + , stats.bytes_write, stats.bytes_read + , stats.bytes_retransmit, stats.bytes_invalid + , (int)stats.send_seq, (int)stats.receive_seq + , (int)stats.retransmit_seq + , stats.srtt, stats.rttvar, stats.rto + , stats.ready_bytes, stats.stalled_bytes); +} + +// Extract old messages stored in the debug queues +int +serialqueue_extract_old(struct serialqueue *sq, int sentq + , struct pull_queue_message *q, int max) +{ + int count = sentq ? DEBUG_QUEUE_SENT : DEBUG_QUEUE_RECEIVE; + struct list_head *rootp = sentq ? &sq->old_sent : &sq->old_receive; + struct list_head replacement, current; + list_init(&replacement); + debug_queue_alloc(&replacement, count); + list_init(¤t); + + // Atomically replace existing debug list with new zero'd list + pthread_mutex_lock(&sq->lock); + list_join_tail(rootp, ¤t); + list_init(rootp); + list_join_tail(&replacement, rootp); + pthread_mutex_unlock(&sq->lock); + + // Walk the debug list + int pos = 0; + while (!list_empty(¤t) && pos < max) { + struct queue_message *qm = list_first_entry( + ¤t, struct queue_message, node); + if (qm->len) { + struct pull_queue_message *pqm = q++; + pos++; + memcpy(pqm->msg, qm->msg, qm->len); + pqm->len = qm->len; + pqm->sent_time = qm->sent_time; + pqm->receive_time = qm->receive_time; + } + list_del(&qm->node); + message_free(qm); + } + return pos; +} |