| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995 | 
							- #!/usr/bin/env python
 
- from json import JSONDecodeError
 
- import math
 
- import pathlib
 
- import time
 
- import traceback
 
- from typing import Callable, Dict
 
- import pkg_resources
 
- import sys
 
- import os
 
- import io
 
- from os import walk
 
- from requests import Response
 
-  
 
- class Logger:
 
-     NEWLINE_CHAR = '\n'
 
-     with_crlf = False
 
-     @classmethod
 
-     def print_message(cls,message,prefix=''):
 
-         if not Logger.with_crlf:
 
-             trimmed=re.sub(r'\n', r'%0A', message,flags=re.MULTILINE)
 
-         print(f'{prefix}{trimmed}')
 
-     @classmethod
 
-     def debug(cls,message):
 
-         cls.print_message(message,'::debug::')
 
-     @classmethod
 
-     def error(cls,message):
 
-         cls.print_message(message,'::error::')
 
-     @classmethod
 
-     def notice(cls,message):
 
-         cls.print_message(message,'::notice::')    
 
-     @classmethod
 
-     def warning(cls,message):
 
-         cls.print_message(message,'::notice::')   
 
- try:
 
-     import argparse
 
-     import collections
 
-     import copy
 
-     import enum
 
-     import glob
 
-     import json
 
-     import re
 
-     import shutil
 
-     import stat
 
-     import tempfile
 
-     import zipfile
 
-     from ast import literal_eval
 
-     from collections import namedtuple
 
-     from datetime import datetime, timedelta, timezone
 
-     from json import JSONDecoder
 
-     from operator import contains
 
-     from platform import platform, release
 
-     from pydoc import describe
 
-     from time import strftime
 
-     from typing import OrderedDict
 
-     from urllib import response
 
-     from urllib.parse import urlparse
 
-     from urllib.request import Request
 
-     from webbrowser import get
 
-     import pygit2
 
-     from pygit2 import Commit, Repository, GitError, Reference, UserPass, Index, Signature, RemoteCallbacks, Remote
 
-     import requests
 
-     from genericpath import isdir
 
- except ImportError as ex:
 
-     Logger.error(
 
-         f'Failed importing module {ex.name}, using interpreter {sys.executable}. {Logger.NEWLINE_CHAR} Installed packages:')
 
-     installed_packages = pkg_resources.working_set
 
-     installed_packages_list = sorted(
 
-         ["%s==%s" % (i.key, i.version) for i in installed_packages])
 
-     print(Logger.NEWLINE_CHAR.join(installed_packages_list))
 
-     print(f'Environment: ')
 
-     envlist = "\n".join([f"{k}={v}" for k, v in sorted(os.environ.items())])
 
-     print(f'{envlist}')
 
-     raise
 
- tool_version = "1.0.7"
 
- WEB_INSTALLER_DEFAULT_PATH = './web_installer/'
 
- FORMAT = '%(asctime)s %(message)s'
 
- github_env = type('', (), {})()
 
- manifest = {
 
-     "name": "",
 
-     "version": "",
 
-     "home_assistant_domain": "slim_player",
 
-     "funding_url": "https://esphome.io/guides/supporters.html",
 
-     "new_install_prompt_erase": True,
 
-     "new_install_improv_wait_time" : 20,
 
-     "builds": [
 
-         {
 
-             "chipFamily": "ESP32",
 
-             "parts": [
 
-             ]
 
-         }
 
-     ]
 
- }
 
- artifacts_formats_outdir = '$OUTDIR'
 
- artifacts_formats_prefix = '$PREFIX'
 
- artifacts_formats = [
 
-     ['build/squeezelite.bin', '$OUTDIR/$PREFIX-squeezelite.bin'],
 
-     ['build/recovery.bin', '$OUTDIR/$PREFIX-recovery.bin'],
 
-     ['build/ota_data_initial.bin', '$OUTDIR/$PREFIX-ota_data_initial.bin'],
 
-     ['build/bootloader/bootloader.bin', '$OUTDIR/$PREFIX-bootloader.bin'],
 
-     ['build/partition_table/partition-table.bin ',
 
-         '$OUTDIR/$PREFIX-partition-table.bin'],
 
- ]
 
- class AttributeDict(dict):
 
-     __slots__ = ()
 
-     def __getattr__(self, name: str):
 
-         try:
 
-             return self[name.upper()]
 
-         except Exception:
 
-             try:
 
-                 return self[name.lower()]
 
-             except Exception:
 
-                 for attr in self.keys():
 
-                     if name.lower() == attr.replace("'", "").lower():
 
-                         return self[attr]
 
-     __setattr__ = dict.__setitem__
 
- parser = argparse.ArgumentParser(
 
-     description='Handles some parts of the squeezelite-esp32 build process')
 
- parser.add_argument('--cwd', type=str,
 
-                     help='Working directory', default=os.getcwd())
 
- parser.add_argument('--with_crlf', action='store_true',help='To prevent replacing cr/lf with hex representation')
 
