#!/usr/bin/env python from json import JSONDecodeError import math import pathlib import time from typing import Callable,Dict, Union import pkg_resources import sys import os import io from os import stat_result, walk try: import argparse import collections import copy import enum import glob import json import logging import re import shutil import stat import tempfile import zipfile from ast import literal_eval from collections import namedtuple from datetime import datetime, timedelta, timezone from json import JSONDecoder from operator import contains from platform import platform, release from pydoc import describe from time import strftime from typing import OrderedDict from urllib import response from urllib.parse import urlparse from urllib.request import Request from webbrowser import get import pygit2 from pygit2 import Commit,Repository,GitError,Reference,UserPass,Index,Signature,RemoteCallbacks, Remote import requests from genericpath import isdir except ImportError as ex: print(f'::error::Failed importing module {ex.name}, using interpreter {sys.executable}. \n Installed packages:') installed_packages = pkg_resources.working_set installed_packages_list = sorted(["%s==%s" % (i.key, i.version) for i in installed_packages]) print('\n'.join(installed_packages_list)) print(f'Environment: ') envlist="\n".join( [f"{k}={v}" for k,v in sorted(os.environ.items())]) print(f'{envlist}') raise FORMAT = '%(asctime)s %(message)s' logging.basicConfig(format=FORMAT) logger:logging.Logger = logging.getLogger(__name__) github_env= type('', (), {})() tool_version= "1.0.5" manifest={ "name": "", "version": "", "home_assistant_domain": "slim_player", "funding_url": "https://esphome.io/guides/supporters.html", "builds": [ { "chipFamily": "ESP32", "parts": [ ] } ] } artifacts_formats_outdir= '$OUTDIR' artifacts_formats_prefix= '$PREFIX' artifacts_formats = [ ['build/squeezelite.bin', '$OUTDIR/$PREFIX-squeezelite.bin'], ['build/recovery.bin', '$OUTDIR/$PREFIX-recovery.bin'], ['build/ota_data_initial.bin', '$OUTDIR/$PREFIX-ota_data_initial.bin'], ['build/bootloader/bootloader.bin', '$OUTDIR/$PREFIX-bootloader.bin'], ['build/partition_table/partition-table.bin ', '$OUTDIR/$PREFIX-partition-table.bin'], ] class AttributeDict(dict): __slots__ = () def __getattr__(self, name:str): try: return self[name.upper()] except Exception: try: return self[name.lower()] except Exception: for attr in self.keys(): if name.lower() == attr.replace("'","").lower() : return self[attr] __setattr__ = dict.__setitem__ parser = argparse.ArgumentParser(description='Handles some parts of the squeezelite-esp32 build process') parser.add_argument('--cwd', type=str,help='Working directory', default=os.getcwd()) parser.add_argument('--loglevel', type=str,choices={'CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET'}, help='Logging level', default='INFO') subparsers = parser.add_subparsers( dest='command', required=True) parser_manifest = subparsers.add_parser("manifest", add_help=False, description="Manifest parser", help="Handles the web installer manifest creation") parser_manifest.add_argument('--flash_file', required=True, type=str,help='The file path which contains the firmware flashing definition') parser_manifest.add_argument('--max_count', type=int,help='The maximum number of releases to keep', default=3) parser_manifest.add_argument('--manif_name', required=True,type=str,help='Manifest files name and prefix') parser_manifest.add_argument('--outdir', required=True,type=str,help='Output directory for files and manifests') parser_artifacts = subparsers.add_parser("artifacts", add_help=False, description="Artifacts parser", help="Handles the creation of artifacts files") parser_artifacts.add_argument('--outdir', type=str,help='Output directory for artifact files', default='./artifacts/') parser_pushinstaller = subparsers.add_parser("pushinstaller", add_help=False, description="Web Installer Checkout parser", help="Handles the creation of artifacts files") parser_pushinstaller.add_argument('--target', type=str,help='Output directory for web installer repository', default='./web_installer/') parser_pushinstaller.add_argument('--artifacts', type=str,help='Target subdirectory for web installer artifacts', default='./web_installer/') parser_pushinstaller.add_argument('--source', type=str,help='Source directory for the installer artifacts', default='./web_installer/') parser_pushinstaller.add_argument('--url', type=str,help='Web Installer clone url ', default='https://github.com/sle118/squeezelite-esp32-installer.git') parser_pushinstaller.add_argument('--web_installer_branch', type=str,help='Web Installer branch to use ', default='main') parser_pushinstaller.add_argument('--token', type=str,help='Auth token for pushing changes') parser_pushinstaller.add_argument('--flash_file', type=str,help='Manifest json file path') parser_pushinstaller.add_argument('--manif_name', required=True,type=str,help='Manifest files name and prefix') parser_environment = subparsers.add_parser("environment", add_help=False, description="Environment parser", help="Updates the build environment") parser_environment.add_argument('--env_file', type=str,help='Environment File', default=os.environ.get('GITHUB_ENV')) parser_environment.add_argument('--build', required=True, type=int,help='The build number') parser_environment.add_argument('--node', required=True, type=str,help='The matrix node being built') parser_environment.add_argument('--depth', required=True, type=int,help='The bit depth being built') parser_environment.add_argument('--major', type=str,help='Major version', default='2') parser_environment.add_argument('--docker', type=str,help='Docker image to use',default='sle118/squeezelite-esp32-idfv43') parser_show = subparsers.add_parser("show", add_help=False, description="Show parser", help="Show the build environment") parser_build_flags = subparsers.add_parser("build_flags", add_help=False, description="Build Flags", help="Updates the build environment with build flags") parser_build_flags.add_argument('--mock', action='store_true',help='Mock release') parser_build_flags.add_argument('--force', action='store_true',help='Force a release build') parser_build_flags.add_argument('--ui_build', action='store_true',help='Include building the web UI') def get_github_data(repo:Repository,api): base_url = urlparse(repo.remotes['origin'].url) url = f"https://api.github.com/repos{base_url.path.split('.')[-2]}/{api}" resp= requests.get(url, headers={"Content-Type": "application/vnd.github.v3+json"}) return json.loads(resp.text) def dump_directory(dir_path): # list to store files name res = [] for (dir_path, dir_names, file_names) in walk(dir_path): res.extend(file_names) print(res) class ReleaseDetails(): version:str idf:str platform:str branch:str bitrate:str def __init__(self,tag:str) -> None: self.version,self.idf,self.platform,self.branch=tag.split('#') try: self.version,self.bitrate = self.version.split('-') except Exception: pass def get_attributes(self): return { 'version': self.version, 'idf': self.idf, 'platform': self.platform, 'branch': self.branch, 'bitrate': self.bitrate } def format_prefix(self)->str: return f'{self.branch}-{self.platform}-{self.version}' def get_full_platform(self): return f"{self.platform}{f'-{self.bitrate}' if self.bitrate is not None else ''}" class BinFile(): name:str offset:int source_full_path:str target_name:str target_fullpath:str artifact_relpath:str def __init__(self, source_path,file_build_path:str, offset:int,release_details:ReleaseDetails,build_dir) -> None: self.name = os.path.basename(file_build_path).rstrip() self.artifact_relpath = os.path.relpath(file_build_path,build_dir).rstrip() self.source_path = source_path self.source_full_path = os.path.join(source_path,file_build_path).rstrip() self.offset = offset self.target_name= f'{release_details.format_prefix()}-{self.name}'.rstrip() def get_manifest(self): return { "path": self.target_name , "offset": self.offset } def copy(self,target_folder)->str: self.target_fullpath=os.path.join(target_folder,self.target_name) logger.debug(f'file {self.source_full_path} will be copied to {self.target_fullpath}') try: os.makedirs(target_folder, exist_ok=True) shutil.copyfile(self.source_full_path, self.target_fullpath, follow_symlinks=True) except Exception as ex: print(f'::error::Error while copying {self.source_full_path} to {self.target_fullpath}' ) print(f'::error::Content of {os.path.dirname(self.source_full_path.rstrip())}:') print('\n::error::'.join(get_file_list(os.path.dirname(self.source_full_path.rstrip())))) raise return self.target_fullpath def get_attributes(self): return { 'name':self.target_name, 'offset':self.offset, 'artifact_relpath':self.artifact_relpath } class PlatformRelease(): name:str description:str url:str='' zipfile:str='' tempfolder:str release_details:ReleaseDetails flash_parms={} build_dir:str has_artifacts:bool branch:str assets:list bin_files:list name_prefix:str def get_manifest_name(self)->str: return f'{self.name_prefix}-{self.release_details.format_prefix()}.json' def __init__(self,git_release,flash_parms,build_dir, branch,name_prefix) -> None: self.name = git_release.tag_name self.description=git_release.body self.assets = git_release['assets'] self.has_artifacts = False self.name_prefix = name_prefix if len(self.assets)>0: if self.has_asset_type(): self.url=self.get_asset_from_extension().browser_download_url if self.has_asset_type('.zip'): self.zipfile=self.get_asset_from_extension(ext='.zip').browser_download_url self.has_artifacts = True self.release_details=ReleaseDetails(git_release.name) self.bin_files = list() self.flash_parms = flash_parms self.build_dir = build_dir self.branch = branch def process_files(self,outdir:str)->list: parts = [] for f in self.bin_files: f.copy(outdir) parts.append(f.get_manifest()) def get_asset_from_extension(self,ext='.bin'): for a in self.assets: filename=AttributeDict(a).name file_name, file_extension = os.path.splitext(filename) if file_extension == ext: return AttributeDict(a) return None def has_asset_type(self,ext='.bin')->bool: return self.get_asset_from_extension(ext) is not None def platform(self): return self.release_details.get_full_platform() def get_zip_file(self): self.tempfolder = extract_files_from_archive(self.zipfile) logger.info(f'Artifacts for {self.name} extracted to {self.tempfolder}') try: for artifact in artifacts_formats: base_name = os.path.basename(artifact[0]).rstrip().lstrip() self.bin_files.append(BinFile(self.tempfolder,artifact[0],self.flash_parms[base_name],self.release_details,self.build_dir)) has_artifacts = True except Exception: self.has_artifacts = False def cleanup(self): logger.info(f'removing {self.name} temp directory {self.tempfolder}') shutil.rmtree(self.tempfolder) def get_attributes(self): return { 'name':self.name, 'branch':self.branch, 'description':self.description, 'url':self.url, 'zipfile':self.zipfile, 'release_details':self.release_details.get_attributes(), 'bin_files': [b.get_attributes() for b in self.bin_files], 'manifest_name': self.get_manifest_name() } class Releases(): _dict:dict = collections.OrderedDict() maxcount:int =0 branch:str='' repo:Repository=None manifest_name:str def __init__(self,branch:str,maxcount:int=3) -> None: self.maxcount = maxcount self.branch = branch def count(self,value:PlatformRelease)->int: content=self._dict.get(value.platform()) if content == None: return 0 return len(content) def get_platform(self,platform:str)->list: return self._dict[platform] def get_platform_keys(self): return self._dict.keys() def get_all(self)->list: result:list=[] for platform in [self.get_platform(platform) for platform in self.get_platform_keys()]: for release in platform: result.append(release) return result def append(self,value:PlatformRelease): # optional processing here if self.count(value) == 0: self._dict[value.platform()] = [] if self.should_add(value): logger.info(f'Adding release {value.name} to the list') self._dict[value.platform()].append(value) else: logger.info(f'Skipping release {value.name}') def get_attributes(self): res = [] release:PlatformRelease for release in self.get_all(): res.append(release.get_attributes()) return res def get_minlen(self)->int: return min([len(self.get_platform(p)) for p in self.get_platform_keys()]) def got_all_packages(self)->bool: return self.get_minlen() >=self.maxcount def should_add(self,release:PlatformRelease)->bool: return self.count(release) <=self.maxcount def add_package(self,package:PlatformRelease, with_artifacts:bool=True): if self.branch != package.branch: logger.info(f'Skipping release {package.name} from branch {package.branch}') elif package.has_artifacts or not with_artifacts: self.append(package) @classmethod def get_last_commit(cls)->Commit: if cls.repo is None: cls.get_repository(os.getcwd()) return cls.repo[cls.repo.head.target] @classmethod def get_repository(cls,path:str=os.getcwd())->Repository: if cls.repo is None: try: logger.info(f'Opening repository from {path}') cls.repo=Repository(path=path) except GitError as ex: print(f'::error::Error while trying to access the repository.') print(f'::error::Content of {path}:') print('\n::error::'.join(get_file_list(path))) raise return cls.repo @classmethod def resolve_commit(cls,repo:Repository,commit_id:str)->Commit: commit:Commit reference:Reference commit, reference = repo.resolve_refish(commit_id) return commit @classmethod def get_release_branch(cls,repo:Repository,platform_release)->str: match = [t for t in repo.branches.with_commit(platform_release.target_commitish)] no_origin = [t for t in match if 'origin' not in t] if len(no_origin) == 0 and len(match) > 0: return match[0].split('/')[1] elif len(no_origin) >0: return no_origin[0] return '' @classmethod def get_flash_parms(cls,file_path): flash = parse_json(file_path) od:collections.OrderedDict = collections.OrderedDict() for z in flash['flash_files'].items(): base_name:str = os.path.basename(z[1]) od[base_name.rstrip().lstrip()] = literal_eval( z[0]) return collections.OrderedDict(sorted(od.items())) @classmethod def get_releases(cls,flash_file_path,maxcount:int,name_prefix): repo=Releases.get_repository(os.getcwd()) flash_parms = Releases.get_flash_parms(flash_file_path) packages:Releases = cls(branch=repo.head.shorthand,maxcount=maxcount) build_dir=os.path.dirname(flash_file_path) for page in range(1,999): logger.debug(f'Getting releases page {page}') releases = get_github_data(repo,f'releases?per_page=50&page={page}') if len(releases)==0: logger.debug(f'No more release found for page {page}') break for release_entry in [AttributeDict(platform) for platform in releases]: packages.add_package(PlatformRelease(release_entry,flash_parms,build_dir,Releases.get_release_branch(repo,release_entry),name_prefix)) if packages.got_all_packages(): break if packages.got_all_packages(): break return packages def update(self, *args, **kwargs): if args: if len(args) > 1: raise TypeError("update expected at most 1 arguments, " "got %d" % len(args)) other = dict(args[0]) for key in other: self[key] = other[key] for key in kwargs: self[key] = kwargs[key] def setdefault(self, key, value=None): if key not in self: self[key] = value return self[key] def set_workdir(args): logger.info(f'setting work dir to: {args.cwd}') os.chdir(os.path.abspath(args.cwd)) def parse_json(filename:str): fname = os.path.abspath(filename) folder:str = os.path.abspath(os.path.dirname(filename)) logger.info(f'Opening json file {fname} from {folder}') try: with open(fname) as f: content=f.read() logger.debug(f'Loading json\n{content}') return json.loads(content) except JSONDecodeError as ex: print(f'::error::Error parsing {content}') except Exception as ex: print(f'::error::Unable to parse flasher args json file. Content of {folder}:') print('\n::error::'.join(get_file_list(folder))) raise def write_github_env(args): logger.info(f'Writing environment details to {args.env_file}...') with open(args.env_file, "w") as env_file: for attr in [attr for attr in dir(github_env) if not attr.startswith('_')]: line=f'{attr}={getattr(github_env,attr)}' logger.info(line) env_file.write(f'{line}\n') os.environ[attr] = str(getattr(github_env,attr)) logger.info(f'Done writing environment details to {args.env_file}!') def set_workflow_output(args): logger.info(f'Outputting job variables ...') for attr in [attr for attr in dir(github_env) if not attr.startswith('_')]: # use print instead of logger, as we need the raw output without the date/time prefix from logging print(f'::set-output name={attr}::{getattr(github_env,attr)}') os.environ[attr] = str(getattr(github_env,attr)) logger.info(f'Done outputting job variables!') def format_commit(commit): #463a9d8b7 Merge branch 'bugfix/ci_deploy_tags_v4.0' into 'release/v4.0' (2020-01-11T14:08:55+08:00) dt = datetime.fromtimestamp(float(commit.author.time), timezone( timedelta(minutes=commit.author.offset) )) timestr = dt.strftime('%c%z') cmesg= commit.message.replace('\n', ' ' ) return f'{commit.short_id} {cmesg} ({timestr}) <{commit.author.name}>'.replace(' ', ' ', ) def format_artifact_name(base_name:str='',args = AttributeDict(os.environ)): return f'{base_name}{args.branch_name}-{args.node}-{args.depth}-{args.major}{args.build}' def handle_build_flags(args): set_workdir(args) logger.info('Setting global build flags') last:Commit = Releases.get_last_commit() commit_message:str= last.message.replace('\n', ' ') github_env.mock=1 if args.mock else 0 github_env.release_flag=1 if args.mock or args.force or 'release' in commit_message.lower() else 0 github_env.ui_build=1 if args.mock or args.ui_build or '[ui-build]' in commit_message.lower() or github_env.release_flag==1 else 0 set_workflow_output(github_env) def handle_environment(args): set_workdir(args) logger.info('Setting environment variables...') last:Commit = Releases.get_last_commit() commit_message:str= last.message.replace('\n', ' ') github_env.author_name=last.author.name github_env.author_email=last.author.email github_env.committer_name=last.committer.name github_env.committer_email=last.committer.email github_env.node=args.node github_env.depth=args.depth github_env.major=args.major github_env.build=args.build github_env.DEPTH=args.depth github_env.TARGET_BUILD_NAME=args.node github_env.build_version_prefix=args.major github_env.branch_name=re.sub('[^a-zA-Z0-9\-~!@_\.]', '', Releases.get_repository().head.shorthand) github_env.BUILD_NUMBER=str(args.build) github_env.tag=f'{args.node}.{args.depth}.{args.build}.{github_env.branch_name}'.rstrip() github_env.last_commit=commit_message github_env.DOCKER_IMAGE_NAME=args.docker github_env.name=f"{args.major}.{str(args.build)}-{args.depth}#v4.3#{args.node}#{github_env.branch_name}" github_env.artifact_prefix=format_artifact_name('squeezelite-esp32-',github_env) github_env.artifact_file_name=f"{github_env.artifact_prefix}.zip" github_env.artifact_bin_file_name=f"{github_env.artifact_prefix}.bin" github_env.PROJECT_VER=f'{args.node}-{ args.build }' github_env.description='### Revision Log
<<~EOD\n'+'
\n'.join(format_commit(c) for i,c in enumerate(Releases.get_repository().walk(last.id,pygit2.GIT_SORT_TIME)) if i<10)+'\n~EOD' write_github_env(args) def handle_artifacts(args): set_workdir(args) logger.info(f'Handling artifacts') for attr in artifacts_formats: target:str=attr[1].replace(artifacts_formats_outdir,args.outdir).replace(artifacts_formats_prefix,format_artifact_name()) logger.debug(f'file {attr[0]} will be copied to {target}') try: os.makedirs(os.path.dirname(target), exist_ok=True) shutil.copyfile(attr[0].rstrip(), target, follow_symlinks=True) except Exception as ex: print(f'::error::Error while copying to {target}' ) print(f'::error::Content of {os.path.dirname(attr[0].rstrip())}:') print('\n::error::'.join(get_file_list(os.path.dirname(attr[0].rstrip())))) raise def delete_folder(path): '''Remov Read Only Files''' for root, dirs, files in os.walk(path,topdown=True): for dir in dirs: fulldirpath=os.path.join(root, dir) logger.debug(f'Drilling down in {fulldirpath}') delete_folder(fulldirpath) for fname in files: full_path = os.path.join(root, fname) logger.debug(f'Setting file read/write {full_path}') os.chmod(full_path ,stat.S_IWRITE) logger.debug(f'Deleting file {full_path}') os.remove(full_path) if os.path.exists(path): logger.debug(f'Changing folder read/write {path}') os.chmod(path ,stat.S_IWRITE) logger.warning(f'Deleting Folder {path}') os.rmdir(path) def get_file_list(path)->list: outlist:list=[] for root, dirs, files in os.walk(path,topdown=True): for dir in dirs: outlist.append(f'Content of {os.path.join(root, dir)}') get_file_list(os.path.join(root, dir)) for fname in files: full_name=os.path.join(root, fname) fstat:os.stat_result = pathlib.Path(full_name).stat() # Convert file size to MB, KB or Bytes if (fstat.st_size > 1024 * 1024): fsize = math.ceil(fstat.st_size / (1024 * 1024)) unit = "MB" elif (fstat.st_size > 1024): fsize = math.ceil(fstat.st_size / 1024) unit = "KB" else: fsize = fstat.st_size unit = "B" mtime = time.strftime("%X %x", time.gmtime(fstat.st_mtime)) outlist.append('\t{:15.80s}{:8d} {:2s} {:18s}'.format(fname,fsize,unit,mtime)) if os.path.exists(path): outlist.append(path) outlist.sort() return outlist def get_recursive_list(path)->list: outlist:list=[] for root, dirs, files in os.walk(path,topdown=True): for dir in dirs: get_file_list(os.path.join(root, dir)) for fname in files: outlist.append(fname) # if os.path.exists(path): # outlist.append(path) outlist.sort() return outlist def handle_manifest(args): set_workdir(args) logger.info(f'Creating the web installer manifest') env = AttributeDict(os.environ) if not os.path.exists(os.path.dirname(args.outdir)): logger.info(f'Creating target folder {args.outdir}') os.makedirs(args.outdir, exist_ok=True) releases:Releases = Releases.get_releases(args.flash_file, args.max_count,args.manif_name) release:PlatformRelease for release in releases.get_all(): release.get_zip_file() man = copy.deepcopy(manifest) man['manifest_name'] = release.get_manifest_name() man['builds'][0]['parts'] = release.process_files(args.outdir) man['name'] = release.platform() man['version'] = release.release_details.version logger.debug(f'Generated manifest: \n{json.dumps(man,indent=4)}') fullpath=os.path.join(args.outdir,release.get_manifest_name()) logger.info(f'Writing manifest to {fullpath}') with open(fullpath, "w") as f: json.dump(man,f,indent=4) release.cleanup() mainmanifest=os.path.join(args.outdir,args.manif_name) logger.info(f'Writing main manifest {mainmanifest}') with open(mainmanifest,'w') as f: json.dump(releases.get_attributes(),f,indent=4) def get_new_file_names(manifest:str,source:str)->collections.OrderedDict(): artifacts = parse_json(os.path.join(source,manifest)) new_release_files:dict = collections.OrderedDict() for artifact in artifacts: for name in [f["name"] for f in artifact["bin_files"]]: new_release_files[name] = artifact new_release_files[artifact['manifest_name']] = artifact['name'] return new_release_files def copy_no_overwrite(source:str,target:str) : sfiles = os.listdir(source) for f in sfiles: source_file = os.path.join(source,f) target_file = os.path.join(target,f) if not os.path.exists(target_file): logger.info(f'Copying {f} to target') shutil.copy(source_file, target_file) else: logger.debug(f'Skipping existing file {f}') def get_changed_items(repo:Repository)->Dict: changed_filemode_status_code: int = pygit2.GIT_FILEMODE_TREE original_status_dict: Dict[str, int] = repo.status() # transfer any non-filemode changes to a new dictionary status_dict: Dict[str, int] = {} for filename, code in original_status_dict.items(): if code != changed_filemode_status_code: status_dict[filename] = code return status_dict def is_dirty(repo:Repository)->bool: return len(get_changed_items(repo)) > 0 def push_if_change(repo:Repository, token:str): if is_dirty(repo): logger.info(f'Changes found. Preparing commit') env = AttributeDict(os.environ) index:Index = repo.index index.add_all() index.write() reference=repo.head.name author = Signature(env.author_name,env.author_email) committer = Signature(env.committer_name, env.committer_email) message = f'Web installer for {format_artifact_name()}' tree = index.write_tree() commit = repo.create_commit(reference, author, committer, message, tree,[repo.head.target]) origin:Remote=repo.remotes['origin'] logger.info(f'Pushing commit {format_commit(repo[commit])} to url {origin.url}') credentials = UserPass(token, 'x-oauth-basic') # passing credentials remote:Remote = repo.remotes['origin'] remote.credentials = credentials remote.push([reference],callbacks= RemoteCallbacks(UserPass(token, 'x-oauth-basic'))) else: logger.warning(f'No change found. Skipping update') def update_files(target_artifacts:str,manif_name:str,source:str): new_list:dict = get_new_file_names(manif_name, os.path.abspath(source)) if os.path.exists(target_artifacts): logger.info(f'Removing obsolete files from {target_artifacts}') for f in get_recursive_list(target_artifacts): if f not in new_list.keys(): full_target = os.path.join(target_artifacts,f) logger.warning(f'Removing obsolete file {f}') os.remove(full_target) else: logger.info(f'Creating target folder {target_artifacts}') os.makedirs(target_artifacts, exist_ok=True) logger.info(f'Copying installer files to {target_artifacts}:') copy_no_overwrite(os.path.abspath(source), target_artifacts) def handle_pushinstaller(args): set_workdir(args) logger.info('Pushing web installer updates... ') target_artifacts = os.path.join(args.target,args.artifacts) if os.path.exists(args.target): logger.info(f'Removing files (if any) from {args.target}') delete_folder(args.target) logger.info(f'Cloning from {args.url} into {args.target}') repo = pygit2.clone_repository(args.url,args.target) repo.checkout_head() update_files(target_artifacts,args.manif_name,args.source) push_if_change(repo,args.token) repo.state_cleanup() def handle_show(args): logger.info('Show') def extract_files_from_archive(url): tempfolder= tempfile.mkdtemp() platform = requests.get(url) z = zipfile.ZipFile(io.BytesIO(platform.content)) z.extractall(tempfolder) return tempfolder parser_environment.set_defaults(func=handle_environment, cmd='environment') parser_artifacts.set_defaults(func=handle_artifacts, cmd='artifacts') parser_manifest.set_defaults(func=handle_manifest, cmd='manifest') parser_pushinstaller.set_defaults(func=handle_pushinstaller, cmd='installer') parser_show.set_defaults(func=handle_show, cmd='show') parser_build_flags.set_defaults(func=handle_build_flags, cmd='build_flags') def main(): args = parser.parse_args() logger.setLevel(logging.getLevelName(args.loglevel)) logger.info(f'build_tools version : {tool_version}') logger.debug(f'Processing command {args.command}') func:Callable = getattr(args, 'func', None) if func is not None: # Call whatever subcommand function was selected func(args) else: # No subcommand was provided, so call help parser.print_usage() if __name__ == '__main__': main()