#!/opt/esp/python_env/idf4.4_py3.8_env/bin/python import io import os import logging import json from pathlib import Path import sys from typing import Dict, List from google.protobuf.compiler import plugin_pb2 as plugin from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto, FieldDescriptorProto,FieldOptions from google.protobuf.descriptor import FieldDescriptor, Descriptor, FileDescriptor from ProtoElement import ProtoElement from ProtocParser import ProtocParser file_suffix = "_options_pb2" head_comments =""" /* Automatically generated nanopb header */ /* Generated by protoc-plugin-options */ """ PROCESSED_PREFIX = 'processed_' logger = logging.getLogger(__name__) logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) class OptionsParser(ProtocParser) : def start_message(self,message:ProtoElement) : super().start_message(message) def reset_data(self): self.all_members_defaults:Dict([str,str]) = {} self.lines = {} self.lines['init_from_mac'] = [] # self.lines['read_only'] = {} # self.lines['default_value'] = {} # self.lines['init_from_mac']['source'] = ['void set_init_from_mac(self.message_type_name * data){'] # self.lines['init_from_mac']['header'] = [] # self.lines['read_only']['source'] = [] # self.lines['read_only']['header'] = [] # self.lines['default_value']['source'] = [] # self.lines['default_value']['header'] = [] def writelines(self,lines, out:io.StringIO): for l in lines: out.write(f'{l}\r\n') # def render_all_members_for_message(self,name,roottype:bool=False): # elements= self.lines['init_from_mac'] # for element in [element for element in elements if element.descriptor.containing_type.name == name]: # suffix = "ROOT" if roottype else "CHILD(msg, member)" # const_prefix = self.get_option(element,'const_prefix') or "" # memberprefix = '' if roottype else 'member.' # msg = element.name if roottype else "msg" # self.c_header.writelines(f'MSG_INIT_FROM_MAC_DECLARATION({element.cpp_type})') # self.c_header.writelines(f'#define {element.cpp_type}_ALL_MEMBERS_{suffix}') # if(elements.index(element)>0): # self.c_header.writelines(f',\\\n') # else: # self.c_source.writelines(f'MSG_INIT_FROM_MAC_IMPLEMENTATION({msg_ctype});') def render_all_members(self): for key in [key for key in self.all_members_defaults.keys() if not '.' in key and not key.startswith(PROCESSED_PREFIX)]: # make each line unique self.all_members_defaults[key] = set(self.all_members_defaults[key]) self.all_members_defaults[key] = '\\\n'.join(self.all_members_defaults[key]) # WRITE DEPENDENCIES FOR THE CURRENT FILE try: member_defines = '\n'.join([ self.all_members_defaults.get(key) for key in self.all_members_defaults.keys() if '.' in key]) self.c_header.writelines(member_defines) except Exception as e: logger.error(f'{e}') sys.exit(1) # Exit with error status message_defines = ',\\\n'.join([key for key in self.all_members_defaults.keys() if not '.' in key]) self.c_header.writelines(message_defines) def end_message(self,message:ProtoElement): super().end_message(message) self.message_type_name = message.path.replace('.','_') self.global_name = message.options.get("global_name",message.path) message.render() self.render_all_members() # self.c_source.writelines() def start_file(self,file:FileDescriptor) : super().start_file(file) self.set_source(Path(file.name).stem) self.reset_data() def end_file(self,file:ProtoElement) : super().end_file(file) # Parse request # self.lines['init_from_mac']['source'] = ['}'] # self.c_header.writelines(self.lines['init_from_mac']['source']) # self.c_source.writelines(self.lines['init_from_mac']['header']) self.set_source(None) def get_name(self)->str: return 'protoc_plugin_options' def add_comment_if_exists(element, comment_type: str, path: str) -> dict: comment = getattr(element, f"{comment_type}_comment", "").strip() return {f"__{comment_type}_{path}": comment} if comment else {} def repeated_render(self,element:ProtoElement,obj:any): return [obj] if element.repeated else obj def get_option(self,element:ProtoElement,optname:str): options = element.options.get('cust_field',dict()) msg_options = element.options.get('cust_msg',dict()) return getattr(options,optname,None) or getattr(msg_options,optname,None) def get_nanoppb_option(self,element:ProtoElement,optname:str): options = element.options.get('nanopb_msgopt',dict()) msg_options = element.options.get('nanopb',dict()) return getattr(options,optname,None) or getattr(msg_options,optname,None) def get_mkey(self,key)->list([str]): if not self.all_members_defaults.get(key): self.all_members_defaults[key] = list() return self.all_members_defaults[key] def append_line(self,key,line): self.get_mkey(key).append(line) def get_member_assignment(self, element: ProtoElement, value) -> str: member = "default_value." cpptype = element.descriptor.cpp_type if cpptype == FieldDescriptor.CPPTYPE_ENUM: member += f'v_enum = {element.cpp_type_member_prefix}_{value}' elif cpptype == FieldDescriptor.CPPTYPE_INT32: member += f'v_int32 = {value}' elif cpptype == FieldDescriptor.CPPTYPE_INT64: member += f'v_int64 = {value}' elif cpptype == FieldDescriptor.CPPTYPE_UINT32: member += f'v_uint32 = {value}' elif cpptype == FieldDescriptor.CPPTYPE_UINT64: member += f'v_uint64 = {value}' elif cpptype == FieldDescriptor.CPPTYPE_DOUBLE: member += f'v_double = {value}' elif cpptype == FieldDescriptor.CPPTYPE_FLOAT: member += f'v_float = {value}' elif cpptype == FieldDescriptor.CPPTYPE_BOOL: member += f'v_bool = {value}' elif cpptype == FieldDescriptor.CPPTYPE_STRING: member += f'v_string = {value}' elif cpptype == FieldDescriptor.CPPTYPE_MESSAGE: # Assuming value is a serialized string or similar member += f'v_bytes = {value}' else: raise ValueError(f"Unsupported C++ type: {cpptype}") return member def render(self,element: ProtoElement) -> Dict: result = {} if len(element.childs)>0: # oneof = getattr(element.descriptor,'containing_oneof',None) # if oneof: # result[f'__one_of_{element.name}'] = f'Choose only one structure for {oneof.full_name}' for child in element.childs: child.render() else: options = element.options.get('cust_field',dict()) nanopb_msgopt = element.options.get('',dict()) nanopb = element.options.get('nanopb',dict()) msg_options = element.options.get('cust_msg',dict()) init_from_mac = getattr(options,'init_from_mac',False) or getattr(msg_options,'init_from_mac',False) read_only = getattr(options,'read_only',False) or getattr(msg_options,'read_only',False) default_value = getattr(options,'default_value',None) or getattr(msg_options,'default_value',None) init_from_mac = self.get_option(element,'init_from_mac') or False const_prefix = self.get_option(element,'const_prefix') or False read_only = self.get_option(element,'read_only') or False default_value = self.get_option(element,'default_value') or False global_name = self.get_option(element,'global_name') or False # pb_size_t which_default_value; # union { # char *v_string; # uint32_t v_uint32; # int32_t v_int32; # uint64_t v_uint64; # int64_t v_int64; # double v_double; # float v_float; # bool v_bool; # int16_t v_enum; # pb_bytes_array_t *v_bytes; # } default_value; if element.descriptor.cpp_type in [ FieldDescriptor.CPPTYPE_STRING ] and ( init_from_mac or const_prefix or read_only or default_value or global_name ): if not self.all_members_defaults.get(f'{PROCESSED_PREFIX}{element.cpp_type}'): self.get_mkey(element.cpp_child).append(f'#define {element.cpp_type}_ALL_MEMBERS_CHILD(msg,member)') self.get_mkey(element.cpp_root).append(f'#define {element.cpp_type}_ALL_MEMBERS_ROOT ') c_source, c_header = self.get_source_names(Path(element.file.name).stem) self.get_mkey(f'{element.cpp_type}_INCLUDES').append(f'#include "{c_header}"') self.all_members_defaults[f'processed_{element.cpp_type}'] = True member_prefix = f'{element.cpp_type_member.replace(".","_")}_OPTIONS' init_from_mac_str = 'true' if init_from_mac else 'false' const_prefix_str = f'"{const_prefix}"' if const_prefix else '""' read_only_str = "true" if read_only else "false" default_value_str = f'"{default_value}"' if default_value else '""' global_name_str = f'"{global_name}"' if global_name else '""' opt_member = 'STRING_ARRAY' if self.get_nanoppb_option(element,'max_length') >0 else 'STRING_POINTER' opt_member_type = opt_member + '_MEMBER' self.get_mkey(element.cpp_type).append(f'#define {member_prefix} {opt_member_type}({init_from_mac_str}, {const_prefix_str}, {read_only_str}, {default_value_str}, {global_name_str})') if element.detached_leading_comments: logger.debug(f'{element.detached_leading_comments}') logger.info(f'INITFROMMAC: {self.global_name}/{element.path}') self.get_mkey(element.cpp_child).append(f'{opt_member}(msg, member.{element.cpp_member},{opt_member_type}) ') self.get_mkey(element.cpp_root).append(f'{opt_member}({element.cpp_type},{element.cpp_member},{opt_member_type})') return self.repeated_render(element,result) def add_file(self,name:str,stream:io.StringIO): if stream and not stream.closed: f = self.response.file.add() f.name = name f.content = stream.getvalue() stream.close() def get_source_names(self,name)->(str,str): csource_name = f'{name}{file_suffix}' return (f'{csource_name}.c',f'{csource_name}.h') def set_source(self,name): if hasattr(self,"c_source"): self.add_file(self.c_source_name,self.c_source) self.add_file(self.c_header_name,self.c_header) if name: self.c_source_name,self.c_header_name = self.get_source_names(name) self.c_header = io.StringIO() self.c_header.write(head_comments) self.c_header.write(f'#pragma once\n') self.c_header.write(f'#include "sys_options.h"\n') self.c_header.write(f'#include "configuration.pb.h"\n') self.c_source = io.StringIO() self.c_source.write(head_comments) self.c_source.write(f'#include "{self.c_header_name}"\n') if __name__ == '__main__': data = ProtocParser.get_data() logger.info(f"Generating c source file(s) for options") protocParser:OptionsParser = OptionsParser(data) protocParser.process() logger.info('Done generating c source file(s) for options')