2025-12-17 15:55:25 +08:00

875 lines
27 KiB
Python

import os
import sys
import time
import serial
import pdb
import words
import struct
import logging
import json
import crcmod
#import usb.core
#import usb.util
#import usb.backend.libusb1
from optparse import OptionParser
import serial
import xmodem
import pyprind
from binascii import b2a_hex, a2b_hex
__DEBUG__ = 1
# USB Vendor ID / Product ID
VENDOR_ID = 0x0d7d
PRODUCT_ID = 0x0100
USB_MAX_BYTES = 8192
DATA_BLOCK_SIZE = 8192
ACK_PACKET_SIZE = 8
UART_MAX_BYTES = 4096
INTF_USB = 0
INTF_UART = 1
INTF_I2C = 2
INTF_SPI = 3
#################### PACKET LAYER ENABLE/DISABLE ###################
PACKET = 0 # 1-> enable packet layer, 0 -> disable packet layer
IMAGE_TRF = 0 # 0-> skip transfer, 1-> use USB, 2-> use UART
HIGH_BAUD = 1 # 1 -> HAPS or EVB, 0 -> ZC706 or Host (YG's App)
KNERON_UART = "/dev/kneron_uart"
UART_COM_ID = 7 # COM5
UART_BLOCK = 0x1000
act_intf = INTF_UART
if HIGH_BAUD == 1:
BAUDRATE_HI = 921600
BAUDRATE = 115200
else:
BAUDRATE_HI = 115200
BAUDRATE = 115200
# dut host interface (mozart)
dut_intf = INTF_I2C
ser = None
CMD_NONE = 0
CMD_MEM_READ = 1
CMD_MEM_WRITE= 2
CMD_DATA = 3
CMD_ACK = 4
CMD_STS_CLR = 5
CMD_MEM_CLR = 6
CMD_CHK_ERR = 7
CMD_TEST_ECHO = 8
CMD_USB_WRITE = 9
CMD_DEMO_SET_MODEL = 0x10
CMD_DEMO_SET_IMAGES = 0x11
CMD_DEMO_RUN_ONCE = 0x12
CMD_DEMO_RUN = 0x13
CMD_DEMO_STOP = 0x14
CMD_DEMO_RESULT_LEN = 0x15
CMD_DEMO_RESULT = 0x16
CMD_DEMO_FD = 0x17 # for development, remove later
CMD_DEMO_LM = 0x18 # for development
CMD_DEMO_LD = 0x19 # for development
CMD_DEMO_FR = 0x1A # for development
CMD_DEMO_INFERENCE = 100
CMD_DEMO_REG1 = 101
CMD_DEMO_REG2 = 102
CMD_DEMO_REG3 = 103
CMD_DEMO_REG4 = 104
CMD_DEMO_REG5 = 105
CMD_DEMO_ADDUSER = 106
CMD_DEMO_DELUSER = 107
CMD_FLASH_INFO = 0x1000
CMD_FLASH_CHIP_ERASE = 0x1001
CMD_FLASH_SECTOR_ERASE = 0x1002
CMD_FLASH_READ = 0x1003
CMD_FLASH_WRITE = 0x1004
CMD_SCPU_RUN = 0x1005
# Messge HDR formart
MSG_HDR_FMT = "<HHIII" # Header(2b), checksum(2b), Cmd(4b), Addr(4b), Len(4b)
MSG_PKT_HDR_FMT = "<HHHHII" # Preamble(2b), Psize(2b), Cmd(2b), Csize(2b), Addr(4b), Len(4b)
PKTX_PAMB = 0xA583 # Packet TX Preamble
PKRX_PAMB = 0x8A35 # Packet RX Preamble
MSG_HDR_SIZE = 16 # 2+2+2+2+4+4
MSG_DAT_IDX = 16 # message body will be stored from offset 16
PKT_WR_FLAG = 0x8000
PKT_CRC_FLAG = 0x4000
# SET_MODEL and SET_IMAGES format
CMD_DEMO_SET_TWELVE_FMT = "<IIIIIIIIIIII" # arg1(4b), arg2(4b), arg3(4b), arg4(4b), arg5(4b), arg6(4b), arg7(4b), arg8(4b), arg9(4b), arg10(4b), arg11(4b), arg12(4b)
CMD_DEMO_SET_EIGHT_FMT = "<IIIIIIII" # arg1(4b), arg2(4b), arg3(4b), arg4(4b), arg5(4b), arg6(4b), arg7(4b), arg8(4b)
CMD_DEMO_SET_SIX_FMT = "<IIIIII" # arg1(4b), arg2(4b), arg3(4b), arg4(4b), arg5(4b), arg6(4b)
CMD_DEMO_SET_TWO_FMT = "<II" # arg1(4b), arg2(4b)
##########################################################################
# Device control
##########################################################################
def LOG(__LEVEL__, fmt):
print(fmt)
##########################################################################
# Device control
##########################################################################
def dev_init(baudrate = 115200):
if act_intf == INTF_UART:
''' Uart interface '''
global ser
ser = serial.Serial()
try:
setup_intf('uart', ser, UART_COM_ID, baudrate, timeout=5)
ser.flushInput()
ser.flushOutput()
except:
DBGPRINT('Uart port open fail')
sys.exit(-1)
''' USB interface '''
global kneron_dev
global ep_out
global ep_in
if(IMAGE_TRF == 1):
backend = usb.backend.libusb1.get_backend(find_library=lambda x: "C:\Windows\System32\libusb0.dll")
kneron_dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID, backend=backend)
if kneron_dev is None:
sys.exit(-1)
#print('kneron usb is open...')
kneron_dev.set_configuration()
cfg = kneron_dev.get_active_configuration()
#print(cfg)
interface = cfg[(0,0)]
#print(interface)
interface_number = cfg[(0, 0)].bInterfaceNumber
#print(interface_number)
alternate_setting = usb.control.get_interface(kneron_dev, interface_number)
#print(alternate_setting)
usb_interface = usb.util.find_descriptor(cfg, bInterfaceNumber=interface_number, bAlternateSetting=alternate_setting)
#print(usb_interface)
ep_out = usb.util.find_descriptor(usb_interface, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT)
if ep_out is None:
sys.exit(-1)
#print(ep_out)
ep_in = usb.util.find_descriptor(usb_interface, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN)
if ep_in is None:
sys.exit(-1)
def dev_close():
if act_intf == INTF_UART:
global ser
intf_close()
else:
''' USB interface '''
#TODO: Wait for mozart usb
def dev_flush():
global ser
ser.flushInput()
ser.flushOutput()
##########################################################################
# Setpu active interface
##########################################################################
def setup_intf(interface, dev, com = 0, baudrate = 115200, timeout=5.0):
global intf_read
global intf_write
global intf_open
global intf_close
act_intf = words.__WORDS__[interface]
if interface == 'usb':
intf_read = usb_read
intf_write = usb_write
intf_open = usb_open
intf_clsoe = usb_close
elif interface == 'uart':
intf_read = uart_read
intf_write = uart_write
intf_open = uart_open
intf_close = uart_close
uart_open(dev, com, baudrate, timeout)
else:
print("Error device interface")
##########################################################################
# Uart interface
##########################################################################
def uart_open(ser, com, baudrate, timeout):
if os.name == 'nt':
ser.port = 'COM' + str(com)
else:
if os.path.islink(KNERON_UART):
ser.port = KNERON_UART
else:
ser.port = '/dev/ttyUSB' + str(com)
ser.timeout = timeout
ser.baudrate = baudrate
ser.open()
def uart_write(buf, size):
# print('->uart write: %d' %size)
# mem_dump(buf)
global ser
ser.write(buf)
def uart_read(size):
global ser
# print('->uart read: %d' %size)
try:
buf = bytearray(ser.read(size))
except:
print('Uart read fail')
# mem_dump(buf)
if (len(buf) != size):
print('Uart read fail, expected=%x, read=%x' %(size, len(buf)))
return buf
def uart_close():
global ser
ser.close()
##########################################################################
# USB interface
##########################################################################
def check_generic_ack(buf):
preamble = buf[0] + buf[1] << 8
pkt_size = buf[2] + buf[3] << 8
cmd = buf[4] + buf[5] >> 8
size = buf[6] + buf[7] >> 8
if (preamble != PKRX_PAMB):
return False
if (pkt_size & 0x0FFF != 0):
return False
if (size != 0): # must be zero msg payload
return False
if (cmd == CMD_ACK):
return True
else:
return False
def RecDataACKFromUSB():
# print("RecDataACKFromUSB")
ackusb = bytes(b"none")
if (ep_in is None):
print("ep_in is not open")
return
temp = ep_in.read(ACK_PACKET_SIZE, timeout=1000)
ackusb = bytes(temp)
# print(ackusb)
while ((ackusb[0] != 0x35) or (ackusb[1] != 0x8A)): # look for preamble
temp = ep_in.read(ACK_PACKET_SIZE, timeout=1000)
print("ACK Retry", temp)
ackusb = bytes(temp)
# print(ackusb[0:4])
# print(len(ackusb))
# print(".......")
return 0
def usb_write_data(buf, length, offset):
if(ep_out is None):
print("ep_out is not open")
return
num = (length - offset) // USB_MAX_BYTES
leftover = (length - offset) % USB_MAX_BYTES
for x in range(num):
# print(buf[offset:offset+USB_MAX_BYTES])
RecDataACKFromUSB() # wait for receiver ready
ep_out.write(buf[offset:offset+USB_MAX_BYTES])
offset += USB_MAX_BYTES
# print("+", end='', flush=True)
if (leftover != 0):
# print(buf[offset:])
RecDataACKFromUSB() # wait for acknowledge first
ep_out.write(buf[offset:])
return
##########################################################################
# UART interface Raw Data Transfer
##########################################################################
def uart_write_data(buf, length, offset):
num = (length - offset) // UART_MAX_BYTES
leftover = (length - offset) % UART_MAX_BYTES
for x in range(num):
RecDataACKFromUART() # wait for receiver ready
bdata = bytearray(buf[offset:offset+UART_MAX_BYTES])
intf_write(bdata, len(bdata))
offset += UART_MAX_BYTES
if (leftover != 0):
RecDataACKFromUART() # wait for acknowledge first
bdata = bytearray(buf[offset:offset+leftover])
intf_write(bdata, len(bdata))
return
def RecDataACKFromUART():
ackuart = bytes(b"none")
temp = intf_read(ACK_PACKET_SIZE)
ackuart = bytes(temp)
# print(ackuart)
# look for the Data ACK
while ((ackuart[0] != 0x35) or (ackuart[1] != 0x8A) or (ackuart[2] != 4)):
intf_read(ACK_PACKET_SIZE)
print("ACK Retry", temp)
ackuart = bytes(temp)
return
##########################################################################
# AND / OR
##########################################################################
def bit_set():
''' '''
def bit_clear():
''' '''
##########################################################################
# Register read / write
##########################################################################
def reg_read(addr):
buf = mem_block_read(addr, 4)
val = (buf[3] << 24) + (buf[2] << 16) + (buf[1] << 8) + buf[0]
return val
def reg_write(addr, value):
buf = word2blist(value)
mem_block_write(addr, buf, len(buf))
def word2blist(value):
buf = []
buf.append((value >> 0) & 0xFF)
buf.append((value >> 8) & 0xFF)
buf.append((value >> 16) & 0xFF)
buf.append((value >> 24) & 0xFF)
return buf
def wlist2blist(wlist):
buf = []
for value in wlist:
buf.append((value >> 0) & 0xFF)
buf.append((value >> 8) & 0xFF)
buf.append((value >> 16) & 0xFF)
buf.append((value >> 24) & 0xFF)
return buf
def blist2wlist(blist):
buf = []
for i in range(len(blist) // 4):
#value = (blist[i * 4 + 0] << 24) + (blist[i * 4 + 1] << 16) + (blist[i * 4 + 2] << 8) + (blist[i * 4 + 3])
value = (blist[i * 4 + 0] ) + (blist[i * 4 + 1] << 8) + (blist[i * 4 + 2] << 16) + (blist[i * 4 + 3] << 24)
buf.append(value)
return buf
##########################################################################
# Flash READ/WRITE/COMPARE
##########################################################################
def send_scpu_run(addr):
print('Jump to SCPU and Run!!')
data = pack_data(CMD_SCPU_RUN, addr, 0, None)
intf_write(data, MSG_HDR_SIZE)
time.sleep(0.1)
def flash_read(addr, size):
return(mem_read(addr, size, CMD_FLASH_READ, True))
def flash_write(addr , buf, size, buf_max = UART_BLOCK):
mem_write(addr, buf, size, CMD_FLASH_WRITE, buf_max, True)
def flash_chip_erase():
msg_cmd_general(CMD_FLASH_CHIP_ERASE, 0.2)
def flash_sector_erase(addr):
data = pack_data(CMD_FLASH_SECTOR_ERASE, addr, 1, None)
intf_write(data, MSG_HDR_SIZE)
time.sleep(0.1)
if not msg_wait_ack():
print('Flase erase: wait ack fail')
def showdebuginfo():
msg_cmd_general(CMD_FLASH_INFO, 0.2)
def flash_sector_erase_64K(addr):
data = pack_data(CMD_FLASH_SECTOR_ERASE_64K, addr, 1, None)
intf_write(data, MSG_HDR_SIZE)
time.sleep(0.1)
if not msg_wait_ack():
print('Flase erase: wait ack fail')
##########################################################################
# USB MEMORY WRITE
##########################################################################
def usb_mem_write(addr, buf, size):
print('---------------- USB -----------------')
print('Start ADDR=0x%X usb memory Write' %addr)
data = pack_data(CMD_USB_WRITE, addr, size, None, 1) #use crc
intf_write(data, len(data))
#### start usb upload #####
usb_write_data(buf, size, 0) # start at the beginning of file
# now get the response
if (msg_wait_ack(CMD_USB_WRITE) != True):
print('USB Write, wait ack fail')
return ''
##########################################################################
# MEMORY READ/WRITE/COMPARE
##########################################################################
def mem_write(addr , buf, size, cmd = CMD_MEM_WRITE, buf_max = UART_BLOCK, dbg = False):
print('--------------------------------------')
print('Start ADDR=0x%X memory R/W test' %addr)
wloop = 0
percent = 0
unit = 0
total_range = 100
#print("\n")
if (size % buf_max == 0):
wloop = (size // buf_max)
else:
wloop = (size // buf_max) + 1
if(wloop < total_range):
total_range = wloop - 1
if(total_range == 0):
total_range = 1
#print('%d = (%d // %d)' %((wloop // total_range), wloop, total_range))
if (wloop % total_range == 0):
unit = (wloop // total_range) - 1
else:
unit = (wloop // total_range)
if(unit == 0):
unit = 1
for i in range(wloop):
if i == wloop - 1:
wbuf = buf[i * buf_max : size]
else:
wbuf = buf[i * buf_max : (i + 1) * buf_max]
mem_block_write(addr + buf_max * i, wbuf, len(wbuf), cmd)
#if dbg:
#print('Write status [%d/%d]'%(i, wloop))
if( percent != (i // unit) ):
percent = i // unit
if( percent > total_range):
percent = total_range
#print("\r" + "In progress",percent,"%",end = "",flush=True)
if(total_range == 100):
print("\r" + '[%s%s] %d%%' % ('*'*percent,'-'*(total_range-percent), percent),end = "",flush=True)
else:
print("\r" + '[%s%s] %d' % ('*'*percent,'-'*(total_range-percent), percent) + "/" + '%d' %total_range, end = "",flush=True)
print("")
def mem_read(addr, size, cmd = CMD_MEM_READ, dbg = False):
buf_max = UART_BLOCK
rloop = 0
rlen = 0
percent = 0
unit = 0
total_range = 100
buf = []
#print("\n")
if (size % buf_max == 0):
rloop = (size // buf_max)
else:
rloop = (size // buf_max) + 1
if(rloop < total_range):
total_range = rloop - 1
if(total_range == 0):
total_range = 1
#print('%d = (%d // %d)' %((rloop // total_range), rloop, total_range))
if (rloop % total_range == 0):
unit = (rloop // total_range) - 1
else:
unit = (rloop // total_range)
if(unit == 0):
unit = 1
for i in range(rloop):
if i == rloop - 1:
rlen = size - i * buf_max
else:
rlen = buf_max
sbuf = mem_block_read(addr + buf_max * i, rlen, cmd)
buf.extend(sbuf)
#if dbg:
# print('Read status [%d/%d]'%(i, rloop))
if( percent != (i // unit) ):
percent = i // unit
if( percent > total_range):
percent = total_range
#print("\r" + "In progress",percent,"%",end = "",flush=True)
if(total_range == 100):
print("\r" + '[%s%s] %d%%' % ('*'*percent,'-'*(total_range-percent), percent),end = "",flush=True)
else:
print("\r" + '[%s%s] %d' % ('*'*percent,'-'*(total_range-percent), percent) + "/" + '%d' %total_range, end = "",flush=True)
print("")
return buf
def mem_dump(buf, displayLen = 0):
''' Memory dump '''
if buf == None or len(buf) == 0:
return
pLen = len(buf)
if(pLen > 1):
print('Lenght=%d' %pLen)
#Display index address
aStr = "====="
for i in range(16):
if displayLen == 0:
aStr += ' ' +('%02X' %i)
else:
aStr += ' ' +('%03X' %i)
print(aStr)
#Display data
aStr = ""
if(pLen % 16):
loop = (pLen // 16)+1
else:
loop = pLen // 16;
for i in range(loop):
aStr = ('%04X:' %(i * 16))
for j in range(16):
if((i*16+j) < pLen):
if displayLen==0:
aStr += ' ' +('%02X' %buf[i*16+j])
else:
aStr += ' ' +('%03X' %buf[i*16+j])
print(aStr)
def mem_block_read(addr, size, cmd = CMD_MEM_READ):
assert size <= UART_BLOCK, 'block buffer size must less than UART_BLOCK(default:0x100)'
data = pack_data(cmd, addr, size, None)
intf_write(data, MSG_HDR_SIZE)
# time.sleep(0.1)
buf = intf_read(size + MSG_HDR_SIZE)
if (len(buf) != (size + MSG_HDR_SIZE)):
print("****** Not receiving enough bytes ******")
else:
hdr, chk, cal_chk, cmd, addr, size, data = unpack_data (buf)
if (chk != cal_chk):
print('Error: intf data read fail. CRC error!')
return list(data)
return ''
def mem_block_write(addr, buf, size, cmd = CMD_MEM_WRITE):
assert size <= UART_BLOCK, 'block buffer size must less than UART_BLOCK(def:0x100)'
cnt = 1
RETRY_MAX = 3
while(cnt <= RETRY_MAX):
bdata = bytearray(buf)
data = pack_data(cmd, addr, size, bytearray(buf))
intf_write(data, len(data))
# time.sleep(0.1)
if msg_wait_ack():
break
else:
print('Write, wait ack fail, retry %d/%d' % (cnt, RETRY_MAX))
cnt = cnt + 1
time.sleep(0.5)
def mem_clr(addr = 0x60000000, len = 0):
''' DDR/SRAM/NPUSRAM memory clear '''
data = pack_data(CMD_MEM_CLR, addr, len, None)
intf_write(data, MSG_HDR_SIZE)
time.sleep(1)
cnt = 1
RETRY_MAX = 3
while(cnt <= RETRY_MAX):
if msg_wait_ack():
return
else:
print('ERR: Memory clear ack fail, retry %d/%d' % (cnt, RETRY_MAX))
cnt = cnt + 1
time.sleep(0.5)
def set_model(input_addr, input_len, output_addr, output_len, buf_addr, buf_len, cmd_addr, cmd_len, weight_addr, weight_len, setup_addr, setup_len):
buf = bytearray(struct.pack(CMD_DEMO_SET_TWELVE_FMT, input_addr, input_len, output_addr, output_len, buf_addr, buf_len, cmd_addr, cmd_len, weight_addr, weight_len, setup_addr, setup_len))
data = pack_data(CMD_DEMO_SET_MODEL, setup_addr, 48, bytearray(buf))
intf_write(data, len(data))
time.sleep(0.1)
if not msg_wait_ack():
print('set_model: wait ack fail')
def set_image(image_addr, image_len, col, row, ch, format):
buf = bytearray(struct.pack(CMD_DEMO_SET_SIX_FMT, image_addr, image_len, col, row, ch, format))
data = pack_data(CMD_DEMO_SET_IMAGES, 0, 24, bytearray(buf))
intf_write(data, len(data))
time.sleep(0.1)
if not msg_wait_ack():
print('set_images: wait ack fail')
def demo_run_once():
data = pack_data(CMD_DEMO_RUN_ONCE, 0, 0, None)
intf_write(data, len(data))
time.sleep(0.1)
if not msg_wait_ack():
print('set_images: wait ack fail')
def mem_wrc():
''' Memory write, read and compare '''
def pack_data(cmd, addr, size, buf):
crc_flag = 1 # always use crc
if (buf):
mSize = len(buf) + 8 # there is always address & size
else:
mSize = 8
# print("packet_data: address is %x" %(addr))
pSize = mSize + 4 # add MSG header itself
pSize = pSize | PKT_WR_FLAG # we are the host
# if CRC is used, append it at the end, so increase pSize by 4
if (crc_flag):
pSize = pSize + 4
pSize = pSize | PKT_CRC_FLAG
if (PACKET):
nbuf = bytearray(struct.pack(MSG_PKT_HDR_FMT, PKTX_PAMB, pSize, cmd, mSize, addr, size))
else:
nbuf = bytearray(struct.pack(MSG_HDR_FMT, PKTX_PAMB, 0x00, cmd, addr, size))
if (buf != None):
nbuf = nbuf + buf
# print("address sent -> %02x %02x %02x %02x" %(nbuf[11], nbuf[10], nbuf[9], nbuf[8]))
if (PACKET):
if (crc_flag):
crc16 = gen_crc16(nbuf[0:len(nbuf)], len(nbuf))
# print('crc is %x' %(crc16))
# print(bytes([crc16 & 0xFF]), bytes([crc16 >> 8]))
nbuf = nbuf + bytes([crc16 & 0xFF]) + bytes([crc16 >> 8]) + bytes([0]) + bytes([0])
else:
crc16 = gen_crc16(nbuf[4:len(nbuf)], len(nbuf) - 4)
#print('get CRC16 %X' %(crc16))
nbuf[2] = crc16 & 0xFF
nbuf[3] = crc16 >> 8
return nbuf
def unpack_data(buf):
#assert len(buf) < 16, "Packet header error"
chk = 0
cal_chk = 0
hdrmsg = buf[0:16]
if (PACKET):
preamble, pSize, cmd, mSize, addr, size = struct.unpack(MSG_PKT_HDR_FMT, hdrmsg)
if (pSize & PKT_CRC_FLAG):
crc_bytes = intf_read(4)
chk = int.from_bytes(crc_bytes, byteorder='little', signed=False)
cal_chk = gen_crc16(buf[0:len(buf)], len(buf))
if (mSize < 8):
return (preamble, chk, cal_chk, cmd, addr, size, None)
else:
return (preamble, chk, cal_chk, cmd, addr, size, buf[16 : len(buf)])
else:
hdr, chk, cmd, addr, size = struct.unpack(MSG_HDR_FMT, hdrmsg)
cal_chk = gen_crc16(buf[4:len(buf)], len(buf) - 4)
if (len(buf) < 16):
return (hdr, chk, cal_chk, cmd, addr, size, None)
else:
return (hdr, chk, cal_chk, cmd, addr, size, buf[16 : len(buf)])
##########################################################################
# Msg Command
##########################################################################
def msg_wait_ack():
buf = intf_read(MSG_HDR_SIZE)
hdr, chk, cal_chk, cmd, addr, size, data = unpack_data (buf)
if (PACKET == 0):
if cmd == CMD_ACK:
return True
else:
return False
# else we are using packet layer
if (cmd & 0x8000): # check is it a proper response code?
rsp = cmd & 0x0FFF # get the original command
if (rsp == CMD_ACK):
return True
elif (rsp == CMD_MEM_READ):
return True
elif (rsp == CMD_MEM_WRITE):
return True
elif (rsp == CMD_MEM_CLR):
return True
elif (rsp == CMD_TEST_ECHO):
return True
else:
return False
else:
return False
def msg_wait_result(resp, len):
buf = intf_read(MSG_HDR_SIZE + len)
hdr, chk, cal_chk, cmd, addr, size, data = unpack_data (buf)
if (PACKET):
expected_response = resp | 0x8000
else:
expected_response = resp
if cmd == expected_response:
print('*** result rcvd: %x [%d]' %(cmd, len))
else:
print('*** result rcvd: %x NOT %x' %(cmd, resp))
return list(data)
def msg_cmd_general(cmd, wait_time):
''' send run_once command '''
data = pack_data(cmd, 0, 0, None)
intf_write(data, MSG_HDR_SIZE)
time.sleep(wait_time)
cnt = 1
RETRY_MAX = 3
while(cnt <= RETRY_MAX):
if msg_wait_ack():
return
else:
print('ERR: CMD ACK FAIL, retry %d/%d' % (cnt, RETRY_MAX))
cnt = cnt + 1
time.sleep(0.5)
##########################################################################
# Xmodem Application
##########################################################################
def send_at_command(command):
ser.write(bytes(command+"\r", encoding='ascii'))
def getc(size, timeout=1):
return ser.read(size)
def putc_user(data, timeout=1):
return ser.write(data)
def erase_readline():
response = ser.readline().strip()
#print(response)
#if(response == b'ERASE'):
if b'ERA' in response:
return True
else:
return False
def xmodem_send_bin():
count = 0
timeout = 10
print("Please press reset button!!")
while(timeout):
timeout -= 1
response = ser.readline().strip()
if(response == b''):
print("Please press reset button!!")
if(response == b'\x00'):
print("Reset done!!")
elif(response != b''):
count += 1
print(response)
if(count >= 3):
break
if(timeout == 0):
return False
cmd="2"
print("xmodem_send %s start" %(cmd), end="")
send_at_command(cmd)
print("\rxmodem_send %s done " %(cmd))
time.sleep(1)
ser.flushInput()
image_file = './fw_minions_1218.bin'
print("xmodem_send bin file start")
stream = open(image_file, 'rb')
m = xmodem.XMODEM(getc, putc_user)
#print("xmodem_sending ... ")
ret = m.send(stream)
if(ret):
print("xmodem_send bin file done!!")
else:
print("xmodem_send bin file FAIL!!!! Please reset the target and start over!!!!")
return ret
time.sleep(1)
ser.flush()
ser.flushInput()
ser.close()
time.sleep(0.2)
print("change baudrate to 921600... ", end="")
setup_intf('uart', ser, UART_COM_ID, BAUDRATE_HI, timeout=5)
ser.flushInput()
ser.flushOutput()
print("\rchange baudrate to 921600 done")
time.sleep(0.2)
return ret
##########################################################################
# NPU control
##########################################################################
def npu_reset():
''' 0xC238_004C[2], NPU reset control register. set 0 : reset, 1: release '''
val = reg_read(0xC238004C)
reg_write(0xC238004C, val & 0xFFFFFFFB)
reg_write(0xC238004C, val | 0x04)
##########################################################################
# Utils
##########################################################################
def gen_crc16(buf, size):
crc16_func = crcmod.predefined.mkPredefinedCrcFun('crc-16')
# the size is implied by buf
if (len(buf) != size):
print("crc error, buffer is", len(buf), "size is", size)
out = crc16_func(buf)
return out
##########################################################################
# File I/O
##########################################################################
def file_read_binary(src):
''' Load depth file '''
with open(src, "rb") as f:
data = f.read()
return data
def file_cmp(file1, file2):
l1 = l2 = True
with open(file1, 'r') as f1, open(file2, 'r') as f2:
while l1 and l2:
l1 = f1.readline()
l2 = f2.readline()
if l1 != l2:
return False
return True
def file_cmp_binary(file1, file2):
data1 = file_read_binary(file1)
data2 = file_read_binary(file2)
return (data1 == data2)
##########################################################################
# Debug
##########################################################################
def DBGPRINT(format, *args):
""" print debug message """
if __DEBUG__ == 1:
print (format % args)