protoc-gen-options.py 12 KB


  1. #!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
  2. import io
  3. import os
  4. import logging
  5. import json
  6. from pathlib import Path
  7. import sys
  8. from typing import Dict, List
  9. from google.protobuf.compiler import plugin_pb2 as plugin
  10. from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto, FieldDescriptorProto,FieldOptions
  11. from google.protobuf.descriptor import FieldDescriptor, Descriptor, FileDescriptor
  12. from ProtoElement import ProtoElement
  13. from ProtocParser import ProtocParser
  14. file_suffix = "_options_pb2"
  15. head_comments ="""
  16. /* Automatically generated nanopb header */
  17. /* Generated by protoc-plugin-options */
  18. """
  19. PROCESSED_PREFIX = 'processed_'
  20. logger = logging.getLogger(__name__)
  21. logging.basicConfig(
  22. level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  23. )
  24. class OptionsParser(ProtocParser) :
  25. def start_message(self,message:ProtoElement) :
  26. super().start_message(message)
  27. def reset_data(self):
  28. self.all_members_defaults:Dict([str,str]) = {}
  29. self.lines = {}
  30. self.lines['init_from_mac'] = []
  31. # self.lines['read_only'] = {}
  32. # self.lines['default_value'] = {}
  33. # self.lines['init_from_mac']['source'] = ['void set_init_from_mac(self.message_type_name * data){']
  34. # self.lines['init_from_mac']['header'] = []
  35. # self.lines['read_only']['source'] = []
  36. # self.lines['read_only']['header'] = []
  37. # self.lines['default_value']['source'] = []
  38. # self.lines['default_value']['header'] = []
  39. def writelines(self,lines, out:io.StringIO):
  40. for l in lines:
  41. out.write(f'{l}\r\n')
  42. # def render_all_members_for_message(self,name,roottype:bool=False):
  43. # elements= self.lines['init_from_mac']
  44. # for element in [element for element in elements if element.descriptor.containing_type.name == name]:
  45. # suffix = "ROOT" if roottype else "CHILD(msg, member)"
  46. # const_prefix = self.get_option(element,'const_prefix') or ""
  47. # memberprefix = '' if roottype else 'member.'
  48. # msg = element.name if roottype else "msg"
  49. # self.c_header.writelines(f'MSG_INIT_FROM_MAC_DECLARATION({element.cpp_type})')
  50. # self.c_header.writelines(f'#define {element.cpp_type}_ALL_MEMBERS_{suffix}')
  51. # if(elements.index(element)>0):
  52. # self.c_header.writelines(f',\\\n')
  53. # else:
  54. # self.c_source.writelines(f'MSG_INIT_FROM_MAC_IMPLEMENTATION({msg_ctype});')
  55. def render_all_members(self):
  56. for key in [key for key in self.all_members_defaults.keys() if not '.' in key and not key.startswith(PROCESSED_PREFIX)]:
  57. # make each line unique
  58. self.all_members_defaults[key] = set(self.all_members_defaults[key])
  59. self.all_members_defaults[key] = '\\\n'.join(self.all_members_defaults[key])
  60. # WRITE DEPENDENCIES FOR THE CURRENT FILE
  61. try:
  62. member_defines = '\n'.join([ self.all_members_defaults.get(key) for key in self.all_members_defaults.keys() if '.' in key])
  63. self.c_header.writelines(member_defines)
  64. except Exception as e:
  65. logger.error(f'{e}')
  66. sys.exit(1) # Exit with error status
  67. message_defines = ',\\\n'.join([key for key in self.all_members_defaults.keys() if not '.' in key])
  68. self.c_header.writelines(message_defines)
  69. def end_message(self,message:ProtoElement):
  70. super().end_message(message)
  71. self.message_type_name = message.path.replace('.','_')
  72. self.global_name = message.options.get("global_name",message.path)
  73. message.render()
  74. self.render_all_members()
  75. # self.c_source.writelines()
  76. def start_file(self,file:FileDescriptor) :
  77. super().start_file(file)
  78. self.set_source(Path(file.name).stem)
  79. self.reset_data()
  80. def end_file(self,file:ProtoElement) :
  81. super().end_file(file)
  82. # Parse request
  83. # self.lines['init_from_mac']['source'] = ['}']
  84. # self.c_header.writelines(self.lines['init_from_mac']['source'])
  85. # self.c_source.writelines(self.lines['init_from_mac']['header'])
  86. self.set_source(None)
  87. def get_name(self)->str:
  88. return 'protoc_plugin_options'
  89. def add_comment_if_exists(element, comment_type: str, path: str) -> dict:
  90. comment = getattr(element, f"{comment_type}_comment", "").strip()
  91. return {f"__{comment_type}_{path}": comment} if comment else {}
  92. def repeated_render(self,element:ProtoElement,obj:any):
  93. return [obj] if element.repeated else obj
  94. def get_option(self,element:ProtoElement,optname:str):
  95. options = element.options.get('cust_field',dict())
  96. msg_options = element.options.get('cust_msg',dict())
  97. return getattr(options,optname,None) or getattr(msg_options,optname,None)
  98. def get_nanoppb_option(self,element:ProtoElement,optname:str):
  99. options = element.options.get('nanopb_msgopt',dict())
  100. msg_options = element.options.get('nanopb',dict())
  101. return getattr(options,optname,None) or getattr(msg_options,optname,None)
  102. def get_mkey(self,key)->list([str]):
  103. if not self.all_members_defaults.get(key):
  104. self.all_members_defaults[key] = list()
  105. return self.all_members_defaults[key]
  106. def append_line(self,key,line):
  107. self.get_mkey(key).append(line)
  108. def get_member_assignment(self, element: ProtoElement, value) -> str:
  109. member = "default_value."
  110. cpptype = element.descriptor.cpp_type
  111. if cpptype == FieldDescriptor.CPPTYPE_ENUM:
  112. member += f'v_enum = {element.cpp_type_member_prefix}_{value}'
  113. elif cpptype == FieldDescriptor.CPPTYPE_INT32:
  114. member += f'v_int32 = {value}'
  115. elif cpptype == FieldDescriptor.CPPTYPE_INT64:
  116. member += f'v_int64 = {value}'
  117. elif cpptype == FieldDescriptor.CPPTYPE_UINT32:
  118. member += f'v_uint32 = {value}'
  119. elif cpptype == FieldDescriptor.CPPTYPE_UINT64:
  120. member += f'v_uint64 = {value}'
  121. elif cpptype == FieldDescriptor.CPPTYPE_DOUBLE:
  122. member += f'v_double = {value}'
  123. elif cpptype == FieldDescriptor.CPPTYPE_FLOAT:
  124. member += f'v_float = {value}'
  125. elif cpptype == FieldDescriptor.CPPTYPE_BOOL:
  126. member += f'v_bool = {value}'
  127. elif cpptype == FieldDescriptor.CPPTYPE_STRING:
  128. member += f'v_string = {value}'
  129. elif cpptype == FieldDescriptor.CPPTYPE_MESSAGE:
  130. # Assuming value is a serialized string or similar
  131. member += f'v_bytes = {value}'
  132. else:
  133. raise ValueError(f"Unsupported C++ type: {cpptype}")
  134. return member
  135. def render(self,element: ProtoElement) -> Dict:
  136. result = {}
  137. if len(element.childs)>0:
  138. # oneof = getattr(element.descriptor,'containing_oneof',None)
  139. # if oneof:
  140. # result[f'__one_of_{element.name}'] = f'Choose only one structure for {oneof.full_name}'
  141. for child in element.childs:
  142. child.render()
  143. else:
  144. options = element.options.get('cust_field',dict())
  145. nanopb_msgopt = element.options.get('',dict())
  146. nanopb = element.options.get('nanopb',dict())
  147. msg_options = element.options.get('cust_msg',dict())
  148. init_from_mac = getattr(options,'init_from_mac',False) or getattr(msg_options,'init_from_mac',False)
  149. read_only = getattr(options,'read_only',False) or getattr(msg_options,'read_only',False)
  150. default_value = getattr(options,'default_value',None) or getattr(msg_options,'default_value',None)
  151. init_from_mac = self.get_option(element,'init_from_mac') or False
  152. const_prefix = self.get_option(element,'const_prefix') or False
  153. read_only = self.get_option(element,'read_only') or False
  154. default_value = self.get_option(element,'default_value') or False
  155. global_name = self.get_option(element,'global_name') or False
  156. # pb_size_t which_default_value;
  157. # union {
  158. # char *v_string;
  159. # uint32_t v_uint32;
  160. # int32_t v_int32;
  161. # uint64_t v_uint64;
  162. # int64_t v_int64;
  163. # double v_double;
  164. # float v_float;
  165. # bool v_bool;
  166. # int16_t v_enum;
  167. # pb_bytes_array_t *v_bytes;
  168. # } default_value;
  169. if element.descriptor.cpp_type in [ FieldDescriptor.CPPTYPE_STRING ] and ( init_from_mac or const_prefix or read_only or default_value or global_name ):
  170. if not self.all_members_defaults.get(f'{PROCESSED_PREFIX}{element.cpp_type}'):
  171. self.get_mkey(element.cpp_child).append(f'#define {element.cpp_type}_ALL_MEMBERS_CHILD(msg,member)')
  172. self.get_mkey(element.cpp_root).append(f'#define {element.cpp_type}_ALL_MEMBERS_ROOT ')
  173. c_source, c_header = self.get_source_names(Path(element.file.name).stem)
  174. self.get_mkey(f'{element.cpp_type}_INCLUDES').append(f'#include "{c_header}"')
  175. self.all_members_defaults[f'processed_{element.cpp_type}'] = True
  176. member_prefix = f'{element.cpp_type_member.replace(".","_")}_OPTIONS'
  177. init_from_mac_str = 'true' if init_from_mac else 'false'
  178. const_prefix_str = f'"{const_prefix}"' if const_prefix else '""'
  179. read_only_str = "true" if read_only else "false"
  180. default_value_str = f'"{default_value}"' if default_value else '""'
  181. global_name_str = f'"{global_name}"' if global_name else '""'
  182. opt_member = 'STRING_ARRAY' if self.get_nanoppb_option(element,'max_length') >0 else 'STRING_POINTER'
  183. opt_member_type = opt_member + '_MEMBER'
  184. self.get_mkey(element.cpp_type).append(f'#define {member_prefix} {opt_member_type}({init_from_mac_str}, {const_prefix_str}, {read_only_str}, {default_value_str}, {global_name_str})')
  185. if element.detached_leading_comments:
  186. logger.debug(f'{element.detached_leading_comments}')
  187. logger.info(f'INITFROMMAC: {self.global_name}/{element.path}')
  188. self.get_mkey(element.cpp_child).append(f'{opt_member}(msg, member.{element.cpp_member},{opt_member_type}) ')
  189. self.get_mkey(element.cpp_root).append(f'{opt_member}({element.cpp_type},{element.cpp_member},{opt_member_type})')
  190. return self.repeated_render(element,result)
  191. def add_file(self,name:str,stream:io.StringIO):
  192. if stream and not stream.closed:
  193. f = self.response.file.add()
  194. f.name = name
  195. f.content = stream.getvalue()
  196. stream.close()
  197. def get_source_names(self,name)->(str,str):
  198. csource_name = f'{name}{file_suffix}'
  199. return (f'{csource_name}.c',f'{csource_name}.h')
  200. def set_source(self,name):
  201. if hasattr(self,"c_source"):
  202. self.add_file(self.c_source_name,self.c_source)
  203. self.add_file(self.c_header_name,self.c_header)
  204. if name:
  205. self.c_source_name,self.c_header_name = self.get_source_names(name)
  206. self.c_header = io.StringIO()
  207. self.c_header.write(head_comments)
  208. self.c_header.write(f'#pragma once\n')
  209. self.c_header.write(f'#include "sys_options.h"\n')
  210. self.c_header.write(f'#include "configuration.pb.h"\n')
  211. self.c_source = io.StringIO()
  212. self.c_source.write(head_comments)
  213. self.c_source.write(f'#include "{self.c_header_name}"\n')
  214. if __name__ == '__main__':
  215. data = ProtocParser.get_data()
  216. logger.info(f"Generating c source file(s) for options")
  217. protocParser:OptionsParser = OptionsParser(data)
  218. protocParser.process()
  219. logger.info('Done generating c source file(s) for options')