875 lines
27 KiB
Python
875 lines
27 KiB
Python
import os
|
|
import sys
|
|
import time
|
|
import serial
|
|
import pdb
|
|
import words
|
|
import struct
|
|
import logging
|
|
import json
|
|
import crcmod
|
|
#import usb.core
|
|
#import usb.util
|
|
#import usb.backend.libusb1
|
|
|
|
from optparse import OptionParser
|
|
import serial
|
|
import xmodem
|
|
import pyprind
|
|
from binascii import b2a_hex, a2b_hex
|
|
|
|
__DEBUG__ = 1
|
|
|
|
|
|
# USB Vendor ID / Product ID
|
|
VENDOR_ID = 0x0d7d
|
|
PRODUCT_ID = 0x0100
|
|
USB_MAX_BYTES = 8192
|
|
DATA_BLOCK_SIZE = 8192
|
|
ACK_PACKET_SIZE = 8
|
|
UART_MAX_BYTES = 4096
|
|
|
|
INTF_USB = 0
|
|
INTF_UART = 1
|
|
INTF_I2C = 2
|
|
INTF_SPI = 3
|
|
|
|
#################### PACKET LAYER ENABLE/DISABLE ###################
|
|
PACKET = 0 # 1-> enable packet layer, 0 -> disable packet layer
|
|
IMAGE_TRF = 0 # 0-> skip transfer, 1-> use USB, 2-> use UART
|
|
HIGH_BAUD = 1 # 1 -> HAPS or EVB, 0 -> ZC706 or Host (YG's App)
|
|
|
|
KNERON_UART = "/dev/kneron_uart"
|
|
UART_COM_ID = 7 # COM5
|
|
UART_BLOCK = 0x1000
|
|
|
|
act_intf = INTF_UART
|
|
|
|
if HIGH_BAUD == 1:
|
|
BAUDRATE_HI = 921600
|
|
BAUDRATE = 115200
|
|
else:
|
|
BAUDRATE_HI = 115200
|
|
BAUDRATE = 115200
|
|
# dut host interface (mozart)
|
|
dut_intf = INTF_I2C
|
|
|
|
ser = None
|
|
|
|
CMD_NONE = 0
|
|
CMD_MEM_READ = 1
|
|
CMD_MEM_WRITE= 2
|
|
CMD_DATA = 3
|
|
CMD_ACK = 4
|
|
CMD_STS_CLR = 5
|
|
CMD_MEM_CLR = 6
|
|
CMD_CHK_ERR = 7
|
|
CMD_TEST_ECHO = 8
|
|
CMD_USB_WRITE = 9
|
|
|
|
CMD_DEMO_SET_MODEL = 0x10
|
|
CMD_DEMO_SET_IMAGES = 0x11
|
|
CMD_DEMO_RUN_ONCE = 0x12
|
|
CMD_DEMO_RUN = 0x13
|
|
CMD_DEMO_STOP = 0x14
|
|
CMD_DEMO_RESULT_LEN = 0x15
|
|
CMD_DEMO_RESULT = 0x16
|
|
CMD_DEMO_FD = 0x17 # for development, remove later
|
|
CMD_DEMO_LM = 0x18 # for development
|
|
CMD_DEMO_LD = 0x19 # for development
|
|
CMD_DEMO_FR = 0x1A # for development
|
|
|
|
CMD_DEMO_INFERENCE = 100
|
|
CMD_DEMO_REG1 = 101
|
|
CMD_DEMO_REG2 = 102
|
|
CMD_DEMO_REG3 = 103
|
|
CMD_DEMO_REG4 = 104
|
|
CMD_DEMO_REG5 = 105
|
|
CMD_DEMO_ADDUSER = 106
|
|
CMD_DEMO_DELUSER = 107
|
|
|
|
CMD_FLASH_INFO = 0x1000
|
|
CMD_FLASH_CHIP_ERASE = 0x1001
|
|
CMD_FLASH_SECTOR_ERASE = 0x1002
|
|
CMD_FLASH_READ = 0x1003
|
|
CMD_FLASH_WRITE = 0x1004
|
|
CMD_SCPU_RUN = 0x1005
|
|
|
|
# Messge HDR formart
|
|
MSG_HDR_FMT = "<HHIII" # Header(2b), checksum(2b), Cmd(4b), Addr(4b), Len(4b)
|
|
MSG_PKT_HDR_FMT = "<HHHHII" # Preamble(2b), Psize(2b), Cmd(2b), Csize(2b), Addr(4b), Len(4b)
|
|
PKTX_PAMB = 0xA583 # Packet TX Preamble
|
|
PKRX_PAMB = 0x8A35 # Packet RX Preamble
|
|
MSG_HDR_SIZE = 16 # 2+2+2+2+4+4
|
|
MSG_DAT_IDX = 16 # message body will be stored from offset 16
|
|
|
|
PKT_WR_FLAG = 0x8000
|
|
PKT_CRC_FLAG = 0x4000
|
|
|
|
# SET_MODEL and SET_IMAGES format
|
|
CMD_DEMO_SET_TWELVE_FMT = "<IIIIIIIIIIII" # arg1(4b), arg2(4b), arg3(4b), arg4(4b), arg5(4b), arg6(4b), arg7(4b), arg8(4b), arg9(4b), arg10(4b), arg11(4b), arg12(4b)
|
|
CMD_DEMO_SET_EIGHT_FMT = "<IIIIIIII" # arg1(4b), arg2(4b), arg3(4b), arg4(4b), arg5(4b), arg6(4b), arg7(4b), arg8(4b)
|
|
CMD_DEMO_SET_SIX_FMT = "<IIIIII" # arg1(4b), arg2(4b), arg3(4b), arg4(4b), arg5(4b), arg6(4b)
|
|
CMD_DEMO_SET_TWO_FMT = "<II" # arg1(4b), arg2(4b)
|
|
|
|
##########################################################################
|
|
# Device control
|
|
##########################################################################
|
|
def LOG(__LEVEL__, fmt):
|
|
print(fmt)
|
|
|
|
|
|
##########################################################################
|
|
# Device control
|
|
##########################################################################
|
|
def dev_init(baudrate = 115200):
|
|
if act_intf == INTF_UART:
|
|
''' Uart interface '''
|
|
global ser
|
|
ser = serial.Serial()
|
|
try:
|
|
setup_intf('uart', ser, UART_COM_ID, baudrate, timeout=5)
|
|
ser.flushInput()
|
|
ser.flushOutput()
|
|
except:
|
|
DBGPRINT('Uart port open fail')
|
|
sys.exit(-1)
|
|
|
|
''' USB interface '''
|
|
global kneron_dev
|
|
global ep_out
|
|
global ep_in
|
|
|
|
if(IMAGE_TRF == 1):
|
|
backend = usb.backend.libusb1.get_backend(find_library=lambda x: "C:\Windows\System32\libusb0.dll")
|
|
kneron_dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID, backend=backend)
|
|
if kneron_dev is None:
|
|
sys.exit(-1)
|
|
#print('kneron usb is open...')
|
|
kneron_dev.set_configuration()
|
|
cfg = kneron_dev.get_active_configuration()
|
|
#print(cfg)
|
|
interface = cfg[(0,0)]
|
|
#print(interface)
|
|
interface_number = cfg[(0, 0)].bInterfaceNumber
|
|
#print(interface_number)
|
|
alternate_setting = usb.control.get_interface(kneron_dev, interface_number)
|
|
#print(alternate_setting)
|
|
usb_interface = usb.util.find_descriptor(cfg, bInterfaceNumber=interface_number, bAlternateSetting=alternate_setting)
|
|
#print(usb_interface)
|
|
ep_out = usb.util.find_descriptor(usb_interface, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT)
|
|
if ep_out is None:
|
|
sys.exit(-1)
|
|
#print(ep_out)
|
|
ep_in = usb.util.find_descriptor(usb_interface, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN)
|
|
if ep_in is None:
|
|
sys.exit(-1)
|
|
|
|
def dev_close():
|
|
if act_intf == INTF_UART:
|
|
global ser
|
|
intf_close()
|
|
else:
|
|
''' USB interface '''
|
|
#TODO: Wait for mozart usb
|
|
|
|
def dev_flush():
|
|
global ser
|
|
ser.flushInput()
|
|
ser.flushOutput()
|
|
|
|
##########################################################################
|
|
# Setpu active interface
|
|
##########################################################################
|
|
def setup_intf(interface, dev, com = 0, baudrate = 115200, timeout=5.0):
|
|
global intf_read
|
|
global intf_write
|
|
global intf_open
|
|
global intf_close
|
|
act_intf = words.__WORDS__[interface]
|
|
if interface == 'usb':
|
|
intf_read = usb_read
|
|
intf_write = usb_write
|
|
intf_open = usb_open
|
|
intf_clsoe = usb_close
|
|
elif interface == 'uart':
|
|
intf_read = uart_read
|
|
intf_write = uart_write
|
|
intf_open = uart_open
|
|
intf_close = uart_close
|
|
uart_open(dev, com, baudrate, timeout)
|
|
else:
|
|
print("Error device interface")
|
|
|
|
##########################################################################
|
|
# Uart interface
|
|
##########################################################################
|
|
def uart_open(ser, com, baudrate, timeout):
|
|
if os.name == 'nt':
|
|
ser.port = 'COM' + str(com)
|
|
else:
|
|
if os.path.islink(KNERON_UART):
|
|
ser.port = KNERON_UART
|
|
else:
|
|
ser.port = '/dev/ttyUSB' + str(com)
|
|
ser.timeout = timeout
|
|
ser.baudrate = baudrate
|
|
ser.open()
|
|
|
|
def uart_write(buf, size):
|
|
# print('->uart write: %d' %size)
|
|
# mem_dump(buf)
|
|
global ser
|
|
ser.write(buf)
|
|
|
|
def uart_read(size):
|
|
global ser
|
|
# print('->uart read: %d' %size)
|
|
try:
|
|
buf = bytearray(ser.read(size))
|
|
except:
|
|
print('Uart read fail')
|
|
# mem_dump(buf)
|
|
if (len(buf) != size):
|
|
print('Uart read fail, expected=%x, read=%x' %(size, len(buf)))
|
|
return buf
|
|
|
|
def uart_close():
|
|
global ser
|
|
ser.close()
|
|
|
|
|
|
##########################################################################
|
|
# USB interface
|
|
##########################################################################
|
|
def check_generic_ack(buf):
|
|
preamble = buf[0] + buf[1] << 8
|
|
pkt_size = buf[2] + buf[3] << 8
|
|
cmd = buf[4] + buf[5] >> 8
|
|
size = buf[6] + buf[7] >> 8
|
|
if (preamble != PKRX_PAMB):
|
|
return False
|
|
if (pkt_size & 0x0FFF != 0):
|
|
return False
|
|
if (size != 0): # must be zero msg payload
|
|
return False
|
|
if (cmd == CMD_ACK):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def RecDataACKFromUSB():
|
|
# print("RecDataACKFromUSB")
|
|
ackusb = bytes(b"none")
|
|
if (ep_in is None):
|
|
print("ep_in is not open")
|
|
return
|
|
temp = ep_in.read(ACK_PACKET_SIZE, timeout=1000)
|
|
ackusb = bytes(temp)
|
|
# print(ackusb)
|
|
while ((ackusb[0] != 0x35) or (ackusb[1] != 0x8A)): # look for preamble
|
|
temp = ep_in.read(ACK_PACKET_SIZE, timeout=1000)
|
|
print("ACK Retry", temp)
|
|
ackusb = bytes(temp)
|
|
# print(ackusb[0:4])
|
|
# print(len(ackusb))
|
|
# print(".......")
|
|
return 0
|
|
|
|
def usb_write_data(buf, length, offset):
|
|
if(ep_out is None):
|
|
print("ep_out is not open")
|
|
return
|
|
num = (length - offset) // USB_MAX_BYTES
|
|
leftover = (length - offset) % USB_MAX_BYTES
|
|
|
|
for x in range(num):
|
|
# print(buf[offset:offset+USB_MAX_BYTES])
|
|
RecDataACKFromUSB() # wait for receiver ready
|
|
ep_out.write(buf[offset:offset+USB_MAX_BYTES])
|
|
offset += USB_MAX_BYTES
|
|
# print("+", end='', flush=True)
|
|
if (leftover != 0):
|
|
# print(buf[offset:])
|
|
RecDataACKFromUSB() # wait for acknowledge first
|
|
ep_out.write(buf[offset:])
|
|
return
|
|
|
|
##########################################################################
|
|
# UART interface Raw Data Transfer
|
|
##########################################################################
|
|
def uart_write_data(buf, length, offset):
|
|
num = (length - offset) // UART_MAX_BYTES
|
|
leftover = (length - offset) % UART_MAX_BYTES
|
|
|
|
for x in range(num):
|
|
RecDataACKFromUART() # wait for receiver ready
|
|
bdata = bytearray(buf[offset:offset+UART_MAX_BYTES])
|
|
intf_write(bdata, len(bdata))
|
|
offset += UART_MAX_BYTES
|
|
|
|
if (leftover != 0):
|
|
RecDataACKFromUART() # wait for acknowledge first
|
|
bdata = bytearray(buf[offset:offset+leftover])
|
|
intf_write(bdata, len(bdata))
|
|
return
|
|
|
|
def RecDataACKFromUART():
|
|
ackuart = bytes(b"none")
|
|
temp = intf_read(ACK_PACKET_SIZE)
|
|
ackuart = bytes(temp)
|
|
# print(ackuart)
|
|
# look for the Data ACK
|
|
while ((ackuart[0] != 0x35) or (ackuart[1] != 0x8A) or (ackuart[2] != 4)):
|
|
intf_read(ACK_PACKET_SIZE)
|
|
print("ACK Retry", temp)
|
|
ackuart = bytes(temp)
|
|
return
|
|
|
|
##########################################################################
|
|
# AND / OR
|
|
##########################################################################
|
|
def bit_set():
|
|
''' '''
|
|
|
|
def bit_clear():
|
|
''' '''
|
|
|
|
|
|
##########################################################################
|
|
# Register read / write
|
|
##########################################################################
|
|
def reg_read(addr):
|
|
buf = mem_block_read(addr, 4)
|
|
val = (buf[3] << 24) + (buf[2] << 16) + (buf[1] << 8) + buf[0]
|
|
return val
|
|
|
|
def reg_write(addr, value):
|
|
buf = word2blist(value)
|
|
mem_block_write(addr, buf, len(buf))
|
|
|
|
def word2blist(value):
|
|
buf = []
|
|
buf.append((value >> 0) & 0xFF)
|
|
buf.append((value >> 8) & 0xFF)
|
|
buf.append((value >> 16) & 0xFF)
|
|
buf.append((value >> 24) & 0xFF)
|
|
return buf
|
|
|
|
|
|
def wlist2blist(wlist):
|
|
buf = []
|
|
for value in wlist:
|
|
buf.append((value >> 0) & 0xFF)
|
|
buf.append((value >> 8) & 0xFF)
|
|
buf.append((value >> 16) & 0xFF)
|
|
buf.append((value >> 24) & 0xFF)
|
|
return buf
|
|
|
|
def blist2wlist(blist):
|
|
buf = []
|
|
for i in range(len(blist) // 4):
|
|
#value = (blist[i * 4 + 0] << 24) + (blist[i * 4 + 1] << 16) + (blist[i * 4 + 2] << 8) + (blist[i * 4 + 3])
|
|
value = (blist[i * 4 + 0] ) + (blist[i * 4 + 1] << 8) + (blist[i * 4 + 2] << 16) + (blist[i * 4 + 3] << 24)
|
|
buf.append(value)
|
|
return buf
|
|
|
|
|
|
##########################################################################
|
|
# Flash READ/WRITE/COMPARE
|
|
##########################################################################
|
|
def send_scpu_run(addr):
|
|
print('Jump to SCPU and Run!!')
|
|
data = pack_data(CMD_SCPU_RUN, addr, 0, None)
|
|
intf_write(data, MSG_HDR_SIZE)
|
|
time.sleep(0.1)
|
|
|
|
def flash_read(addr, size):
|
|
return(mem_read(addr, size, CMD_FLASH_READ, True))
|
|
|
|
def flash_write(addr , buf, size, buf_max = UART_BLOCK):
|
|
mem_write(addr, buf, size, CMD_FLASH_WRITE, buf_max, True)
|
|
|
|
def flash_chip_erase():
|
|
msg_cmd_general(CMD_FLASH_CHIP_ERASE, 0.2)
|
|
|
|
def flash_sector_erase(addr):
|
|
data = pack_data(CMD_FLASH_SECTOR_ERASE, addr, 1, None)
|
|
intf_write(data, MSG_HDR_SIZE)
|
|
time.sleep(0.1)
|
|
if not msg_wait_ack():
|
|
print('Flase erase: wait ack fail')
|
|
|
|
def showdebuginfo():
|
|
msg_cmd_general(CMD_FLASH_INFO, 0.2)
|
|
|
|
def flash_sector_erase_64K(addr):
|
|
data = pack_data(CMD_FLASH_SECTOR_ERASE_64K, addr, 1, None)
|
|
intf_write(data, MSG_HDR_SIZE)
|
|
time.sleep(0.1)
|
|
if not msg_wait_ack():
|
|
print('Flase erase: wait ack fail')
|
|
|
|
##########################################################################
|
|
# USB MEMORY WRITE
|
|
##########################################################################
|
|
def usb_mem_write(addr, buf, size):
|
|
print('---------------- USB -----------------')
|
|
print('Start ADDR=0x%X usb memory Write' %addr)
|
|
data = pack_data(CMD_USB_WRITE, addr, size, None, 1) #use crc
|
|
intf_write(data, len(data))
|
|
#### start usb upload #####
|
|
usb_write_data(buf, size, 0) # start at the beginning of file
|
|
# now get the response
|
|
if (msg_wait_ack(CMD_USB_WRITE) != True):
|
|
print('USB Write, wait ack fail')
|
|
|
|
return ''
|
|
|
|
##########################################################################
|
|
# MEMORY READ/WRITE/COMPARE
|
|
##########################################################################
|
|
def mem_write(addr , buf, size, cmd = CMD_MEM_WRITE, buf_max = UART_BLOCK, dbg = False):
|
|
print('--------------------------------------')
|
|
print('Start ADDR=0x%X memory R/W test' %addr)
|
|
wloop = 0
|
|
percent = 0
|
|
unit = 0
|
|
total_range = 100
|
|
#print("\n")
|
|
if (size % buf_max == 0):
|
|
wloop = (size // buf_max)
|
|
else:
|
|
wloop = (size // buf_max) + 1
|
|
|
|
if(wloop < total_range):
|
|
total_range = wloop - 1
|
|
if(total_range == 0):
|
|
total_range = 1
|
|
#print('%d = (%d // %d)' %((wloop // total_range), wloop, total_range))
|
|
if (wloop % total_range == 0):
|
|
unit = (wloop // total_range) - 1
|
|
else:
|
|
unit = (wloop // total_range)
|
|
if(unit == 0):
|
|
unit = 1
|
|
for i in range(wloop):
|
|
if i == wloop - 1:
|
|
wbuf = buf[i * buf_max : size]
|
|
else:
|
|
wbuf = buf[i * buf_max : (i + 1) * buf_max]
|
|
mem_block_write(addr + buf_max * i, wbuf, len(wbuf), cmd)
|
|
#if dbg:
|
|
#print('Write status [%d/%d]'%(i, wloop))
|
|
if( percent != (i // unit) ):
|
|
percent = i // unit
|
|
if( percent > total_range):
|
|
percent = total_range
|
|
#print("\r" + "In progress",percent,"%",end = "",flush=True)
|
|
if(total_range == 100):
|
|
print("\r" + '[%s%s] %d%%' % ('*'*percent,'-'*(total_range-percent), percent),end = "",flush=True)
|
|
else:
|
|
print("\r" + '[%s%s] %d' % ('*'*percent,'-'*(total_range-percent), percent) + "/" + '%d' %total_range, end = "",flush=True)
|
|
print("")
|
|
|
|
|
|
def mem_read(addr, size, cmd = CMD_MEM_READ, dbg = False):
|
|
buf_max = UART_BLOCK
|
|
rloop = 0
|
|
rlen = 0
|
|
percent = 0
|
|
unit = 0
|
|
total_range = 100
|
|
buf = []
|
|
#print("\n")
|
|
if (size % buf_max == 0):
|
|
rloop = (size // buf_max)
|
|
else:
|
|
rloop = (size // buf_max) + 1
|
|
|
|
if(rloop < total_range):
|
|
total_range = rloop - 1
|
|
if(total_range == 0):
|
|
total_range = 1
|
|
#print('%d = (%d // %d)' %((rloop // total_range), rloop, total_range))
|
|
if (rloop % total_range == 0):
|
|
unit = (rloop // total_range) - 1
|
|
else:
|
|
unit = (rloop // total_range)
|
|
if(unit == 0):
|
|
unit = 1
|
|
for i in range(rloop):
|
|
if i == rloop - 1:
|
|
rlen = size - i * buf_max
|
|
else:
|
|
rlen = buf_max
|
|
sbuf = mem_block_read(addr + buf_max * i, rlen, cmd)
|
|
buf.extend(sbuf)
|
|
#if dbg:
|
|
# print('Read status [%d/%d]'%(i, rloop))
|
|
if( percent != (i // unit) ):
|
|
percent = i // unit
|
|
if( percent > total_range):
|
|
percent = total_range
|
|
#print("\r" + "In progress",percent,"%",end = "",flush=True)
|
|
if(total_range == 100):
|
|
print("\r" + '[%s%s] %d%%' % ('*'*percent,'-'*(total_range-percent), percent),end = "",flush=True)
|
|
else:
|
|
print("\r" + '[%s%s] %d' % ('*'*percent,'-'*(total_range-percent), percent) + "/" + '%d' %total_range, end = "",flush=True)
|
|
print("")
|
|
return buf
|
|
|
|
|
|
|
|
def mem_dump(buf, displayLen = 0):
|
|
''' Memory dump '''
|
|
if buf == None or len(buf) == 0:
|
|
return
|
|
pLen = len(buf)
|
|
if(pLen > 1):
|
|
print('Lenght=%d' %pLen)
|
|
#Display index address
|
|
aStr = "====="
|
|
for i in range(16):
|
|
if displayLen == 0:
|
|
aStr += ' ' +('%02X' %i)
|
|
else:
|
|
aStr += ' ' +('%03X' %i)
|
|
print(aStr)
|
|
#Display data
|
|
aStr = ""
|
|
if(pLen % 16):
|
|
loop = (pLen // 16)+1
|
|
else:
|
|
loop = pLen // 16;
|
|
for i in range(loop):
|
|
aStr = ('%04X:' %(i * 16))
|
|
for j in range(16):
|
|
if((i*16+j) < pLen):
|
|
if displayLen==0:
|
|
aStr += ' ' +('%02X' %buf[i*16+j])
|
|
else:
|
|
aStr += ' ' +('%03X' %buf[i*16+j])
|
|
print(aStr)
|
|
|
|
|
|
def mem_block_read(addr, size, cmd = CMD_MEM_READ):
|
|
assert size <= UART_BLOCK, 'block buffer size must less than UART_BLOCK(default:0x100)'
|
|
data = pack_data(cmd, addr, size, None)
|
|
intf_write(data, MSG_HDR_SIZE)
|
|
# time.sleep(0.1)
|
|
buf = intf_read(size + MSG_HDR_SIZE)
|
|
if (len(buf) != (size + MSG_HDR_SIZE)):
|
|
print("****** Not receiving enough bytes ******")
|
|
else:
|
|
hdr, chk, cal_chk, cmd, addr, size, data = unpack_data (buf)
|
|
if (chk != cal_chk):
|
|
print('Error: intf data read fail. CRC error!')
|
|
return list(data)
|
|
return ''
|
|
|
|
|
|
def mem_block_write(addr, buf, size, cmd = CMD_MEM_WRITE):
|
|
assert size <= UART_BLOCK, 'block buffer size must less than UART_BLOCK(def:0x100)'
|
|
cnt = 1
|
|
RETRY_MAX = 3
|
|
while(cnt <= RETRY_MAX):
|
|
bdata = bytearray(buf)
|
|
data = pack_data(cmd, addr, size, bytearray(buf))
|
|
intf_write(data, len(data))
|
|
# time.sleep(0.1)
|
|
if msg_wait_ack():
|
|
break
|
|
else:
|
|
print('Write, wait ack fail, retry %d/%d' % (cnt, RETRY_MAX))
|
|
cnt = cnt + 1
|
|
time.sleep(0.5)
|
|
|
|
def mem_clr(addr = 0x60000000, len = 0):
|
|
''' DDR/SRAM/NPUSRAM memory clear '''
|
|
data = pack_data(CMD_MEM_CLR, addr, len, None)
|
|
intf_write(data, MSG_HDR_SIZE)
|
|
time.sleep(1)
|
|
cnt = 1
|
|
RETRY_MAX = 3
|
|
while(cnt <= RETRY_MAX):
|
|
if msg_wait_ack():
|
|
return
|
|
else:
|
|
print('ERR: Memory clear ack fail, retry %d/%d' % (cnt, RETRY_MAX))
|
|
cnt = cnt + 1
|
|
time.sleep(0.5)
|
|
|
|
def set_model(input_addr, input_len, output_addr, output_len, buf_addr, buf_len, cmd_addr, cmd_len, weight_addr, weight_len, setup_addr, setup_len):
|
|
buf = bytearray(struct.pack(CMD_DEMO_SET_TWELVE_FMT, input_addr, input_len, output_addr, output_len, buf_addr, buf_len, cmd_addr, cmd_len, weight_addr, weight_len, setup_addr, setup_len))
|
|
data = pack_data(CMD_DEMO_SET_MODEL, setup_addr, 48, bytearray(buf))
|
|
intf_write(data, len(data))
|
|
time.sleep(0.1)
|
|
if not msg_wait_ack():
|
|
print('set_model: wait ack fail')
|
|
|
|
def set_image(image_addr, image_len, col, row, ch, format):
|
|
buf = bytearray(struct.pack(CMD_DEMO_SET_SIX_FMT, image_addr, image_len, col, row, ch, format))
|
|
data = pack_data(CMD_DEMO_SET_IMAGES, 0, 24, bytearray(buf))
|
|
intf_write(data, len(data))
|
|
time.sleep(0.1)
|
|
if not msg_wait_ack():
|
|
print('set_images: wait ack fail')
|
|
|
|
def demo_run_once():
|
|
data = pack_data(CMD_DEMO_RUN_ONCE, 0, 0, None)
|
|
intf_write(data, len(data))
|
|
time.sleep(0.1)
|
|
if not msg_wait_ack():
|
|
print('set_images: wait ack fail')
|
|
|
|
def mem_wrc():
|
|
''' Memory write, read and compare '''
|
|
|
|
|
|
def pack_data(cmd, addr, size, buf):
|
|
crc_flag = 1 # always use crc
|
|
if (buf):
|
|
mSize = len(buf) + 8 # there is always address & size
|
|
else:
|
|
mSize = 8
|
|
# print("packet_data: address is %x" %(addr))
|
|
pSize = mSize + 4 # add MSG header itself
|
|
pSize = pSize | PKT_WR_FLAG # we are the host
|
|
# if CRC is used, append it at the end, so increase pSize by 4
|
|
if (crc_flag):
|
|
pSize = pSize + 4
|
|
pSize = pSize | PKT_CRC_FLAG
|
|
if (PACKET):
|
|
nbuf = bytearray(struct.pack(MSG_PKT_HDR_FMT, PKTX_PAMB, pSize, cmd, mSize, addr, size))
|
|
else:
|
|
nbuf = bytearray(struct.pack(MSG_HDR_FMT, PKTX_PAMB, 0x00, cmd, addr, size))
|
|
if (buf != None):
|
|
nbuf = nbuf + buf
|
|
# print("address sent -> %02x %02x %02x %02x" %(nbuf[11], nbuf[10], nbuf[9], nbuf[8]))
|
|
if (PACKET):
|
|
if (crc_flag):
|
|
crc16 = gen_crc16(nbuf[0:len(nbuf)], len(nbuf))
|
|
# print('crc is %x' %(crc16))
|
|
# print(bytes([crc16 & 0xFF]), bytes([crc16 >> 8]))
|
|
nbuf = nbuf + bytes([crc16 & 0xFF]) + bytes([crc16 >> 8]) + bytes([0]) + bytes([0])
|
|
else:
|
|
crc16 = gen_crc16(nbuf[4:len(nbuf)], len(nbuf) - 4)
|
|
#print('get CRC16 %X' %(crc16))
|
|
nbuf[2] = crc16 & 0xFF
|
|
nbuf[3] = crc16 >> 8
|
|
return nbuf
|
|
|
|
def unpack_data(buf):
|
|
#assert len(buf) < 16, "Packet header error"
|
|
chk = 0
|
|
cal_chk = 0
|
|
hdrmsg = buf[0:16]
|
|
if (PACKET):
|
|
preamble, pSize, cmd, mSize, addr, size = struct.unpack(MSG_PKT_HDR_FMT, hdrmsg)
|
|
if (pSize & PKT_CRC_FLAG):
|
|
crc_bytes = intf_read(4)
|
|
chk = int.from_bytes(crc_bytes, byteorder='little', signed=False)
|
|
cal_chk = gen_crc16(buf[0:len(buf)], len(buf))
|
|
if (mSize < 8):
|
|
return (preamble, chk, cal_chk, cmd, addr, size, None)
|
|
else:
|
|
return (preamble, chk, cal_chk, cmd, addr, size, buf[16 : len(buf)])
|
|
else:
|
|
hdr, chk, cmd, addr, size = struct.unpack(MSG_HDR_FMT, hdrmsg)
|
|
cal_chk = gen_crc16(buf[4:len(buf)], len(buf) - 4)
|
|
if (len(buf) < 16):
|
|
return (hdr, chk, cal_chk, cmd, addr, size, None)
|
|
else:
|
|
return (hdr, chk, cal_chk, cmd, addr, size, buf[16 : len(buf)])
|
|
|
|
|
|
##########################################################################
|
|
# Msg Command
|
|
##########################################################################
|
|
def msg_wait_ack():
|
|
buf = intf_read(MSG_HDR_SIZE)
|
|
hdr, chk, cal_chk, cmd, addr, size, data = unpack_data (buf)
|
|
if (PACKET == 0):
|
|
if cmd == CMD_ACK:
|
|
return True
|
|
else:
|
|
return False
|
|
# else we are using packet layer
|
|
if (cmd & 0x8000): # check is it a proper response code?
|
|
rsp = cmd & 0x0FFF # get the original command
|
|
if (rsp == CMD_ACK):
|
|
return True
|
|
elif (rsp == CMD_MEM_READ):
|
|
return True
|
|
elif (rsp == CMD_MEM_WRITE):
|
|
return True
|
|
elif (rsp == CMD_MEM_CLR):
|
|
return True
|
|
elif (rsp == CMD_TEST_ECHO):
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
def msg_wait_result(resp, len):
|
|
buf = intf_read(MSG_HDR_SIZE + len)
|
|
hdr, chk, cal_chk, cmd, addr, size, data = unpack_data (buf)
|
|
if (PACKET):
|
|
expected_response = resp | 0x8000
|
|
else:
|
|
expected_response = resp
|
|
if cmd == expected_response:
|
|
print('*** result rcvd: %x [%d]' %(cmd, len))
|
|
else:
|
|
print('*** result rcvd: %x NOT %x' %(cmd, resp))
|
|
return list(data)
|
|
|
|
|
|
def msg_cmd_general(cmd, wait_time):
|
|
''' send run_once command '''
|
|
data = pack_data(cmd, 0, 0, None)
|
|
intf_write(data, MSG_HDR_SIZE)
|
|
time.sleep(wait_time)
|
|
cnt = 1
|
|
RETRY_MAX = 3
|
|
while(cnt <= RETRY_MAX):
|
|
if msg_wait_ack():
|
|
return
|
|
else:
|
|
print('ERR: CMD ACK FAIL, retry %d/%d' % (cnt, RETRY_MAX))
|
|
cnt = cnt + 1
|
|
time.sleep(0.5)
|
|
|
|
##########################################################################
|
|
# Xmodem Application
|
|
##########################################################################
|
|
def send_at_command(command):
|
|
ser.write(bytes(command+"\r", encoding='ascii'))
|
|
|
|
def getc(size, timeout=1):
|
|
return ser.read(size)
|
|
|
|
def putc_user(data, timeout=1):
|
|
return ser.write(data)
|
|
|
|
def erase_readline():
|
|
response = ser.readline().strip()
|
|
#print(response)
|
|
#if(response == b'ERASE'):
|
|
if b'ERA' in response:
|
|
return True
|
|
else:
|
|
return False
|
|
def xmodem_send_bin():
|
|
count = 0
|
|
timeout = 10
|
|
print("Please press reset button!!")
|
|
while(timeout):
|
|
timeout -= 1
|
|
response = ser.readline().strip()
|
|
if(response == b''):
|
|
print("Please press reset button!!")
|
|
if(response == b'\x00'):
|
|
print("Reset done!!")
|
|
elif(response != b''):
|
|
count += 1
|
|
print(response)
|
|
if(count >= 3):
|
|
break
|
|
if(timeout == 0):
|
|
return False
|
|
cmd="2"
|
|
print("xmodem_send %s start" %(cmd), end="")
|
|
send_at_command(cmd)
|
|
print("\rxmodem_send %s done " %(cmd))
|
|
time.sleep(1)
|
|
|
|
ser.flushInput()
|
|
image_file = './fw_minions_1218.bin'
|
|
print("xmodem_send bin file start")
|
|
stream = open(image_file, 'rb')
|
|
|
|
m = xmodem.XMODEM(getc, putc_user)
|
|
|
|
#print("xmodem_sending ... ")
|
|
ret = m.send(stream)
|
|
|
|
if(ret):
|
|
print("xmodem_send bin file done!!")
|
|
else:
|
|
print("xmodem_send bin file FAIL!!!! Please reset the target and start over!!!!")
|
|
return ret
|
|
|
|
time.sleep(1)
|
|
|
|
ser.flush()
|
|
ser.flushInput()
|
|
ser.close()
|
|
time.sleep(0.2)
|
|
print("change baudrate to 921600... ", end="")
|
|
setup_intf('uart', ser, UART_COM_ID, BAUDRATE_HI, timeout=5)
|
|
ser.flushInput()
|
|
ser.flushOutput()
|
|
print("\rchange baudrate to 921600 done")
|
|
time.sleep(0.2)
|
|
return ret
|
|
|
|
##########################################################################
|
|
# NPU control
|
|
##########################################################################
|
|
def npu_reset():
|
|
''' 0xC238_004C[2], NPU reset control register. set 0 : reset, 1: release '''
|
|
val = reg_read(0xC238004C)
|
|
reg_write(0xC238004C, val & 0xFFFFFFFB)
|
|
reg_write(0xC238004C, val | 0x04)
|
|
|
|
|
|
|
|
|
|
##########################################################################
|
|
# Utils
|
|
##########################################################################
|
|
def gen_crc16(buf, size):
|
|
crc16_func = crcmod.predefined.mkPredefinedCrcFun('crc-16')
|
|
# the size is implied by buf
|
|
if (len(buf) != size):
|
|
print("crc error, buffer is", len(buf), "size is", size)
|
|
out = crc16_func(buf)
|
|
return out
|
|
|
|
|
|
|
|
##########################################################################
|
|
# File I/O
|
|
##########################################################################
|
|
def file_read_binary(src):
|
|
''' Load depth file '''
|
|
with open(src, "rb") as f:
|
|
data = f.read()
|
|
return data
|
|
|
|
def file_cmp(file1, file2):
|
|
l1 = l2 = True
|
|
with open(file1, 'r') as f1, open(file2, 'r') as f2:
|
|
while l1 and l2:
|
|
l1 = f1.readline()
|
|
l2 = f2.readline()
|
|
if l1 != l2:
|
|
return False
|
|
return True
|
|
|
|
def file_cmp_binary(file1, file2):
|
|
data1 = file_read_binary(file1)
|
|
data2 = file_read_binary(file2)
|
|
return (data1 == data2)
|
|
|
|
##########################################################################
|
|
# Debug
|
|
##########################################################################
|
|
def DBGPRINT(format, *args):
|
|
""" print debug message """
|
|
if __DEBUG__ == 1:
|
|
print (format % args)
|