|
- #!/usr/bin/env python
- from json import JSONDecodeError
- import math
- import pathlib
- import time
- import traceback
- from typing import Callable, Dict
- import pkg_resources
- import sys
- import os
- import io
- from os import walk
- from requests import Response
-
- class Logger:
- NEWLINE_CHAR = '\n'
- with_crlf = False
- @classmethod
- def print_message(cls,message,prefix=''):
- if not Logger.with_crlf:
- trimmed=re.sub(r'\n', r'%0A', message,flags=re.MULTILINE)
- print(f'{prefix}{trimmed}')
- @classmethod
- def debug(cls,message):
- cls.print_message(message,'::debug::')
- @classmethod
- def error(cls,message):
- cls.print_message(message,'::error::')
- @classmethod
- def notice(cls,message):
- cls.print_message(message,'::notice::')
- @classmethod
- def warning(cls,message):
- cls.print_message(message,'::notice::')
- try:
- import argparse
- import collections
- import copy
- import enum
- import glob
- import json
- import re
- import shutil
- import stat
- import tempfile
- import zipfile
- from ast import literal_eval
- from collections import namedtuple
- from datetime import datetime, timedelta, timezone
- from json import JSONDecoder
- from operator import contains
- from platform import platform, release
- from pydoc import describe
- from time import strftime
- from typing import OrderedDict
- from urllib import response
- from urllib.parse import urlparse
- from urllib.request import Request
- from webbrowser import get
- import pygit2
- from pygit2 import Commit, Repository, GitError, Reference, UserPass, Index, Signature, RemoteCallbacks, Remote
- import requests
- from genericpath import isdir
- except ImportError as ex:
- Logger.error(
- f'Failed importing module {ex.name}, using interpreter {sys.executable}. {Logger.NEWLINE_CHAR} Installed packages:')
- installed_packages = pkg_resources.working_set
- installed_packages_list = sorted(
- ["%s==%s" % (i.key, i.version) for i in installed_packages])
- print(Logger.NEWLINE_CHAR.join(installed_packages_list))
- print(f'Environment: ')
- envlist = "\n".join([f"{k}={v}" for k, v in sorted(os.environ.items())])
- print(f'{envlist}')
- raise
- tool_version = "1.0.7"
- WEB_INSTALLER_DEFAULT_PATH = './web_installer/'
- FORMAT = '%(asctime)s %(message)s'
- github_env = type('', (), {})()
- manifest = {
- "name": "",
- "version": "",
- "home_assistant_domain": "slim_player",
- "funding_url": "https://esphome.io/guides/supporters.html",
- "new_install_prompt_erase": True,
- "new_install_improv_wait_time" : 20,
- "builds": [
- {
- "chipFamily": "ESP32",
- "parts": [
- ]
- }
- ]
- }
- artifacts_formats_outdir = '$OUTDIR'
- artifacts_formats_prefix = '$PREFIX'
- artifacts_formats = [
- ['build/squeezelite.bin', '$OUTDIR/$PREFIX-squeezelite.bin'],
- ['build/recovery.bin', '$OUTDIR/$PREFIX-recovery.bin'],
- ['build/ota_data_initial.bin', '$OUTDIR/$PREFIX-ota_data_initial.bin'],
- ['build/bootloader/bootloader.bin', '$OUTDIR/$PREFIX-bootloader.bin'],
- ['build/partition_table/partition-table.bin ',
- '$OUTDIR/$PREFIX-partition-table.bin'],
- ]
- class AttributeDict(dict):
- __slots__ = ()
- def __getattr__(self, name: str):
- try:
- return self[name.upper()]
- except Exception:
- try:
- return self[name.lower()]
- except Exception:
- for attr in self.keys():
- if name.lower() == attr.replace("'", "").lower():
- return self[attr]
- __setattr__ = dict.__setitem__
- parser = argparse.ArgumentParser(
- description='Handles some parts of the squeezelite-esp32 build process')
- parser.add_argument('--cwd', type=str,
- help='Working directory', default=os.getcwd())
- parser.add_argument('--with_crlf', action='store_true',help='To prevent replacing cr/lf with hex representation')
- parser.add_argument('--loglevel', type=str, choices={
- 'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'NOTSET'}, help='Logging level', default='INFO')
- subparsers = parser.add_subparsers(dest='command', required=True)
- parser_commits = subparsers.add_parser("list_commits",add_help=False,
- description="Commits list",
- help="Lists the last commits"
- )
- parser_dir = subparsers.add_parser("list_files",
- add_help=False,
- description="List Files parser",
- help="Display the content of the folder")
- parser_manifest = subparsers.add_parser("manifest",
- add_help=False,
- description="Manifest parser",
- help="Handles the web installer manifest creation")
- parser_manifest.add_argument('--flash_file', required=True, type=str,
- help='The file path which contains the firmware flashing definition')
- parser_manifest.add_argument(
- '--max_count', type=int, help='The maximum number of releases to keep', default=3)
- parser_manifest.add_argument(
- '--manif_name', required=True, type=str, help='Manifest files name and prefix')
- parser_manifest.add_argument(
- '--outdir', required=True, type=str, help='Output directory for files and manifests')
- parser_pushinstaller = subparsers.add_parser("pushinstaller",
- add_help=False,
- description="Web Installer Checkout parser",
- help="Handles the creation of artifacts files")
- parser_pushinstaller.add_argument(
- '--target', type=str, help='Output directory for web installer repository', default=WEB_INSTALLER_DEFAULT_PATH)
- parser_pushinstaller.add_argument(
- '--artifacts', type=str, help='Target subdirectory for web installer artifacts', default=WEB_INSTALLER_DEFAULT_PATH)
- parser_pushinstaller.add_argument(
- '--source', type=str, help='Source directory for the installer artifacts', default=WEB_INSTALLER_DEFAULT_PATH)
- parser_pushinstaller.add_argument('--url', type=str, help='Web Installer clone url ',
- default='https://github.com/sle118/squeezelite-esp32-installer.git')
- parser_pushinstaller.add_argument(
- '--web_installer_branch', type=str, help='Web Installer branch to use ', default='main')
- parser_pushinstaller.add_argument(
- '--token', type=str, help='Auth token for pushing changes')
- parser_pushinstaller.add_argument(
- '--flash_file', type=str, help='Manifest json file path')
- parser_pushinstaller.add_argument(
- '--manif_name', required=True, type=str, help='Manifest files name and prefix')
- parser_environment = subparsers.add_parser("environment",
- add_help=False,
- description="Environment parser",
- help="Updates the build environment")
- parser_environment.add_argument(
- '--env_file', type=str, help='Environment File', default=os.environ.get('GITHUB_ENV'))
- parser_environment.add_argument(
- '--build', required=True, type=int, help='The build number')
- parser_environment.add_argument(
- '--node', required=True, type=str, help='The matrix node being built')
- parser_environment.add_argument(
- '--depth', required=True, type=int, help='The bit depth being built')
- parser_environment.add_argument(
- '--major', type=str, help='Major version', default='2')
- parser_environment.add_argument(
- '--docker', type=str, help='Docker image to use', default='sle118/squeezelite-esp32-idfv43')
- parser_show = subparsers.add_parser("show",
- add_help=False,
- description="Show parser",
- help="Show the build environment")
- parser_build_flags = subparsers.add_parser("build_flags",
- add_help=False,
- description="Build Flags",
- help="Updates the build environment with build flags")
- parser_build_flags.add_argument(
- '--mock', action='store_true', help='Mock release')
- parser_build_flags.add_argument(
- '--force', action='store_true', help='Force a release build')
- parser_build_flags.add_argument(
- '--ui_build', action='store_true', help='Include building the web UI')
- def format_commit(commit):
- # 463a9d8b7 Merge branch 'bugfix/ci_deploy_tags_v4.0' into 'release/v4.0' (2020-01-11T14:08:55+08:00)
- dt = datetime.fromtimestamp(float(commit.author.time), timezone(
- timedelta(minutes=commit.author.offset)))
- #timestr = dt.strftime('%c%z')
- timestr = dt.strftime('%F %R %Z')
- cmesg:str = commit.message.replace('\n', ' ').replace('\r','').replace('*','-')
- return f'{commit.short_id} {cmesg} ({timestr}) <{commit.author.name}>'.replace(' ', ' ', )
- def get_github_data(repo: Repository, api):
- base_url = urlparse(repo.remotes['origin'].url)
- print(
- f'Base URL is {base_url.path} from remote URL {repo.remotes["origin"].url}')
- url_parts = base_url.path.split('.')
- for p in url_parts:
- print(f'URL Part: {p}')
- api_url = f"{url_parts[0]}/{api}"
- print(f'API to call: {api_url}')
- url = f"https://api.github.com/repos{api_url}"
- resp = requests.get(
- url, headers={"Content-Type": "application/vnd.github.v3+json"})
- return json.loads(resp.text)
- def dump_directory(dir_path):
- # list to store files name
- res = []
- for (dir_path, dir_names, file_names) in walk(dir_path):
- res.extend(file_names)
- print(res)
- class ReleaseDetails():
- version: str
- idf: str
- platform: str
- branch: str
- bitrate: str
- def __init__(self, tag: str) -> None:
- self.version, self.idf, self.platform, self.branch = tag.split('#')
- try:
- self.version, self.bitrate = self.version.split('-')
- except Exception:
- pass
- def get_attributes(self):
- return {
- 'version': self.version,
- 'idf': self.idf,
- 'platform': self.platform,
- 'branch': self.branch,
- 'bitrate': self.bitrate
- }
- def format_prefix(self) -> str:
- return f'{self.branch}-{self.platform}-{self.version}'
- def get_full_platform(self):
- return f"{self.platform}{f'-{self.bitrate}' if self.bitrate is not None else ''}"
- class BinFile():
- name: str
- offset: int
- source_full_path: str
- target_name: str
- target_fullpath: str
- artifact_relpath: str
- def __init__(self, source_path, file_build_path: str, offset: int, release_details: ReleaseDetails, build_dir) -> None:
- self.name = os.path.basename(file_build_path).rstrip()
- self.artifact_relpath = os.path.relpath(
- file_build_path, build_dir).rstrip()
- self.source_path = source_path
- self.source_full_path = os.path.join(
- source_path, file_build_path).rstrip()
- self.offset = offset
- self.target_name = f'{release_details.format_prefix()}-{release_details.bitrate}-{self.name}'.rstrip()
- def get_manifest(self):
- return {"path": self.target_name, "offset": self.offset}
- def copy(self, target_folder) -> str:
- self.target_fullpath = os.path.join(target_folder, self.target_name)
- Logger.debug(
- f'File {self.source_full_path} will be copied to {self.target_fullpath}')
- try:
- os.makedirs(target_folder, exist_ok=True)
- shutil.copyfile(self.source_full_path,
- self.target_fullpath, follow_symlinks=True)
- except Exception as ex:
- Logger.error(f"Error while copying {self.source_full_path} to {self.target_fullpath}{Logger.NEWLINE_CHAR}Content of {os.path.dirname(self.source_full_path.rstrip())}:{Logger.NEWLINE_CHAR}{Logger.NEWLINE_CHAR.join(get_file_list(os.path.dirname(self.source_full_path.rstrip())))}")
-
-
-
- raise
- return self.target_fullpath
- def get_attributes(self):
- return {
- 'name': self.target_name,
- 'offset': self.offset,
- 'artifact_relpath': self.artifact_relpath
- }
- class PlatformRelease():
- name: str
- description: str
- url: str = ''
- zipfile: str = ''
- tempfolder: str
- release_details: ReleaseDetails
- flash_parms = {}
- build_dir: str
- has_artifacts: bool
- branch: str
- assets: list
- bin_files: list
- name_prefix: str
- flash_file_path: str
- def get_manifest_name(self) -> str:
- return f'{self.name_prefix}-{self.release_details.format_prefix()}-{self.release_details.bitrate}.json'
- def __init__(self, flash_file_path, git_release, build_dir, branch, name_prefix) -> None:
- self.name = git_release.tag_name
- self.description = git_release.body
- self.assets = git_release['assets']
- self.has_artifacts = False
- self.name_prefix = name_prefix
- if len(self.assets) > 0:
- if self.has_asset_type():
- self.url = self.get_asset_from_extension().browser_download_url
- if self.has_asset_type('.zip'):
- self.zipfile = self.get_asset_from_extension(
- ext='.zip').browser_download_url
- self.has_artifacts = True
- self.release_details = ReleaseDetails(git_release.name)
- self.bin_files = list()
- self.flash_file_path = flash_file_path
- self.build_dir = os.path.relpath(build_dir)
- self.branch = branch
- def process_files(self, outdir: str) -> list:
- parts = []
- for f in self.bin_files:
- f.copy(outdir)
- parts.append(f.get_manifest())
- return parts
- def get_asset_from_extension(self, ext='.bin'):
- for a in self.assets:
- filename = AttributeDict(a).name
- file_name, file_extension = os.path.splitext(filename)
- if file_extension == ext:
- return AttributeDict(a)
- return None
- def has_asset_type(self, ext='.bin') -> bool:
- return self.get_asset_from_extension(ext) is not None
- def platform(self):
- return self.release_details.get_full_platform()
- def get_zip_file(self):
- self.tempfolder = extract_files_from_archive(self.zipfile)
- print(
- f'Artifacts for {self.name} extracted to {self.tempfolder}')
- flash_parms_file = os.path.relpath(
- self.tempfolder+self.flash_file_path)
- line: str
- with open(flash_parms_file) as fin:
- for line in fin:
- components = line.split()
- if len(components) == 2:
- self.flash_parms[os.path.basename(
- components[1]).rstrip().lstrip()] = components[0]
- try:
- for artifact in artifacts_formats:
- base_name = os.path.basename(artifact[0]).rstrip().lstrip()
- self.bin_files.append(BinFile(
- self.tempfolder, artifact[0], self.flash_parms[base_name], self.release_details, self.build_dir))
- has_artifacts = True
- except Exception:
- self.has_artifacts = False
- def cleanup(self):
- Logger.debug(f'removing temp directory for platform release {self.name}')
- shutil.rmtree(self.tempfolder)
- def get_attributes(self):
- return {
- 'name': self.name,
- 'branch': self.branch,
- 'description': self.description,
- 'url': self.url,
- 'zipfile': self.zipfile,
- 'release_details': self.release_details.get_attributes(),
- 'bin_files': [b.get_attributes() for b in self.bin_files],
- 'manifest_name': self.get_manifest_name()
- }
- class Releases():
- _dict: dict = collections.OrderedDict()
- maxcount: int = 0
- branch: str = ''
- repo: Repository = None
- last_commit: Commit = None
- manifest_name: str
- def __init__(self, branch: str, maxcount: int = 3) -> None:
- self.maxcount = maxcount
- self.branch = branch
- def count(self, value: PlatformRelease) -> int:
- content = self._dict.get(value.platform())
- if content == None:
- return 0
- return len(content)
- def get_platform(self, platform: str) -> list:
- return self._dict[platform]
- def get_platform_keys(self):
- return self._dict.keys()
- def get_all(self) -> list:
- result: list = []
- for platform in [self.get_platform(platform) for platform in self.get_platform_keys()]:
- for release in platform:
- result.append(release)
- return result
- def append(self, value: PlatformRelease):
- if self.count(value) == 0:
- self._dict[value.platform()] = []
- if self.should_add(value):
- print(f'Adding release {value.name} to the list')
- self._dict[value.platform()].append(value)
- else:
- print(f'Skipping release {value.name}')
- def get_attributes(self):
- res = []
- release: PlatformRelease
- for release in self.get_all():
- res.append(release.get_attributes())
- return res
- def get_minlen(self) -> int:
- return min([len(self.get_platform(p)) for p in self.get_platform_keys()])
- def got_all_packages(self) -> bool:
- return self.get_minlen() >= self.maxcount
- def should_add(self, release: PlatformRelease) -> bool:
- return self.count(release) <= self.maxcount
- def add_package(self, package: PlatformRelease, with_artifacts: bool = True):
- if self.branch != package.branch:
- Logger.debug(f'Skipping release {package.name} from branch {package.branch}')
- elif package.has_artifacts or not with_artifacts:
- self.append(package)
- @classmethod
- def get_last_commit_message(cls, repo_obj: Repository = None) -> str:
- last: Commit = cls.get_last_commit(repo_obj)
- if last is None:
- return ''
- else:
- return last.message.replace(Logger.NEWLINE_CHAR, ' ')
- @classmethod
- def get_last_author(cls, repo_obj: Repository = None) -> Signature:
- last: Commit = cls.get_last_commit(repo_obj)
- return last.author
- @classmethod
- def get_last_committer(cls, repo_obj: Repository = None) -> Signature:
- last: Commit = cls.get_last_commit(repo_obj)
- return last.committer
- @classmethod
- def get_last_commit(cls, repo_obj: Repository = None) -> Commit:
- loc_repo = repo_obj
- if cls.repo is None:
- cls.load_repository(os.getcwd())
- if loc_repo is None:
- loc_repo = cls.repo
- head: Reference = loc_repo.head
- target = head.target
- ref: Reference
- if cls.last_commit is None:
- try:
- cls.last_commit = loc_repo[target]
- print(
- f'Last commit for {head.shorthand} is {format_commit(cls.last_commit)}')
- except Exception as e:
- Logger.error(
- f'Unable to retrieve last commit for {head.shorthand}/{target}: {e}')
- cls.last_commit = None
- return cls.last_commit
- @classmethod
- def load_repository(cls, path: str = os.getcwd()) -> Repository:
- if cls.repo is None:
- try:
- print(f'Opening repository from {path}')
- cls.repo = Repository(path=path)
- except GitError as ex:
- Logger.error(f"Unable to access the repository({ex}).\nContent of {path}:\n{Logger.NEWLINE_CHAR.join(get_file_list(path, 1))}")
- raise
- return cls.repo
- @classmethod
- def resolve_commit(cls, repo: Repository, commit_id: str) -> Commit:
- commit: Commit
- reference: Reference
- commit, reference = repo.resolve_refish(commit_id)
- return commit
- @classmethod
- def get_branch_name(cls) -> str:
- return re.sub('[^a-zA-Z0-9\-~!@_\.]', '', cls.load_repository().head.shorthand)
- @classmethod
- def get_release_branch(cls, repo: Repository, platform_release) -> str:
- match = [t for t in repo.branches.with_commit(
- platform_release.target_commitish)]
- no_origin = [t for t in match if 'origin' not in t]
- if len(no_origin) == 0 and len(match) > 0:
- return match[0].split('/')[1]
- elif len(no_origin) > 0:
- return no_origin[0]
- return ''
- @classmethod
- def get_flash_parms(cls, file_path):
- flash = parse_json(file_path)
- od: collections.OrderedDict = collections.OrderedDict()
- for z in flash['flash_files'].items():
- base_name: str = os.path.basename(z[1])
- od[base_name.rstrip().lstrip()] = literal_eval(z[0])
- return collections.OrderedDict(sorted(od.items()))
- @classmethod
- def get_releases(cls, flash_file_path, maxcount: int, name_prefix):
- repo = Releases.load_repository(os.getcwd())
- packages: Releases = cls(branch=repo.head.shorthand, maxcount=maxcount)
- build_dir = os.path.dirname(flash_file_path)
- for page in range(1, 999):
- Logger.debug(f'Getting releases page {page}')
- releases = get_github_data(
- repo, f'releases?per_page=50&page={page}')
- if len(releases) == 0:
- Logger.debug(f'No more release found for page {page}')
- break
- for release_entry in [AttributeDict(platform) for platform in releases]:
- packages.add_package(PlatformRelease(flash_file_path, release_entry, build_dir,
- Releases.get_release_branch(repo, release_entry), name_prefix))
- if packages.got_all_packages():
- break
- if packages.got_all_packages():
- break
- return packages
- @classmethod
- def get_commit_list(cls) -> list:
- commit_list = []
- last: Commit = Releases.get_last_commit()
- if last is None:
- return commit_list
- try:
- for c in Releases.load_repository().walk(last.id, pygit2.GIT_SORT_TIME):
- if '[skip actions]' not in c.message:
- commit_list.append(format_commit(c))
- if len(commit_list) > 10:
- break
- except Exception as e:
- Logger.error(
- f'Unable to get commit list starting at {last.id}: {e}')
- return commit_list
- @classmethod
- def get_commit_list_descriptions(cls) -> str:
- return '<<~EOD\n### Revision Log\n'+Logger.NEWLINE_CHAR.join(cls.get_commit_list())+'\n~EOD'
- def update(self, *args, **kwargs):
- if args:
- if len(args) > 1:
- raise TypeError("update expected at most 1 arguments, "
- "got %d" % len(args))
- other = dict(args[0])
- for key in other:
- self[key] = other[key]
- for key in kwargs:
- self[key] = kwargs[key]
- def setdefault(self, key, value=None):
- if key not in self:
- self[key] = value
- return self[key]
- def set_workdir(args):
- print(f'setting work dir to: {args.cwd}')
- os.chdir(os.path.abspath(args.cwd))
- def parse_json(filename: str):
- fname = os.path.abspath(filename)
- folder: str = os.path.abspath(os.path.dirname(filename))
- print(f'Opening json file {fname} from {folder}')
- try:
- with open(fname) as f:
- content = f.read()
- Logger.debug(f'Loading json\n{content}')
- return json.loads(content)
- except JSONDecodeError as ex:
- Logger.error(f'Error parsing {content}')
- except Exception as ex:
- Logger.error(
- f"Unable to parse flasher args json file. Content of {folder}:{Logger.NEWLINE_CHAR.join(get_file_list(folder))}")
- raise
- def write_github_env_file(values,env_file):
- print(f'Writing content to {env_file}...')
- with open(env_file, "w") as env_file:
- for attr in [attr for attr in dir(values) if not attr.startswith('_')]:
- line = f'{attr}{"=" if attr != "description" else ""}{getattr(values,attr)}'
- print(line)
- env_file.write(f'{line}\n')
- os.environ[attr] = str(getattr(values, attr))
- print(f'Done writing to {env_file}!')
- def format_artifact_from_manifest(manif_json: AttributeDict):
- if len(manif_json) == 0:
- return 'Newest release'
- first = manif_json[0]
- return f'{first["branch"]}-{first["release_details"]["version"]}'
- def format_artifact_name(base_name: str = '', args=AttributeDict(os.environ)):
- return f'{base_name}{args.branch_name}-{args.node}-{args.depth}-{args.major}{args.build}'
- def handle_build_flags(args):
- set_workdir(args)
- print('Setting global build flags')
- commit_message: str = Releases.get_last_commit_message()
- github_env.mock = 1 if args.mock else 0
- github_env.release_flag = 1 if args.mock or args.force or 'release' in commit_message.lower() else 0
- github_env.ui_build = 1 if args.mock or args.ui_build or '[ui-build]' in commit_message.lower(
- ) or github_env.release_flag == 1 else 0
- write_github_env_file(github_env,os.environ.get('GITHUB_OUTPUT'))
- def write_version_number(file_path:str,env_details):
- # app_name="${TARGET_BUILD_NAME}.${DEPTH}.dev-$(git log --pretty=format:'%h' --max-count=1).${branch_name}"
- # echo "${app_name}">version.txt
- try:
- version:str = f'{env_details.TARGET_BUILD_NAME}.{env_details.DEPTH}.{env_details.major}.{env_details.BUILD_NUMBER}.{env_details.branch_name}'
- with open(file_path, "w") as version_file:
- version_file.write(version)
- except Exception as ex:
- Logger.error(f'Unable to set version string {version} in file {file_path}')
- raise Exception('Version error')
- Logger.notice(f'Firmware version set to {version}')
- def handle_environment(args):
- set_workdir(args)
- print('Setting environment variables...')
- commit_message: str = Releases.get_last_commit_message()
- last: Commit = Releases.get_last_commit()
- if last is not None:
- github_env.author_name = last.author.name
- github_env.author_email = last.author.email
- github_env.committer_name = last.committer.name
- github_env.committer_email = last.committer.email
- github_env.node = args.node
- github_env.depth = args.depth
- github_env.major = args.major
- github_env.build = args.build
- github_env.DEPTH = args.depth
- github_env.TARGET_BUILD_NAME = args.node
- github_env.build_version_prefix = args.major
- github_env.branch_name = Releases.get_branch_name()
- github_env.BUILD_NUMBER = str(args.build)
- github_env.tag = f'{args.node}.{args.depth}.{args.build}.{github_env.branch_name}'.rstrip()
- github_env.last_commit = commit_message
- github_env.DOCKER_IMAGE_NAME = args.docker
- github_env.name = f"{args.major}.{str(args.build)}-{args.depth}#v4.3#{args.node}#{github_env.branch_name}"
- github_env.artifact_prefix = format_artifact_name(
- 'squeezelite-esp32-', github_env)
- github_env.artifact_file_name = f"{github_env.artifact_prefix}.zip"
- github_env.artifact_bin_file_name = f"{github_env.artifact_prefix}.bin"
- github_env.PROJECT_VER = f'{args.node}-{ args.build }'
- github_env.description = Releases.get_commit_list_descriptions()
- write_github_env_file(github_env,args.env_file)
- write_version_number("version.txt",github_env)
- def handle_artifacts(args):
- set_workdir(args)
- print(f'Handling artifacts')
- for attr in artifacts_formats:
- target: str = os.path.relpath(attr[1].replace(artifacts_formats_outdir, args.outdir).replace(
- artifacts_formats_prefix, format_artifact_name()))
- source: str = os.path.relpath(attr[0])
- target_dir: str = os.path.dirname(target)
- print(f'Copying file {source} to {target}')
- try:
- os.makedirs(target_dir, exist_ok=True)
- shutil.copyfile(source, target, follow_symlinks=True)
- except Exception as ex:
- Logger.error(f"Error while copying {source} to {target}\nContent of {target_dir}:\n{Logger.NEWLINE_CHAR.join(get_file_list(os.path.dirname(attr[0].rstrip())))}")
- raise
- def delete_folder(path):
- '''Remov Read Only Files'''
- for root, dirs, files in os.walk(path, topdown=True):
- for dir in dirs:
- fulldirpath = os.path.join(root, dir)
- Logger.debug(f'Drilling down in {fulldirpath}')
- delete_folder(fulldirpath)
- for fname in files:
- full_path = os.path.join(root, fname)
- Logger.debug(f'Setting file read/write {full_path}')
- os.chmod(full_path, stat.S_IWRITE)
- Logger.debug(f'Deleting file {full_path}')
- os.remove(full_path)
- if os.path.exists(path):
- Logger.debug(f'Changing folder read/write {path}')
- os.chmod(path, stat.S_IWRITE)
- print(f'WARNING: Deleting Folder {path}')
- os.rmdir(path)
- def get_file_stats(path):
- fstat: os.stat_result = pathlib.Path(path).stat()
- # Convert file size to MB, KB or Bytes
- mtime = time.strftime("%X %x", time.gmtime(fstat.st_mtime))
- if (fstat.st_size > 1024 * 1024):
- return math.ceil(fstat.st_size / (1024 * 1024)), "MB", mtime
- elif (fstat.st_size > 1024):
- return math.ceil(fstat.st_size / 1024), "KB", mtime
- return fstat.st_size, "B", mtime
- def get_file_list(root_path, max_levels: int = 2) -> list:
- outlist: list = []
- for root, dirs, files in os.walk(root_path):
- path = os.path.relpath(root).split(os.sep)
- if len(path) <= max_levels:
- outlist.append(f'\n{root}')
- for file in files:
- full_name = os.path.join(root, file)
- fsize, unit, mtime = get_file_stats(full_name)
- outlist.append('{:s} {:8d} {:2s} {:18s}\t{:s}'.format(
- len(path) * "---", fsize, unit, mtime, file))
- return outlist
- def get_recursive_list(path) -> list:
- outlist: list = []
- for root, dirs, files in os.walk(path, topdown=True):
- for fname in files:
- outlist.append((fname, os.path.join(root, fname)))
- return outlist
- def handle_manifest(args):
- set_workdir(args)
- print(f'Creating the web installer manifest')
- outdir: str = os.path.relpath(args.outdir)
- if not os.path.exists(outdir):
- print(f'Creating target folder {outdir}')
- os.makedirs(outdir, exist_ok=True)
- releases: Releases = Releases.get_releases(
- args.flash_file, args.max_count, args.manif_name)
- release: PlatformRelease
- for release in releases.get_all():
- manifest_name = release.get_manifest_name()
- release.get_zip_file()
- man = copy.deepcopy(manifest)
- man['manifest_name'] = manifest_name
- man['builds'][0]['parts'] = release.process_files(args.outdir)
- man['name'] = release.platform()
- man['version'] = release.release_details.version
- Logger.debug(f'Generated manifest: \n{json.dumps(man)}')
- fullpath = os.path.join(args.outdir, release.get_manifest_name())
- print(f'Writing manifest to {fullpath}')
- with open(fullpath, "w") as f:
- json.dump(man, f, indent=4)
- release.cleanup()
- mainmanifest = os.path.join(args.outdir, args.manif_name)
- print(f'Writing main manifest {mainmanifest}')
- with open(mainmanifest, 'w') as f:
- json.dump(releases.get_attributes(), f, indent=4)
- def get_new_file_names(manif_json) -> collections.OrderedDict():
- new_release_files: dict = collections.OrderedDict()
- for artifact in manif_json:
- for name in [f["name"] for f in artifact["bin_files"]]:
- new_release_files[name] = artifact
- new_release_files[artifact["manifest_name"]] = artifact["name"]
- return new_release_files
- def copy_no_overwrite(source: str, target: str):
- sfiles = os.listdir(source)
- for f in sfiles:
- source_file = os.path.join(source, f)
- target_file = os.path.join(target, f)
- if not os.path.exists(target_file):
- print(f'Copying {f} to target')
- shutil.copy(source_file, target_file)
- else:
- Logger.debug(f'Skipping existing file {f}')
- def get_changed_items(repo: Repository) -> Dict:
- changed_filemode_status_code: int = pygit2.GIT_FILEMODE_TREE
- original_status_dict: Dict[str, int] = repo.status()
- # transfer any non-filemode changes to a new dictionary
- status_dict: Dict[str, int] = {}
- for filename, code in original_status_dict.items():
- if code != changed_filemode_status_code:
- status_dict[filename] = code
- return status_dict
- def is_dirty(repo: Repository) -> bool:
- return len(get_changed_items(repo)) > 0
- def push_with_method(auth_method:str,token:str,remote: Remote,reference):
- success:bool = False
- try:
- remote.push(reference, callbacks=RemoteCallbacks(pygit2.UserPass(auth_method, token)))
- success=True
- except Exception as ex:
- Logger.error(f'Error pushing with auth method {auth_method}: {ex}.')
- return success
- def push_if_change(repo: Repository, token: str, source_path: str, manif_json):
- if is_dirty(repo):
- print(f'Changes found. Preparing commit')
- env = AttributeDict(os.environ)
- index: Index = repo.index
- index.add_all()
- index.write()
- reference = repo.head.name
- message = f'Web installer for {format_artifact_from_manifest(manif_json)}'
- tree = index.write_tree()
- Releases.load_repository(source_path)
- commit = repo.create_commit(reference, Releases.get_last_author(
- ), Releases.get_last_committer(), message, tree, [repo.head.target])
- origin: Remote = repo.remotes['origin']
- print(
- f'Pushing commit {format_commit(repo[commit])} to url {origin.url}')
- remote: Remote = repo.remotes['origin']
- auth_methods = ['x-access-token','x-oauth-basic']
- for method in auth_methods:
- if push_with_method(method, token, remote, [reference]):
- print(f'::notice Web installer updated for {format_artifact_from_manifest(manif_json)}')
- return
- raise Exception('Unable to push web installer.')
- else:
- print(f'WARNING: No change found. Skipping update')
- def update_files(target_artifacts: str, manif_json, source: str):
- new_list: dict = get_new_file_names(manif_json)
- if os.path.exists(target_artifacts):
- print(f'Removing obsolete files from {target_artifacts}')
- for entry in get_recursive_list(target_artifacts):
- f = entry[0]
- full_target = entry[1]
- if f not in new_list.keys():
- print(f'WARNING: Removing obsolete file {f}')
- os.remove(full_target)
- else:
- print(f'Creating target folder {target_artifacts}')
- os.makedirs(target_artifacts, exist_ok=True)
- print(f'Copying installer files to {target_artifacts}:')
- copy_no_overwrite(os.path.abspath(source), target_artifacts)
- def handle_pushinstaller(args):
- set_workdir(args)
- print('Pushing web installer updates... ')
- target_artifacts = os.path.join(args.target, args.artifacts)
- if os.path.exists(args.target):
- print(f'Removing files (if any) from {args.target}')
- delete_folder(args.target)
- print(f'Cloning from {args.url} into {args.target}')
- repo = pygit2.clone_repository(args.url, args.target)
- repo.checkout_head()
- manif_json = parse_json(os.path.join(args.source, args.manif_name))
- update_files(target_artifacts, manif_json, args.source)
- push_if_change(repo, args.token, args.cwd, manif_json)
- repo.state_cleanup()
- def handle_show(args):
- print('Show')
- def extract_files_from_archive(url):
- tempfolder = tempfile.mkdtemp()
- platform:Response = requests.get(url)
- Logger.debug(f'Downloading {url} to {tempfolder}')
- Logger.debug(f'Transfer status code: {platform.status_code}. Expanding content')
- z = zipfile.ZipFile(io.BytesIO(platform.content))
- z.extractall(tempfolder)
- return tempfolder
- def handle_list_files(args):
- print(f'Content of {args.cwd}:')
- print(Logger.NEWLINE_CHAR.join(get_file_list(args.cwd)))
- def handle_commits(args):
- set_workdir(args)
- print(Releases.get_commit_list_descriptions())
- parser_environment.set_defaults(func=handle_environment, cmd='environment')
- parser_manifest.set_defaults(func=handle_manifest, cmd='manifest')
- parser_pushinstaller.set_defaults(func=handle_pushinstaller, cmd='installer')
- parser_show.set_defaults(func=handle_show, cmd='show')
- parser_build_flags.set_defaults(func=handle_build_flags, cmd='build_flags')
- parser_dir.set_defaults(func=handle_list_files, cmd='list_files')
- parser_commits.set_defaults(func=handle_commits,cmd='list_commits')
- def main():
- exit_result_code = 0
- args = parser.parse_args()
- Logger.with_crlf = args.with_crlf
- print(f'::group::{args.command}')
- print(f'build_tools version : {tool_version}')
- print(f'Processing command {args.command}')
- func: Callable = getattr(args, 'func', None)
- if func is not None:
- # Call whatever subcommand function was selected
-
- e: Exception
- try:
- func(args)
- except Exception as e:
- Logger.error(f'Critical error while running {args.command}\n{" ".join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))}')
- exit_result_code = 1
- else:
- # No subcommand was provided, so call help
- parser.print_usage()
- print(f'::endgroup::')
- sys.exit(exit_result_code)
- if __name__ == '__main__':
- main()
|