| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 | # Run a fuzz test to verify robustness against corrupted/malicious data.import sysimport timeimport zipfileimport randomimport subprocessImport("env", "malloc_env")def set_pkgname(src, dst, pkgname):    data = open(str(src)).read()    placeholder = '// package name placeholder'    assert placeholder in data    data = data.replace(placeholder, 'package %s;' % pkgname)    open(str(dst), 'w').write(data)# We want both pointer and static versions of the AllTypes message# Prefix them with package name.env.Command("alltypes_static.proto", "#alltypes/alltypes.proto",            lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_static'))env.Command("alltypes_pointer.proto", "#alltypes/alltypes.proto",            lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_pointer'))env.NanopbProto(["alltypes_pointer", "alltypes_pointer.options"])env.NanopbProto(["alltypes_static", "alltypes_static.options"])# Do the same for proto3 versionsenv.Command("alltypes_proto3_static.proto", "#alltypes_proto3/alltypes.proto",            lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_static'))env.Command("alltypes_proto3_pointer.proto", "#alltypes_proto3/alltypes.proto",            lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_pointer'))env.NanopbProto(["alltypes_proto3_pointer", "alltypes_proto3_pointer.options"])env.NanopbProto(["alltypes_proto3_static", "alltypes_proto3_static.options"])# And also a callback versionenv.Command("alltypes_callback.proto", "#alltypes/alltypes.proto",            lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_callback'))env.NanopbProto(["alltypes_callback", "alltypes_callback.options"])common_objs = [env.Object("random_data.c"),               env.Object("validation.c"),               env.Object("flakystream.c"),               env.Object("alltypes_pointer.pb.c"),               env.Object("alltypes_static.pb.c"),               env.Object("alltypes_callback.pb.c"),               env.Object("alltypes_proto3_pointer.pb.c"),               env.Object("alltypes_proto3_static.pb.c"),                "$COMMON/malloc_wrappers.o"]objs_malloc = ["$COMMON/pb_encode_with_malloc.o",               "$COMMON/pb_decode_with_malloc.o",               "$COMMON/pb_common_with_malloc.o"] + common_objsobjs_static = ["$COMMON/pb_encode.o",               "$COMMON/pb_decode.o",               "$COMMON/pb_common.o"] + common_objsfuzz = malloc_env.Program(["fuzztest.c"] + objs_malloc)# Run the stand-alone fuzz testerseed = int(time.time())if env.get('EMBEDDED'):    iterations = 100else:    iterations = 1000env.RunTest(fuzz, ARGS = [str(seed), str(iterations)])generate_message = malloc_env.Program(["generate_message.c"] + objs_static)# Test the message generatorenv.RunTest(generate_message, ARGS = [str(seed)])env.RunTest("generate_message.output.fuzzed", [fuzz, "generate_message.output"])# Run against the latest corpus from ossfuzz# This allows quick testing against regressions and also lets us more# completely test slow embedded targets. To reduce runtime, only a subset# of the corpus is fuzzed each time.def run_against_corpus(target, source, env):    corpus = zipfile.ZipFile(str(source[1]), 'r')    count = 0    args = [str(source[0])]    if "TEST_RUNNER" in env:        args = [env["TEST_RUNNER"]] + args    if "FUZZTEST_CORPUS_SAMPLESIZE" in env:        samplesize = int(env["FUZZTEST_CORPUS_SAMPLESIZE"])    elif env.get('EMBEDDED'):        samplesize = 100    else:        samplesize = 4096    files = [n for n in corpus.namelist() if not n.endswith('/')]    files = random.sample(files, min(samplesize, len(files)))    for filename in files:        sys.stdout.write("Fuzzing: %5d/%5d: %-40.40s\r" % (count, len(files), filename))        sys.stdout.flush()        count += 1        maxsize = env.get('CPPDEFINES', {}).get('FUZZTEST_BUFSIZE', 256*1024)        data_in = corpus.read(filename)[:maxsize]        try:            process = subprocess.Popen(args, stdin=subprocess.PIPE,                                       stdout=subprocess.PIPE, stderr=subprocess.PIPE)            stdout, stderr = process.communicate(input = data_in)            result = process.wait()        except OSError as e:            if e.errno == 22:                print("Warning: OSError 22 when running with input " + filename)                result = process.wait()            else:                raise        if result != 0:            stdout += stderr            print(stdout)            print('\033[31m[FAIL]\033[0m   Program ' + str(args) + ' returned ' + str(result) + ' with input ' + filename + ' from ' + str(source[1]))            return result    open(str(target[0]), 'w').write(str(count))    print('\033[32m[ OK ]\033[0m   Ran ' + str(args) + " against " + str(source[1]) + " (" + str(count) + " entries)")env.Command("corpus.zip.fuzzed", [fuzz, "corpus.zip"], run_against_corpus)env.Command("regressions.zip.fuzzed", [fuzz, "regressions.zip"], run_against_corpus)# Build separate fuzzers for each test case.# Having them separate speeds up control flow based fuzzer engines.# These are mainly used by oss-fuzz project.env_proto2_static = env.Clone()env_proto2_static.Append(CPPDEFINES = {'FUZZTEST_PROTO2_STATIC': '1'})env_proto2_static.Program("fuzztest_proto2_static",    [env_proto2_static.Object("fuzztest_proto2_static.o", "fuzztest.c")] + objs_static)env_proto2_pointer = malloc_env.Clone()env_proto2_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO2_POINTER': '1'})env_proto2_pointer.Program("fuzztest_proto2_pointer",    [env_proto2_pointer.Object("fuzztest_proto2_pointer.o", "fuzztest.c")] + objs_malloc)env_proto3_static = env.Clone()env_proto3_static.Append(CPPDEFINES = {'FUZZTEST_PROTO3_STATIC': '1'})env_proto3_static.Program("fuzztest_proto3_static",    [env_proto3_static.Object("fuzztest_proto3_static.o", "fuzztest.c")] + objs_static)env_proto3_pointer = malloc_env.Clone()env_proto3_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO3_POINTER': '1'})env_proto3_pointer.Program("fuzztest_proto3_pointer",    [env_proto3_pointer.Object("fuzztest_proto3_pointer.o", "fuzztest.c")] + objs_malloc)env_io_errors = malloc_env.Clone()env_io_errors.Append(CPPDEFINES = {'FUZZTEST_IO_ERRORS': '1'})env_io_errors.Program("fuzztest_io_errors",    [env_io_errors.Object("fuzztest_io_errors.o", "fuzztest.c")] + objs_malloc)
 |