123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- #!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
- import io
- import os
- import logging
- import json
- from pathlib import Path
- import sys
- from typing import Dict, List
- from google.protobuf.compiler import plugin_pb2 as plugin
- from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto, FieldDescriptorProto,FieldOptions
- from google.protobuf.descriptor import FieldDescriptor, Descriptor, FileDescriptor
- from ProtoElement import ProtoElement
- from ProtocParser import ProtocParser
- file_suffix = "_options_pb2"
- head_comments ="""
- /* Automatically generated nanopb header */
- /* Generated by protoc-plugin-options */
- """
- PROCESSED_PREFIX = 'processed_'
- logger = logging.getLogger(__name__)
- logging.basicConfig(
- level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- )
- class OptionsParser(ProtocParser) :
-
- def start_message(self,message:ProtoElement) :
- super().start_message(message)
- def reset_data(self):
- self.all_members_defaults:Dict([str,str]) = {}
- self.lines = {}
- self.lines['init_from_mac'] = []
- # self.lines['read_only'] = {}
- # self.lines['default_value'] = {}
- # self.lines['init_from_mac']['source'] = ['void set_init_from_mac(self.message_type_name * data){']
- # self.lines['init_from_mac']['header'] = []
- # self.lines['read_only']['source'] = []
- # self.lines['read_only']['header'] = []
- # self.lines['default_value']['source'] = []
- # self.lines['default_value']['header'] = []
- def writelines(self,lines, out:io.StringIO):
- for l in lines:
- out.write(f'{l}\r\n')
- # def render_all_members_for_message(self,name,roottype:bool=False):
- # elements= self.lines['init_from_mac']
- # for element in [element for element in elements if element.descriptor.containing_type.name == name]:
- # suffix = "ROOT" if roottype else "CHILD(msg, member)"
- # const_prefix = self.get_option(element,'const_prefix') or ""
- # memberprefix = '' if roottype else 'member.'
- # msg = element.name if roottype else "msg"
- # self.c_header.writelines(f'MSG_INIT_FROM_MAC_DECLARATION({element.cpp_type})')
- # self.c_header.writelines(f'#define {element.cpp_type}_ALL_MEMBERS_{suffix}')
- # if(elements.index(element)>0):
- # self.c_header.writelines(f',\\\n')
- # else:
- # self.c_source.writelines(f'MSG_INIT_FROM_MAC_IMPLEMENTATION({msg_ctype});')
-
- def render_all_members(self):
-
- for key in [key for key in self.all_members_defaults.keys() if not '.' in key and not key.startswith(PROCESSED_PREFIX)]:
- # make each line unique
- self.all_members_defaults[key] = set(self.all_members_defaults[key])
- self.all_members_defaults[key] = '\\\n'.join(self.all_members_defaults[key])
-
- # WRITE DEPENDENCIES FOR THE CURRENT FILE
- try:
- member_defines = '\n'.join([ self.all_members_defaults.get(key) for key in self.all_members_defaults.keys() if '.' in key])
- self.c_header.writelines(member_defines)
- except Exception as e:
- logger.error(f'{e}')
- sys.exit(1) # Exit with error status
- message_defines = ',\\\n'.join([key for key in self.all_members_defaults.keys() if not '.' in key])
- self.c_header.writelines(message_defines)
-
- def end_message(self,message:ProtoElement):
- super().end_message(message)
- self.message_type_name = message.path.replace('.','_')
- self.global_name = message.options.get("global_name",message.path)
- message.render()
- self.render_all_members()
- # self.c_source.writelines()
- def start_file(self,file:FileDescriptor) :
- super().start_file(file)
- self.set_source(Path(file.name).stem)
- self.reset_data()
- def end_file(self,file:ProtoElement) :
- super().end_file(file)
- # Parse request
- # self.lines['init_from_mac']['source'] = ['}']
- # self.c_header.writelines(self.lines['init_from_mac']['source'])
- # self.c_source.writelines(self.lines['init_from_mac']['header'])
- self.set_source(None)
- def get_name(self)->str:
- return 'protoc_plugin_options'
-
- def add_comment_if_exists(element, comment_type: str, path: str) -> dict:
- comment = getattr(element, f"{comment_type}_comment", "").strip()
- return {f"__{comment_type}_{path}": comment} if comment else {}
- def repeated_render(self,element:ProtoElement,obj:any):
- return [obj] if element.repeated else obj
- def get_option(self,element:ProtoElement,optname:str):
- options = element.options.get('cust_field',dict())
- msg_options = element.options.get('cust_msg',dict())
- return getattr(options,optname,None) or getattr(msg_options,optname,None)
- def get_nanoppb_option(self,element:ProtoElement,optname:str):
- options = element.options.get('nanopb_msgopt',dict())
- msg_options = element.options.get('nanopb',dict())
- return getattr(options,optname,None) or getattr(msg_options,optname,None)
- def get_mkey(self,key)->list([str]):
- if not self.all_members_defaults.get(key):
- self.all_members_defaults[key] = list()
- return self.all_members_defaults[key]
- def append_line(self,key,line):
- self.get_mkey(key).append(line)
- def get_member_assignment(self, element: ProtoElement, value) -> str:
- member = "default_value."
- cpptype = element.descriptor.cpp_type
- if cpptype == FieldDescriptor.CPPTYPE_ENUM:
- member += f'v_enum = {element.cpp_type_member_prefix}_{value}'
- elif cpptype == FieldDescriptor.CPPTYPE_INT32:
- member += f'v_int32 = {value}'
- elif cpptype == FieldDescriptor.CPPTYPE_INT64:
- member += f'v_int64 = {value}'
- elif cpptype == FieldDescriptor.CPPTYPE_UINT32:
- member += f'v_uint32 = {value}'
- elif cpptype == FieldDescriptor.CPPTYPE_UINT64:
- member += f'v_uint64 = {value}'
- elif cpptype == FieldDescriptor.CPPTYPE_DOUBLE:
- member += f'v_double = {value}'
- elif cpptype == FieldDescriptor.CPPTYPE_FLOAT:
- member += f'v_float = {value}'
- elif cpptype == FieldDescriptor.CPPTYPE_BOOL:
- member += f'v_bool = {value}'
- elif cpptype == FieldDescriptor.CPPTYPE_STRING:
- member += f'v_string = {value}'
- elif cpptype == FieldDescriptor.CPPTYPE_MESSAGE:
- # Assuming value is a serialized string or similar
- member += f'v_bytes = {value}'
- else:
- raise ValueError(f"Unsupported C++ type: {cpptype}")
- return member
- def render(self,element: ProtoElement) -> Dict:
- result = {}
- if len(element.childs)>0:
- # oneof = getattr(element.descriptor,'containing_oneof',None)
- # if oneof:
- # result[f'__one_of_{element.name}'] = f'Choose only one structure for {oneof.full_name}'
- for child in element.childs:
- child.render()
-
- else:
- options = element.options.get('cust_field',dict())
- nanopb_msgopt = element.options.get('',dict())
- nanopb = element.options.get('nanopb',dict())
- msg_options = element.options.get('cust_msg',dict())
- init_from_mac = getattr(options,'init_from_mac',False) or getattr(msg_options,'init_from_mac',False)
- read_only = getattr(options,'read_only',False) or getattr(msg_options,'read_only',False)
- default_value = getattr(options,'default_value',None) or getattr(msg_options,'default_value',None)
- init_from_mac = self.get_option(element,'init_from_mac') or False
- const_prefix = self.get_option(element,'const_prefix') or False
- read_only = self.get_option(element,'read_only') or False
- default_value = self.get_option(element,'default_value') or False
- global_name = self.get_option(element,'global_name') or False
- # pb_size_t which_default_value;
- # union {
- # char *v_string;
- # uint32_t v_uint32;
- # int32_t v_int32;
- # uint64_t v_uint64;
- # int64_t v_int64;
- # double v_double;
- # float v_float;
- # bool v_bool;
- # int16_t v_enum;
- # pb_bytes_array_t *v_bytes;
- # } default_value;
- if element.descriptor.cpp_type in [ FieldDescriptor.CPPTYPE_STRING ] and ( init_from_mac or const_prefix or read_only or default_value or global_name ):
- if not self.all_members_defaults.get(f'{PROCESSED_PREFIX}{element.cpp_type}'):
- self.get_mkey(element.cpp_child).append(f'#define {element.cpp_type}_ALL_MEMBERS_CHILD(msg,member)')
- self.get_mkey(element.cpp_root).append(f'#define {element.cpp_type}_ALL_MEMBERS_ROOT ')
- c_source, c_header = self.get_source_names(Path(element.file.name).stem)
- self.get_mkey(f'{element.cpp_type}_INCLUDES').append(f'#include "{c_header}"')
- self.all_members_defaults[f'processed_{element.cpp_type}'] = True
- member_prefix = f'{element.cpp_type_member.replace(".","_")}_OPTIONS'
- init_from_mac_str = 'true' if init_from_mac else 'false'
- const_prefix_str = f'"{const_prefix}"' if const_prefix else '""'
- read_only_str = "true" if read_only else "false"
- default_value_str = f'"{default_value}"' if default_value else '""'
- global_name_str = f'"{global_name}"' if global_name else '""'
- opt_member = 'STRING_ARRAY' if self.get_nanoppb_option(element,'max_length') >0 else 'STRING_POINTER'
- opt_member_type = opt_member + '_MEMBER'
-
- self.get_mkey(element.cpp_type).append(f'#define {member_prefix} {opt_member_type}({init_from_mac_str}, {const_prefix_str}, {read_only_str}, {default_value_str}, {global_name_str})')
- if element.detached_leading_comments:
- logger.debug(f'{element.detached_leading_comments}')
- logger.info(f'INITFROMMAC: {self.global_name}/{element.path}')
- self.get_mkey(element.cpp_child).append(f'{opt_member}(msg, member.{element.cpp_member},{opt_member_type}) ')
- self.get_mkey(element.cpp_root).append(f'{opt_member}({element.cpp_type},{element.cpp_member},{opt_member_type})')
- return self.repeated_render(element,result)
-
- def add_file(self,name:str,stream:io.StringIO):
- if stream and not stream.closed:
- f = self.response.file.add()
- f.name = name
- f.content = stream.getvalue()
- stream.close()
- def get_source_names(self,name)->(str,str):
- csource_name = f'{name}{file_suffix}'
- return (f'{csource_name}.c',f'{csource_name}.h')
-
- def set_source(self,name):
- if hasattr(self,"c_source"):
- self.add_file(self.c_source_name,self.c_source)
- self.add_file(self.c_header_name,self.c_header)
- if name:
- self.c_source_name,self.c_header_name = self.get_source_names(name)
- self.c_header = io.StringIO()
- self.c_header.write(head_comments)
- self.c_header.write(f'#pragma once\n')
- self.c_header.write(f'#include "sys_options.h"\n')
- self.c_header.write(f'#include "configuration.pb.h"\n')
- self.c_source = io.StringIO()
- self.c_source.write(head_comments)
- self.c_source.write(f'#include "{self.c_header_name}"\n')
- if __name__ == '__main__':
- data = ProtocParser.get_data()
- logger.info(f"Generating c source file(s) for options")
- protocParser:OptionsParser = OptionsParser(data)
- protocParser.process()
- logger.info('Done generating c source file(s) for options')
|