- parser.add_argument('--loglevel', type=str, choices={
 
-                     'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'NOTSET'}, help='Logging level', default='INFO')
 
- subparsers = parser.add_subparsers(dest='command', required=True)
 
- parser_commits = subparsers.add_parser("list_commits",add_help=False,
 
-                                     description="Commits list",
 
-                                     help="Lists the last commits"
 
-                                     )
 
- parser_dir = subparsers.add_parser("list_files",
 
-                                    add_help=False,
 
-                                    description="List Files parser",
 
-                                    help="Display the content of the folder")
 
- parser_manifest = subparsers.add_parser("manifest",
 
-                                         add_help=False,
 
-                                         description="Manifest parser",
 
-                                         help="Handles the web installer manifest creation")
 
- parser_manifest.add_argument('--flash_file', required=True, type=str,
 
-                              help='The file path which contains the firmware flashing definition')
 
- parser_manifest.add_argument(
 
-     '--max_count', type=int, help='The maximum number of releases to keep', default=3)
 
- parser_manifest.add_argument(
 
-     '--manif_name', required=True, type=str, help='Manifest files name and prefix')
 
- parser_manifest.add_argument(
 
-     '--outdir', required=True, type=str, help='Output directory for files and manifests')
 
- parser_pushinstaller = subparsers.add_parser("pushinstaller",
 
-                                              add_help=False,
 
-                                              description="Web Installer Checkout parser",
 
-                                              help="Handles the creation of artifacts files")
 
- parser_pushinstaller.add_argument(
 
-     '--target', type=str, help='Output directory for web installer repository', default=WEB_INSTALLER_DEFAULT_PATH)
 
- parser_pushinstaller.add_argument(
 
-     '--artifacts', type=str, help='Target subdirectory for web installer artifacts', default=WEB_INSTALLER_DEFAULT_PATH)
 
- parser_pushinstaller.add_argument(
 
-     '--source', type=str, help='Source directory for the installer artifacts', default=WEB_INSTALLER_DEFAULT_PATH)
 
- parser_pushinstaller.add_argument('--url', type=str, help='Web Installer clone url ',
 
-                                   default='https://github.com/sle118/squeezelite-esp32-installer.git')
 
- parser_pushinstaller.add_argument(
 
-     '--web_installer_branch', type=str, help='Web Installer branch to use ', default='main')
 
- parser_pushinstaller.add_argument(
 
-     '--token', type=str, help='Auth token for pushing changes')
 
- parser_pushinstaller.add_argument(
 
-     '--flash_file', type=str, help='Manifest json file path')
 
- parser_pushinstaller.add_argument(
 
-     '--manif_name', required=True, type=str, help='Manifest files name and prefix')
 
- parser_environment = subparsers.add_parser("environment",
 
-                                            add_help=False,
 
-                                            description="Environment parser",
 
-                                            help="Updates the build environment")
 
- parser_environment.add_argument(
 
-     '--env_file', type=str, help='Environment File',  default=os.environ.get('GITHUB_ENV'))
 
- parser_environment.add_argument(
 
-     '--build', required=True, type=int, help='The build number')
 
- parser_environment.add_argument(
 
-     '--node', required=True, type=str, help='The matrix node being built')
 
- parser_environment.add_argument(
 
-     '--depth', required=True, type=int, help='The bit depth being built')
 
- parser_environment.add_argument(
 
-     '--major', type=str, help='Major version', default='2')
 
- parser_environment.add_argument(
 
-     '--docker', type=str, help='Docker image to use', default='sle118/squeezelite-esp32-idfv43')
 
- parser_show = subparsers.add_parser("show",
 
-                                     add_help=False,
 
-                                     description="Show parser",
 
-                                     help="Show the build environment")
 
- parser_build_flags = subparsers.add_parser("build_flags",
 
-                                            add_help=False,
 
-                                            description="Build Flags",
 
-                                            help="Updates the build environment with build flags")
 
- parser_build_flags.add_argument(
 
-     '--mock', action='store_true', help='Mock release')
 
- parser_build_flags.add_argument(
 
-     '--force', action='store_true', help='Force a release build')
 
- parser_build_flags.add_argument(
 
-     '--ui_build', action='store_true', help='Include building the web UI')
 
- def format_commit(commit):
 
-     # 463a9d8b7 Merge branch 'bugfix/ci_deploy_tags_v4.0' into 'release/v4.0' (2020-01-11T14:08:55+08:00)
 
-     dt = datetime.fromtimestamp(float(commit.author.time), timezone(
 
-         timedelta(minutes=commit.author.offset)))
 
-     #timestr = dt.strftime('%c%z')
 
-     timestr = dt.strftime('%F %R %Z')
 
-     cmesg:str = commit.message.replace('\n', ' ').replace('\r','').replace('*','-')
 
-     return f'{commit.short_id} {cmesg} ({timestr}) <{commit.author.name}>'.replace('  ', ' ', )
 
- def get_github_data(repo: Repository, api):
 
-     base_url = urlparse(repo.remotes['origin'].url)
 
-     print(
 
-         f'Base URL is {base_url.path} from remote URL {repo.remotes["origin"].url}')
 
-     url_parts = base_url.path.split('.')
 
-     for p in url_parts:
 
-         print(f'URL Part: {p}')
 
-     api_url = f"{url_parts[0]}/{api}"
 
-     print(f'API to call: {api_url}')
 
-     url = f"https://api.github.com/repos{api_url}"
 
-     resp = requests.get(
 
-         url, headers={"Content-Type": "application/vnd.github.v3+json"})
 
-     return json.loads(resp.text)
 
- def dump_directory(dir_path):
 
-     # list to store files name
 
-     res = []
 
-     for (dir_path, dir_names, file_names) in walk(dir_path):
 
-         res.extend(file_names)
 
-     print(res)
 
- class ReleaseDetails():
 
-     version: str
 
-     idf: str
 
-     platform: str
 
-     branch: str
 
-     bitrate: str
 
-     def __init__(self, tag: str) -> None:
 
-         self.version, self.idf, self.platform, self.branch = tag.split('#')
 
-         try:
 
-             self.version, self.bitrate = self.version.split('-')
 
-         except Exception:
 
-             pass
 
-     def get_attributes(self):
 
-         return {
 
-             'version': self.version,
 
-             'idf': self.idf,
 
-             'platform': self.platform,
 
-             'branch': self.branch,
 
-             'bitrate': self.bitrate
 
-         }
 
-     def format_prefix(self) -> str:
 
-         return f'{self.branch}-{self.platform}-{self.version}'
 
-     def get_full_platform(self):
 
-         return f"{self.platform}{f'-{self.bitrate}' if self.bitrate is not None else ''}"
 
- class BinFile():
 
-     name: str
 
-     offset: int
 
-     source_full_path: str
 
-     target_name: str
 
-     target_fullpath: str
 
-     artifact_relpath: str
 
-     def __init__(self, source_path, file_build_path: str, offset: int, release_details: ReleaseDetails, build_dir) -> None:
 
-         self.name = os.path.basename(file_build_path).rstrip()
 
-         self.artifact_relpath = os.path.relpath(
 
-             file_build_path, build_dir).rstrip()
 
-         self.source_path = source_path
 
-         self.source_full_path = os.path.join(
 
-             source_path, file_build_path).rstrip()
 
-         self.offset = offset
 
-         self.target_name = f'{release_details.format_prefix()}-{release_details.bitrate}-{self.name}'.rstrip()
 
-     def get_manifest(self):
 
-         return {"path": self.target_name, "offset": self.offset}
 
-     def copy(self, target_folder) -> str:
 
-         self.target_fullpath = os.path.join(target_folder, self.target_name)
 
-         Logger.debug(
 
-             f'File {self.source_full_path} will be copied to {self.target_fullpath}')
 
-         try:
 
-             os.makedirs(target_folder, exist_ok=True)
 
-             shutil.copyfile(self.source_full_path,
 
-                             self.target_fullpath, follow_symlinks=True)
 
-         except Exception as ex:
 
-             Logger.error(f"Error while copying {self.source_full_path} to {self.target_fullpath}{Logger.NEWLINE_CHAR}Content of {os.path.dirname(self.source_full_path.rstrip())}:{Logger.NEWLINE_CHAR}{Logger.NEWLINE_CHAR.join(get_file_list(os.path.dirname(self.source_full_path.rstrip())))}")
 
-             
 
-                 
 
-             
 
-             raise
 
-         return self.target_fullpath
 
-     def get_attributes(self):
 
-         return {
 
-             'name': self.target_name,
 
-             'offset': self.offset,
 
-             'artifact_relpath': self.artifact_relpath
 
-         }
 
- class PlatformRelease():
 
-     name: str
 
-     description: str
 
-     url: str = ''
 
-     zipfile: str = ''
 
-     tempfolder: str
 
-     release_details: ReleaseDetails
 
-     flash_parms = {}
 
-     build_dir: str
 
-     has_artifacts: bool
 
-     branch: str
 
-     assets: list
 
-     bin_files: list
 
-     name_prefix: str
 
-     flash_file_path: str
 
-     def get_manifest_name(self) -> str:
 
-         return f'{self.name_prefix}-{self.release_details.format_prefix()}-{self.release_details.bitrate}.json'
 
-     def __init__(self, flash_file_path, git_release, build_dir, branch, name_prefix) -> None:
 
-         self.name = git_release.tag_name
 
-         self.description = git_release.body
 
-         self.assets = git_release['assets']
 
-         self.has_artifacts = False
 
-         self.name_prefix = name_prefix
 
-         if len(self.assets) > 0:
 
-             if self.has_asset_type():
 
-                 self.url = self.get_asset_from_extension().browser_download_url
 
-             if self.has_asset_type('.zip'):
 
-                 self.zipfile = self.get_asset_from_extension(
 
-                     ext='.zip').browser_download_url
 
-                 self.has_artifacts = True
 
-         self.release_details = ReleaseDetails(git_release.name)
 
-         self.bin_files = list()
 
-         self.flash_file_path = flash_file_path
 
-         self.build_dir = os.path.relpath(build_dir)
 
-         self.branch = branch
 
-     def process_files(self, outdir: str) -> list:
 
-         parts = []
 
-         for f in self.bin_files:
 
-             f.copy(outdir)
 
-             parts.append(f.get_manifest())
 
-         return parts
 
-     def get_asset_from_extension(self, ext='.bin'):
 
-         for a in self.assets:
 
-             filename = AttributeDict(a).name
 
-             file_name, file_extension = os.path.splitext(filename)
 
-             if file_extension == ext:
 
-                 return AttributeDict(a)
 
-         return None
 
-     def has_asset_type(self, ext='.bin') -> bool:
 
-         return self.get_asset_from_extension(ext) is not None
 
-     def platform(self):
 
-         return self.release_details.get_full_platform()
 
-     def get_zip_file(self):
 
-         self.tempfolder = extract_files_from_archive(self.zipfile)
 
-         print(
 
-             f'Artifacts for {self.name} extracted to {self.tempfolder}')
 
-         flash_parms_file = os.path.relpath(
 
-             self.tempfolder+self.flash_file_path)
 
-         line: str
 
-         with open(flash_parms_file) as fin:
 
-             for line in fin:
 
-                 components = line.split()
 
-                 if len(components) == 2:
 
-                     self.flash_parms[os.path.basename(
 
-                         components[1]).rstrip().lstrip()] = components[0]
 
-         try:
 
-             for artifact in artifacts_formats:
 
-                 base_name = os.path.basename(artifact[0]).rstrip().lstrip()
 
-                 self.bin_files.append(BinFile(
 
-                     self.tempfolder, artifact[0], self.flash_parms[base_name], self.release_details, self.build_dir))
 
-                 has_artifacts = True
 
-         except Exception:
 
-             self.has_artifacts = False
 
-     def cleanup(self):
 
-         Logger.debug(f'removing temp directory for platform release {self.name}')
 
-         shutil.rmtree(self.tempfolder)
 
-     def get_attributes(self):
 
-         return {
 
-             'name': self.name,
 
-             'branch': self.branch,
 
-             'description': self.description,
 
-             'url': self.url,
 
-             'zipfile': self.zipfile,
 
-             'release_details': self.release_details.get_attributes(),
 
-             'bin_files': [b.get_attributes() for b in self.bin_files],
 
-             'manifest_name': self.get_manifest_name()
 
-         }
 
- class Releases():
 
-     _dict: dict = collections.OrderedDict()
 
-     maxcount: int = 0
 
-     branch: str = ''
 
-     repo: Repository = None
 
-     last_commit: Commit = None
 
-     manifest_name: str
 
-     def __init__(self, branch: str, maxcount: int = 3) -> None:
 
-         self.maxcount = maxcount
 
-         self.branch = branch
 
-     def count(self, value: PlatformRelease) -> int:
 
-         content = self._dict.get(value.platform())
 
-         if content == None:
 
-             return 0
 
-         return len(content)
 
-     def get_platform(self, platform: str) -> list:
 
-         return self._dict[platform]
 
-     def get_platform_keys(self):
 
-         return self._dict.keys()
 
-     def get_all(self) -> list:
 
-         result: list = []
 
-         for platform in [self.get_platform(platform) for platform in self.get_platform_keys()]:
 
-             for release in platform:
 
-                 result.append(release)
 
-         return result
 
-     def append(self, value: PlatformRelease):
 
-         if self.count(value) == 0:
 
-             self._dict[value.platform()] = []
 
-         if self.should_add(value):
 
-             print(f'Adding release {value.name} to the list')
 
-             self._dict[value.platform()].append(value)
 
-         else:
 
-             print(f'Skipping release {value.name}')
 
-     def get_attributes(self):
 
-         res = []
 
-         release: PlatformRelease
 
-         for release in self.get_all():
 
-             res.append(release.get_attributes())
 
-         return res
 
-     def get_minlen(self) -> int:
 
-         return min([len(self.get_platform(p)) for p in self.get_platform_keys()])
 
-     def got_all_packages(self) -> bool:
 
-         return self.get_minlen() >= self.maxcount
 
-     def should_add(self, release: PlatformRelease) -> bool:
 
-         return self.count(release) <= self.maxcount
 
-     def add_package(self, package: PlatformRelease, with_artifacts: bool = True):
 
-         if self.branch != package.branch:
 
-             Logger.debug(f'Skipping release {package.name} from branch {package.branch}')
 
-         elif package.has_artifacts or not with_artifacts:
 
-             self.append(package)
 
-     @classmethod
 
-     def get_last_commit_message(cls, repo_obj: Repository = None) -> str:
 
-         last: Commit = cls.get_last_commit(repo_obj)
 
-         if last is None:
 
-             return ''
 
-         else:
 
-             return last.message.replace(Logger.NEWLINE_CHAR, ' ')
 
-     @classmethod
 
-     def get_last_author(cls, repo_obj: Repository = None) -> Signature:
 
-         last: Commit = cls.get_last_commit(repo_obj)
 
-         return last.author
 
-     @classmethod
 
-     def get_last_committer(cls, repo_obj: Repository = None) -> Signature:
 
-         last: Commit = cls.get_last_commit(repo_obj)
 
-         return last.committer
 
-     @classmethod
 
-     def get_last_commit(cls, repo_obj: Repository = None) -> Commit:
 
-         loc_repo = repo_obj
 
-         if cls.repo is None:
 
-             cls.load_repository(os.getcwd())
 
-         if loc_repo is None:
 
-             loc_repo = cls.repo
 
-         head: Reference = loc_repo.head
 
-         target = head.target
 
-         ref: Reference
 
-         if cls.last_commit is None:
 
-             try:
 
-                 cls.last_commit = loc_repo[target]
 
-                 print(
 
-                     f'Last commit for {head.shorthand} is {format_commit(cls.last_commit)}')
 
-             except Exception as e:
 
-                 Logger.error(
 
-                     f'Unable to retrieve last commit for {head.shorthand}/{target}: {e}')
 
-                 cls.last_commit = None
 
-         return cls.last_commit
 
-     @classmethod
 
-     def load_repository(cls, path: str = os.getcwd()) -> Repository:
 
-         if cls.repo is None:
 
-             try:
 
-                 print(f'Opening repository from {path}')
 
-                 cls.repo = Repository(path=path)
 
-             except GitError as ex:
 
-                 Logger.error(f"Unable to access the repository({ex}).\nContent of {path}:\n{Logger.NEWLINE_CHAR.join(get_file_list(path, 1))}")
 
-                 raise
 
-         return cls.repo
 
-     @classmethod
 
-     def resolve_commit(cls, repo: Repository, commit_id: str) -> Commit:
 
-         commit: Commit
 
-         reference: Reference
 
-         commit, reference = repo.resolve_refish(commit_id)
 
-         return commit
 
-     @classmethod
 
-     def get_branch_name(cls) -> str:
 
-         return re.sub('[^a-zA-Z0-9\-~!@_\.]', '', cls.load_repository().head.shorthand)
 
-     @classmethod
 
-     def get_release_branch(cls, repo: Repository, platform_release) -> str:
 
-         match = [t for t in repo.branches.with_commit(
 
-             platform_release.target_commitish)]
 
-         no_origin = [t for t in match if 'origin' not in t]
 
-         if len(no_origin) == 0 and len(match) > 0:
 
-             return match[0].split('/')[1]
 
-         elif len(no_origin) > 0:
 
-             return no_origin[0]
 
-         return ''
 
-     @classmethod
 
-     def get_flash_parms(cls, file_path):
 
-         flash = parse_json(file_path)
 
-         od: collections.OrderedDict = collections.OrderedDict()
 
-         for z in flash['flash_files'].items():
 
-             base_name: str = os.path.basename(z[1])
 
-             od[base_name.rstrip().lstrip()] = literal_eval(z[0])
 
-         return collections.OrderedDict(sorted(od.items()))
 
-     @classmethod
 
-     def get_releases(cls, flash_file_path, maxcount: int, name_prefix):
 
-         repo = Releases.load_repository(os.getcwd())
 
-         packages: Releases = cls(branch=repo.head.shorthand, maxcount=maxcount)
 
-         build_dir = os.path.dirname(flash_file_path)
 
-         for page in range(1, 999):
 
-             Logger.debug(f'Getting releases page {page}')
 
-             releases = get_github_data(
 
-                 repo, f'releases?per_page=50&page={page}')
 
-             if len(releases) == 0:
 
-                 Logger.debug(f'No more release found for page {page}')
 
-                 break
 
-             for release_entry in [AttributeDict(platform) for platform in releases]:
 
-                 packages.add_package(PlatformRelease(flash_file_path, release_entry, build_dir,
 
-                                      Releases.get_release_branch(repo, release_entry), name_prefix))
 
-                 if packages.got_all_packages():
 
-                     break
 
-             if packages.got_all_packages():
 
-                 break
 
-         return packages
 
-     @classmethod
 
-     def get_commit_list(cls) -> list:
 
-         commit_list = []
 
-         last: Commit = Releases.get_last_commit()
 
-         if last is None:
 
-             return commit_list
 
-         try:
 
-             for c in Releases.load_repository().walk(last.id, pygit2.GIT_SORT_TIME):
 
-                 if '[skip actions]' not in c.message:
 
-                     commit_list.append(format_commit(c))
 
-                     if len(commit_list) > 10:
 
-                         break
 
-         except Exception as e:
 
-             Logger.error(
 
-                 f'Unable to get commit list starting at {last.id}: {e}')
 
-         return commit_list
 
-     @classmethod
 
-     def get_commit_list_descriptions(cls) -> str:
 
-         return '<<~EOD\n### Revision Log\n'+Logger.NEWLINE_CHAR.join(cls.get_commit_list())+'\n~EOD'
 
-     def update(self, *args, **kwargs):
 
-         if args:
 
-             if len(args) > 1:
 
-                 raise TypeError("update expected at most 1 arguments, "
 
-                                 "got %d" % len(args))
 
-             other = dict(args[0])
 
-             for key in other:
 
-                 self[key] = other[key]
 
-         for key in kwargs:
 
-             self[key] = kwargs[key]
 
-     def setdefault(self, key, value=None):
 
-         if key not in self:
 
-             self[key] = value
 
-         return self[key]
 
- def set_workdir(args):
 
-     print(f'setting work dir to: {args.cwd}')
 
-     os.chdir(os.path.abspath(args.cwd))
 
- def parse_json(filename: str):
 
-     fname = os.path.abspath(filename)
 
-     folder: str = os.path.abspath(os.path.dirname(filename))
 
-     print(f'Opening json file {fname} from {folder}')
 
-     try:
 
-         with open(fname) as f:
 
-             content = f.read()
 
-             Logger.debug(f'Loading json\n{content}')
 
-             return json.loads(content)
 
-     except JSONDecodeError as ex:
 
-         Logger.error(f'Error parsing {content}')
 
-     except Exception as ex:
 
-         Logger.error(
 
-             f"Unable to parse flasher args json file. Content of {folder}:{Logger.NEWLINE_CHAR.join(get_file_list(folder))}")
 
-         raise
 
- def write_github_env_file(values,env_file):
 
-     print(f'Writing content to {env_file}...')
 
-     with open(env_file,  "w") as env_file:
 
-         for attr in [attr for attr in dir(values) if not attr.startswith('_')]:
 
-             line = f'{attr}{"=" if attr != "description" else ""}{getattr(values,attr)}'
 
-             print(line)
 
-             env_file.write(f'{line}\n')
 
-             os.environ[attr] = str(getattr(values, attr))
 
-     print(f'Done writing to {env_file}!')
 
- def format_artifact_from_manifest(manif_json: AttributeDict):
 
-     if len(manif_json) == 0:
 
-         return 'Newest release'
 
-     first = manif_json[0]
 
-     return f'{first["branch"]}-{first["release_details"]["version"]}'
 
- def format_artifact_name(base_name: str = '', args=AttributeDict(os.environ)):
 
-     return f'{base_name}{args.branch_name}-{args.node}-{args.depth}-{args.major}{args.build}'
 
- def handle_build_flags(args):
 
-     set_workdir(args)
 
-     print('Setting global build flags')
 
-     commit_message: str = Releases.get_last_commit_message()
 
-     github_env.mock = 1 if args.mock else 0
 
-     github_env.release_flag = 1 if args.mock or args.force or 'release' in commit_message.lower() else 0
 
-     github_env.ui_build = 1 if args.mock or args.ui_build or '[ui-build]' in commit_message.lower(
 
-     ) or github_env.release_flag == 1 else 0
 
-     write_github_env_file(github_env,os.environ.get('GITHUB_OUTPUT'))
 
- def write_version_number(file_path:str,env_details):
 
-     #     app_name="${TARGET_BUILD_NAME}.${DEPTH}.dev-$(git log --pretty=format:'%h' --max-count=1).${branch_name}" 
 
-     #     echo "${app_name}">version.txt
 
-     try:
 
-         version:str = f'{env_details.TARGET_BUILD_NAME}.{env_details.DEPTH}.{env_details.major}.{env_details.BUILD_NUMBER}.{env_details.branch_name}'
 
-         with open(file_path,  "w") as version_file:
 
-             version_file.write(version)
 
-     except Exception as ex:
 
-         Logger.error(f'Unable to set version string {version} in file {file_path}')
 
-         raise Exception('Version error')
 
-     Logger.notice(f'Firmware version set to {version}')
 
- def handle_environment(args):
 
-     set_workdir(args)
 
-     print('Setting environment variables...')
 
-     commit_message: str = Releases.get_last_commit_message()
 
-     last: Commit = Releases.get_last_commit()
 
-     if last is not None:
 
-         github_env.author_name = last.author.name
 
-         github_env.author_email = last.author.email
 
-         github_env.committer_name = last.committer.name
 
-         github_env.committer_email = last.committer.email
 
-     github_env.node = args.node
 
-     github_env.depth = args.depth
 
-     github_env.major = args.major
 
-     github_env.build = args.build
 
-     github_env.DEPTH = args.depth
 
-     github_env.TARGET_BUILD_NAME = args.node
 
-     github_env.build_version_prefix = args.major
 
-     github_env.branch_name = Releases.get_branch_name()
 
-     github_env.BUILD_NUMBER = str(args.build)
 
-     github_env.tag = f'{args.node}.{args.depth}.{args.build}.{github_env.branch_name}'.rstrip()
 
-     github_env.last_commit = commit_message
 
-     github_env.DOCKER_IMAGE_NAME = args.docker
 
-     github_env.name = f"{args.major}.{str(args.build)}-{args.depth}#v4.3#{args.node}#{github_env.branch_name}"
 
-     github_env.artifact_prefix = format_artifact_name(
 
-         'squeezelite-esp32-', github_env)
 
-     github_env.artifact_file_name = f"{github_env.artifact_prefix}.zip"
 
-     github_env.artifact_bin_file_name = f"{github_env.artifact_prefix}.bin"
 
-     github_env.PROJECT_VER = f'{args.node}-{ args.build }'
 
-     github_env.description = Releases.get_commit_list_descriptions()
 
-     write_github_env_file(github_env,args.env_file)
 
-     write_version_number("version.txt",github_env)
 
- def handle_artifacts(args):
 
-     set_workdir(args)
 
-     print(f'Handling artifacts')
 
-     for attr in artifacts_formats:
 
-         target: str = os.path.relpath(attr[1].replace(artifacts_formats_outdir, args.outdir).replace(
 
-             artifacts_formats_prefix, format_artifact_name()))
 
-         source: str = os.path.relpath(attr[0])
 
-         target_dir: str = os.path.dirname(target)
 
-         print(f'Copying file {source} to {target}')
 
-         try:
 
-             os.makedirs(target_dir, exist_ok=True)
 
-             shutil.copyfile(source, target, follow_symlinks=True)
 
-         except Exception as ex:
 
-             Logger.error(f"Error while copying {source} to {target}\nContent of {target_dir}:\n{Logger.NEWLINE_CHAR.join(get_file_list(os.path.dirname(attr[0].rstrip())))}")
 
-             raise
 
- def delete_folder(path):
 
-     '''Remov Read Only Files'''
 
-     for root, dirs, files in os.walk(path, topdown=True):
 
-         for dir in dirs:
 
-             fulldirpath = os.path.join(root, dir)
 
-             Logger.debug(f'Drilling down in {fulldirpath}')
 
-             delete_folder(fulldirpath)
 
-         for fname in files:
 
-             full_path = os.path.join(root, fname)
 
-             Logger.debug(f'Setting file read/write {full_path}')
 
-             os.chmod(full_path, stat.S_IWRITE)
 
-             Logger.debug(f'Deleting file {full_path}')
 
-             os.remove(full_path)
 
-     if os.path.exists(path):
 
-         Logger.debug(f'Changing folder read/write {path}')
 
-         os.chmod(path, stat.S_IWRITE)
 
-         print(f'WARNING: Deleting Folder {path}')
 
-         os.rmdir(path)
 
- def get_file_stats(path):
 
-     fstat: os.stat_result = pathlib.Path(path).stat()
 
-     # Convert file size to MB, KB or Bytes
 
-     mtime = time.strftime("%X %x", time.gmtime(fstat.st_mtime))
 
-     if (fstat.st_size > 1024 * 1024):
 
-         return math.ceil(fstat.st_size / (1024 * 1024)), "MB", mtime
 
-     elif (fstat.st_size > 1024):
 
-         return math.ceil(fstat.st_size / 1024), "KB", mtime
 
-     return fstat.st_size, "B", mtime
 
- def get_file_list(root_path, max_levels: int = 2) -> list:
 
-     outlist: list = []
 
-     for root, dirs, files in os.walk(root_path):
 
-         path = os.path.relpath(root).split(os.sep)
 
-         if len(path) <= max_levels:
 
-             outlist.append(f'\n{root}')
 
-             for file in files:
 
-                 full_name = os.path.join(root, file)
 
-                 fsize, unit, mtime = get_file_stats(full_name)
 
-                 outlist.append('{:s} {:8d} {:2s} {:18s}\t{:s}'.format(
 
-                     len(path) * "---", fsize, unit, mtime, file))
 
-     return outlist
 
- def get_recursive_list(path) -> list:
 
-     outlist: list = []
 
-     for root, dirs, files in os.walk(path, topdown=True):
 
-         for fname in files:
 
-             outlist.append((fname, os.path.join(root, fname)))
 
-     return outlist
 
- def handle_manifest(args):
 
-     set_workdir(args)
 
-     print(f'Creating the web installer manifest')
 
-     outdir: str = os.path.relpath(args.outdir)
 
-     if not os.path.exists(outdir):
 
-         print(f'Creating target folder {outdir}')
 
-         os.makedirs(outdir, exist_ok=True)
 
-     releases: Releases = Releases.get_releases(
 
-         args.flash_file, args.max_count, args.manif_name)
 
-     release: PlatformRelease
 
-     for release in releases.get_all():
 
-         manifest_name = release.get_manifest_name()
 
-         release.get_zip_file()
 
-         man = copy.deepcopy(manifest)
 
-         man['manifest_name'] = manifest_name
 
-         man['builds'][0]['parts'] = release.process_files(args.outdir)
 
-         man['name'] = release.platform()
 
-         man['version'] = release.release_details.version
 
-         Logger.debug(f'Generated manifest: \n{json.dumps(man)}')
 
-         fullpath = os.path.join(args.outdir, release.get_manifest_name())
 
-         print(f'Writing manifest to {fullpath}')
 
-         with open(fullpath, "w") as f:
 
-             json.dump(man, f, indent=4)
 
-         release.cleanup()
 
-     mainmanifest = os.path.join(args.outdir, args.manif_name)
 
-     print(f'Writing main manifest {mainmanifest}')
 
-     with open(mainmanifest, 'w') as f:
 
-         json.dump(releases.get_attributes(), f, indent=4)
 
- def get_new_file_names(manif_json) -> collections.OrderedDict():
 
-     new_release_files: dict = collections.OrderedDict()
 
-     for artifact in manif_json:
 
-         for name in [f["name"] for f in artifact["bin_files"]]:
 
-             new_release_files[name] = artifact
 
-         new_release_files[artifact["manifest_name"]] = artifact["name"]
 
-     return new_release_files
 
- def copy_no_overwrite(source: str, target: str):
 
-     sfiles = os.listdir(source)
 
-     for f in sfiles:
 
-         source_file = os.path.join(source, f)
 
-         target_file = os.path.join(target, f)
 
-         if not os.path.exists(target_file):
 
-             print(f'Copying {f} to target')
 
-             shutil.copy(source_file, target_file)
 
-         else:
 
-             Logger.debug(f'Skipping existing file {f}')
 
- def get_changed_items(repo: Repository) -> Dict:
 
-     changed_filemode_status_code: int = pygit2.GIT_FILEMODE_TREE
 
-     original_status_dict: Dict[str, int] = repo.status()
 
-     # transfer any non-filemode changes to a new dictionary
 
-     status_dict: Dict[str, int] = {}
 
-     for filename, code in original_status_dict.items():
 
-         if code != changed_filemode_status_code:
 
-             status_dict[filename] = code
 
-     return status_dict
 
- def is_dirty(repo: Repository) -> bool:
 
-     return len(get_changed_items(repo)) > 0
 
- def push_with_method(auth_method:str,token:str,remote: Remote,reference):
 
-     success:bool = False
 
-     try:
 
-         remote.push(reference, callbacks=RemoteCallbacks(pygit2.UserPass(auth_method, token)))
 
-         success=True
 
-     except Exception as ex:
 
-         Logger.error(f'Error pushing with auth method {auth_method}: {ex}.')
 
-     return success
 
- def push_if_change(repo: Repository, token: str, source_path: str, manif_json):
 
-     if is_dirty(repo):
 
-         print(f'Changes found. Preparing commit')
 
-         env = AttributeDict(os.environ)
 
-         index: Index = repo.index
 
-         index.add_all()
 
-         index.write()
 
-         reference = repo.head.name
 
-         message = f'Web installer for {format_artifact_from_manifest(manif_json)}'
 
-         tree = index.write_tree()
 
-         Releases.load_repository(source_path)
 
-         commit = repo.create_commit(reference, Releases.get_last_author(
 
-         ), Releases.get_last_committer(), message, tree, [repo.head.target])
 
-         origin: Remote = repo.remotes['origin']
 
-         print(
 
-             f'Pushing commit {format_commit(repo[commit])} to url {origin.url}')
 
-         remote: Remote = repo.remotes['origin']
 
-         auth_methods = ['x-access-token','x-oauth-basic']
 
-         for method in auth_methods:
 
-             if push_with_method(method, token, remote, [reference]):
 
-                 print(f'::notice Web installer updated for {format_artifact_from_manifest(manif_json)}')
 
-                 return
 
-         raise Exception('Unable to push web installer.')    
 
-     else:
 
-         print(f'WARNING: No change found. Skipping update')
 
- def update_files(target_artifacts: str, manif_json, source: str):
 
-     new_list: dict = get_new_file_names(manif_json)
 
-     if os.path.exists(target_artifacts):
 
-         print(f'Removing obsolete files from {target_artifacts}')
 
-         for entry in get_recursive_list(target_artifacts):
 
-             f = entry[0]
 
-             full_target = entry[1]
 
-             if f not in new_list.keys():
 
-                 print(f'WARNING: Removing obsolete file {f}')
 
-                 os.remove(full_target)
 
-     else:
 
-         print(f'Creating target folder {target_artifacts}')
 
-         os.makedirs(target_artifacts, exist_ok=True)
 
-     print(f'Copying installer files to {target_artifacts}:')
 
-     copy_no_overwrite(os.path.abspath(source), target_artifacts)
 
- def handle_pushinstaller(args):
 
-     set_workdir(args)
 
-     print('Pushing web installer updates... ')
 
-     target_artifacts = os.path.join(args.target, args.artifacts)
 
-     if os.path.exists(args.target):
 
-         print(f'Removing files (if any) from {args.target}')
 
-         delete_folder(args.target)
 
-     print(f'Cloning from {args.url} into {args.target}')
 
-     repo = pygit2.clone_repository(args.url, args.target)
 
-     repo.checkout_head()
 
-     manif_json = parse_json(os.path.join(args.source, args.manif_name))
 
-     update_files(target_artifacts, manif_json, args.source)
 
-     push_if_change(repo, args.token, args.cwd, manif_json)
 
-     repo.state_cleanup()
 
- def handle_show(args):
 
-     print('Show')
 
- def extract_files_from_archive(url):
 
-     tempfolder = tempfile.mkdtemp()
 
-     platform:Response = requests.get(url)
 
-     Logger.debug(f'Downloading {url} to {tempfolder}')
 
-     Logger.debug(f'Transfer status code: {platform.status_code}. Expanding content')
 
-     z = zipfile.ZipFile(io.BytesIO(platform.content))
 
-     z.extractall(tempfolder)
 
-     return tempfolder
 
- def handle_list_files(args):
 
-     print(f'Content of {args.cwd}:')
 
-     print(Logger.NEWLINE_CHAR.join(get_file_list(args.cwd)))
 
- def handle_commits(args):
 
-     set_workdir(args)
 
-     print(Releases.get_commit_list_descriptions())
 
- parser_environment.set_defaults(func=handle_environment, cmd='environment')
 
- parser_manifest.set_defaults(func=handle_manifest, cmd='manifest')
 
- parser_pushinstaller.set_defaults(func=handle_pushinstaller, cmd='installer')
 
- parser_show.set_defaults(func=handle_show, cmd='show')
 
- parser_build_flags.set_defaults(func=handle_build_flags, cmd='build_flags')
 
- parser_dir.set_defaults(func=handle_list_files, cmd='list_files')
 
- parser_commits.set_defaults(func=handle_commits,cmd='list_commits')
 
- def main():
 
-     exit_result_code = 0
 
-     args = parser.parse_args()
 
-     Logger.with_crlf = args.with_crlf
 
-     print(f'::group::{args.command}')
 
-     print(f'build_tools version : {tool_version}')
 
-     print(f'Processing command {args.command}')
 
-     func: Callable = getattr(args, 'func', None)
 
-     if func is not None:
 
-         # Call whatever subcommand function was selected
 
-         
 
-         e: Exception
 
-         try:
 
-             func(args)
 
-         except Exception as e:
 
-             Logger.error(f'Critical error while running {args.command}\n{" ".join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))}')
 
-             exit_result_code = 1
 
-     else:
 
-         # No subcommand was provided, so call help
 
-         parser.print_usage()
 
-     print(f'::endgroup::')
 
-     sys.exit(exit_result_code)
 
- if __name__ == '__main__':
 
-     main()
 
 
  |