# Run a fuzz test to verify robustness against corrupted/malicious data. import sys import time import zipfile import random import subprocess Import("env", "malloc_env") def set_pkgname(src, dst, pkgname): data = open(str(src)).read() placeholder = '// package name placeholder' assert placeholder in data data = data.replace(placeholder, 'package %s;' % pkgname) open(str(dst), 'w').write(data) # We want both pointer and static versions of the AllTypes message # Prefix them with package name. env.Command("alltypes_static.proto", "#alltypes/alltypes.proto", lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_static')) env.Command("alltypes_pointer.proto", "#alltypes/alltypes.proto", lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_pointer')) env.NanopbProto(["alltypes_pointer", "alltypes_pointer.options"]) env.NanopbProto(["alltypes_static", "alltypes_static.options"]) # Do the same for proto3 versions env.Command("alltypes_proto3_static.proto", "#alltypes_proto3/alltypes.proto", lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_static')) env.Command("alltypes_proto3_pointer.proto", "#alltypes_proto3/alltypes.proto", lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_pointer')) env.NanopbProto(["alltypes_proto3_pointer", "alltypes_proto3_pointer.options"]) env.NanopbProto(["alltypes_proto3_static", "alltypes_proto3_static.options"]) # And also a callback version env.Command("alltypes_callback.proto", "#alltypes/alltypes.proto", lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_callback')) env.NanopbProto(["alltypes_callback", "alltypes_callback.options"]) common_objs = [env.Object("random_data.c"), env.Object("validation.c"), env.Object("flakystream.c"), env.Object("alltypes_pointer.pb.c"), env.Object("alltypes_static.pb.c"), env.Object("alltypes_callback.pb.c"), env.Object("alltypes_proto3_pointer.pb.c"), env.Object("alltypes_proto3_static.pb.c"), "$COMMON/malloc_wrappers.o"] objs_malloc = ["$COMMON/pb_encode_with_malloc.o", "$COMMON/pb_decode_with_malloc.o", "$COMMON/pb_common_with_malloc.o"] + common_objs objs_static = ["$COMMON/pb_encode.o", "$COMMON/pb_decode.o", "$COMMON/pb_common.o"] + common_objs fuzz = malloc_env.Program(["fuzztest.c"] + objs_malloc) # Run the stand-alone fuzz tester seed = int(time.time()) if env.get('EMBEDDED'): iterations = 100 else: iterations = 1000 env.RunTest(fuzz, ARGS = [str(seed), str(iterations)]) generate_message = malloc_env.Program(["generate_message.c"] + objs_static) # Test the message generator env.RunTest(generate_message, ARGS = [str(seed)]) env.RunTest("generate_message.output.fuzzed", [fuzz, "generate_message.output"]) # Run against the latest corpus from ossfuzz # This allows quick testing against regressions and also lets us more # completely test slow embedded targets. To reduce runtime, only a subset # of the corpus is fuzzed each time. def run_against_corpus(target, source, env): corpus = zipfile.ZipFile(str(source[1]), 'r') count = 0 args = [str(source[0])] if "TEST_RUNNER" in env: args = [env["TEST_RUNNER"]] + args if "FUZZTEST_CORPUS_SAMPLESIZE" in env: samplesize = int(env["FUZZTEST_CORPUS_SAMPLESIZE"]) elif env.get('EMBEDDED'): samplesize = 100 else: samplesize = 4096 files = [n for n in corpus.namelist() if not n.endswith('/')] files = random.sample(files, min(samplesize, len(files))) for filename in files: sys.stdout.write("Fuzzing: %5d/%5d: %-40.40s\r" % (count, len(files), filename)) sys.stdout.flush() count += 1 maxsize = env.get('CPPDEFINES', {}).get('FUZZTEST_BUFSIZE', 256*1024) data_in = corpus.read(filename)[:maxsize] try: process = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate(input = data_in) result = process.wait() except OSError as e: if e.errno == 22: print("Warning: OSError 22 when running with input " + filename) result = process.wait() else: raise if result != 0: stdout += stderr print(stdout) print('\033[31m[FAIL]\033[0m Program ' + str(args) + ' returned ' + str(result) + ' with input ' + filename + ' from ' + str(source[1])) return result open(str(target[0]), 'w').write(str(count)) print('\033[32m[ OK ]\033[0m Ran ' + str(args) + " against " + str(source[1]) + " (" + str(count) + " entries)") env.Command("corpus.zip.fuzzed", [fuzz, "corpus.zip"], run_against_corpus) env.Command("regressions.zip.fuzzed", [fuzz, "regressions.zip"], run_against_corpus) # Build separate fuzzers for each test case. # Having them separate speeds up control flow based fuzzer engines. # These are mainly used by oss-fuzz project. env_proto2_static = env.Clone() env_proto2_static.Append(CPPDEFINES = {'FUZZTEST_PROTO2_STATIC': '1'}) env_proto2_static.Program("fuzztest_proto2_static", [env_proto2_static.Object("fuzztest_proto2_static.o", "fuzztest.c")] + objs_static) env_proto2_pointer = malloc_env.Clone() env_proto2_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO2_POINTER': '1'}) env_proto2_pointer.Program("fuzztest_proto2_pointer", [env_proto2_pointer.Object("fuzztest_proto2_pointer.o", "fuzztest.c")] + objs_malloc) env_proto3_static = env.Clone() env_proto3_static.Append(CPPDEFINES = {'FUZZTEST_PROTO3_STATIC': '1'}) env_proto3_static.Program("fuzztest_proto3_static", [env_proto3_static.Object("fuzztest_proto3_static.o", "fuzztest.c")] + objs_static) env_proto3_pointer = malloc_env.Clone() env_proto3_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO3_POINTER': '1'}) env_proto3_pointer.Program("fuzztest_proto3_pointer", [env_proto3_pointer.Object("fuzztest_proto3_pointer.o", "fuzztest.c")] + objs_malloc) env_io_errors = malloc_env.Clone() env_io_errors.Append(CPPDEFINES = {'FUZZTEST_IO_ERRORS': '1'}) env_io_errors.Program("fuzztest_io_errors", [env_io_errors.Object("fuzztest_io_errors.o", "fuzztest.c")] + objs_malloc)