본문 바로가기
Python, C, C++

Python 소스 ( Serial Protocol, USART, 8-bit bootloader ) 분석

by 소나무기운 2022. 2. 11.
반응형

[2022/02/18] 제목 이미지 추가

[2022/02/11] 일부 업데이트

[2022/02/11] 처음 시작.

 

소나무 기운 ,  전자제품 개발/생산

8-bit Bootloader F/W Update 프로그램 분석

microchip 8-bit bootloader와 serial통신을 통해서 F/W를 다운로드 하는 Python소스코드가 있어요. 시리얼 통신에 대한 좋은 예제가 될것 같아 분석해 보기로 했어요.

 

원본 소스는 아래 참고자료 링크를 참고하세요. 함수별로 분석해 보도록 할께요. 

 

 

main함수

if len(sys.argv) == 1:
    parser.print_help()
    sys.exit(1)
args = parser.parse_args()

# Command line arguments
File = sys.argv[1]
FlashSize = int(sys.argv[2], 16)
ComPort = sys.argv[3]
Baudrate = int(sys.argv[4], 10)


#***********************************************************************************************************************
# Function : main
#***********************************************************************************************************************
if __name__ == "__main__":
    os.system('')
    print("\033[1;34;40m")
    if open_uart() == True:
        print("**********************BOOTLOAD START*************************\n")    
        APPStartAddr = hex2bin(File, FlashSize)
        get_version()
        erase_flash(APPStartAddr, FlashSize, (EraseSizeW<<1))
        write_flash(APPStartAddr, FlashSize, (WriteSizeW<<1))
        calculate_checksum(APPStartAddr)
        reset_device()
        print("*********************BOOTLOAD COMPLETE***********************\n")     
        sys.exit(1)
    else:
        sys.exit(1)
    print("\033[0m")

@ if __name__ == "__main__":

     임포트될때가 아닌 직접 실행되어 main함수가 필요할 때만 실행합니다.

@ os.system('')

     ''안에 명령을 실행합니다. os.system('dir')이면 dir이란 dos명령을 실행합니다.

     ''는 어떤것일까요? 아직 정확하지 않아요.

@ pinrt("\033[1:34;40m")

     글씨 색깔을 변경합니다.

@ open_uart()로 uart를 열어서 정상적으로 열리면 업데이트를 진행합니다.

@ hex2bin()로 HEX파일을 분석하여 시작 주소를 APPStartAddr로 가져옵니다.

    FlashSize는 0x3FFF로 업로드할 마이컴의 플래시 메모리의 크기를 입력 받습니다.

@ get_version()으로 번전, Flash block사이즈등의 라이팅하기 위한 정보를 가져옵니다.

@ erase_flash()로 플래시 메모리를 지웁니다.

@ write_flash()로 플래시 메모리를 씁니다.

@ calculate_checksum()으로 잘 써졌는지 Flash메모리를 읽어 checksum을 비교합니다.

@ device를 리셋하여 재부팅하도록 한다.

@ sys.exit(1) 프로그램을 종료한다. 비정상일경우도 강제 종료한다.

 

 

 

open_uart(), 시리얼 포트 열기

#***********************************************************************************************************************
# Function : open_uart()
#***********************************************************************************************************************
def open_uart():
    global UART

    if UART == None:
        try:
            UART = serial.Serial(ComPort, baudrate=Baudrate, timeout=1)
            UART.reset_input_buffer()
        except:
            print('Status: ' + 'open ' + ComPort + ' fail!!')
            return False
    return True

@ global UART 전역변수를 선언한다.

     함수가 여러번 실행되더라도 시리얼 토트는 한번만 실행될수 있도록 선언됨.(하지만 여러번 실행되지 않음)

@ serial.Serial()함수를 이용하여 시리얼 포트를 연다. 실생시 연락 받은 COM포트, Baudrate로 연다.

 

hex2bin(), hex파일에서 시작주소와 데이터를 가져온다.

#***********************************************************************************************************************
# Function : hex2bin(hex_file, flash_size)
#***********************************************************************************************************************
def hex2bin(hex_file, flash_size):
    # Load application hex file and convert to bin file
    ih = IntelHex()
    fileextension = hex_file[-3:]
    ih.loadfile(hex_file, format=fileextension)

    appstart = ih.minaddr()
    # print("\n", "Start Address: %#06x" % (appstart))

    start_app = ih.tobinarray(end=flash_size - 1)
    bin_start_file = os.path.splitext(hex_file)[0] + ".bin"

    # Save original file
    fq = open(bin_start_file, 'wb')
    fq.write(start_app)
    fq.close()

    return appstart

