123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- #!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
- import json
- import os
- import sys
- import argparse
- import importlib.util
- import logging
- from pathlib import Path
- from google.protobuf import json_format
- from google.protobuf.json_format import MessageToJson
- # Assuming this script is in the same directory as the generated Python files
- # script_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),'generated/src')
- # sys.path.append(script_dir)
- # Configure logging
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- def load_protobuf_module(source, includes):
- """Dynamically load a protobuf module given its file name."""
- for include in includes:
- sys.path.append(include)
- module_name = Path(source).stem
- module_file = module_name + '_pb2.py'
- module_location = Path(source).parent / module_file
- logging.debug(f'Loading module {module_file} from {module_location} with includes [{", ".join(includes)}]')
- spec = importlib.util.spec_from_file_location(name=module_name, location=str(module_location))
- if spec is not None:
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
- logging.debug(f'Loaded protobuf module: {module_name}')
- return module
- else:
- logging.error(f'Failed to load module {module_file} from {module_location}')
- return None
- def protobuf_to_dict(message):
- """Convert a protobuf message to a dictionary."""
- # Convert the protobuf message to a JSON string
- json_str = MessageToJson(message, including_default_value_fields=True)
- # Parse the JSON string back into a dictionary
- return json.loads(json_str)
- def main():
- parser = argparse.ArgumentParser(description='Process protobuf and JSON files.')
- parser.add_argument('--proto_file', help='Name of the protobuf file (without extension)')
- parser.add_argument('--main_class', help='Main message class to process')
- parser.add_argument('--target_dir', help='Target directory for output files')
- parser.add_argument('--include', help='Directory where message python files can be found', default=None,action = 'append' )
- parser.add_argument('--json', help='Source JSON file(s)',action = 'append' )
- parser.add_argument('--dumpconsole', action='store_true', help='Dump to console')
- args = parser.parse_args()
- # Load the protobuf module
- logging.debug(f'Loading modules')
- proto_module = load_protobuf_module(args.proto_file, args.include)
- # Determine the main message class
- try:
- main_message_class = getattr(proto_module, args.main_class)
- except Exception as e:
- content:list = [entry for entry in dir(proto_module) if not entry.startswith('_') ]
- logging.error(f'Error getting main class: {e}. Available classes: {", ".join(content)}')
- sys.exit(1) # Exit with error status
- message = main_message_class()
- proto_base_name = Path(args.proto_file).stem
-
- for jsonfile in args.json:
- output_file_base:str = os.path.join(args.target_dir, Path(jsonfile).stem+".bin").lower()
- logging.debug(f'Converting JSON file {jsonfile} to binary format')
- with open(jsonfile, 'r') as json_file:
- try:
- json_data = json.load(json_file)
- try:
- json_format.ParseDict(json_data, message)
- except json_format.ParseError as e:
- logging.error(f'Parse error in JSON file {jsonfile}: {e}')
- sys.exit(1) # Exit with error status
- except Exception as e:
- logging.error(f'Error reading JSON file {jsonfile}: {e}')
- sys.exit(1) # Exit with error status
- binary_data = message.SerializeToString()
- with open(output_file_base, 'wb') as bin_file:
- bin_file.write(binary_data)
- logging.info(f'Binary file written to {output_file_base}')
- if args.dumpconsole:
- escaped_string = ''.join('\\x{:02x}'.format(byte) for byte in binary_data)
- print(f'escaped string representation: \nconst char * bin_{Path(jsonfile).stem} = "{escaped_string}";\n')
- except Exception as e:
- logging.error(f'Error reading JSON file {jsonfile}: {e}')
- sys.exit(1) # Exit with error status
- if __name__ == '__main__':
- main()
|