protoc-gen-options.py 12 KB


  1. # !/usr/bin/env python
  2. import io
  3. import os
  4. import logging
  5. import json
  6. from pathlib import Path
  7. from typing import Dict, List
  8. from google.protobuf.compiler import plugin_pb2 as plugin
  9. from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto, FieldDescriptorProto,FieldOptions
  10. from google.protobuf.descriptor import FieldDescriptor, Descriptor, FileDescriptor
  11. from ProtoElement import ProtoElement
  12. from ProtocParser import ProtocParser
  13. file_suffix = "_options_pb2"
  14. head_comments ="""
  15. /* Automatically generated nanopb header */
  16. /* Generated by protoc-plugin-options */
  17. """
  18. PROCESSED_PREFIX = 'processed_'
  19. logger = logging.getLogger(__name__)
  20. logging.basicConfig(
  21. level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  22. )
  23. class OptionsParser(ProtocParser) :
  24. def start_message(self,message:ProtoElement) :
  25. super().start_message(message)
  26. def reset_data(self):
  27. self.all_members_defaults:Dict([str,str]) = {}
  28. self.lines = {}
  29. self.lines['init_from_mac'] = []
  30. # self.lines['read_only'] = {}
  31. # self.lines['default_value'] = {}
  32. # self.lines['init_from_mac']['source'] = ['void set_init_from_mac(self.message_type_name * data){']
  33. # self.lines['init_from_mac']['header'] = []
  34. # self.lines['read_only']['source'] = []
  35. # self.lines['read_only']['header'] = []
  36. # self.lines['default_value']['source'] = []
  37. # self.lines['default_value']['header'] = []
  38. def writelines(self,lines, out:io.StringIO):
  39. for l in lines:
  40. out.write(f'{l}\r\n')
  41. # def render_all_members_for_message(self,name,roottype:bool=False):
  42. # elements= self.lines['init_from_mac']
  43. # for element in [element for element in elements if element.descriptor.containing_type.name == name]:
  44. # suffix = "ROOT" if roottype else "CHILD(msg, member)"
  45. # const_prefix = self.get_option(element,'const_prefix') or ""
  46. # memberprefix = '' if roottype else 'member.'
  47. # msg = element.name if roottype else "msg"
  48. # self.c_header.writelines(f'MSG_INIT_FROM_MAC_DECLARATION({element.cpp_type})')
  49. # self.c_header.writelines(f'#define {element.cpp_type}_ALL_MEMBERS_{suffix}')
  50. # if(elements.index(element)>0):
  51. # self.c_header.writelines(f',\\\n')
  52. # else:
  53. # self.c_source.writelines(f'MSG_INIT_FROM_MAC_IMPLEMENTATION({msg_ctype});')
  54. def render_all_members(self):
  55. for key in [key for key in self.all_members_defaults.keys() if not '.' in key and not key.startswith(PROCESSED_PREFIX)]:
  56. # make each line unique
  57. self.all_members_defaults[key] = set(self.all_members_defaults[key])
  58. self.all_members_defaults[key] = '\\\n'.join(self.all_members_defaults[key])
  59. # WRITE DEPENDENCIES FOR THE CURRENT FILE
  60. member_defines = '\n'.join([ self.all_members_defaults.get(key) for key in self.all_members_defaults.keys() if '.' in key])
  61. self.c_header.writelines(member_defines)
  62. message_defines = ',\\\n'.join([key for key in self.all_members_defaults.keys() if not '.' in key])
  63. self.c_header.writelines(message_defines)
  64. def end_message(self,message:ProtoElement):
  65. super().end_message(message)
  66. self.message_type_name = message.path.replace('.','_')
  67. self.global_name = message.options.get("global_name",message.path)
  68. message.render()
  69. self.render_all_members()
  70. # self.c_source.writelines()
  71. def start_file(self,file:FileDescriptor) :
  72. super().start_file(file)
  73. self.set_source(Path(file.name).stem)
  74. self.reset_data()
  75. def end_file(self,file:ProtoElement) :
  76. super().end_file(file)
  77. # Parse request
  78. # self.lines['init_from_mac']['source'] = ['}']
  79. # self.c_header.writelines(self.lines['init_from_mac']['source'])
  80. # self.c_source.writelines(self.lines['init_from_mac']['header'])
  81. self.set_source(None)
  82. def get_name(self)->str:
  83. return 'protoc_plugin_options'
  84. def add_comment_if_exists(element, comment_type: str, path: str) -> dict:
  85. comment = getattr(element, f"{comment_type}_comment", "").strip()
  86. return {f"__{comment_type}_{path}": comment} if comment else {}
  87. def repeated_render(self,element:ProtoElement,obj:any):
  88. return [obj] if element.repeated else obj
  89. def get_option(self,element:ProtoElement,optname:str):
  90. options = element.options.get('cust_field',dict())
  91. msg_options = element.options.get('cust_msg',dict())
  92. return getattr(options,optname,None) or getattr(msg_options,optname,None)
  93. def get_nanoppb_option(self,element:ProtoElement,optname:str):
  94. options = element.options.get('nanopb_msgopt',dict())
  95. msg_options = element.options.get('nanopb',dict())
  96. return getattr(options,optname,None) or getattr(msg_options,optname,None)
  97. def get_mkey(self,key)->list([str]):
  98. if not self.all_members_defaults.get(key):
  99. self.all_members_defaults[key] = list()
  100. return self.all_members_defaults[key]
  101. def append_line(self,key,line):
  102. self.get_mkey(key).append(line)
  103. def get_member_assignment(self, element: ProtoElement, value) -> str:
  104. member = "default_value."
  105. cpptype = element.descriptor.cpp_type
  106. if cpptype == FieldDescriptor.CPPTYPE_ENUM:
  107. member += f'v_enum = {element.cpp_type_member_prefix}_{value}'
  108. elif cpptype == FieldDescriptor.CPPTYPE_INT32:
  109. member += f'v_int32 = {value}'
  110. elif cpptype == FieldDescriptor.CPPTYPE_INT64:
  111. member += f'v_int64 = {value}'
  112. elif cpptype == FieldDescriptor.CPPTYPE_UINT32:
  113. member += f'v_uint32 = {value}'
  114. elif cpptype == FieldDescriptor.CPPTYPE_UINT64:
  115. member += f'v_uint64 = {value}'
  116. elif cpptype == FieldDescriptor.CPPTYPE_DOUBLE:
  117. member += f'v_double = {value}'
  118. elif cpptype == FieldDescriptor.CPPTYPE_FLOAT:
  119. member += f'v_float = {value}'
  120. elif cpptype == FieldDescriptor.CPPTYPE_BOOL:
  121. member += f'v_bool = {value}'
  122. elif cpptype == FieldDescriptor.CPPTYPE_STRING:
  123. member += f'v_string = {value}'
  124. elif cpptype == FieldDescriptor.CPPTYPE_MESSAGE:
  125. # Assuming value is a serialized string or similar
  126. member += f'v_bytes = {value}'
  127. else:
  128. raise ValueError(f"Unsupported C++ type: {cpptype}")
  129. return member
  130. def render(self,element: ProtoElement) -> Dict:
  131. result = {}
  132. if len(element.childs)>0:
  133. # oneof = getattr(element.descriptor,'containing_oneof',None)
  134. # if oneof:
  135. # result[f'__one_of_{element.name}'] = f'Choose only one structure for {oneof.full_name}'
  136. for child in element.childs:
  137. child.render()
  138. else:
  139. options = element.options.get('cust_field',dict())
  140. nanopb_msgopt = element.options.get('',dict())
  141. nanopb = element.options.get('nanopb',dict())
  142. msg_options = element.options.get('cust_msg',dict())
  143. init_from_mac = getattr(options,'init_from_mac',False) or getattr(msg_options,'init_from_mac',False)
  144. read_only = getattr(options,'read_only',False) or getattr(msg_options,'read_only',False)
  145. default_value = getattr(options,'default_value',None) or getattr(msg_options,'default_value',None)
  146. init_from_mac = self.get_option(element,'init_from_mac') or False
  147. const_prefix = self.get_option(element,'const_prefix') or False
  148. read_only = self.get_option(element,'read_only') or False
  149. default_value = self.get_option(element,'default_value') or False
  150. global_name = self.get_option(element,'global_name') or False
  151. # pb_size_t which_default_value;
  152. # union {
  153. # char *v_string;
  154. # uint32_t v_uint32;
  155. # int32_t v_int32;
  156. # uint64_t v_uint64;
  157. # int64_t v_int64;
  158. # double v_double;
  159. # float v_float;
  160. # bool v_bool;
  161. # int16_t v_enum;
  162. # pb_bytes_array_t *v_bytes;
  163. # } default_value;
  164. if element.descriptor.cpp_type in [ FieldDescriptor.CPPTYPE_STRING ] and ( init_from_mac or const_prefix or read_only or default_value or global_name ):
  165. if not self.all_members_defaults.get(f'{PROCESSED_PREFIX}{element.cpp_type}'):
  166. self.get_mkey(element.cpp_child).append(f'#define {element.cpp_type}_ALL_MEMBERS_CHILD(msg,member)')
  167. self.get_mkey(element.cpp_root).append(f'#define {element.cpp_type}_ALL_MEMBERS_ROOT ')
  168. c_source, c_header = self.get_source_names(Path(element.file.name).stem)
  169. self.get_mkey(f'{element.cpp_type}_INCLUDES').append(f'#include "{c_header}"')
  170. self.all_members_defaults[f'processed_{element.cpp_type}'] = True
  171. member_prefix = f'{element.cpp_type_member.replace(".","_")}_OPTIONS'
  172. init_from_mac_str = 'true' if init_from_mac else 'false'
  173. const_prefix_str = f'"{const_prefix}"' if const_prefix else '""'
  174. read_only_str = "true" if read_only else "false"
  175. default_value_str = f'"{default_value}"' if default_value else '""'
  176. global_name_str = f'"{global_name}"' if global_name else '""'
  177. opt_member = 'STRING_ARRAY' if self.get_nanoppb_option(element,'max_length') >0 else 'STRING_POINTER'
  178. opt_member_type = opt_member + '_MEMBER'
  179. self.get_mkey(element.cpp_type).append(f'#define {member_prefix} {opt_member_type}({init_from_mac_str}, {const_prefix_str}, {read_only_str}, {default_value_str}, {global_name_str})')
  180. if element.detached_leading_comments:
  181. logger.debug(f'{element.detached_leading_comments}')
  182. logger.info(f'INITFROMMAC: {self.global_name}{element.path}')
  183. self.get_mkey(element.cpp_child).append(f'{opt_member}(msg, member.{element.cpp_member},{opt_member_type}) ')
  184. self.get_mkey(element.cpp_root).append(f'{opt_member}({element.cpp_type},{element.cpp_member},{opt_member_type})')
  185. return self.repeated_render(element,result)
  186. def add_file(self,name:str,stream:io.StringIO):
  187. if stream and not stream.closed:
  188. f = self.response.file.add()
  189. f.name = name
  190. f.content = stream.getvalue()
  191. stream.close()
  192. def get_source_names(self,name)->(str,str):
  193. csource_name = f'{name}{file_suffix}'
  194. return (f'{csource_name}.c',f'{csource_name}.h')
  195. def set_source(self,name):
  196. if hasattr(self,"c_source"):
  197. self.add_file(self.c_source_name,self.c_source)
  198. self.add_file(self.c_header_name,self.c_header)
  199. if name:
  200. self.c_source_name,self.c_header_name = self.get_source_names(name)
  201. self.c_header = io.StringIO()
  202. self.c_header.write(head_comments)
  203. self.c_header.write(f'#pragma once\n')
  204. self.c_header.write(f'#include "sys_options.h"\n')
  205. self.c_header.write(f'#include "configuration.pb.h"\n')
  206. self.c_source = io.StringIO()
  207. self.c_source.write(head_comments)
  208. self.c_source.write(f'#include "{self.c_header_name}"\n')
  209. if __name__ == '__main__':
  210. data = ProtocParser.get_data()
  211. logger.info(f"Generating c source file(s) for options")
  212. protocParser:OptionsParser = OptionsParser(data)
  213. protocParser.process()
  214. logger.info('Done generating c source file(s) for options')