@ 컴파일을 하게되면 HEX파일과 BIN파일이 생성된다.

@ HEX파일에서 시작 주소값을 가져온다.

@ HEX파일에서 binery로 데이터를 가져와서 파일명.bin으로 저장한다.

 

get_version(), get version명령을 보내서 여러가지 정보를 가져온다.

#***********************************************************************************************************************
# Function : get_version()
#***********************************************************************************************************************
def get_version():
    global GoBuf, EraseSizeW, WriteSizeW
    
    print("*******************Read Version Command...*******************\n")
    print("Hint: Getting version ...\n")

    GoBuf = bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00')
    
    
    print("Tx ->", GoBuf.hex(' '), "\n")

    
    if execute_result(1.0) == False: sys.exit(1)
    print("Rx ->", RcvBuf.hex(' '), "\n")
    
    FWInfo = RcvBuf[10:]			# 펌웨어 버전

    EraseSizeW = FWInfo[10]			# 지우기 기능의 크기
    WriteSizeW = FWInfo[11]			# 쓰기 기능의 크기

    print("Status: Get version completely", 'successful!\n')
    print("*************************************************************\n")

@ get version명령을 보내서 펌웨어 정보, EraseSizeW, WriteSizeW의 값을 받는다.

 

 

erase_flash(), 플래시를 사이즈만큼 지운다.

#***********************************************************************************************************************
# Function : erase_flash(MinAddr, MaxAddr, RowSize)
# Note     :every parameter is based on bytes...
#***********************************************************************************************************************
def erase_flash(MinAddr, MaxAddr, RowSize):
    global GoBuf, Erased

    
    print("*******************Erase Flash Command...********************\n")
    print("Hint: Erasing flash ...\n")
    
    EraseCnt = int((MaxAddr - MinAddr) / RowSize)

    # Only for Erase command of PIC16F1_bootload.c
    # 보낼 명령어 생성
    GoBuf = bytearray(b'\x03') + EraseCnt.to_bytes(2, byteorder='little') + bytearray(b'\x55\xaa') + (
                MinAddr >> 1).to_bytes(4, byteorder='little')
    print("Tx ->", GoBuf.hex(' '), "\n")                

    if execute_result(10.0) == False: sys.exit(1)	# 전송 & 수신
    print("Rx ->", RcvBuf.hex(' '), "\n")

    print("Status: Erase flash memory", 'successful!\n')
    print("*************************************************************\n")

@ 지우기 명령을 보내 플래시를 지운다. 

 

 

 

write_flash(), 플래시에 사이즈만큼의 데이터를 쓴다.

#***********************************************************************************************************************
# Function : write_flash(MinAddr, MaxAddr, RowSize)
#***********************************************************************************************************************
def write_flash(MinAddr, MaxAddr, RowSize):
    global FBuf, File, GoBuf

    print("*******************write Flash Command...********************\n")
    print("Hint: Writing flash ...\n")

    bin_file = os.path.splitext(File)[0] + ".bin"
    size = os.path.getsize(bin_file);
    print("Uploading", size, "bytes from bin file...\n")

    with open(bin_file, "rb") as f:			# 파일 읽어 오기
        FBuf += f.read()

    EmptyArray = bytearray(b'\xff' * RowSize)	# 0xFF로 채운 버퍼 생성

    for Address in range(MaxAddr - RowSize, MinAddr - RowSize, -RowSize):
        # Only for Write command of PIC16F1_bootload.c
        # 쓰기 데이터 명령 생성
        GoBuf = bytearray(b'\x02') + RowSize.to_bytes(2, byteorder='little') + bytearray(b'\x55\xaa') + (
                Address >> 1).to_bytes(4, byteorder='little')
        if EmptyArray == FBuf[Address - MinAddr: Address - MinAddr + RowSize]:
            continue
		# 쓰기 데이터 생성
        GoBuf += FBuf[Address - MinAddr: Address - MinAddr + RowSize]
		# 상황 표시
        print("Programming range from 0X%08XH to 0X%08XH. (Whole range is from 0X%08Xh to 0X%08Xh)"
              % (Address, Address + RowSize - 1, MinAddr, MaxAddr - 1), '...\n')
        print("Tx ->", GoBuf.hex(' '), "\n") 

        if execute_result(10.0) == False:	# 송신 & 수신
            sys.exit(1)
        print("Rx ->", RcvBuf.hex(' '), "\n")

    print("Status: Writing flash memory successfully !!  Range from 0X%08Xh to 0X%08Xh.\n" % (MinAddr, MaxAddr - 1))
    print("*************************************************************\n")

