ProtoElement.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from __future__ import annotations
  2. from functools import partial
  3. from typing import ClassVar, List, Any, Dict
  4. from typing import Callable
  5. from google.protobuf import message
  6. from google.protobuf.descriptor import Descriptor, FieldDescriptor, FileDescriptor
  7. from google.protobuf.descriptor_pb2 import FieldDescriptorProto,DescriptorProto,EnumDescriptorProto
  8. import google.protobuf.descriptor_pool as descriptor_pool
  9. import logging
  10. import copy
  11. # import custom_options_pb2 as custom
  12. RendererType = Callable[['ProtoElement'], Dict]
  13. class ProtoElement:
  14. childs:List[ProtoElement]
  15. descriptor:Descriptor|FieldDescriptor
  16. comments: Dict[str,str]
  17. enum_type:EnumDescriptorProto
  18. _comments: Dict[str,str] ={}
  19. pool:descriptor_pool.DescriptorPool
  20. prototypes: dict[str, type[message.Message]]
  21. renderer:RendererType
  22. package:str
  23. file:FileDescriptor
  24. message:str
  25. options:Dict[str,any]
  26. _message_instance:ClassVar
  27. @classmethod
  28. def set_prototypes(cls,prototypes:dict[str, type[message.Message]]):
  29. cls.prototypes = prototypes
  30. @classmethod
  31. def set_comments_base(cls,comments:Dict[str,str]):
  32. cls._comments = comments
  33. @classmethod
  34. def set_pool(cls,pool:descriptor_pool.DescriptorPool):
  35. cls.pool = pool
  36. @classmethod
  37. def set_logger(cls,logger = None):
  38. if not logger and not cls.logger:
  39. cls.logger = logging.getLogger(__name__)
  40. logging.basicConfig(
  41. level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  42. )
  43. elif logger:
  44. cls.logger = logger
  45. @classmethod
  46. def set_render(cls,render):
  47. cls.render_class = render
  48. def __init__(self, descriptor: Descriptor|FieldDescriptor, parent=None):
  49. ProtoElement.set_logger()
  50. self.descriptor = descriptor
  51. self.file = descriptor.file
  52. self.package = getattr(descriptor,"file",parent).package
  53. self.descriptorname = descriptor.name
  54. self.json_name = getattr(descriptor,'json_name','')
  55. self.type_name = getattr(descriptor,'type_name',descriptor.name)
  56. self.parent = parent
  57. self.fullname = descriptor.full_name
  58. self.type = getattr(descriptor,'type',FieldDescriptor.TYPE_MESSAGE)
  59. if self.type ==FieldDescriptor.TYPE_MESSAGE:
  60. try:
  61. self._message_instance = self.prototypes[self.descriptor.message_type.full_name]()
  62. # self.logger.debug(f'Found instance for {self.descriptor.message_type.full_name}')
  63. except:
  64. # self.logger.error(f'Could not find instance for {self.descriptor.full_name}')
  65. self._message_instance = self.prototypes[self.descriptor.full_name]()
  66. self.label = getattr(descriptor,'label',None)
  67. self.childs = []
  68. if descriptor.has_options:
  69. self.options = {descr.name: value for descr, value in descriptor.GetOptions().ListFields()}
  70. else:
  71. self.options = {}
  72. try:
  73. if descriptor.containing_type.has_options:
  74. self.options.update({descr.name: value for descr, value in descriptor.containing_type.GetOptions().ListFields()})
  75. except:
  76. pass
  77. self.render = partial(self.render_class, self)
  78. self.comments = {comment.split('.')[-1]:self._comments[comment] for comment in self._comments.keys() if comment.startswith(self.path)}
  79. @property
  80. def cpp_type(self)->str:
  81. return f'{self.package}_{self.descriptor.containing_type.name}'
  82. @property
  83. def cpp_member(self)->str:
  84. return self.name
  85. @property
  86. def cpp_type_member_prefix(self)->str:
  87. return f'{self.cpp_type}_{self.cpp_member}'
  88. @property
  89. def cpp_type_member(self)->str:
  90. return f'{self.cpp_type}.{self.cpp_member}'
  91. @property
  92. def main_message(self)->bool:
  93. return self.parent == None
  94. @property
  95. def parent(self)->ProtoElement:
  96. return self._parent
  97. @parent.setter
  98. def parent(self,value:ProtoElement):
  99. self._parent = value
  100. if value:
  101. self._parent.childs.append(self)
  102. @property
  103. def root(self)->ProtoElement:
  104. return self if not self.parent else self.parent
  105. @property
  106. def enum_type(self)->EnumDescriptorProto:
  107. return self.descriptor.enum_type
  108. @property
  109. def cpp_root(self):
  110. return f'{self.cpp_type}_ROOT'
  111. @property
  112. def cpp_child(self):
  113. return f'{self.cpp_type}_CHILD'
  114. @property
  115. def message_instance(self):
  116. return getattr(self,'_message_instance',getattr(self.parent,'message_instance',None))
  117. @property
  118. def tree(self):
  119. childs = '->('+', '.join(c.tree for c in self.childs ) + ')' if len(self.childs)>0 else ''
  120. return f'{self.name}{childs}'
  121. @property
  122. def name(self):
  123. return self.descriptorname if len(self.descriptorname)>0 else self.parent.name if self.parent else self.package
  124. @property
  125. def enum_values(self)->List[str]:
  126. return [n.name for n in getattr(self.enum_type,"values",getattr(self.enum_type,"value",[])) ]
  127. @property
  128. def enum_values_str(self)->str:
  129. return ', '.join(self.enum_values)
  130. @property
  131. def fields(self)->List[FieldDescriptor]:
  132. return getattr(self.descriptor,"fields",getattr(getattr(self.descriptor,"message_type",None),"fields",None))
  133. @property
  134. def _default_value(self):
  135. if 'default_value' in self.options:
  136. return self.options['default_value']
  137. if self.type in [FieldDescriptorProto.TYPE_INT32, FieldDescriptorProto.TYPE_INT64,
  138. FieldDescriptorProto.TYPE_UINT32, FieldDescriptorProto.TYPE_UINT64,
  139. FieldDescriptorProto.TYPE_SINT32, FieldDescriptorProto.TYPE_SINT64,
  140. FieldDescriptorProto.TYPE_FIXED32, FieldDescriptorProto.TYPE_FIXED64,
  141. FieldDescriptorProto.TYPE_SFIXED32, FieldDescriptorProto.TYPE_SFIXED64]:
  142. return 0
  143. elif self.type in [FieldDescriptorProto.TYPE_FLOAT, FieldDescriptorProto.TYPE_DOUBLE]:
  144. return 0.0
  145. elif self.type == FieldDescriptorProto.TYPE_BOOL:
  146. return False
  147. elif self.type in [FieldDescriptorProto.TYPE_STRING, FieldDescriptorProto.TYPE_BYTES]:
  148. return ""
  149. elif self.is_enum:
  150. return self.enum_values[0] if self.enum_values else 0
  151. @property
  152. def detached_leading_comments(self)->str:
  153. return self.comments["leading"] if "detached" in self.comments else ""
  154. @property
  155. def leading_comment(self)->str:
  156. return self.comments["leading"] if "leading" in self.comments else ""
  157. @property
  158. def trailing_comment(self)->str:
  159. return self.comments["trailing"] if "trailing" in self.comments else ""
  160. @property
  161. def is_enum(self):
  162. return self.type == FieldDescriptorProto.TYPE_ENUM
  163. @property
  164. def path(self) -> str:
  165. return self.descriptor.full_name
  166. @property
  167. def enum_name(self)-> str:
  168. return self.type_name.split('.', maxsplit=1)[-1]
  169. @property
  170. def repeated(self)->bool:
  171. return self.label== FieldDescriptor.LABEL_REPEATED