| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995 | #!/usr/bin/env pythonfrom json import JSONDecodeErrorimport mathimport pathlibimport timeimport tracebackfrom typing import Callable, Dictimport pkg_resourcesimport sysimport osimport iofrom os import walkfrom requests import Response class Logger:    NEWLINE_CHAR = '\n'    with_crlf = False    @classmethod    def print_message(cls,message,prefix=''):        if not Logger.with_crlf:            trimmed=re.sub(r'\n', r'%0A', message,flags=re.MULTILINE)        print(f'{prefix}{trimmed}')    @classmethod    def debug(cls,message):        cls.print_message(message,'::debug::')    @classmethod    def error(cls,message):        cls.print_message(message,'::error::')    @classmethod    def notice(cls,message):        cls.print_message(message,'::notice::')        @classmethod    def warning(cls,message):        cls.print_message(message,'::notice::')   try:    import argparse    import collections    import copy    import enum    import glob    import json    import re    import shutil    import stat    import tempfile    import zipfile    from ast import literal_eval    from collections import namedtuple    from datetime import datetime, timedelta, timezone    from json import JSONDecoder    from operator import contains    from platform import platform, release    from pydoc import describe    from time import strftime    from typing import OrderedDict    from urllib import response    from urllib.parse import urlparse    from urllib.request import Request    from webbrowser import get    import pygit2    from pygit2 import Commit, Repository, GitError, Reference, UserPass, Index, Signature, RemoteCallbacks, Remote    import requests    from genericpath import isdirexcept ImportError as ex:    Logger.error(        f'Failed importing module {ex.name}, using interpreter {sys.executable}. {Logger.NEWLINE_CHAR} Installed packages:')    installed_packages = pkg_resources.working_set    installed_packages_list = sorted(        ["%s==%s" % (i.key, i.version) for i in installed_packages])    print(Logger.NEWLINE_CHAR.join(installed_packages_list))    print(f'Environment: ')    envlist = "\n".join([f"{k}={v}" for k, v in sorted(os.environ.items())])    print(f'{envlist}')    raisetool_version = "1.0.7"WEB_INSTALLER_DEFAULT_PATH = './web_installer/'FORMAT = '%(asctime)s %(message)s'github_env = type('', (), {})()manifest = {    "name": "",    "version": "",    "home_assistant_domain": "slim_player",    "funding_url": "https://esphome.io/guides/supporters.html",    "new_install_prompt_erase": True,    "new_install_improv_wait_time" : 20,    "builds": [        {            "chipFamily": "ESP32",            "parts": [            ]        }    ]}artifacts_formats_outdir = '$OUTDIR'artifacts_formats_prefix = '$PREFIX'artifacts_formats = [    ['build/squeezelite.bin', '$OUTDIR/$PREFIX-squeezelite.bin'],    ['build/recovery.bin', '$OUTDIR/$PREFIX-recovery.bin'],    ['build/ota_data_initial.bin', '$OUTDIR/$PREFIX-ota_data_initial.bin'],    ['build/bootloader/bootloader.bin', '$OUTDIR/$PREFIX-bootloader.bin'],    ['build/partition_table/partition-table.bin ',        '$OUTDIR/$PREFIX-partition-table.bin'],]class AttributeDict(dict):    __slots__ = ()    def __getattr__(self, name: str):        try:            return self[name.upper()]        except Exception:            try:                return self[name.lower()]            except Exception:                for attr in self.keys():                    if name.lower() == attr.replace("'", "").lower():                        return self[attr]    __setattr__ = dict.__setitem__parser = argparse.ArgumentParser(    description='Handles some parts of the squeezelite-esp32 build process')parser.add_argument('--cwd', type=str,                    help='Working directory', default=os.getcwd())parser.add_argument('--with_crlf', action='store_true',help='To prevent replacing cr/lf with hex representation')parser.add_argument('--loglevel', type=str, choices={                    'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'NOTSET'}, help='Logging level', default='INFO')subparsers = parser.add_subparsers(dest='command', required=True)parser_commits = subparsers.add_parser("list_commits",add_help=False,                                    description="Commits list",                                    help="Lists the last commits"                                    )parser_dir = subparsers.add_parser("list_files",                                   add_help=False,                                   description="List Files parser",                                   help="Display the content of the folder")parser_manifest = subparsers.add_parser("manifest",                                        add_help=False,                                        description="Manifest parser",                                        help="Handles the web installer manifest creation")parser_manifest.add_argument('--flash_file', required=True, type=str,                             help='The file path which contains the firmware flashing definition')parser_manifest.add_argument(    '--max_count', type=int, help='The maximum number of releases to keep', default=3)parser_manifest.add_argument(    '--manif_name', required=True, type=str, help='Manifest files name and prefix')parser_manifest.add_argument(    '--outdir', required=True, type=str, help='Output directory for files and manifests')parser_pushinstaller = subparsers.add_parser("pushinstaller",                                             add_help=False,                                             description="Web Installer Checkout parser",                                             help="Handles the creation of artifacts files")parser_pushinstaller.add_argument(    '--target', type=str, help='Output directory for web installer repository', default=WEB_INSTALLER_DEFAULT_PATH)parser_pushinstaller.add_argument(    '--artifacts', type=str, help='Target subdirectory for web installer artifacts', default=WEB_INSTALLER_DEFAULT_PATH)parser_pushinstaller.add_argument(    '--source', type=str, help='Source directory for the installer artifacts', default=WEB_INSTALLER_DEFAULT_PATH)parser_pushinstaller.add_argument('--url', type=str, help='Web Installer clone url ',                                  default='https://github.com/sle118/squeezelite-esp32-installer.git')parser_pushinstaller.add_argument(    '--web_installer_branch', type=str, help='Web Installer branch to use ', default='main')parser_pushinstaller.add_argument(    '--token', type=str, help='Auth token for pushing changes')parser_pushinstaller.add_argument(    '--flash_file', type=str, help='Manifest json file path')parser_pushinstaller.add_argument(    '--manif_name', required=True, type=str, help='Manifest files name and prefix')parser_environment = subparsers.add_parser("environment",                                           add_help=False,                                           description="Environment parser",                                           help="Updates the build environment")parser_environment.add_argument(    '--env_file', type=str, help='Environment File',  default=os.environ.get('GITHUB_ENV'))parser_environment.add_argument(    '--build', required=True, type=int, help='The build number')parser_environment.add_argument(    '--node', required=True, type=str, help='The matrix node being built')parser_environment.add_argument(    '--depth', required=True, type=int, help='The bit depth being built')parser_environment.add_argument(    '--major', type=str, help='Major version', default='2')parser_environment.add_argument(    '--docker', type=str, help='Docker image to use', default='sle118/squeezelite-esp32-idfv43')parser_show = subparsers.add_parser("show",                                    add_help=False,                                    description="Show parser",                                    help="Show the build environment")parser_build_flags = subparsers.add_parser("build_flags",                                           add_help=False,                                           description="Build Flags",                                           help="Updates the build environment with build flags")parser_build_flags.add_argument(    '--mock', action='store_true', help='Mock release')parser_build_flags.add_argument(    '--force', action='store_true', help='Force a release build')parser_build_flags.add_argument(    '--ui_build', action='store_true', help='Include building the web UI')def format_commit(commit):    # 463a9d8b7 Merge branch 'bugfix/ci_deploy_tags_v4.0' into 'release/v4.0' (2020-01-11T14:08:55+08:00)    dt = datetime.fromtimestamp(float(commit.author.time), timezone(        timedelta(minutes=commit.author.offset)))    #timestr = dt.strftime('%c%z')    timestr = dt.strftime('%F %R %Z')    cmesg:str = commit.message.replace('\n', ' ').replace('\r','').replace('*','-')    return f'{commit.short_id} {cmesg} ({timestr}) <{commit.author.name}>'.replace('  ', ' ', )def get_github_data(repo: Repository, api):    base_url = urlparse(repo.remotes['origin'].url)    print(        f'Base URL is {base_url.path} from remote URL {repo.remotes["origin"].url}')    url_parts = base_url.path.split('.')    for p in url_parts:        print(f'URL Part: {p}')    api_url = f"{url_parts[0]}/{api}"    print(f'API to call: {api_url}')    url = f"https://api.github.com/repos{api_url}"    resp = requests.get(        url, headers={"Content-Type": "application/vnd.github.v3+json"})    return json.loads(resp.text)def dump_directory(dir_path):    # list to store files name    res = []    for (dir_path, dir_names, file_names) in walk(dir_path):        res.extend(file_names)    print(res)class ReleaseDetails():    version: str    idf: str    platform: str    branch: str    bitrate: str    def __init__(self, tag: str) -> None:        self.version, self.idf, self.platform, self.branch = tag.split('#')        try:            self.version, self.bitrate = self.version.split('-')        except Exception:            pass    def get_attributes(self):        return {            'version': self.version,            'idf': self.idf,            'platform': self.platform,            'branch': self.branch,            'bitrate': self.bitrate        }    def format_prefix(self) -> str:        return f'{self.branch}-{self.platform}-{self.version}'    def get_full_platform(self):        return f"{self.platform}{f'-{self.bitrate}' if self.bitrate is not None else ''}"class BinFile():    name: str    offset: int    source_full_path: str    target_name: str    target_fullpath: str    artifact_relpath: str    def __init__(self, source_path, file_build_path: str, offset: int, release_details: ReleaseDetails, build_dir) -> None:        self.name = os.path.basename(file_build_path).rstrip()        self.artifact_relpath = os.path.relpath(            file_build_path, build_dir).rstrip()        self.source_path = source_path        self.source_full_path = os.path.join(            source_path, file_build_path).rstrip()        self.offset = offset        self.target_name = f'{release_details.format_prefix()}-{release_details.bitrate}-{self.name}'.rstrip()    def get_manifest(self):        return {"path": self.target_name, "offset": self.offset}    def copy(self, target_folder) -> str:        self.target_fullpath = os.path.join(target_folder, self.target_name)        Logger.debug(            f'File {self.source_full_path} will be copied to {self.target_fullpath}')        try:            os.makedirs(target_folder, exist_ok=True)            shutil.copyfile(self.source_full_path,                            self.target_fullpath, follow_symlinks=True)        except Exception as ex:            Logger.error(f"Error while copying {self.source_full_path} to {self.target_fullpath}{Logger.NEWLINE_CHAR}Content of {os.path.dirname(self.source_full_path.rstrip())}:{Logger.NEWLINE_CHAR}{Logger.NEWLINE_CHAR.join(get_file_list(os.path.dirname(self.source_full_path.rstrip())))}")                                                    raise        return self.target_fullpath    def get_attributes(self):        return {            'name': self.target_name,            'offset': self.offset,            'artifact_relpath': self.artifact_relpath        }class PlatformRelease():    name: str    description: str    url: str = ''    zipfile: str = ''    tempfolder: str    release_details: ReleaseDetails    flash_parms = {}    build_dir: str    has_artifacts: bool    branch: str    assets: list    bin_files: list    name_prefix: str    flash_file_path: str    def get_manifest_name(self) -> str:        return f'{self.name_prefix}-{self.release_details.format_prefix()}-{self.release_details.bitrate}.json'    def __init__(self, flash_file_path, git_release, build_dir, branch, name_prefix) -> None:        self.name = git_release.tag_name        self.description = git_release.body        self.assets = git_release['assets']        self.has_artifacts = False        self.name_prefix = name_prefix        if len(self.assets) > 0:            if self.has_asset_type():                self.url = self.get_asset_from_extension().browser_download_url            if self.has_asset_type('.zip'):                self.zipfile = self.get_asset_from_extension(                    ext='.zip').browser_download_url                self.has_artifacts = True        self.release_details = ReleaseDetails(git_release.name)        self.bin_files = list()        self.flash_file_path = flash_file_path        self.build_dir = os.path.relpath(build_dir)        self.branch = branch    def process_files(self, outdir: str) -> list:        parts = []        for f in self.bin_files:            f.copy(outdir)            parts.append(f.get_manifest())        return parts    def get_asset_from_extension(self, ext='.bin'):        for a in self.assets:            filename = AttributeDict(a).name            file_name, file_extension = os.path.splitext(filename)            if file_extension == ext:                return AttributeDict(a)        return None    def has_asset_type(self, ext='.bin') -> bool:        return self.get_asset_from_extension(ext) is not None    def platform(self):        return self.release_details.get_full_platform()    def get_zip_file(self):        self.tempfolder = extract_files_from_archive(self.zipfile)        print(            f'Artifacts for {self.name} extracted to {self.tempfolder}')        flash_parms_file = os.path.relpath(            self.tempfolder+self.flash_file_path)        line: str        with open(flash_parms_file) as fin:            for line in fin:                components = line.split()                if len(components) == 2:                    self.flash_parms[os.path.basename(                        components[1]).rstrip().lstrip()] = components[0]        try:            for artifact in artifacts_formats:                base_name = os.path.basename(artifact[0]).rstrip().lstrip()                self.bin_files.append(BinFile(                    self.tempfolder, artifact[0], self.flash_parms[base_name], self.release_details, self.build_dir))                has_artifacts = True        except Exception:            self.has_artifacts = False    def cleanup(self):        Logger.debug(f'removing temp directory for platform release {self.name}')        shutil.rmtree(self.tempfolder)    def get_attributes(self):        return {            'name': self.name,            'branch': self.branch,            'description': self.description,            'url': self.url,            'zipfile': self.zipfile,            'release_details': self.release_details.get_attributes(),            'bin_files': [b.get_attributes() for b in self.bin_files],            'manifest_name': self.get_manifest_name()        }class Releases():    _dict: dict = collections.OrderedDict()    maxcount: int = 0    branch: str = ''    repo: Repository = None    last_commit: Commit = None    manifest_name: str    def __init__(self, branch: str, maxcount: int = 3) -> None:        self.maxcount = maxcount        self.branch = branch    def count(self, value: PlatformRelease) -> int:        content = self._dict.get(value.platform())        if content == None:            return 0        return len(content)    def get_platform(self, platform: str) -> list:        return self._dict[platform]    def get_platform_keys(self):        return self._dict.keys()    def get_all(self) -> list:        result: list = []        for platform in [self.get_platform(platform) for platform in self.get_platform_keys()]:            for release in platform:                result.append(release)        return result    def append(self, value: PlatformRelease):        if self.count(value) == 0:            self._dict[value.platform()] = []        if self.should_add(value):            print(f'Adding release {value.name} to the list')            self._dict[value.platform()].append(value)        else:            print(f'Skipping release {value.name}')    def get_attributes(self):        res = []        release: PlatformRelease        for release in self.get_all():            res.append(release.get_attributes())        return res    def get_minlen(self) -> int:        return min([len(self.get_platform(p)) for p in self.get_platform_keys()])    def got_all_packages(self) -> bool:        return self.get_minlen() >= self.maxcount    def should_add(self, release: PlatformRelease) -> bool:        return self.count(release) <= self.maxcount    def add_package(self, package: PlatformRelease, with_artifacts: bool = True):        if self.branch != package.branch:            Logger.debug(f'Skipping release {package.name} from branch {package.branch}')        elif package.has_artifacts or not with_artifacts:            self.append(package)    @classmethod    def get_last_commit_message(cls, repo_obj: Repository = None) -> str:        last: Commit = cls.get_last_commit(repo_obj)        if last is None:            return ''        else:            return last.message.replace(Logger.NEWLINE_CHAR, ' ')    @classmethod    def get_last_author(cls, repo_obj: Repository = None) -> Signature:        last: Commit = cls.get_last_commit(repo_obj)        return last.author    @classmethod    def get_last_committer(cls, repo_obj: Repository = None) -> Signature:        last: Commit = cls.get_last_commit(repo_obj)        return last.committer    @classmethod    def get_last_commit(cls, repo_obj: Repository = None) -> Commit:        loc_repo = repo_obj        if cls.repo is None:            cls.load_repository(os.getcwd())        if loc_repo is None:            loc_repo = cls.repo        head: Reference = loc_repo.head        target = head.target        ref: Reference        if cls.last_commit is None:            try:                cls.last_commit = loc_repo[target]                print(                    f'Last commit for {head.shorthand} is {format_commit(cls.last_commit)}')            except Exception as e:                Logger.error(                    f'Unable to retrieve last commit for {head.shorthand}/{target}: {e}')                cls.last_commit = None        return cls.last_commit    @classmethod    def load_repository(cls, path: str = os.getcwd()) -> Repository:        if cls.repo is None:            try:                print(f'Opening repository from {path}')                cls.repo = Repository(path=path)            except GitError as ex:                Logger.error(f"Unable to access the repository({ex}).\nContent of {path}:\n{Logger.NEWLINE_CHAR.join(get_file_list(path, 1))}")                raise        return cls.repo    @classmethod    def resolve_commit(cls, repo: Repository, commit_id: str) -> Commit:        commit: Commit        reference: Reference        commit, reference = repo.resolve_refish(commit_id)        return commit    @classmethod    def get_branch_name(cls) -> str:        return re.sub('[^a-zA-Z0-9\-~!@_\.]', '', cls.load_repository().head.shorthand)    @classmethod    def get_release_branch(cls, repo: Repository, platform_release) -> str:        match = [t for t in repo.branches.with_commit(            platform_release.target_commitish)]        no_origin = [t for t in match if 'origin' not in t]        if len(no_origin) == 0 and len(match) > 0:            return match[0].split('/')[1]        elif len(no_origin) > 0:            return no_origin[0]        return ''    @classmethod    def get_flash_parms(cls, file_path):        flash = parse_json(file_path)        od: collections.OrderedDict = collections.OrderedDict()        for z in flash['flash_files'].items():            base_name: str = os.path.basename(z[1])            od[base_name.rstrip().lstrip()] = literal_eval(z[0])        return collections.OrderedDict(sorted(od.items()))    @classmethod    def get_releases(cls, flash_file_path, maxcount: int, name_prefix):        repo = Releases.load_repository(os.getcwd())        packages: Releases = cls(branch=repo.head.shorthand, maxcount=maxcount)        build_dir = os.path.dirname(flash_file_path)        for page in range(1, 999):            Logger.debug(f'Getting releases page {page}')            releases = get_github_data(                repo, f'releases?per_page=50&page={page}')            if len(releases) == 0:                Logger.debug(f'No more release found for page {page}')                break            for release_entry in [AttributeDict(platform) for platform in releases]:                packages.add_package(PlatformRelease(flash_file_path, release_entry, build_dir,                                     Releases.get_release_branch(repo, release_entry), name_prefix))                if packages.got_all_packages():                    break            if packages.got_all_packages():                break        return packages    @classmethod    def get_commit_list(cls) -> list:        commit_list = []        last: Commit = Releases.get_last_commit()        if last is None:            return commit_list        try:            for c in Releases.load_repository().walk(last.id, pygit2.GIT_SORT_TIME):                if '[skip actions]' not in c.message:                    commit_list.append(format_commit(c))                    if len(commit_list) > 10:                        break        except Exception as e:            Logger.error(                f'Unable to get commit list starting at {last.id}: {e}')        return commit_list    @classmethod    def get_commit_list_descriptions(cls) -> str:        return '<<~EOD\n### Revision Log\n'+Logger.NEWLINE_CHAR.join(cls.get_commit_list())+'\n~EOD'    def update(self, *args, **kwargs):        if args:            if len(args) > 1:                raise TypeError("update expected at most 1 arguments, "                                "got %d" % len(args))            other = dict(args[0])            for key in other:                self[key] = other[key]        for key in kwargs:            self[key] = kwargs[key]    def setdefault(self, key, value=None):        if key not in self:            self[key] = value        return self[key]def set_workdir(args):    print(f'setting work dir to: {args.cwd}')    os.chdir(os.path.abspath(args.cwd))def parse_json(filename: str):    fname = os.path.abspath(filename)    folder: str = os.path.abspath(os.path.dirname(filename))    print(f'Opening json file {fname} from {folder}')    try:        with open(fname) as f:            content = f.read()            Logger.debug(f'Loading json\n{content}')            return json.loads(content)    except JSONDecodeError as ex:        Logger.error(f'Error parsing {content}')    except Exception as ex:        Logger.error(            f"Unable to parse flasher args json file. Content of {folder}:{Logger.NEWLINE_CHAR.join(get_file_list(folder))}")        raisedef write_github_env_file(values,env_file):    print(f'Writing content to {env_file}...')    with open(env_file,  "w") as env_file:        for attr in [attr for attr in dir(values) if not attr.startswith('_')]:            line = f'{attr}{"=" if attr != "description" else ""}{getattr(values,attr)}'            print(line)            env_file.write(f'{line}\n')            os.environ[attr] = str(getattr(values, attr))    print(f'Done writing to {env_file}!')def format_artifact_from_manifest(manif_json: AttributeDict):    if len(manif_json) == 0:        return 'Newest release'    first = manif_json[0]    return f'{first["branch"]}-{first["release_details"]["version"]}'def format_artifact_name(base_name: str = '', args=AttributeDict(os.environ)):    return f'{base_name}{args.branch_name}-{args.node}-{args.depth}-{args.major}{args.build}'def handle_build_flags(args):    set_workdir(args)    print('Setting global build flags')    commit_message: str = Releases.get_last_commit_message()    github_env.mock = 1 if args.mock else 0    github_env.release_flag = 1 if args.mock or args.force or 'release' in commit_message.lower() else 0    github_env.ui_build = 1 if args.mock or args.ui_build or '[ui-build]' in commit_message.lower(    ) or github_env.release_flag == 1 else 0    write_github_env_file(github_env,os.environ.get('GITHUB_OUTPUT'))def write_version_number(file_path:str,env_details):    #     app_name="${TARGET_BUILD_NAME}.${DEPTH}.dev-$(git log --pretty=format:'%h' --max-count=1).${branch_name}"     #     echo "${app_name}">version.txt    try:        version:str = f'{env_details.TARGET_BUILD_NAME}.{env_details.DEPTH}.{env_details.major}.{env_details.BUILD_NUMBER}.{env_details.branch_name}'        with open(file_path,  "w") as version_file:            version_file.write(version)    except Exception as ex:        Logger.error(f'Unable to set version string {version} in file {file_path}')        raise Exception('Version error')    Logger.notice(f'Firmware version set to {version}')def handle_environment(args):    set_workdir(args)    print('Setting environment variables...')    commit_message: str = Releases.get_last_commit_message()    last: Commit = Releases.get_last_commit()    if last is not None:        github_env.author_name = last.author.name        github_env.author_email = last.author.email        github_env.committer_name = last.committer.name        github_env.committer_email = last.committer.email    github_env.node = args.node    github_env.depth = args.depth    github_env.major = args.major    github_env.build = args.build    github_env.DEPTH = args.depth    github_env.TARGET_BUILD_NAME = args.node    github_env.build_version_prefix = args.major    github_env.branch_name = Releases.get_branch_name()    github_env.BUILD_NUMBER = str(args.build)    github_env.tag = f'{args.node}.{args.depth}.{args.build}.{github_env.branch_name}'.rstrip()    github_env.last_commit = commit_message    github_env.DOCKER_IMAGE_NAME = args.docker    github_env.name = f"{args.major}.{str(args.build)}-{args.depth}#v4.3#{args.node}#{github_env.branch_name}"    github_env.artifact_prefix = format_artifact_name(        'squeezelite-esp32-', github_env)    github_env.artifact_file_name = f"{github_env.artifact_prefix}.zip"    github_env.artifact_bin_file_name = f"{github_env.artifact_prefix}.bin"    github_env.PROJECT_VER = f'{args.node}-{ args.build }'    github_env.description = Releases.get_commit_list_descriptions()    write_github_env_file(github_env,args.env_file)    write_version_number("version.txt",github_env)def handle_artifacts(args):    set_workdir(args)    print(f'Handling artifacts')    for attr in artifacts_formats:        target: str = os.path.relpath(attr[1].replace(artifacts_formats_outdir, args.outdir).replace(            artifacts_formats_prefix, format_artifact_name()))        source: str = os.path.relpath(attr[0])        target_dir: str = os.path.dirname(target)        print(f'Copying file {source} to {target}')        try:            os.makedirs(target_dir, exist_ok=True)            shutil.copyfile(source, target, follow_symlinks=True)        except Exception as ex:            Logger.error(f"Error while copying {source} to {target}\nContent of {target_dir}:\n{Logger.NEWLINE_CHAR.join(get_file_list(os.path.dirname(attr[0].rstrip())))}")            raisedef delete_folder(path):    '''Remov Read Only Files'''    for root, dirs, files in os.walk(path, topdown=True):        for dir in dirs:            fulldirpath = os.path.join(root, dir)            Logger.debug(f'Drilling down in {fulldirpath}')            delete_folder(fulldirpath)        for fname in files:            full_path = os.path.join(root, fname)            Logger.debug(f'Setting file read/write {full_path}')            os.chmod(full_path, stat.S_IWRITE)            Logger.debug(f'Deleting file {full_path}')            os.remove(full_path)    if os.path.exists(path):        Logger.debug(f'Changing folder read/write {path}')        os.chmod(path, stat.S_IWRITE)        print(f'WARNING: Deleting Folder {path}')        os.rmdir(path)def get_file_stats(path):    fstat: os.stat_result = pathlib.Path(path).stat()    # Convert file size to MB, KB or Bytes    mtime = time.strftime("%X %x", time.gmtime(fstat.st_mtime))    if (fstat.st_size > 1024 * 1024):        return math.ceil(fstat.st_size / (1024 * 1024)), "MB", mtime    elif (fstat.st_size > 1024):        return math.ceil(fstat.st_size / 1024), "KB", mtime    return fstat.st_size, "B", mtimedef get_file_list(root_path, max_levels: int = 2) -> list:    outlist: list = []    for root, dirs, files in os.walk(root_path):        path = os.path.relpath(root).split(os.sep)        if len(path) <= max_levels:            outlist.append(f'\n{root}')            for file in files:                full_name = os.path.join(root, file)                fsize, unit, mtime = get_file_stats(full_name)                outlist.append('{:s} {:8d} {:2s} {:18s}\t{:s}'.format(                    len(path) * "---", fsize, unit, mtime, file))    return outlistdef get_recursive_list(path) -> list:    outlist: list = []    for root, dirs, files in os.walk(path, topdown=True):        for fname in files:            outlist.append((fname, os.path.join(root, fname)))    return outlistdef handle_manifest(args):    set_workdir(args)    print(f'Creating the web installer manifest')    outdir: str = os.path.relpath(args.outdir)    if not os.path.exists(outdir):        print(f'Creating target folder {outdir}')        os.makedirs(outdir, exist_ok=True)    releases: Releases = Releases.get_releases(        args.flash_file, args.max_count, args.manif_name)    release: PlatformRelease    for release in releases.get_all():        manifest_name = release.get_manifest_name()        release.get_zip_file()        man = copy.deepcopy(manifest)        man['manifest_name'] = manifest_name        man['builds'][0]['parts'] = release.process_files(args.outdir)        man['name'] = release.platform()        man['version'] = release.release_details.version        Logger.debug(f'Generated manifest: \n{json.dumps(man)}')        fullpath = os.path.join(args.outdir, release.get_manifest_name())        print(f'Writing manifest to {fullpath}')        with open(fullpath, "w") as f:            json.dump(man, f, indent=4)        release.cleanup()    mainmanifest = os.path.join(args.outdir, args.manif_name)    print(f'Writing main manifest {mainmanifest}')    with open(mainmanifest, 'w') as f:        json.dump(releases.get_attributes(), f, indent=4)def get_new_file_names(manif_json) -> collections.OrderedDict():    new_release_files: dict = collections.OrderedDict()    for artifact in manif_json:        for name in [f["name"] for f in artifact["bin_files"]]:            new_release_files[name] = artifact        new_release_files[artifact["manifest_name"]] = artifact["name"]    return new_release_filesdef copy_no_overwrite(source: str, target: str):    sfiles = os.listdir(source)    for f in sfiles:        source_file = os.path.join(source, f)        target_file = os.path.join(target, f)        if not os.path.exists(target_file):            print(f'Copying {f} to target')            shutil.copy(source_file, target_file)        else:            Logger.debug(f'Skipping existing file {f}')def get_changed_items(repo: Repository) -> Dict:    changed_filemode_status_code: int = pygit2.GIT_FILEMODE_TREE    original_status_dict: Dict[str, int] = repo.status()    # transfer any non-filemode changes to a new dictionary    status_dict: Dict[str, int] = {}    for filename, code in original_status_dict.items():        if code != changed_filemode_status_code:            status_dict[filename] = code    return status_dictdef is_dirty(repo: Repository) -> bool:    return len(get_changed_items(repo)) > 0def push_with_method(auth_method:str,token:str,remote: Remote,reference):    success:bool = False    try:        remote.push(reference, callbacks=RemoteCallbacks(pygit2.UserPass(auth_method, token)))        success=True    except Exception as ex:        Logger.error(f'Error pushing with auth method {auth_method}: {ex}.')    return successdef push_if_change(repo: Repository, token: str, source_path: str, manif_json):    if is_dirty(repo):        print(f'Changes found. Preparing commit')        env = AttributeDict(os.environ)        index: Index = repo.index        index.add_all()        index.write()        reference = repo.head.name        message = f'Web installer for {format_artifact_from_manifest(manif_json)}'        tree = index.write_tree()        Releases.load_repository(source_path)        commit = repo.create_commit(reference, Releases.get_last_author(        ), Releases.get_last_committer(), message, tree, [repo.head.target])        origin: Remote = repo.remotes['origin']        print(            f'Pushing commit {format_commit(repo[commit])} to url {origin.url}')        remote: Remote = repo.remotes['origin']        auth_methods = ['x-access-token','x-oauth-basic']        for method in auth_methods:            if push_with_method(method, token, remote, [reference]):                print(f'::notice Web installer updated for {format_artifact_from_manifest(manif_json)}')                return        raise Exception('Unable to push web installer.')        else:        print(f'WARNING: No change found. Skipping update')def update_files(target_artifacts: str, manif_json, source: str):    new_list: dict = get_new_file_names(manif_json)    if os.path.exists(target_artifacts):        print(f'Removing obsolete files from {target_artifacts}')        for entry in get_recursive_list(target_artifacts):            f = entry[0]            full_target = entry[1]            if f not in new_list.keys():                print(f'WARNING: Removing obsolete file {f}')                os.remove(full_target)    else:        print(f'Creating target folder {target_artifacts}')        os.makedirs(target_artifacts, exist_ok=True)    print(f'Copying installer files to {target_artifacts}:')    copy_no_overwrite(os.path.abspath(source), target_artifacts)def handle_pushinstaller(args):    set_workdir(args)    print('Pushing web installer updates... ')    target_artifacts = os.path.join(args.target, args.artifacts)    if os.path.exists(args.target):        print(f'Removing files (if any) from {args.target}')        delete_folder(args.target)    print(f'Cloning from {args.url} into {args.target}')    repo = pygit2.clone_repository(args.url, args.target)    repo.checkout_head()    manif_json = parse_json(os.path.join(args.source, args.manif_name))    update_files(target_artifacts, manif_json, args.source)    push_if_change(repo, args.token, args.cwd, manif_json)    repo.state_cleanup()def handle_show(args):    print('Show')def extract_files_from_archive(url):    tempfolder = tempfile.mkdtemp()    platform:Response = requests.get(url)    Logger.debug(f'Downloading {url} to {tempfolder}')    Logger.debug(f'Transfer status code: {platform.status_code}. Expanding content')    z = zipfile.ZipFile(io.BytesIO(platform.content))    z.extractall(tempfolder)    return tempfolderdef handle_list_files(args):    print(f'Content of {args.cwd}:')    print(Logger.NEWLINE_CHAR.join(get_file_list(args.cwd)))def handle_commits(args):    set_workdir(args)    print(Releases.get_commit_list_descriptions())parser_environment.set_defaults(func=handle_environment, cmd='environment')parser_manifest.set_defaults(func=handle_manifest, cmd='manifest')parser_pushinstaller.set_defaults(func=handle_pushinstaller, cmd='installer')parser_show.set_defaults(func=handle_show, cmd='show')parser_build_flags.set_defaults(func=handle_build_flags, cmd='build_flags')parser_dir.set_defaults(func=handle_list_files, cmd='list_files')parser_commits.set_defaults(func=handle_commits,cmd='list_commits')def main():    exit_result_code = 0    args = parser.parse_args()    Logger.with_crlf = args.with_crlf    print(f'::group::{args.command}')    print(f'build_tools version : {tool_version}')    print(f'Processing command {args.command}')    func: Callable = getattr(args, 'func', None)    if func is not None:        # Call whatever subcommand function was selected                e: Exception        try:            func(args)        except Exception as e:            Logger.error(f'Critical error while running {args.command}\n{" ".join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))}')            exit_result_code = 1    else:        # No subcommand was provided, so call help        parser.print_usage()    print(f'::endgroup::')    sys.exit(exit_result_code)if __name__ == '__main__':    main()
 |