import os import sys import time import serial import pdb import words import struct import logging import json import crcmod #import usb.core #import usb.util #import usb.backend.libusb1 from optparse import OptionParser import serial import xmodem import pyprind from binascii import b2a_hex, a2b_hex __DEBUG__ = 1 # USB Vendor ID / Product ID VENDOR_ID = 0x0d7d PRODUCT_ID = 0x0100 USB_MAX_BYTES = 8192 DATA_BLOCK_SIZE = 8192 ACK_PACKET_SIZE = 8 UART_MAX_BYTES = 4096 INTF_USB = 0 INTF_UART = 1 INTF_I2C = 2 INTF_SPI = 3 #################### PACKET LAYER ENABLE/DISABLE ################### PACKET = 0 # 1-> enable packet layer, 0 -> disable packet layer IMAGE_TRF = 0 # 0-> skip transfer, 1-> use USB, 2-> use UART HIGH_BAUD = 1 # 1 -> HAPS or EVB, 0 -> ZC706 or Host (YG's App) KNERON_UART = "/dev/kneron_uart" UART_COM_ID = 7 # COM5 UART_BLOCK = 0x1000 act_intf = INTF_UART if HIGH_BAUD == 1: BAUDRATE_HI = 921600 BAUDRATE = 115200 else: BAUDRATE_HI = 115200 BAUDRATE = 115200 # dut host interface (mozart) dut_intf = INTF_I2C ser = None CMD_NONE = 0 CMD_MEM_READ = 1 CMD_MEM_WRITE= 2 CMD_DATA = 3 CMD_ACK = 4 CMD_STS_CLR = 5 CMD_MEM_CLR = 6 CMD_CHK_ERR = 7 CMD_TEST_ECHO = 8 CMD_USB_WRITE = 9 CMD_DEMO_SET_MODEL = 0x10 CMD_DEMO_SET_IMAGES = 0x11 CMD_DEMO_RUN_ONCE = 0x12 CMD_DEMO_RUN = 0x13 CMD_DEMO_STOP = 0x14 CMD_DEMO_RESULT_LEN = 0x15 CMD_DEMO_RESULT = 0x16 CMD_DEMO_FD = 0x17 # for development, remove later CMD_DEMO_LM = 0x18 # for development CMD_DEMO_LD = 0x19 # for development CMD_DEMO_FR = 0x1A # for development CMD_DEMO_INFERENCE = 100 CMD_DEMO_REG1 = 101 CMD_DEMO_REG2 = 102 CMD_DEMO_REG3 = 103 CMD_DEMO_REG4 = 104 CMD_DEMO_REG5 = 105 CMD_DEMO_ADDUSER = 106 CMD_DEMO_DELUSER = 107 CMD_FLASH_INFO = 0x1000 CMD_FLASH_CHIP_ERASE = 0x1001 CMD_FLASH_SECTOR_ERASE = 0x1002 CMD_FLASH_READ = 0x1003 CMD_FLASH_WRITE = 0x1004 CMD_SCPU_RUN = 0x1005 # Messge HDR formart MSG_HDR_FMT = "uart write: %d' %size) # mem_dump(buf) global ser ser.write(buf) def uart_read(size): global ser # print('->uart read: %d' %size) try: buf = bytearray(ser.read(size)) except: print('Uart read fail') # mem_dump(buf) if (len(buf) != size): print('Uart read fail, expected=%x, read=%x' %(size, len(buf))) return buf def uart_close(): global ser ser.close() ########################################################################## # USB interface ########################################################################## def check_generic_ack(buf): preamble = buf[0] + buf[1] << 8 pkt_size = buf[2] + buf[3] << 8 cmd = buf[4] + buf[5] >> 8 size = buf[6] + buf[7] >> 8 if (preamble != PKRX_PAMB): return False if (pkt_size & 0x0FFF != 0): return False if (size != 0): # must be zero msg payload return False if (cmd == CMD_ACK): return True else: return False def RecDataACKFromUSB(): # print("RecDataACKFromUSB") ackusb = bytes(b"none") if (ep_in is None): print("ep_in is not open") return temp = ep_in.read(ACK_PACKET_SIZE, timeout=1000) ackusb = bytes(temp) # print(ackusb) while ((ackusb[0] != 0x35) or (ackusb[1] != 0x8A)): # look for preamble temp = ep_in.read(ACK_PACKET_SIZE, timeout=1000) print("ACK Retry", temp) ackusb = bytes(temp) # print(ackusb[0:4]) # print(len(ackusb)) # print(".......") return 0 def usb_write_data(buf, length, offset): if(ep_out is None): print("ep_out is not open") return num = (length - offset) // USB_MAX_BYTES leftover = (length - offset) % USB_MAX_BYTES for x in range(num): # print(buf[offset:offset+USB_MAX_BYTES]) RecDataACKFromUSB() # wait for receiver ready ep_out.write(buf[offset:offset+USB_MAX_BYTES]) offset += USB_MAX_BYTES # print("+", end='', flush=True) if (leftover != 0): # print(buf[offset:]) RecDataACKFromUSB() # wait for acknowledge first ep_out.write(buf[offset:]) return ########################################################################## # UART interface Raw Data Transfer ########################################################################## def uart_write_data(buf, length, offset): num = (length - offset) // UART_MAX_BYTES leftover = (length - offset) % UART_MAX_BYTES for x in range(num): RecDataACKFromUART() # wait for receiver ready bdata = bytearray(buf[offset:offset+UART_MAX_BYTES]) intf_write(bdata, len(bdata)) offset += UART_MAX_BYTES if (leftover != 0): RecDataACKFromUART() # wait for acknowledge first bdata = bytearray(buf[offset:offset+leftover]) intf_write(bdata, len(bdata)) return def RecDataACKFromUART(): ackuart = bytes(b"none") temp = intf_read(ACK_PACKET_SIZE) ackuart = bytes(temp) # print(ackuart) # look for the Data ACK while ((ackuart[0] != 0x35) or (ackuart[1] != 0x8A) or (ackuart[2] != 4)): intf_read(ACK_PACKET_SIZE) print("ACK Retry", temp) ackuart = bytes(temp) return ########################################################################## # AND / OR ########################################################################## def bit_set(): ''' ''' def bit_clear(): ''' ''' ########################################################################## # Register read / write ########################################################################## def reg_read(addr): buf = mem_block_read(addr, 4) val = (buf[3] << 24) + (buf[2] << 16) + (buf[1] << 8) + buf[0] return val def reg_write(addr, value): buf = word2blist(value) mem_block_write(addr, buf, len(buf)) def word2blist(value): buf = [] buf.append((value >> 0) & 0xFF) buf.append((value >> 8) & 0xFF) buf.append((value >> 16) & 0xFF) buf.append((value >> 24) & 0xFF) return buf def wlist2blist(wlist): buf = [] for value in wlist: buf.append((value >> 0) & 0xFF) buf.append((value >> 8) & 0xFF) buf.append((value >> 16) & 0xFF) buf.append((value >> 24) & 0xFF) return buf def blist2wlist(blist): buf = [] for i in range(len(blist) // 4): #value = (blist[i * 4 + 0] << 24) + (blist[i * 4 + 1] << 16) + (blist[i * 4 + 2] << 8) + (blist[i * 4 + 3]) value = (blist[i * 4 + 0] ) + (blist[i * 4 + 1] << 8) + (blist[i * 4 + 2] << 16) + (blist[i * 4 + 3] << 24) buf.append(value) return buf ########################################################################## # Flash READ/WRITE/COMPARE ########################################################################## def send_scpu_run(addr): print('Jump to SCPU and Run!!') data = pack_data(CMD_SCPU_RUN, addr, 0, None) intf_write(data, MSG_HDR_SIZE) time.sleep(0.1) def flash_read(addr, size): return(mem_read(addr, size, CMD_FLASH_READ, True)) def flash_write(addr , buf, size, buf_max = UART_BLOCK): mem_write(addr, buf, size, CMD_FLASH_WRITE, buf_max, True) def flash_chip_erase(): msg_cmd_general(CMD_FLASH_CHIP_ERASE, 0.2) def flash_sector_erase(addr): data = pack_data(CMD_FLASH_SECTOR_ERASE, addr, 1, None) intf_write(data, MSG_HDR_SIZE) time.sleep(0.1) if not msg_wait_ack(): print('Flase erase: wait ack fail') def showdebuginfo(): msg_cmd_general(CMD_FLASH_INFO, 0.2) def flash_sector_erase_64K(addr): data = pack_data(CMD_FLASH_SECTOR_ERASE_64K, addr, 1, None) intf_write(data, MSG_HDR_SIZE) time.sleep(0.1) if not msg_wait_ack(): print('Flase erase: wait ack fail') ########################################################################## # USB MEMORY WRITE ########################################################################## def usb_mem_write(addr, buf, size): print('---------------- USB -----------------') print('Start ADDR=0x%X usb memory Write' %addr) data = pack_data(CMD_USB_WRITE, addr, size, None, 1) #use crc intf_write(data, len(data)) #### start usb upload ##### usb_write_data(buf, size, 0) # start at the beginning of file # now get the response if (msg_wait_ack(CMD_USB_WRITE) != True): print('USB Write, wait ack fail') return '' ########################################################################## # MEMORY READ/WRITE/COMPARE ########################################################################## def mem_write(addr , buf, size, cmd = CMD_MEM_WRITE, buf_max = UART_BLOCK, dbg = False): print('--------------------------------------') print('Start ADDR=0x%X memory R/W test' %addr) wloop = 0 percent = 0 unit = 0 total_range = 100 #print("\n") if (size % buf_max == 0): wloop = (size // buf_max) else: wloop = (size // buf_max) + 1 if(wloop < total_range): total_range = wloop - 1 if(total_range == 0): total_range = 1 #print('%d = (%d // %d)' %((wloop // total_range), wloop, total_range)) if (wloop % total_range == 0): unit = (wloop // total_range) - 1 else: unit = (wloop // total_range) if(unit == 0): unit = 1 for i in range(wloop): if i == wloop - 1: wbuf = buf[i * buf_max : size] else: wbuf = buf[i * buf_max : (i + 1) * buf_max] mem_block_write(addr + buf_max * i, wbuf, len(wbuf), cmd) #if dbg: #print('Write status [%d/%d]'%(i, wloop)) if( percent != (i // unit) ): percent = i // unit if( percent > total_range): percent = total_range #print("\r" + "In progress",percent,"%",end = "",flush=True) if(total_range == 100): print("\r" + '[%s%s] %d%%' % ('*'*percent,'-'*(total_range-percent), percent),end = "",flush=True) else: print("\r" + '[%s%s] %d' % ('*'*percent,'-'*(total_range-percent), percent) + "/" + '%d' %total_range, end = "",flush=True) print("") def mem_read(addr, size, cmd = CMD_MEM_READ, dbg = False): buf_max = UART_BLOCK rloop = 0 rlen = 0 percent = 0 unit = 0 total_range = 100 buf = [] #print("\n") if (size % buf_max == 0): rloop = (size // buf_max) else: rloop = (size // buf_max) + 1 if(rloop < total_range): total_range = rloop - 1 if(total_range == 0): total_range = 1 #print('%d = (%d // %d)' %((rloop // total_range), rloop, total_range)) if (rloop % total_range == 0): unit = (rloop // total_range) - 1 else: unit = (rloop // total_range) if(unit == 0): unit = 1 for i in range(rloop): if i == rloop - 1: rlen = size - i * buf_max else: rlen = buf_max sbuf = mem_block_read(addr + buf_max * i, rlen, cmd) buf.extend(sbuf) #if dbg: # print('Read status [%d/%d]'%(i, rloop)) if( percent != (i // unit) ): percent = i // unit if( percent > total_range): percent = total_range #print("\r" + "In progress",percent,"%",end = "",flush=True) if(total_range == 100): print("\r" + '[%s%s] %d%%' % ('*'*percent,'-'*(total_range-percent), percent),end = "",flush=True) else: print("\r" + '[%s%s] %d' % ('*'*percent,'-'*(total_range-percent), percent) + "/" + '%d' %total_range, end = "",flush=True) print("") return buf def mem_dump(buf, displayLen = 0): ''' Memory dump ''' if buf == None or len(buf) == 0: return pLen = len(buf) if(pLen > 1): print('Lenght=%d' %pLen) #Display index address aStr = "=====" for i in range(16): if displayLen == 0: aStr += ' ' +('%02X' %i) else: aStr += ' ' +('%03X' %i) print(aStr) #Display data aStr = "" if(pLen % 16): loop = (pLen // 16)+1 else: loop = pLen // 16; for i in range(loop): aStr = ('%04X:' %(i * 16)) for j in range(16): if((i*16+j) < pLen): if displayLen==0: aStr += ' ' +('%02X' %buf[i*16+j]) else: aStr += ' ' +('%03X' %buf[i*16+j]) print(aStr) def mem_block_read(addr, size, cmd = CMD_MEM_READ): assert size <= UART_BLOCK, 'block buffer size must less than UART_BLOCK(default:0x100)' data = pack_data(cmd, addr, size, None) intf_write(data, MSG_HDR_SIZE) # time.sleep(0.1) buf = intf_read(size + MSG_HDR_SIZE) if (len(buf) != (size + MSG_HDR_SIZE)): print("****** Not receiving enough bytes ******") else: hdr, chk, cal_chk, cmd, addr, size, data = unpack_data (buf) if (chk != cal_chk): print('Error: intf data read fail. CRC error!') return list(data) return '' def mem_block_write(addr, buf, size, cmd = CMD_MEM_WRITE): assert size <= UART_BLOCK, 'block buffer size must less than UART_BLOCK(def:0x100)' cnt = 1 RETRY_MAX = 3 while(cnt <= RETRY_MAX): bdata = bytearray(buf) data = pack_data(cmd, addr, size, bytearray(buf)) intf_write(data, len(data)) # time.sleep(0.1) if msg_wait_ack(): break else: print('Write, wait ack fail, retry %d/%d' % (cnt, RETRY_MAX)) cnt = cnt + 1 time.sleep(0.5) def mem_clr(addr = 0x60000000, len = 0): ''' DDR/SRAM/NPUSRAM memory clear ''' data = pack_data(CMD_MEM_CLR, addr, len, None) intf_write(data, MSG_HDR_SIZE) time.sleep(1) cnt = 1 RETRY_MAX = 3 while(cnt <= RETRY_MAX): if msg_wait_ack(): return else: print('ERR: Memory clear ack fail, retry %d/%d' % (cnt, RETRY_MAX)) cnt = cnt + 1 time.sleep(0.5) def set_model(input_addr, input_len, output_addr, output_len, buf_addr, buf_len, cmd_addr, cmd_len, weight_addr, weight_len, setup_addr, setup_len): buf = bytearray(struct.pack(CMD_DEMO_SET_TWELVE_FMT, input_addr, input_len, output_addr, output_len, buf_addr, buf_len, cmd_addr, cmd_len, weight_addr, weight_len, setup_addr, setup_len)) data = pack_data(CMD_DEMO_SET_MODEL, setup_addr, 48, bytearray(buf)) intf_write(data, len(data)) time.sleep(0.1) if not msg_wait_ack(): print('set_model: wait ack fail') def set_image(image_addr, image_len, col, row, ch, format): buf = bytearray(struct.pack(CMD_DEMO_SET_SIX_FMT, image_addr, image_len, col, row, ch, format)) data = pack_data(CMD_DEMO_SET_IMAGES, 0, 24, bytearray(buf)) intf_write(data, len(data)) time.sleep(0.1) if not msg_wait_ack(): print('set_images: wait ack fail') def demo_run_once(): data = pack_data(CMD_DEMO_RUN_ONCE, 0, 0, None) intf_write(data, len(data)) time.sleep(0.1) if not msg_wait_ack(): print('set_images: wait ack fail') def mem_wrc(): ''' Memory write, read and compare ''' def pack_data(cmd, addr, size, buf): crc_flag = 1 # always use crc if (buf): mSize = len(buf) + 8 # there is always address & size else: mSize = 8 # print("packet_data: address is %x" %(addr)) pSize = mSize + 4 # add MSG header itself pSize = pSize | PKT_WR_FLAG # we are the host # if CRC is used, append it at the end, so increase pSize by 4 if (crc_flag): pSize = pSize + 4 pSize = pSize | PKT_CRC_FLAG if (PACKET): nbuf = bytearray(struct.pack(MSG_PKT_HDR_FMT, PKTX_PAMB, pSize, cmd, mSize, addr, size)) else: nbuf = bytearray(struct.pack(MSG_HDR_FMT, PKTX_PAMB, 0x00, cmd, addr, size)) if (buf != None): nbuf = nbuf + buf # print("address sent -> %02x %02x %02x %02x" %(nbuf[11], nbuf[10], nbuf[9], nbuf[8])) if (PACKET): if (crc_flag): crc16 = gen_crc16(nbuf[0:len(nbuf)], len(nbuf)) # print('crc is %x' %(crc16)) # print(bytes([crc16 & 0xFF]), bytes([crc16 >> 8])) nbuf = nbuf + bytes([crc16 & 0xFF]) + bytes([crc16 >> 8]) + bytes([0]) + bytes([0]) else: crc16 = gen_crc16(nbuf[4:len(nbuf)], len(nbuf) - 4) #print('get CRC16 %X' %(crc16)) nbuf[2] = crc16 & 0xFF nbuf[3] = crc16 >> 8 return nbuf def unpack_data(buf): #assert len(buf) < 16, "Packet header error" chk = 0 cal_chk = 0 hdrmsg = buf[0:16] if (PACKET): preamble, pSize, cmd, mSize, addr, size = struct.unpack(MSG_PKT_HDR_FMT, hdrmsg) if (pSize & PKT_CRC_FLAG): crc_bytes = intf_read(4) chk = int.from_bytes(crc_bytes, byteorder='little', signed=False) cal_chk = gen_crc16(buf[0:len(buf)], len(buf)) if (mSize < 8): return (preamble, chk, cal_chk, cmd, addr, size, None) else: return (preamble, chk, cal_chk, cmd, addr, size, buf[16 : len(buf)]) else: hdr, chk, cmd, addr, size = struct.unpack(MSG_HDR_FMT, hdrmsg) cal_chk = gen_crc16(buf[4:len(buf)], len(buf) - 4) if (len(buf) < 16): return (hdr, chk, cal_chk, cmd, addr, size, None) else: return (hdr, chk, cal_chk, cmd, addr, size, buf[16 : len(buf)]) ########################################################################## # Msg Command ########################################################################## def msg_wait_ack(): buf = intf_read(MSG_HDR_SIZE) hdr, chk, cal_chk, cmd, addr, size, data = unpack_data (buf) if (PACKET == 0): if cmd == CMD_ACK: return True else: return False # else we are using packet layer if (cmd & 0x8000): # check is it a proper response code? rsp = cmd & 0x0FFF # get the original command if (rsp == CMD_ACK): return True elif (rsp == CMD_MEM_READ): return True elif (rsp == CMD_MEM_WRITE): return True elif (rsp == CMD_MEM_CLR): return True elif (rsp == CMD_TEST_ECHO): return True else: return False else: return False def msg_wait_result(resp, len): buf = intf_read(MSG_HDR_SIZE + len) hdr, chk, cal_chk, cmd, addr, size, data = unpack_data (buf) if (PACKET): expected_response = resp | 0x8000 else: expected_response = resp if cmd == expected_response: print('*** result rcvd: %x [%d]' %(cmd, len)) else: print('*** result rcvd: %x NOT %x' %(cmd, resp)) return list(data) def msg_cmd_general(cmd, wait_time): ''' send run_once command ''' data = pack_data(cmd, 0, 0, None) intf_write(data, MSG_HDR_SIZE) time.sleep(wait_time) cnt = 1 RETRY_MAX = 3 while(cnt <= RETRY_MAX): if msg_wait_ack(): return else: print('ERR: CMD ACK FAIL, retry %d/%d' % (cnt, RETRY_MAX)) cnt = cnt + 1 time.sleep(0.5) ########################################################################## # Xmodem Application ########################################################################## def send_at_command(command): ser.write(bytes(command+"\r", encoding='ascii')) def getc(size, timeout=1): return ser.read(size) def putc_user(data, timeout=1): return ser.write(data) def erase_readline(): response = ser.readline().strip() #print(response) #if(response == b'ERASE'): if b'ERA' in response: return True else: return False def xmodem_send_bin(): count = 0 timeout = 10 print("Please press reset button!!") while(timeout): timeout -= 1 response = ser.readline().strip() if(response == b''): print("Please press reset button!!") if(response == b'\x00'): print("Reset done!!") elif(response != b''): count += 1 print(response) if(count >= 3): break if(timeout == 0): return False cmd="2" print("xmodem_send %s start" %(cmd), end="") send_at_command(cmd) print("\rxmodem_send %s done " %(cmd)) time.sleep(1) ser.flushInput() image_file = './fw_minions_1218.bin' print("xmodem_send bin file start") stream = open(image_file, 'rb') m = xmodem.XMODEM(getc, putc_user) #print("xmodem_sending ... ") ret = m.send(stream) if(ret): print("xmodem_send bin file done!!") else: print("xmodem_send bin file FAIL!!!! Please reset the target and start over!!!!") return ret time.sleep(1) ser.flush() ser.flushInput() ser.close() time.sleep(0.2) print("change baudrate to 921600... ", end="") setup_intf('uart', ser, UART_COM_ID, BAUDRATE_HI, timeout=5) ser.flushInput() ser.flushOutput() print("\rchange baudrate to 921600 done") time.sleep(0.2) return ret ########################################################################## # NPU control ########################################################################## def npu_reset(): ''' 0xC238_004C[2], NPU reset control register. set 0 : reset, 1: release ''' val = reg_read(0xC238004C) reg_write(0xC238004C, val & 0xFFFFFFFB) reg_write(0xC238004C, val | 0x04) ########################################################################## # Utils ########################################################################## def gen_crc16(buf, size): crc16_func = crcmod.predefined.mkPredefinedCrcFun('crc-16') # the size is implied by buf if (len(buf) != size): print("crc error, buffer is", len(buf), "size is", size) out = crc16_func(buf) return out ########################################################################## # File I/O ########################################################################## def file_read_binary(src): ''' Load depth file ''' with open(src, "rb") as f: data = f.read() return data def file_cmp(file1, file2): l1 = l2 = True with open(file1, 'r') as f1, open(file2, 'r') as f2: while l1 and l2: l1 = f1.readline() l2 = f2.readline() if l1 != l2: return False return True def file_cmp_binary(file1, file2): data1 = file_read_binary(file1) data2 = file_read_binary(file2) return (data1 == data2) ########################################################################## # Debug ########################################################################## def DBGPRINT(format, *args): """ print debug message """ if __DEBUG__ == 1: print (format % args)