@ 생성한 bin파일을 열어 프로토콜 포멧에 맞게 전송 수신을 반복한다.

반응형

 

calculate_checksum(), 체크썸을 계산하여 정상적으로 써졌는지 검사한다.

#***********************************************************************************************************************
# Function : calculate_checksum(MinAddr)
#***********************************************************************************************************************
def calculate_checksum(MinAddr):
    global FBuf, File, GoBuf

    print("****************Calculate Checksum Command...****************\n")
    print("Hint: calculate checksum ...\n")

    bin_file = os.path.splitext(File)[0] + ".bin"
    size = os.path.getsize(bin_file);

    with open(bin_file, "rb") as f:			# BIN 파일 읽어 들임
        FBuf += f.read()

    checksum = 0
    for Address in range(0, size, 2):		# 체크썸 계산
        checksum += FBuf[Address]
        checksum +=((FBuf[Address+1]&0x3f)<<8)
    checksum &= 0xFFFF

	# 응답할 데이터 작성
    GoBuf = bytearray(b'\x08') + size.to_bytes(2, byteorder='little') + bytearray(b'\x55\xaa') + (
            MinAddr>>1).to_bytes(4, byteorder='little')
    print("Tx ->", GoBuf.hex(' '), "\n") 			# 송신 표시

    if execute_result(10.0) == False: sys.exit(1)	# 전송 & 수신
    print("Rx ->", RcvBuf.hex(' '), "\n")			# 수신 표시

    checksum_received = (RcvBuf[10]+(RcvBuf[11]<<8))	# 수신된 체크썸 계산

    if checksum != checksum_received:				# 체크썸 검사
        print("Status: Calculate checksum fail!\n")
        sys.exit(1)
    else:
        print("Status: Calculate checksum successful!\n")
    print("*************************************************************\n")

 

 

 

 

reset_device(), 디바이스를 reset한다.

#***********************************************************************************************************************
# Function : reset_device()
#***********************************************************************************************************************
def reset_device():
    global GoBuf

    print("*******************Reset Device Command...*******************\n")    
    print("Hint: reset device ...\n")

    GoBuf = bytearray(b'\x09\x00\x00\x55\xaa\x00\x00\x00\x00')
    print("Tx ->", GoBuf.hex(' '), "\n") 			# 화면 표시
    
    if execute_result(1.0) == False: sys.exit(1)	# 전송 & 수신
    print("Rx ->", RcvBuf.hex(' '), "\n")			# 화면 표시

    if RcvBuf[10] != True:							# 처리결과 판단
        print("Status: Reset device fail!\n")
        sys.exit(1)
    else:
        print("Status: Reset device successful!\n")
    print("*************************************************************\n")

@ reset명령을 보내고, 처리결과를 응답 받는다.

 

 

execute_result(), 명령을 전송하고 응답을 수신한다.

#***********************************************************************************************************************
# Function : execute_result(TOut)
#***********************************************************************************************************************
def execute_result(TOut):
    global CMDRunning, GoBuf

    out_packet()		# 만들어진 명령 serial 통신으로 전송

    if in_com(TOut) == 0:	# Serial통신으로 응답을 받음.
        #Update_Status("No response error, Process terminated !!")
        print("No response error, Process terminated !")
        CMDRunning = False
        return False

	# 에러 응답 분석
    if GoBuf[1] != 0x00 and GoBuf[1] != 0x08 and GoBuf[1] != 0x09 and RcvBuf[10] != 1:
        if RcvBuf[10] == 0xFE:
            print("ADDRESS OUT OF RANGE ERROR when executing command %0X !!" % GoBuf[1])
        if RcvBuf[10] == 0xFF:
            print("Invalid Command ERROR when executing command %0X !!" % GoBuf[1])
        else:
            print("Unknown ERROR when executing command %0X !!" % GoBuf[1])
        CMDRunning = False
        return False

    return True

@ 명령데이터를 보내고, 응답데이터를 받는다.

 

 

out_packet(), 시리얼 통신으로 명령을 보낸다.

