From b72603d3863e025ed978ce38cfe482f5975bab37 Mon Sep 17 00:00:00 2001 From: Alina Lenk Date: Fri, 30 Sep 2022 20:29:38 +0200 Subject: [PATCH 1/4] generate_packets.py: Move script configuration into a class See osdn#45752 Signed-off-by: Alina Lenk --- common/generate_packets.py | 433 +++++++++++++++++++------------------ 1 file changed, 226 insertions(+), 207 deletions(-) diff --git a/common/generate_packets.py b/common/generate_packets.py index bf6a5c13cf..402e487eea 100755 --- a/common/generate_packets.py +++ b/common/generate_packets.py @@ -19,7 +19,7 @@ import re import argparse from pathlib import Path -from contextlib import contextmanager +from contextlib import contextmanager, ExitStack from functools import partial from itertools import chain, combinations, takewhile, zip_longest from collections import deque @@ -39,35 +39,6 @@ T_co = typing.TypeVar("T_co", covariant = True) ###################### Parsing Command Line Arguments ###################### -### Script configuration -# See get_argparser for what each of these does -# Keep initial values in sync with argparser defaults -is_verbose = False -lazy_overwrite = False -generate_stats = False -generate_logs = True -use_log_macro = "log_packet_detailed" -fold_bool_into_header = True - -def config_script(args): - """Update script configuration from the given argument namespace. - - args should be a namespace of a shape like those - produced by get_argparser().parse_args()""" - - global lazy_overwrite, is_verbose - global generate_stats, generate_logs, use_log_macro - global fold_bool_into_header - - is_verbose = args.verbose - lazy_overwrite = args.lazy_overwrite - - generate_stats = args.gen_stats - generate_logs = args.log_macro is not None - use_log_macro = args.log_macro - - fold_bool_into_header = args.fold_bool - def file_path(s: "str | Path") -> Path: """Parse the given path and check basic validity.""" path = Path(s) @@ -79,97 +50,123 @@ def file_path(s: "str | Path") -> Path: return path -def get_argparser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - description = "Generate packet-related code from packets.def", - add_help = False, # we'll add a help option explicitly - ) - - # Argument groups - # Note the order: - # We want the path arguments to show up *first* in the help text - - paths = parser.add_argument_group( - "Output paths", - "The following parameters decide which output files are generated," - " and where the generated code is written.", - ) - - script = parser.add_argument_group( - "Script configuration", - "The following parameters change how the script operates.", - ) - - output = parser.add_argument_group( - "Output configuration", - "The following parameters change the amount of output.", - ) - - protocol = parser.add_argument_group( - "Protocol configuration", - "The following parameters CHANGE the protocol." - " You have been warned.", - ) - - # Individual arguments - # Note the order: - # We want the path arguments to show up *last* in the usage summary - - script.add_argument("-h", "--help", action = "help", - help = "show this help message and exit") - - script.add_argument("-v", "--verbose", action = "store_true", - help = "enable log messages during code generation") - - # When enabled: Only overwrite existing output files when they - # actually changed. This prevents make from rebuilding all dependents - # in cases where that wouldn't even be necessary. - script.add_argument("--lazy-overwrite", action = "store_true", - help = "only overwrite output files when their" - " contents actually changed") - - output.add_argument("-s", "--gen-stats", action = "store_true", - help = "generate code reporting packet usage" - " statistics; call delta_stats_report to get these") - - logs = output.add_mutually_exclusive_group() - logs.add_argument("-l", "--log-macro", default = "log_packet_detailed", - help = "use the given macro for generated log calls") - logs.add_argument("-L", "--no-logs", dest = "log_macro", - action = "store_const", const = None, - help = "disable generating log calls") - - protocol.add_argument("-B", "--no-fold-bool", - dest = "fold_bool", action = "store_false", - help = "explicitly encode boolean values in the" - " packet body, rather than folding them into the" - " packet header") - - path_args = ( - # (dest, option, canonical path) - ("common_header_path", "--common-h", "common/packets_gen.h"), - ("common_impl_path", "--common-c", "common/packets_gen.c"), - ("client_header_path", "--client-h", "client/packhand_gen.h"), - ("client_impl_path", "--client-c", "client/packhand_gen.c"), - ("server_header_path", "--server-h", "server/hand_gen.h"), - ("server_impl_path", "--server-c", "server/hand_gen.c"), - ) - - for dest, option, canonical in path_args: - paths.add_argument(option, dest = dest, type = file_path, - help = "output path for %s" % canonical) - - return parser - -def verbose(s): - if is_verbose: - print(s) - - -####################### File access helper functions ####################### - -def write_disclaimer(f: typing.TextIO): - f.write("""\ + +class ScriptConfig: + """Contains configuration info for the script's execution, along with + functions closely tied to that configuration""" + + @staticmethod + def get_argparser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description = "Generate packet-related code from packets.def", + add_help = False, # we'll add a help option explicitly + ) + + # Argument groups + # Note the order: + # We want the path arguments to show up *first* in the help text + + paths = parser.add_argument_group( + "Output paths", + "The following parameters decide which output files are generated," + " and where the generated code is written.", + ) + + script = parser.add_argument_group( + "Script configuration", + "The following parameters change how the script operates.", + ) + + output = parser.add_argument_group( + "Output configuration", + "The following parameters change the amount of output.", + ) + + protocol = parser.add_argument_group( + "Protocol configuration", + "The following parameters CHANGE the protocol." + " You have been warned.", + ) + + # Individual arguments + # Note the order: + # We want the path arguments to show up *last* in the usage summary + + script.add_argument("-h", "--help", action = "help", + help = "show this help message and exit") + + script.add_argument("-v", "--verbose", action = "store_true", + help = "enable log messages during code generation") + + # When enabled: Only overwrite existing output files when they + # actually changed. This prevents make from rebuilding all dependents + # in cases where that wouldn't even be necessary. + script.add_argument("--lazy-overwrite", action = "store_true", + help = "only overwrite output files when their" + " contents actually changed") + + output.add_argument("-s", "--gen-stats", action = "store_true", + help = "generate code reporting packet usage" + " statistics; call delta_stats_report to get these") + + logs = output.add_mutually_exclusive_group() + logs.add_argument("-l", "--log-macro", default = "log_packet_detailed", + help = "use the given macro for generated log calls") + logs.add_argument("-L", "--no-logs", dest = "log_macro", + action = "store_const", const = None, + help = "disable generating log calls") + + protocol.add_argument("-B", "--no-fold-bool", + dest = "fold_bool", action = "store_false", + help = "explicitly encode boolean values in the" + " packet body, rather than folding them into the" + " packet header") + + path_args = ( + # (dest, option, canonical path) + ("common_header_path", "--common-h", "common/packets_gen.h"), + ("common_impl_path", "--common-c", "common/packets_gen.c"), + ("client_header_path", "--client-h", "client/packhand_gen.h"), + ("client_impl_path", "--client-c", "client/packhand_gen.c"), + ("server_header_path", "--server-h", "server/hand_gen.h"), + ("server_impl_path", "--server-c", "server/hand_gen.c"), + ) + + for dest, option, canonical in path_args: + paths.add_argument(option, dest = dest, type = file_path, + help = "output path for %s" % canonical) + + return parser + + def __init__(self, args: "typing.Sequence[str] | None" = None): + # type hints for fields + # FIXME: Once we can use Python 3.6 features, turn these into + # (class-level) variable annotations + if typing.TYPE_CHECKING: + optional_path = (file_path(""), None)[int()] + self.common_header_path = optional_path + self.common_impl_path = optional_path + self.server_header_path = optional_path + self.server_impl_path = optional_path + self.client_header_path = optional_path + self.client_impl_path = optional_path + + self.verbose = False + self.lazy_overwrite = False + + self.gen_stats = False + self.log_macro = str() or None + + self.fold_bool = True + + __class__.get_argparser().parse_args(args, namespace = self) + + def log_verbose(self, *args): + if self.verbose: + print(*args) + + def _write_disclaimer(self, f: typing.TextIO): + f.write("""\ /**************************************************************************** * THIS FILE WAS GENERATED * * Script: common/generate_packets.py * @@ -179,28 +176,33 @@ def write_disclaimer(f: typing.TextIO): """) -@contextmanager -def wrap_header(file: typing.TextIO, header_name: str, cplusplus: bool = True) -> typing.Iterator[None]: - """Add multiple inclusion protection to the given file. If cplusplus - is given (default), also add code for `extern "C" {}` wrapping""" - name = "FC__%s_H" % header_name.upper() - file.write("""\ + @contextmanager + def _wrap_header(self, file: typing.TextIO, header_name: str) -> typing.Iterator[None]: + """Add multiple inclusion protection to the given file""" + name = "FC__%s_H" % header_name.upper() + file.write("""\ #ifndef {name} #define {name} """.format(name = name)) - if cplusplus: + yield + + file.write("""\ + +#endif /* {name} */ +""".format(name = name)) + + @contextmanager + def _wrap_cplusplus(self, file: typing.TextIO) -> typing.Iterator[None]: + """Add code for `extern "C" {}` wrapping""" file.write("""\ #ifdef __cplusplus extern "C" { #endif /* __cplusplus */ """) - - yield - - if cplusplus: + yield file.write("""\ #ifdef __cplusplus @@ -208,61 +210,65 @@ extern "C" { #endif /* __cplusplus */ """) - file.write("""\ - -#endif /* {name} */ -""".format(name = name)) + @contextmanager + def open_write(self, path: "str | Path", wrap_header: "str | None" = None, cplusplus: bool = True) -> typing.Iterator[typing.TextIO]: + """Open a file for writing and write disclaimer. -@contextmanager -def fc_open(path: "str | Path") -> typing.Iterator[typing.TextIO]: - """Open a file for writing and write disclaimer. + If enabled, lazily overwrites the given file. + If wrap_header is given, add multiple inclusion protection; if + cplusplus is also given (default), also add code for `extern "C"` + wrapping.""" + path = Path(path) # no-op if path is already a Path object + self.log_verbose("writing %s" % path) - If enabled, lazily overwrites the given file.""" - path = Path(path) # no-op if path is already a Path object - verbose("writing %s" % path) + with ExitStack() as stack: + if self.lazy_overwrite: + file = stack.enter_context(self.lazy_overwrite_open(path)) + else: + file = stack.enter_context(path.open("w")) + + self._write_disclaimer(file) + + if wrap_header is not None: + stack.enter_context(self._wrap_header(file, wrap_header)) + if cplusplus: + stack.enter_context(self._wrap_cplusplus(file)) + yield file + self.log_verbose("done writing %s" % path) + + @contextmanager + def lazy_overwrite_open(self, path: "str | Path", suffix: str = ".tmp") -> typing.Iterator[typing.TextIO]: + """Open a file for writing, but only actually overwrite it if the new + content differs from the old content. + + This creates a temporary file by appending the given suffix to the given + file path. In the event of an error, this temporary file might remain in + the target file's directory.""" + + path = Path(path) + tmp_path = path.with_name(path.name + suffix) + + # if tmp_path already exists, assume it's left over from a previous, + # failed run and can be overwritten without trouble + self.log_verbose("lazy: using %s" % tmp_path) + with tmp_path.open("w") as file: + yield file + + if path.exists() and files_equal(tmp_path, path): + self.log_verbose("lazy: no change, deleting...") + tmp_path.unlink() + else: + self.log_verbose("lazy: content changed, replacing...") + tmp_path.replace(path) - if lazy_overwrite: - open_fun = lazy_overwrite_open - else: - open_fun = partial(Path.open, mode = "w") - with open_fun(path) as file: - write_disclaimer(file) - yield file - verbose("done writing %s" % path) +################### General helper functions and classes ################### def files_equal(path_a: "str | Path", path_b: "str | Path") -> bool: """Return whether the contents of two text files are identical""" with Path(path_a).open() as file_a, Path(path_b).open() as file_b: return all(a == b for a, b in zip_longest(file_a, file_b)) -@contextmanager -def lazy_overwrite_open(path: "str | Path", suffix: str = ".tmp") -> typing.Iterator[typing.TextIO]: - """Open a file for writing, but only actually overwrite it if the new - content differs from the old content. - - This creates a temporary file by appending the given suffix to the given - file path. In the event of an error, this temporary file might remain in - the target file's directory.""" - - path = Path(path) - tmp_path = path.with_name(path.name + suffix) - - # if tmp_path already exists, assume it's left over from a previous, - # failed run and can be overwritten without trouble - verbose("lazy: using %s" % tmp_path) - with tmp_path.open("w") as file: - yield file - - if path.exists() and files_equal(tmp_path, path): - verbose("lazy: no change, deleting...") - tmp_path.unlink() - else: - verbose("lazy: content changed, replacing...") - tmp_path.replace(path) - -################### General helper functions and classes ################### - # Taken from https://docs.python.org/3.4/library/itertools.html#itertools-recipes def powerset(iterable: typing.Iterable[T_co]) -> "typing.Iterator[tuple[T_co, ...]]": "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)" @@ -1260,7 +1266,7 @@ class Field: - the final array size""" @classmethod - def parse(cls, line: str, resolve_type: typing.Callable[[str], RawFieldType]) -> "typing.Iterable[Field]": + def parse(cls, cfg: ScriptConfig, line: str, resolve_type: typing.Callable[[str], RawFieldType]) -> "typing.Iterable[Field]": """Parse a single line defining one or more fields""" mo = cls.FIELDS_LINE_PATTERN.fullmatch(line) if mo is None: @@ -1284,9 +1290,10 @@ class Field: if not isinstance(field_type, FieldType): raise ValueError("need an array size to use type %s" % field_type) - yield Field(field_text, field_type, flag_info) + yield Field(cfg, field_text, field_type, flag_info) - def __init__(self, name: str, type_info: FieldType, flags: FieldFlags): + def __init__(self, cfg: ScriptConfig, name: str, type_info: FieldType, flags: FieldFlags): + self.cfg = cfg self.name = name """Field name""" @@ -1346,7 +1353,7 @@ class Field: @property def folded_into_head(self) -> bool: return ( - fold_bool_into_header + self.cfg.fold_bool and self.type_info.foldable ) @@ -1477,10 +1484,6 @@ field_addr.name = \"{self.name}\"; class Variant: def __init__(self, poscaps: typing.Iterable[str], negcaps: typing.Iterable[str], packet: "Packet", no: int): - self.log_macro=use_log_macro - self.gen_stats=generate_stats - self.gen_log=generate_logs - self.packet = packet self.no=no self.name = "%s_%d" % (packet.name, no) @@ -1502,6 +1505,22 @@ class Variant: if not self.fields and packet.fields: raise ValueError("empty variant for nonempty {self.packet_name} with capabilities {self.poscaps}".format(self = self)) + @property + def cfg(self) -> ScriptConfig: + return self.packet.cfg + + @property + def gen_stats(self) -> bool: + return self.cfg.gen_stats + + @property + def log_macro(self) -> "str | None": + return self.cfg.log_macro + + @property + def gen_log(self) -> bool: + return self.log_macro is not None + @property def packet_name(self) -> str: """Name of the packet this is a variant of @@ -2234,8 +2253,9 @@ class Packet: """Whether this packet's handle function should be called with the connection instead of the attached player""" - def __init__(self, packet_type: str, packet_number: int, flags_text: str, + def __init__(self, cfg: ScriptConfig, packet_type: str, packet_number: int, flags_text: str, lines: typing.Iterable[str], resolve_type: typing.Callable[[str], RawFieldType]): + self.cfg = cfg self.type = packet_type self.type_number = packet_number @@ -2301,7 +2321,7 @@ class Packet: self.fields = [ field for line in lines - for field in Field.parse(line, resolve_type) + for field in Field.parse(self.cfg, line, resolve_type) ] self.key_fields = [field for field in self.fields if field.is_key] self.other_fields = [field for field in self.fields if not field.is_key] @@ -2677,7 +2697,7 @@ class PacketsDefinition(typing.Iterable[Packet]): )) packet = Packet( - packet_type, packet_number, flags_text, + self.cfg, packet_type, packet_number, flags_text, takewhile( lambda line: self.PACKET_END_PATTERN.fullmatch(line) is None, lines_iter, # advance the iterator used by this for loop @@ -2703,7 +2723,7 @@ class PacketsDefinition(typing.Iterable[Packet]): """Define a type alias""" if alias in self.types: if meaning == self.types[alias]: - verbose("duplicate typedef: %r = %r" % (alias, meaning)) + self.cfg.log_verbose("duplicate typedef: %r = %r" % (alias, meaning)) return else: raise ValueError("duplicate type alias %r: %r and %r" @@ -2711,7 +2731,8 @@ class PacketsDefinition(typing.Iterable[Packet]): self.types[alias] = self.resolve_type(meaning) - def __init__(self, type_registry: "TypeRegistry | None" = None): + def __init__(self, cfg: ScriptConfig, type_registry: "TypeRegistry | None" = None): + self.cfg = cfg self.type_registry = type_registry or DEFAULT_REGISTRY self.types = {} self.packets = [] @@ -2755,7 +2776,7 @@ const char *const packet_functional_capability = "%s"; @property def code_delta_stats_report(self) -> str: """Code fragment implementing the delta_stats_report() function""" - if not generate_stats: return """\ + if not self.cfg.gen_stats: return """\ void delta_stats_report(void) {} """ @@ -2777,7 +2798,7 @@ void delta_stats_report(void) { @property def code_delta_stats_reset(self) -> str: """Code fragment implementing the delta_stats_reset() function""" - if not generate_stats: return """\ + if not self.cfg.gen_stats: return """\ void delta_stats_reset(void) {} """ @@ -3045,7 +3066,7 @@ def write_common_header(path: "str | Path | None", packets: PacketsDefinition): """Write contents for common/packets_gen.h to the given path""" if path is None: return - with fc_open(path) as output_h, wrap_header(output_h, "packets_gen"): + with packets.cfg.open_write(path, wrap_header = "packets_gen") as output_h: output_h.write("""\ /* common */ #include "actions.h" @@ -3076,7 +3097,7 @@ def write_common_impl(path: "str | Path | None", packets: PacketsDefinition): """Write contents for common/packets_gen.c to the given path""" if path is None: return - with fc_open(path) as output_c: + with packets.cfg.open_write(path) as output_c: output_c.write("""\ #ifdef HAVE_CONFIG_H #include @@ -3117,7 +3138,7 @@ static bool cmp_const(const void *vkey1, const void *vkey2) """) - if generate_stats: + if packets.cfg.gen_stats: output_c.write("""\ static int stats_total_sent; @@ -3143,11 +3164,11 @@ static int stats_total_sent; output_c.write(packets.code_packet_handlers_fill_initial) output_c.write(packets.code_packet_handlers_fill_capability) -def write_server_header(path: "str | Path | None", packets: typing.Iterable[Packet]): +def write_server_header(path: "str | Path | None", packets: PacketsDefinition): """Write contents for server/hand_gen.h to the given path""" if path is None: return - with fc_open(path) as f, wrap_header(f, "hand_gen", cplusplus = False): + with packets.cfg.open_write(path, wrap_header = "hand_gen", cplusplus = False) as f: f.write("""\ /* utility */ #include "shared.h" @@ -3184,11 +3205,11 @@ void handle_%s(%s, const struct %s *packet); void handle_%s(%s%s); """ % (a, sender, b)) -def write_client_header(path: "str | Path | None", packets: typing.Iterable[Packet]): +def write_client_header(path: "str | Path | None", packets: PacketsDefinition): """Write contents for client/packhand_gen.h to the given path""" if path is None: return - with fc_open(path) as f, wrap_header(f, "packhand_gen"): + with packets.cfg.open_write(path, wrap_header = "packhand_gen") as f: f.write("""\ /* utility */ #include "shared.h" @@ -3217,11 +3238,11 @@ void handle_%s(const struct %s *packet); void handle_%s(%s); """ % (a, b)) -def write_server_impl(path: "str | Path | None", packets: typing.Iterable[Packet]): +def write_server_impl(path: "str | Path | None", packets: PacketsDefinition): """Write contents for server/hand_gen.c to the given path""" if path is None: return - with fc_open(path) as f: + with packets.cfg.open_write(path) as f: f.write("""\ #ifdef HAVE_CONFIG_H #include @@ -3270,11 +3291,11 @@ bool server_handle_packet(enum packet_type type, const void *packet, } """) -def write_client_impl(path: "str | Path | None", packets: typing.Iterable[Packet]): +def write_client_impl(path: "str | Path | None", packets: PacketsDefinition): """Write contents for client/packhand_gen.c to the given path""" if path is None: return - with fc_open(path) as f: + with packets.cfg.open_write(path) as f: f.write("""\ #ifdef HAVE_CONFIG_H #include @@ -3321,15 +3342,13 @@ bool client_handle_packet(enum packet_type type, const void *packet) # various files. def main(raw_args: "typing.Sequence[str] | None" = None): ### parsing arguments - global is_verbose - script_args = get_argparser().parse_args(raw_args) - config_script(script_args) + script_args = ScriptConfig(raw_args) ### parsing input src_dir = Path(__file__).parent input_path = src_dir / "networking" / "packets.def" - packets = PacketsDefinition() + packets = PacketsDefinition(script_args) with input_path.open() as input_file: packets.parse_lines(input_file) ### parsing finished -- 2.34.1