forked from PAWPAW-Mirror/lib_xua
Initial passing tx test using pyxsim
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -48,3 +48,4 @@ host_usb_mixer_control/xmos_mixer
|
||||
**/.vscode/**
|
||||
**.egg-info
|
||||
*tests/logs/*
|
||||
midi_tx_cmds.txt
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include <platform.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include <xclib.h>
|
||||
|
||||
#include "xua.h"
|
||||
@@ -31,59 +32,92 @@ on tile[MIDI_TILE] : buffered in port:1 p_midi_rx = XS1_PORT_1F;
|
||||
#define CLKBLK_MIDI XS1_CLKBLK_2
|
||||
on tile[MIDI_TILE] : clock clk_midi = CLKBLK_MIDI;
|
||||
|
||||
|
||||
/* Port declarations for I2C to config ADC's */
|
||||
on tile[0]: port p_scl = XS1_PORT_1L;
|
||||
on tile[0]: port p_sda = XS1_PORT_1M;
|
||||
#define MAX_TEST_COMMANDS 10
|
||||
#define TEST_COMMAND_FILE "midi_tx_cmds.txt"
|
||||
|
||||
/* See hwsupport.xc */
|
||||
void ctrlPort();
|
||||
|
||||
#define CABLE_NUM 0
|
||||
|
||||
#define NOTE_ON 0x90
|
||||
#define PITCH 60
|
||||
#define VELOCITY 80
|
||||
|
||||
void test(chanend c_midi){
|
||||
struct midi_in_parse_state mips;
|
||||
reset_midi_state(mips);
|
||||
unsigned mini_in_parse_helper(unsigned midi[3]){
|
||||
// printf("Composing data: 0x%x 0x%x 0x%x\n", midi[0], midi[1], midi[2]);
|
||||
|
||||
struct midi_in_parse_state m_state;
|
||||
reset_midi_state(m_state);
|
||||
|
||||
unsigned valid = 0;
|
||||
unsigned tx_data = 0;
|
||||
{valid, tx_data} = midi_in_parse(mips,CABLE_NUM, NOTE_ON);
|
||||
printf("Valid: %d data: %u\n", valid, tx_data);
|
||||
{valid, tx_data} = midi_in_parse(mips, CABLE_NUM, PITCH);
|
||||
printf("Valid: %d data: %u\n", valid, tx_data);
|
||||
{valid, tx_data} = midi_in_parse(mips, CABLE_NUM, VELOCITY);
|
||||
printf("Valid: %d data: %u\n", valid, tx_data);
|
||||
unsigned packed = 0;
|
||||
|
||||
for(int i = 0; i < 3; i++){
|
||||
{valid, packed} = midi_in_parse(m_state, CABLE_NUM, midi[i]);
|
||||
if(valid){
|
||||
return packed;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
char midi[3];
|
||||
unsigned size = 0;
|
||||
{midi[0], midi[1], midi[2], size} = midi_out_parse(tx_data);
|
||||
printf("size: %d data: 0x%x 0x%x 0x%x\n", size, midi[0], midi[1], midi[2]);
|
||||
unsigned parse_cmd_line(uint8_t commands[MAX_TEST_COMMANDS][3])
|
||||
{
|
||||
FILE * movable fptr_tx = fopen(TEST_COMMAND_FILE,"rt");
|
||||
if (fptr_tx == NULL) {
|
||||
printf("ERROR: TX command file %s not found or unable to open.\n", TEST_COMMAND_FILE);
|
||||
fclose(move(fptr_tx));
|
||||
return 0;
|
||||
}
|
||||
|
||||
unsigned line = 0;
|
||||
|
||||
unsigned a,b,c;
|
||||
while (fscanf(fptr_tx, "%u %u %u\n", &a, &b, &c) == 3) {
|
||||
commands[line][0] = a;
|
||||
commands[line][1] = b;
|
||||
commands[line][2] = c;
|
||||
// printf("Line %u params: 0x%x 0x%x 0x%x\n", line, commands[line][0], commands[line][1], commands[line][2]);
|
||||
line++;
|
||||
if(line > MAX_TEST_COMMANDS){
|
||||
printf("ERROR: Too many lines in TX command file\n");
|
||||
|
||||
fclose(move(fptr_tx));
|
||||
return MAX_TEST_COMMANDS;
|
||||
}
|
||||
}
|
||||
|
||||
fclose(move(fptr_tx));
|
||||
return line;
|
||||
}
|
||||
|
||||
void test(chanend c_midi){
|
||||
uint8_t commands[MAX_TEST_COMMANDS][3] = {{0}};
|
||||
unsigned num_tx = parse_cmd_line(commands);
|
||||
|
||||
int is_ack;
|
||||
unsigned int datum;
|
||||
unsigned datum;
|
||||
|
||||
unsigned count = 0;
|
||||
unsigned line = 0;
|
||||
while(1){
|
||||
select{
|
||||
case midi_get_ack_or_data(c_midi, is_ack, datum):
|
||||
printf("ACK: %d Datum: 0x%x\n", is_ack, datum);
|
||||
count++;
|
||||
if(count == 3){
|
||||
// printf("ACK: %d Datum: 0x%x\n", is_ack, datum);
|
||||
line++;
|
||||
if(line == num_tx){
|
||||
delay_microseconds(200); // Allow frame to complete
|
||||
exit(0);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
outuint(c_midi, byterev(tx_data));
|
||||
printf("SEND TO MIDI\n");
|
||||
delay_milliseconds(2); // 30 bits at 31.25 kbps is 0.96ms
|
||||
if(num_tx){
|
||||
unsigned midi[] = {commands[line][0], commands[line][1], commands[line][2]};
|
||||
unsigned tx_packet = mini_in_parse_helper(midi);
|
||||
outuint(c_midi, byterev(tx_packet));
|
||||
// printf("SEND TO MIDI: 0x%x\n", tx_packet);
|
||||
delay_milliseconds(2); // 30 bits at 31.25 kbps is 0.96ms
|
||||
} else {
|
||||
exit(0);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -91,11 +125,10 @@ void test(chanend c_midi){
|
||||
}
|
||||
|
||||
|
||||
int main()
|
||||
int main(void)
|
||||
{
|
||||
chan c_midi;
|
||||
|
||||
|
||||
par
|
||||
{
|
||||
on tile[0]: test(c_midi);
|
||||
|
||||
@@ -23,11 +23,23 @@ class Midi_expect:
|
||||
def __init(self):
|
||||
pass
|
||||
|
||||
def expect(self):
|
||||
expected = "Hello"
|
||||
def expect(self, commands):
|
||||
expected = ""
|
||||
for command in commands:
|
||||
while len(command) < 3:
|
||||
command.append(0)
|
||||
expected += "uart_tx_checker: " + " ".join([f"0x{byte:02x}" for byte in command]) + "\n"
|
||||
|
||||
return expected
|
||||
|
||||
|
||||
def create_midi_tx_file(commands):
|
||||
with open("midi_tx_cmds.txt", "wt") as mt:
|
||||
for command in commands:
|
||||
while len(command) < 3:
|
||||
command.append(0)
|
||||
text = " ".join([str(byte) for byte in command]) + "\n"
|
||||
mt.write(text)
|
||||
|
||||
#####
|
||||
# This test builds the spdif transmitter app with a verity of presets and tests that the output matches those presets
|
||||
@@ -36,17 +48,16 @@ class Midi_expect:
|
||||
def test_tx(capfd, config):
|
||||
xe = str(Path(__file__).parent / f"test_midi/bin/{config}/test_midi_{config}.xe")
|
||||
p_midi_out = "tile[1]:XS1_PORT_4C"
|
||||
|
||||
|
||||
# tester = testers.ComparisonTester(
|
||||
# Frames(channels=audio, no_of_blocks=no_of_blocks, sam_freq=sam_freq).expect()[
|
||||
# : no_of_samples * len(audio)
|
||||
# ]
|
||||
# )
|
||||
tester = testers.ComparisonTester(Midi_expect().expect())
|
||||
midi_commands = [[0x90, 60, 81]]
|
||||
create_midi_tx_file(midi_commands)
|
||||
|
||||
tester = testers.ComparisonTester(Midi_expect().expect(midi_commands),
|
||||
regexp = "uart_tx_checker:.+",
|
||||
ordered = True)
|
||||
|
||||
|
||||
tx_port = "tile[1]:XS1_PORT_4C"
|
||||
rx_port = None
|
||||
baud = MIDI_RATE
|
||||
bpb = 8
|
||||
parity = 0
|
||||
@@ -54,11 +65,11 @@ def test_tx(capfd, config):
|
||||
length_of_test = 3 # characters
|
||||
|
||||
simthreads = [
|
||||
# UARTTxChecker(rx_port, tx_port, parity, baud, length_of_test, stop, bpb)
|
||||
UARTTxChecker(tx_port, parity, baud, length_of_test, stop, bpb, debug=False)
|
||||
]
|
||||
|
||||
simargs = ["--max-cycles", str(MAX_CYCLES)]
|
||||
simargs.extend(["--trace-to", "trace.txt", "--vcd-tracing", "-tile tile[1] -ports -o trace.vcd"]) #This is just for local debug so we can capture the run, pass as kwarg to run_with_pyxsim
|
||||
# simargs.extend(["--trace-to", "trace.txt", "--vcd-tracing", "-tile tile[1] -ports -o trace.vcd"]) #This is just for local debug so we can capture the run, pass as kwarg to run_with_pyxsim
|
||||
|
||||
# result = Pyxsim.run_on_simulator(
|
||||
result = Pyxsim.run_on_simulator(
|
||||
@@ -68,9 +79,9 @@ def test_tx(capfd, config):
|
||||
# clean_before_build=True,
|
||||
clean_before_build=False,
|
||||
tester=tester,
|
||||
# capfd=capfd,
|
||||
capfd=None,
|
||||
timeout=1500,
|
||||
capfd=capfd,
|
||||
# capfd=None,
|
||||
timeout=120,
|
||||
simargs=simargs,
|
||||
build_options=[
|
||||
"-j",
|
||||
@@ -80,4 +91,5 @@ def test_tx(capfd, config):
|
||||
,
|
||||
],
|
||||
)
|
||||
|
||||
assert result
|
||||
@@ -9,6 +9,8 @@ from functools import partial
|
||||
# same argument everywhere.
|
||||
print = partial(print, flush=True)
|
||||
|
||||
# From tools 15.2.1 we need to add an extra factor to go from ps to fs
|
||||
time_scaling_factor = 1000
|
||||
|
||||
class UARTTxChecker(px.SimThread):
|
||||
"""
|
||||
@@ -16,7 +18,7 @@ class UARTTxChecker(px.SimThread):
|
||||
transations caused by the device, by looking at the tx pins.
|
||||
"""
|
||||
|
||||
def __init__(self, rx_port, tx_port, parity, baud, length, stop_bits, bpb):
|
||||
def __init__(self, tx_port, parity, baud, length, stop_bits, bpb, debug=False):
|
||||
"""
|
||||
Create a UARTTxChecker instance.
|
||||
|
||||
@@ -34,6 +36,7 @@ class UARTTxChecker(px.SimThread):
|
||||
self._stop_bits = stop_bits
|
||||
self._bits_per_byte = bpb
|
||||
# Hex value of stop bits, as MSB 1st char, e.g. 0b11 : 0xC0
|
||||
self.debug = debug
|
||||
|
||||
def get_port_val(self, xsi, port):
|
||||
"""
|
||||
@@ -57,7 +60,7 @@ class UARTTxChecker(px.SimThread):
|
||||
:rtype: float
|
||||
"""
|
||||
# Return float value in ps
|
||||
return (1.0/self._baud) * 1e12
|
||||
return (1.0/self._baud) * 1e12 * time_scaling_factor
|
||||
|
||||
def wait_baud_time(self, xsi):
|
||||
"""
|
||||
@@ -88,7 +91,7 @@ class UARTTxChecker(px.SimThread):
|
||||
got_start_bit = False
|
||||
|
||||
initial_port_val = self.get_port_val(xsi, self._tx_port)
|
||||
print("tx starts high: %s" % ("True" if initial_port_val else "False"))
|
||||
if self.debug: print("tx starts high: %s" % ("True" if initial_port_val else "False"))
|
||||
|
||||
for x in range(length):
|
||||
packet.append(chr(self.read_byte(xsi, parity)))
|
||||
@@ -113,11 +116,10 @@ class UARTTxChecker(px.SimThread):
|
||||
if initial_port_val == 1:
|
||||
self.wait_for_port_pins_change([self._tx_port])
|
||||
#else go for it as assume tx has just fallen with no interframe gap
|
||||
# print("Byte start time: ", xsi.get_time())
|
||||
|
||||
# The tx line should go low for 1 bit time
|
||||
if self.get_val_timeout(xsi, self._tx_port) == 0:
|
||||
print("Start bit recv'd")
|
||||
if self.debug: print("Start bit recv'd")
|
||||
else:
|
||||
print("Start bit issue")
|
||||
return False
|
||||
@@ -129,7 +131,7 @@ class UARTTxChecker(px.SimThread):
|
||||
byte += (val << j)
|
||||
crc_sum += val
|
||||
|
||||
print(f"Sampled {self._bits_per_byte} data bits")
|
||||
if self.debug: print(f"Sampled {self._bits_per_byte} data bits: 0x{hex(byte)}")
|
||||
|
||||
# Check the parity if needs be
|
||||
self.check_parity(xsi, crc_sum, parity)
|
||||
@@ -138,7 +140,7 @@ class UARTTxChecker(px.SimThread):
|
||||
self.check_stopbit(xsi)
|
||||
|
||||
# Print a new line to split bytes in output
|
||||
print()
|
||||
if self.debug: print()
|
||||
|
||||
return byte
|
||||
|
||||
@@ -158,7 +160,7 @@ class UARTTxChecker(px.SimThread):
|
||||
else:
|
||||
print("Parity bit incorrect. Got %d, expected %d" % (read, (crc_sum + parity_val) % 2))
|
||||
else:
|
||||
print("Parity bit correct")
|
||||
if self.debug: print("Parity bit correct")
|
||||
|
||||
def check_stopbit(self, xsi):
|
||||
"""
|
||||
@@ -171,7 +173,7 @@ class UARTTxChecker(px.SimThread):
|
||||
# The stop bits should stay high for this time
|
||||
if self.get_val_timeout(xsi, self._tx_port) == 0:
|
||||
stop_bits_correct = False
|
||||
print("Stop bit correct: %s" % ("True" if stop_bits_correct else "False"))
|
||||
if self.debug: print("Stop bit correct: %s" % ("True" if stop_bits_correct else "False"))
|
||||
|
||||
def get_val_timeout(self, xsi, port):
|
||||
"""
|
||||
@@ -240,5 +242,5 @@ class UARTTxChecker(px.SimThread):
|
||||
|
||||
# Print each member of K as a hex byte
|
||||
# inline lambda function mapped over a list? awh yiss.
|
||||
print(", ".join(map((lambda x: "0x%02x" % ord(x)), K)))
|
||||
print("uart_tx_checker:", " ".join(map((lambda x: "0x%02x" % ord(x)), K)))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user