#***********************************************************************************************************************
# Function : out_packet()
#***********************************************************************************************************************
def out_packet():  # STX(0x55)+  General Command Format
    global GoBuf

    GoBuf.insert(0, 0x55)  # STX
    UART.write(GoBuf)

@ 0x55로 시작하는 명령어 GoBuf를 Serial로 보냄.

 

 

in_com(), 명령에 대한 응답을 받음.

#***********************************************************************************************************************
# Function : in_com(timeout)
#***********************************************************************************************************************
def in_com(timeout):  # timeout == 0 ==> wait until got data
    global RcvBuf

    RcvBuf = bytearray(b'')
    tStart = time.perf_counter()
    Retry = 3

    while True:
        bdata = UART.read()
        if len(bdata) > 0:
            RcvBuf.extend(bdata)	# 입력된 데이터 저장

        elif len(RcvBuf) != 0:		# 들어오던데이터가 멈추고 데이터가 있으면 수신 완료.
            return len(RcvBuf);

        elif timeout != 0:			# 시간초과 & 재시도
            if (time.perf_counter() - tStart) > timeout:  # timeout loop & retry
                if Retry == 0: return 0
                print(
                    "Status: No response in " + str(timeout) + " S," + " re-trying " + str(Retry) + ' more time(s).')
                Retry -= 1
                UART.write(GoBuf)
                tStart = time.perf_counter()

@ 재시도 횟수 관리, 시관초과 관리

 

 

 

전역변수, 프로그램 인자 처리

@ 프로그램 실행방법

python ./pic16_uploader.py ../NUSI_DI.X\dist\Offset\production/NUSI_DI.X.production.hex 0x4000 COM8 9600
from __future__ import print_function
import sys # sys 모듈을 사용할 수 있도록

try:
    import argparse
    from intelhex import IntelHex
    import os
    import serial
    import time  
except ImportError:
    # 임포트 실패시 에러메세지 표시
    sys.exit("""ImportError: You are probably missing some modules.
To add needed modules, run like 'python -m pip install -U future pyserial intelhex'""")

#-----------------------------------------------------------------------------------------------------------------------
# Generate help and use messages
parser = argparse.ArgumentParser(
    description='Serial bootloader script for Microchip PIC16 family MCUs',
    epilog='Example: pic16_uploader.py ./App/Release/App.hex 0x20000 COM5 9600',
    formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('file', help='Hex file to upload')
parser.add_argument('flashsize', help='Total device flash size based on Byte')
parser.add_argument('comport', help='UART COM port')
parser.add_argument('baudrate', help='UART baud rate')

if len(sys.argv) == 1: 				# 인자가 없다면 설명을 표시한다.
    parser.print_help() 
    sys.exit(1)
args = parser.parse_args() 			# 인자값들을 가져옴.

# Command line arguments
File = sys.argv[1] 					# HEX file 명
FlashSize = int(sys.argv[2], 16) 	# MCU ROM size
ComPort = sys.argv[3] 				# Comport번호
Baudrate = int(sys.argv[4], 10) 	# Baudrate

#-----------------------------------------------------------------------------------------------------------------------
# Variables
CMDRunning = False      # semaphere flag for running commands
Erased = False          # flag for eraseing function
UART = None             # flag for UART open

GoBuf = bytearray()     # global output buffer for functions OutPacket/InCom
RcvBuf = bytearray()    # global input buffer for functions OutPacket/InCom
FBuf = bytearray()

EraseSizeW = 0x20       # erase row size (words), will be update during get version process.
WriteSizeW = 0x20       # Write latches per row size (words), will be update during get version process.

@ ArgumentParser(argparse)는 라인커맨드를 실행할때 많이 사용됨. 여러가지 형식으로 인수 지정이 가능하다.

   -h, --help등으로 사용법을 표시할 수 있다. 

   인수가 없을 경우에 사용법을 안내하기도 한다.

 

 

마무리

시리얼 통신을 하는 예제를 보았습니다. 시리얼 통신을 다루는 방법에 대한 이해 정도로 보시면 되겠습니다.

컴맨드라인을 사용하는 프로그램에서의 인수 사용방법에 대해서도 좀 공부가 되겠습니다. 

 

 

참고문헌

 

 

 

 

틀린 부분이나 질문은 댓글 달아주세요.

즐거운 하루 보내세요. 감사합니다.

 

 

반응형

댓글