forked from PAWPAW-Mirror/lib_xua
Enhanced app_midi_simple to support file based commands and readiness for Rx testing
This commit is contained in:
24
tests/midi_test_helpers.py
Normal file
24
tests/midi_test_helpers.py
Normal file
@@ -0,0 +1,24 @@
|
||||
# Copyright 2024 XMOS LIMITED.
|
||||
# This Software is subject to the terms of the XMOS Public Licence: Version 1.
|
||||
|
||||
class Midi_expect:
|
||||
def __init(self):
|
||||
pass
|
||||
|
||||
def expect(self, commands):
|
||||
expected = ""
|
||||
for command in commands:
|
||||
while len(command) < 3:
|
||||
command.append(0)
|
||||
expected += "uart_tx_checker: " + " ".join([f"0x{byte:02x}" for byte in command]) + "\n"
|
||||
|
||||
return expected
|
||||
|
||||
|
||||
def create_midi_tx_file(commands):
|
||||
with open("midi_tx_cmds.txt", "wt") as mt:
|
||||
for command in commands:
|
||||
while len(command) < 3:
|
||||
command.append(0)
|
||||
text = " ".join([str(byte) for byte in command]) + "\n"
|
||||
mt.write(text)
|
||||
@@ -32,8 +32,15 @@ on tile[MIDI_TILE] : buffered in port:1 p_midi_rx = XS1_PORT_1F;
|
||||
#define CLKBLK_MIDI XS1_CLKBLK_2
|
||||
on tile[MIDI_TILE] : clock clk_midi = CLKBLK_MIDI;
|
||||
|
||||
#define MAX_TEST_COMMANDS 10
|
||||
#define TEST_COMMAND_FILE "midi_tx_cmds.txt"
|
||||
#define MAX_TEST_COMMANDS 100
|
||||
#define TEST_COMMAND_FILE_TX "midi_tx_cmds.txt"
|
||||
#define TEST_COMMAND_FILE_RX "midi_rx_cmds.txt"
|
||||
|
||||
#ifndef DEBUG
|
||||
#define dprintf(...)
|
||||
#else
|
||||
#define dprintf(...) printf(__VA_ARGS__)
|
||||
#endif
|
||||
|
||||
/* See hwsupport.xc */
|
||||
void ctrlPort();
|
||||
@@ -42,8 +49,6 @@ void ctrlPort();
|
||||
|
||||
|
||||
unsigned mini_in_parse_helper(unsigned midi[3]){
|
||||
// printf("Composing data: 0x%x 0x%x 0x%x\n", midi[0], midi[1], midi[2]);
|
||||
|
||||
struct midi_in_parse_state m_state;
|
||||
reset_midi_state(m_state);
|
||||
|
||||
@@ -59,69 +64,97 @@ unsigned mini_in_parse_helper(unsigned midi[3]){
|
||||
return 0;
|
||||
}
|
||||
|
||||
unsigned parse_cmd_line(uint8_t commands[MAX_TEST_COMMANDS][3])
|
||||
{unsigned, unsigned} read_config_file(uint8_t commands[MAX_TEST_COMMANDS][3])
|
||||
{
|
||||
FILE * movable fptr_tx = fopen(TEST_COMMAND_FILE,"rt");
|
||||
unsigned tx_line_count = 0;
|
||||
|
||||
FILE * movable fptr_tx = fopen(TEST_COMMAND_FILE_TX,"rt");
|
||||
if (fptr_tx == NULL) {
|
||||
printf("ERROR: TX command file %s not found or unable to open.\n", TEST_COMMAND_FILE);
|
||||
fclose(move(fptr_tx));
|
||||
return 0;
|
||||
}
|
||||
|
||||
unsigned line = 0;
|
||||
|
||||
unsigned a,b,c;
|
||||
while (fscanf(fptr_tx, "%u %u %u\n", &a, &b, &c) == 3) {
|
||||
commands[line][0] = a;
|
||||
commands[line][1] = b;
|
||||
commands[line][2] = c;
|
||||
// printf("Line %u params: 0x%x 0x%x 0x%x\n", line, commands[line][0], commands[line][1], commands[line][2]);
|
||||
line++;
|
||||
if(line > MAX_TEST_COMMANDS){
|
||||
printf("ERROR: Too many lines in TX command file\n");
|
||||
|
||||
fclose(move(fptr_tx));
|
||||
return MAX_TEST_COMMANDS;
|
||||
dprintf("WARNING: TX command file %s not found or unable to open.\n", TEST_COMMAND_FILE_TX);
|
||||
} else {
|
||||
unsigned a,b,c;
|
||||
while (fscanf(fptr_tx, "%u %u %u\n", &a, &b, &c) == 3) {
|
||||
commands[tx_line_count][0] = a;
|
||||
commands[tx_line_count][1] = b;
|
||||
commands[tx_line_count][2] = c;
|
||||
//printf("Line %u params: 0x%x 0x%x 0x%x\n", tx_line_count, commands[tx_line_count][0], commands[tx_line_count][1], commands[tx_line_count][2]);
|
||||
tx_line_count++;
|
||||
if(tx_line_count > MAX_TEST_COMMANDS){
|
||||
printf("ERROR: Too many lines in TX command file\n");
|
||||
tx_line_count = MAX_TEST_COMMANDS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fclose(move(fptr_tx));
|
||||
return line;
|
||||
|
||||
|
||||
unsigned rx_cmd_count = 0;
|
||||
|
||||
FILE * movable fptr_rx = fopen(TEST_COMMAND_FILE_RX,"rt");
|
||||
if (fptr_rx == NULL) {
|
||||
dprintf("WARNING: RX command file %s not found or unable to open.\n", TEST_COMMAND_FILE_RX);
|
||||
} else {
|
||||
if(fscanf(fptr_rx, "%u\n", &rx_cmd_count) != 1){
|
||||
printf("ERROR: Not enough or too many items in RX command file line\n");
|
||||
}
|
||||
}
|
||||
fclose(move(fptr_rx));
|
||||
|
||||
return {tx_line_count, rx_cmd_count};
|
||||
}
|
||||
|
||||
void test(chanend c_midi){
|
||||
uint8_t commands[MAX_TEST_COMMANDS][3] = {{0}};
|
||||
unsigned num_tx = parse_cmd_line(commands);
|
||||
unsigned num_to_tx = 0;
|
||||
unsigned num_to_rx = 0;
|
||||
{num_to_tx, num_to_rx} = read_config_file(commands);
|
||||
dprintf("Sending %u MIDI command line(s) and receiving %u MIDI command(s)\n", num_to_tx, num_to_rx);
|
||||
|
||||
// For MIDI Rx
|
||||
int is_ack;
|
||||
unsigned datum;
|
||||
unsigned rx_packet;
|
||||
|
||||
unsigned line = 0;
|
||||
while(1){
|
||||
// Counters for Rx and Tx
|
||||
unsigned tx_cmd_count = 0;
|
||||
unsigned rx_cmd_count = 0;
|
||||
|
||||
timer tmr;
|
||||
|
||||
int t_tx;
|
||||
tmr :> t_tx;
|
||||
|
||||
const int max_tx_time = XS1_TIMER_HZ / 31250 * 3 * (8 + 1 + 1); // 30 bits at 31.25 kbps is 0.96ms
|
||||
|
||||
while(tx_cmd_count < num_to_tx || rx_cmd_count < num_to_rx ){
|
||||
select{
|
||||
case midi_get_ack_or_data(c_midi, is_ack, datum):
|
||||
// printf("ACK: %d Datum: 0x%x\n", is_ack, datum);
|
||||
line++;
|
||||
if(line == num_tx){
|
||||
delay_microseconds(200); // Allow frame to complete
|
||||
exit(0);
|
||||
case midi_get_ack_or_data(c_midi, is_ack, rx_packet):
|
||||
if(is_ack){
|
||||
dprintf("ACK from Tx\n");
|
||||
tx_cmd_count++;
|
||||
} else {
|
||||
unsigned midi_data[3] = {0};
|
||||
unsigned byte_count = 0;
|
||||
{midi_data[0], midi_data[1], midi_data[2], byte_count} = midi_out_parse(rx_packet);
|
||||
dprintf("dut_midi_rx: %u %u %u\n", midi_data[0], midi_data[1], midi_data[2]);
|
||||
rx_cmd_count++;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
if(num_tx){
|
||||
unsigned midi[] = {commands[line][0], commands[line][1], commands[line][2]};
|
||||
unsigned tx_packet = mini_in_parse_helper(midi);
|
||||
outuint(c_midi, byterev(tx_packet));
|
||||
// printf("SEND TO MIDI: 0x%x\n", tx_packet);
|
||||
delay_milliseconds(2); // 30 bits at 31.25 kbps is 0.96ms
|
||||
} else {
|
||||
exit(0);
|
||||
}
|
||||
case tx_cmd_count < num_to_tx => tmr when timerafter(t_tx) :> int _:
|
||||
unsigned midi[] = {commands[tx_cmd_count][0], commands[tx_cmd_count][1], commands[tx_cmd_count][2]};
|
||||
unsigned tx_packet = mini_in_parse_helper(midi);
|
||||
outuint(c_midi, byterev(tx_packet));
|
||||
dprintf("Sent packet to midi: %u %u %u\n", commands[tx_cmd_count][0], commands[tx_cmd_count][1], commands[tx_cmd_count][2]);
|
||||
t_tx += max_tx_time;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
dprintf("Tx and Rx count met - exiting after last tx complete.\n");
|
||||
tmr when timerafter(t_tx) :> int _; // wait until packet definitely departed
|
||||
|
||||
delay_ticks(max_tx_time / 4); // Allow a few more bit times about to allow TXChecker to do it's thing
|
||||
exit(0);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -6,12 +6,7 @@ import Pyxsim
|
||||
from Pyxsim import testers
|
||||
from pathlib import Path
|
||||
from uart_tx_checker import UARTTxChecker
|
||||
# from spdif_test_utils import (
|
||||
# Clock,
|
||||
# Spdif_rx,
|
||||
# Frames,
|
||||
# freq_for_sample_rate,
|
||||
# )
|
||||
|
||||
|
||||
MAX_CYCLES = 15000000
|
||||
MIDI_RATE = 31250
|
||||
@@ -47,7 +42,6 @@ def create_midi_tx_file(commands):
|
||||
@pytest.mark.parametrize("config", CONFIGS)
|
||||
def test_tx(capfd, config):
|
||||
xe = str(Path(__file__).parent / f"test_midi/bin/{config}/test_midi_{config}.xe")
|
||||
p_midi_out = "tile[1]:XS1_PORT_4C"
|
||||
|
||||
midi_commands = [[0x90, 60, 81]]
|
||||
create_midi_tx_file(midi_commands)
|
||||
@@ -62,7 +56,7 @@ def test_tx(capfd, config):
|
||||
bpb = 8
|
||||
parity = 0
|
||||
stop = 1
|
||||
length_of_test = 3 # characters
|
||||
length_of_test = sum(len(cmd) for cmd in midi_commands)
|
||||
|
||||
simthreads = [
|
||||
UARTTxChecker(tx_port, parity, baud, length_of_test, stop, bpb, debug=False)
|
||||
|
||||
@@ -16,6 +16,8 @@ Parity = dict(
|
||||
UART_PARITY_BAD=3
|
||||
)
|
||||
|
||||
# From tools 15.2.1 we need to add an extra factor to go from ps to fs
|
||||
time_scaling_factor = 1000
|
||||
|
||||
class DriveHigh(px.SimThread):
|
||||
def __init__(self, p):
|
||||
@@ -28,13 +30,12 @@ class DriveHigh(px.SimThread):
|
||||
|
||||
|
||||
class UARTRxChecker(px.SimThread):
|
||||
def __init__(self, rx_port, tx_port, parity, baud, stop_bits, bpb, data=[0x7f, 0x00, 0x2f, 0xff],
|
||||
intermittent=False):
|
||||
def __init__(self, rx_port, parity, baud, stop_bits, bpb, data=[0x7f, 0x00, 0x2f, 0xff],
|
||||
intermittent=False, debug=False):
|
||||
"""
|
||||
Create a UARTRxChecker instance.
|
||||
|
||||
:param rx_port: Receive port of the UART device under test.
|
||||
:param tx_port: Transmit port of the UART device under test.
|
||||
:param parity: Parity of the UART connection.
|
||||
:param baud: BAUD rate of the UART connection.
|
||||
:param stop_bits: Number of stop_bits for each UART byte.
|
||||
@@ -43,7 +44,6 @@ class UARTRxChecker(px.SimThread):
|
||||
:param intermittent: Add a random delay between sent bytes.
|
||||
"""
|
||||
self._rx_port = rx_port
|
||||
self._tx_port = tx_port
|
||||
self._parity = parity
|
||||
self._baud = baud
|
||||
self._stop_bits = stop_bits
|
||||
@@ -142,7 +142,7 @@ class UARTRxChecker(px.SimThread):
|
||||
Returns float value in nanoseconds.
|
||||
"""
|
||||
# Return float value in ps
|
||||
return (1.0 / self._baud) * 1e12
|
||||
return (1.0 / self._baud) * 1e12 * time_scaling_factor
|
||||
|
||||
def wait_baud_time(self, xsi):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user