| // Copyright 2020 The Go Authors. All rights reserved. | |
| // Use of this source code is governed by a BSD-style | |
| // license that can be found in the LICENSE file. | |
| package fuzz | |
| import ( | |
| "bytes" | |
| "context" | |
| "crypto/sha256" | |
| "encoding/json" | |
| "errors" | |
| "fmt" | |
| "io" | |
| "os" | |
| "os/exec" | |
| "reflect" | |
| "runtime" | |
| "sync" | |
| "time" | |
| ) | |
| const ( | |
| // workerFuzzDuration is the amount of time a worker can spend testing random | |
| // variations of an input given by the coordinator. | |
| workerFuzzDuration = 100 * time.Millisecond | |
| // workerTimeoutDuration is the amount of time a worker can go without | |
| // responding to the coordinator before being stopped. | |
| workerTimeoutDuration = 1 * time.Second | |
| // workerExitCode is used as an exit code by fuzz worker processes after an internal error. | |
| // This distinguishes internal errors from uncontrolled panics and other crashes. | |
| // Keep in sync with internal/fuzz.workerExitCode. | |
| workerExitCode = 70 | |
| // workerSharedMemSize is the maximum size of the shared memory file used to | |
| // communicate with workers. This limits the size of fuzz inputs. | |
| workerSharedMemSize = 100 << 20 // 100 MB | |
| ) | |
| // worker manages a worker process running a test binary. The worker object | |
| // exists only in the coordinator (the process started by 'go test -fuzz'). | |
| // workerClient is used by the coordinator to send RPCs to the worker process, | |
| // which handles them with workerServer. | |
| type worker struct { | |
| dir string // working directory, same as package directory | |
| binPath string // path to test executable | |
| args []string // arguments for test executable | |
| env []string // environment for test executable | |
| coordinator *coordinator | |
| memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes. | |
| cmd *exec.Cmd // current worker process | |
| client *workerClient // used to communicate with worker process | |
| waitErr error // last error returned by wait, set before termC is closed. | |
| interrupted bool // true after stop interrupts a running worker. | |
| termC chan struct{} // closed by wait when worker process terminates | |
| } | |
| func newWorker(c *coordinator, dir, binPath string, args, env []string) (*worker, error) { | |
| mem, err := sharedMemTempFile(workerSharedMemSize) | |
| if err != nil { | |
| return nil, err | |
| } | |
| memMu := make(chan *sharedMem, 1) | |
| memMu <- mem | |
| return &worker{ | |
| dir: dir, | |
| binPath: binPath, | |
| args: args, | |
| env: env[:len(env):len(env)], // copy on append to ensure workers don't overwrite each other. | |
| coordinator: c, | |
| memMu: memMu, | |
| }, nil | |
| } | |
| // cleanup releases persistent resources associated with the worker. | |
| func (w *worker) cleanup() error { | |
| mem := <-w.memMu | |
| if mem == nil { | |
| return nil | |
| } | |
| close(w.memMu) | |
| return mem.Close() | |
| } | |
| // coordinate runs the test binary to perform fuzzing. | |
| // | |
| // coordinate loops until ctx is canceled or a fatal error is encountered. | |
| // If a test process terminates unexpectedly while fuzzing, coordinate will | |
| // attempt to restart and continue unless the termination can be attributed | |
| // to an interruption (from a timer or the user). | |
| // | |
| // While looping, coordinate receives inputs from the coordinator, passes | |
| // those inputs to the worker process, then passes the results back to | |
| // the coordinator. | |
| func (w *worker) coordinate(ctx context.Context) error { | |
| // Main event loop. | |
| for { | |
| // Start or restart the worker if it's not running. | |
| if !w.isRunning() { | |
| if err := w.startAndPing(ctx); err != nil { | |
| return err | |
| } | |
| } | |
| select { | |
| case <-ctx.Done(): | |
| // Worker was told to stop. | |
| err := w.stop() | |
| if err != nil && !w.interrupted && !isInterruptError(err) { | |
| return err | |
| } | |
| return ctx.Err() | |
| case <-w.termC: | |
| // Worker process terminated unexpectedly while waiting for input. | |
| err := w.stop() | |
| if w.interrupted { | |
| panic("worker interrupted after unexpected termination") | |
| } | |
| if err == nil || isInterruptError(err) { | |
| // Worker stopped, either by exiting with status 0 or after being | |
| // interrupted with a signal that was not sent by the coordinator. | |
| // | |
| // When the user presses ^C, on POSIX platforms, SIGINT is delivered to | |
| // all processes in the group concurrently, and the worker may see it | |
| // before the coordinator. The worker should exit 0 gracefully (in | |
| // theory). | |
| // | |
| // This condition is probably intended by the user, so suppress | |
| // the error. | |
| return nil | |
| } | |
| if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode { | |
| // Worker exited with a code indicating F.Fuzz was not called correctly, | |
| // for example, F.Fail was called first. | |
| return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err) | |
| } | |
| // Worker exited non-zero or was terminated by a non-interrupt | |
| // signal (for example, SIGSEGV) while fuzzing. | |
| return fmt.Errorf("fuzzing process hung or terminated unexpectedly: %w", err) | |
| // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker. | |
| case input := <-w.coordinator.inputC: | |
| // Received input from coordinator. | |
| args := fuzzArgs{ | |
| Limit: input.limit, | |
| Timeout: input.timeout, | |
| Warmup: input.warmup, | |
| CoverageData: input.coverageData, | |
| } | |
| entry, resp, isInternalError, err := w.client.fuzz(ctx, input.entry, args) | |
| canMinimize := true | |
| if err != nil { | |
| // Error communicating with worker. | |
| w.stop() | |
| if ctx.Err() != nil { | |
| // Timeout or interruption. | |
| return ctx.Err() | |
| } | |
| if w.interrupted { | |
| // Communication error before we stopped the worker. | |
| // Report an error, but don't record a crasher. | |
| return fmt.Errorf("communicating with fuzzing process: %v", err) | |
| } | |
| if sig, ok := terminationSignal(w.waitErr); ok && !isCrashSignal(sig) { | |
| // Worker terminated by a signal that probably wasn't caused by a | |
| // specific input to the fuzz function. For example, on Linux, | |
| // the kernel (OOM killer) may send SIGKILL to a process using a lot | |
| // of memory. Or the shell might send SIGHUP when the terminal | |
| // is closed. Don't record a crasher. | |
| return fmt.Errorf("fuzzing process terminated by unexpected signal; no crash will be recorded: %v", w.waitErr) | |
| } | |
| if isInternalError { | |
| // An internal error occurred which shouldn't be considered | |
| // a crash. | |
| return err | |
| } | |
| // Unexpected termination. Set error message and fall through. | |
| // We'll restart the worker on the next iteration. | |
| // Don't attempt to minimize this since it crashed the worker. | |
| resp.Err = fmt.Sprintf("fuzzing process hung or terminated unexpectedly: %v", w.waitErr) | |
| canMinimize = false | |
| } | |
| result := fuzzResult{ | |
| limit: input.limit, | |
| count: resp.Count, | |
| totalDuration: resp.TotalDuration, | |
| entryDuration: resp.InterestingDuration, | |
| entry: entry, | |
| crasherMsg: resp.Err, | |
| coverageData: resp.CoverageData, | |
| canMinimize: canMinimize, | |
| } | |
| w.coordinator.resultC <- result | |
| case input := <-w.coordinator.minimizeC: | |
| // Received input to minimize from coordinator. | |
| result, err := w.minimize(ctx, input) | |
| if err != nil { | |
| // Error minimizing. Send back the original input. If it didn't cause | |
| // an error before, report it as causing an error now. | |
| // TODO: double-check this is handled correctly when | |
| // implementing -keepfuzzing. | |
| result = fuzzResult{ | |
| entry: input.entry, | |
| crasherMsg: input.crasherMsg, | |
| canMinimize: false, | |
| limit: input.limit, | |
| } | |
| if result.crasherMsg == "" { | |
| result.crasherMsg = err.Error() | |
| } | |
| } | |
| if shouldPrintDebugInfo() { | |
| w.coordinator.debugLogf( | |
| "input minimized, id: %s, original id: %s, crasher: %t, originally crasher: %t, minimizing took: %s", | |
| result.entry.Path, | |
| input.entry.Path, | |
| result.crasherMsg != "", | |
| input.crasherMsg != "", | |
| result.totalDuration, | |
| ) | |
| } | |
| w.coordinator.resultC <- result | |
| } | |
| } | |
| } | |
| // minimize tells a worker process to attempt to find a smaller value that | |
| // either causes an error (if we started minimizing because we found an input | |
| // that causes an error) or preserves new coverage (if we started minimizing | |
| // because we found an input that expands coverage). | |
| func (w *worker) minimize(ctx context.Context, input fuzzMinimizeInput) (min fuzzResult, err error) { | |
| if w.coordinator.opts.MinimizeTimeout != 0 { | |
| var cancel func() | |
| ctx, cancel = context.WithTimeout(ctx, w.coordinator.opts.MinimizeTimeout) | |
| defer cancel() | |
| } | |
| args := minimizeArgs{ | |
| Limit: input.limit, | |
| Timeout: input.timeout, | |
| KeepCoverage: input.keepCoverage, | |
| } | |
| entry, resp, err := w.client.minimize(ctx, input.entry, args) | |
| if err != nil { | |
| // Error communicating with worker. | |
| w.stop() | |
| if ctx.Err() != nil || w.interrupted || isInterruptError(w.waitErr) { | |
| // Worker was interrupted, possibly by the user pressing ^C. | |
| // Normally, workers can handle interrupts and timeouts gracefully and | |
| // will return without error. An error here indicates the worker | |
| // may not have been in a good state, but the error won't be meaningful | |
| // to the user. Just return the original crasher without logging anything. | |
| return fuzzResult{ | |
| entry: input.entry, | |
| crasherMsg: input.crasherMsg, | |
| coverageData: input.keepCoverage, | |
| canMinimize: false, | |
| limit: input.limit, | |
| }, nil | |
| } | |
| return fuzzResult{ | |
| entry: entry, | |
| crasherMsg: fmt.Sprintf("fuzzing process hung or terminated unexpectedly while minimizing: %v", err), | |
| canMinimize: false, | |
| limit: input.limit, | |
| count: resp.Count, | |
| totalDuration: resp.Duration, | |
| }, nil | |
| } | |
| if input.crasherMsg != "" && resp.Err == "" { | |
| return fuzzResult{}, fmt.Errorf("attempted to minimize a crash but could not reproduce") | |
| } | |
| return fuzzResult{ | |
| entry: entry, | |
| crasherMsg: resp.Err, | |
| coverageData: resp.CoverageData, | |
| canMinimize: false, | |
| limit: input.limit, | |
| count: resp.Count, | |
| totalDuration: resp.Duration, | |
| }, nil | |
| } | |
| func (w *worker) isRunning() bool { | |
| return w.cmd != nil | |
| } | |
| // startAndPing starts the worker process and sends it a message to make sure it | |
| // can communicate. | |
| // | |
| // startAndPing returns an error if any part of this didn't work, including if | |
| // the context is expired or the worker process was interrupted before it | |
| // responded. Errors that happen after start but before the ping response | |
| // likely indicate that the worker did not call F.Fuzz or called F.Fail first. | |
| // We don't record crashers for these errors. | |
| func (w *worker) startAndPing(ctx context.Context) error { | |
| if ctx.Err() != nil { | |
| return ctx.Err() | |
| } | |
| if err := w.start(); err != nil { | |
| return err | |
| } | |
| if err := w.client.ping(ctx); err != nil { | |
| w.stop() | |
| if ctx.Err() != nil { | |
| return ctx.Err() | |
| } | |
| if isInterruptError(err) { | |
| // User may have pressed ^C before worker responded. | |
| return err | |
| } | |
| // TODO: record and return stderr. | |
| return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err) | |
| } | |
| return nil | |
| } | |
| // start runs a new worker process. | |
| // | |
| // If the process couldn't be started, start returns an error. Start won't | |
| // return later termination errors from the process if they occur. | |
| // | |
| // If the process starts successfully, start returns nil. stop must be called | |
| // once later to clean up, even if the process terminates on its own. | |
| // | |
| // When the process terminates, w.waitErr is set to the error (if any), and | |
| // w.termC is closed. | |
| func (w *worker) start() (err error) { | |
| if w.isRunning() { | |
| panic("worker already started") | |
| } | |
| w.waitErr = nil | |
| w.interrupted = false | |
| w.termC = nil | |
| cmd := exec.Command(w.binPath, w.args...) | |
| cmd.Dir = w.dir | |
| cmd.Env = w.env[:len(w.env):len(w.env)] // copy on append to ensure workers don't overwrite each other. | |
| // Create the "fuzz_in" and "fuzz_out" pipes so we can communicate with | |
| // the worker. We don't use stdin and stdout, since the test binary may | |
| // do something else with those. | |
| // | |
| // Each pipe has a reader and a writer. The coordinator writes to fuzzInW | |
| // and reads from fuzzOutR. The worker inherits fuzzInR and fuzzOutW. | |
| // The coordinator closes fuzzInR and fuzzOutW after starting the worker, | |
| // since we have no further need of them. | |
| fuzzInR, fuzzInW, err := os.Pipe() | |
| if err != nil { | |
| return err | |
| } | |
| defer fuzzInR.Close() | |
| fuzzOutR, fuzzOutW, err := os.Pipe() | |
| if err != nil { | |
| fuzzInW.Close() | |
| return err | |
| } | |
| defer fuzzOutW.Close() | |
| setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, memMu: w.memMu}) | |
| // Start the worker process. | |
| if err := cmd.Start(); err != nil { | |
| fuzzInW.Close() | |
| fuzzOutR.Close() | |
| return err | |
| } | |
| // Worker started successfully. | |
| // After this, w.client owns fuzzInW and fuzzOutR, so w.client.Close must be | |
| // called later by stop. | |
| w.cmd = cmd | |
| w.termC = make(chan struct{}) | |
| comm := workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, memMu: w.memMu} | |
| m := newMutator() | |
| w.client = newWorkerClient(comm, m) | |
| go func() { | |
| w.waitErr = w.cmd.Wait() | |
| close(w.termC) | |
| }() | |
| return nil | |
| } | |
| // stop tells the worker process to exit by closing w.client, then blocks until | |
| // it terminates. If the worker doesn't terminate after a short time, stop | |
| // signals it with os.Interrupt (where supported), then os.Kill. | |
| // | |
| // stop returns the error the process terminated with, if any (same as | |
| // w.waitErr). | |
| // | |
| // stop must be called at least once after start returns successfully, even if | |
| // the worker process terminates unexpectedly. | |
| func (w *worker) stop() error { | |
| if w.termC == nil { | |
| panic("worker was not started successfully") | |
| } | |
| select { | |
| case <-w.termC: | |
| // Worker already terminated. | |
| if w.client == nil { | |
| // stop already called. | |
| return w.waitErr | |
| } | |
| // Possible unexpected termination. | |
| w.client.Close() | |
| w.cmd = nil | |
| w.client = nil | |
| return w.waitErr | |
| default: | |
| // Worker still running. | |
| } | |
| // Tell the worker to stop by closing fuzz_in. It won't actually stop until it | |
| // finishes with earlier calls. | |
| closeC := make(chan struct{}) | |
| go func() { | |
| w.client.Close() | |
| close(closeC) | |
| }() | |
| sig := os.Interrupt | |
| if runtime.GOOS == "windows" { | |
| // Per https://golang.org/pkg/os/#Signal, “Interrupt is not implemented on | |
| // Windows; using it with os.Process.Signal will return an error.” | |
| // Fall back to Kill instead. | |
| sig = os.Kill | |
| } | |
| t := time.NewTimer(workerTimeoutDuration) | |
| for { | |
| select { | |
| case <-w.termC: | |
| // Worker terminated. | |
| t.Stop() | |
| <-closeC | |
| w.cmd = nil | |
| w.client = nil | |
| return w.waitErr | |
| case <-t.C: | |
| // Timer fired before worker terminated. | |
| w.interrupted = true | |
| switch sig { | |
| case os.Interrupt: | |
| // Try to stop the worker with SIGINT and wait a little longer. | |
| w.cmd.Process.Signal(sig) | |
| sig = os.Kill | |
| t.Reset(workerTimeoutDuration) | |
| case os.Kill: | |
| // Try to stop the worker with SIGKILL and keep waiting. | |
| w.cmd.Process.Signal(sig) | |
| sig = nil | |
| t.Reset(workerTimeoutDuration) | |
| case nil: | |
| // Still waiting. Print a message to let the user know why. | |
| fmt.Fprintf(w.coordinator.opts.Log, "waiting for fuzzing process to terminate...\n") | |
| } | |
| } | |
| } | |
| } | |
| // RunFuzzWorker is called in a worker process to communicate with the | |
| // coordinator process in order to fuzz random inputs. RunFuzzWorker loops | |
| // until the coordinator tells it to stop. | |
| // | |
| // fn is a wrapper on the fuzz function. It may return an error to indicate | |
| // a given input "crashed". The coordinator will also record a crasher if | |
| // the function times out or terminates the process. | |
| // | |
| // RunFuzzWorker returns an error if it could not communicate with the | |
| // coordinator process. | |
| func RunFuzzWorker(ctx context.Context, fn func(CorpusEntry) error) error { | |
| comm, err := getWorkerComm() | |
| if err != nil { | |
| return err | |
| } | |
| srv := &workerServer{ | |
| workerComm: comm, | |
| fuzzFn: func(e CorpusEntry) (time.Duration, error) { | |
| timer := time.AfterFunc(10*time.Second, func() { | |
| panic("deadlocked!") // this error message won't be printed | |
| }) | |
| defer timer.Stop() | |
| start := time.Now() | |
| err := fn(e) | |
| return time.Since(start), err | |
| }, | |
| m: newMutator(), | |
| } | |
| return srv.serve(ctx) | |
| } | |
| // call is serialized and sent from the coordinator on fuzz_in. It acts as | |
| // a minimalist RPC mechanism. Exactly one of its fields must be set to indicate | |
| // which method to call. | |
| type call struct { | |
| Ping *pingArgs | |
| Fuzz *fuzzArgs | |
| Minimize *minimizeArgs | |
| } | |
| // minimizeArgs contains arguments to workerServer.minimize. The value to | |
| // minimize is already in shared memory. | |
| type minimizeArgs struct { | |
| // Timeout is the time to spend minimizing. This may include time to start up, | |
| // especially if the input causes the worker process to terminated, requiring | |
| // repeated restarts. | |
| Timeout time.Duration | |
| // Limit is the maximum number of values to test, without spending more time | |
| // than Duration. 0 indicates no limit. | |
| Limit int64 | |
| // KeepCoverage is a set of coverage counters the worker should attempt to | |
| // keep in minimized values. When provided, the worker will reject inputs that | |
| // don't cause at least one of these bits to be set. | |
| KeepCoverage []byte | |
| // Index is the index of the fuzz target parameter to be minimized. | |
| Index int | |
| } | |
| // minimizeResponse contains results from workerServer.minimize. | |
| type minimizeResponse struct { | |
| // WroteToMem is true if the worker found a smaller input and wrote it to | |
| // shared memory. If minimizeArgs.KeepCoverage was set, the minimized input | |
| // preserved at least one coverage bit and did not cause an error. | |
| // Otherwise, the minimized input caused some error, recorded in Err. | |
| WroteToMem bool | |
| // Err is the error string caused by the value in shared memory, if any. | |
| Err string | |
| // CoverageData is the set of coverage bits activated by the minimized value | |
| // in shared memory. When set, it contains at least one bit from KeepCoverage. | |
| // CoverageData will be nil if Err is set or if minimization failed. | |
| CoverageData []byte | |
| // Duration is the time spent minimizing, not including starting or cleaning up. | |
| Duration time.Duration | |
| // Count is the number of values tested. | |
| Count int64 | |
| } | |
| // fuzzArgs contains arguments to workerServer.fuzz. The value to fuzz is | |
| // passed in shared memory. | |
| type fuzzArgs struct { | |
| // Timeout is the time to spend fuzzing, not including starting or | |
| // cleaning up. | |
| Timeout time.Duration | |
| // Limit is the maximum number of values to test, without spending more time | |
| // than Duration. 0 indicates no limit. | |
| Limit int64 | |
| // Warmup indicates whether this is part of a warmup run, meaning that | |
| // fuzzing should not occur. If coverageEnabled is true, then coverage data | |
| // should be reported. | |
| Warmup bool | |
| // CoverageData is the coverage data. If set, the worker should update its | |
| // local coverage data prior to fuzzing. | |
| CoverageData []byte | |
| } | |
| // fuzzResponse contains results from workerServer.fuzz. | |
| type fuzzResponse struct { | |
| // Duration is the time spent fuzzing, not including starting or cleaning up. | |
| TotalDuration time.Duration | |
| InterestingDuration time.Duration | |
| // Count is the number of values tested. | |
| Count int64 | |
| // CoverageData is set if the value in shared memory expands coverage | |
| // and therefore may be interesting to the coordinator. | |
| CoverageData []byte | |
| // Err is the error string caused by the value in shared memory, which is | |
| // non-empty if the value in shared memory caused a crash. | |
| Err string | |
| // InternalErr is the error string caused by an internal error in the | |
| // worker. This shouldn't be considered a crasher. | |
| InternalErr string | |
| } | |
| // pingArgs contains arguments to workerServer.ping. | |
| type pingArgs struct{} | |
| // pingResponse contains results from workerServer.ping. | |
| type pingResponse struct{} | |
| // workerComm holds pipes and shared memory used for communication | |
| // between the coordinator process (client) and a worker process (server). | |
| // These values are unique to each worker; they are shared only with the | |
| // coordinator, not with other workers. | |
| // | |
| // Access to shared memory is synchronized implicitly over the RPC protocol | |
| // implemented in workerServer and workerClient. During a call, the client | |
| // (worker) has exclusive access to shared memory; at other times, the server | |
| // (coordinator) has exclusive access. | |
| type workerComm struct { | |
| fuzzIn, fuzzOut *os.File | |
| memMu chan *sharedMem // mutex guarding shared memory | |
| } | |
| // workerServer is a minimalist RPC server, run by fuzz worker processes. | |
| // It allows the coordinator process (using workerClient) to call methods in a | |
| // worker process. This system allows the coordinator to run multiple worker | |
| // processes in parallel and to collect inputs that caused crashes from shared | |
| // memory after a worker process terminates unexpectedly. | |
| type workerServer struct { | |
| workerComm | |
| m *mutator | |
| // coverageMask is the local coverage data for the worker. It is | |
| // periodically updated to reflect the data in the coordinator when new | |
| // coverage is found. | |
| coverageMask []byte | |
| // fuzzFn runs the worker's fuzz target on the given input and returns an | |
| // error if it finds a crasher (the process may also exit or crash), and the | |
| // time it took to run the input. It sets a deadline of 10 seconds, at which | |
| // point it will panic with the assumption that the process is hanging or | |
| // deadlocked. | |
| fuzzFn func(CorpusEntry) (time.Duration, error) | |
| } | |
| // serve reads serialized RPC messages on fuzzIn. When serve receives a message, | |
| // it calls the corresponding method, then sends the serialized result back | |
| // on fuzzOut. | |
| // | |
| // serve handles RPC calls synchronously; it will not attempt to read a message | |
| // until the previous call has finished. | |
| // | |
| // serve returns errors that occurred when communicating over pipes. serve | |
| // does not return errors from method calls; those are passed through serialized | |
| // responses. | |
| func (ws *workerServer) serve(ctx context.Context) error { | |
| enc := json.NewEncoder(ws.fuzzOut) | |
| dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn}) | |
| for { | |
| var c call | |
| if err := dec.Decode(&c); err != nil { | |
| if err == io.EOF || err == ctx.Err() { | |
| return nil | |
| } else { | |
| return err | |
| } | |
| } | |
| var resp any | |
| switch { | |
| case c.Fuzz != nil: | |
| resp = ws.fuzz(ctx, *c.Fuzz) | |
| case c.Minimize != nil: | |
| resp = ws.minimize(ctx, *c.Minimize) | |
| case c.Ping != nil: | |
| resp = ws.ping(ctx, *c.Ping) | |
| default: | |
| return errors.New("no arguments provided for any call") | |
| } | |
| if err := enc.Encode(resp); err != nil { | |
| return err | |
| } | |
| } | |
| } | |
| // chainedMutations is how many mutations are applied before the worker | |
| // resets the input to its original state. | |
| // NOTE: this number was picked without much thought. It is low enough that | |
| // it seems to create a significant diversity in mutated inputs. We may want | |
| // to consider looking into this more closely once we have a proper performance | |
| // testing framework. Another option is to randomly pick the number of chained | |
| // mutations on each invocation of the workerServer.fuzz method (this appears to | |
| // be what libFuzzer does, although there seems to be no documentation which | |
| // explains why this choice was made.) | |
| const chainedMutations = 5 | |
| // fuzz runs the test function on random variations of the input value in shared | |
| // memory for a limited duration or number of iterations. | |
| // | |
| // fuzz returns early if it finds an input that crashes the fuzz function (with | |
| // fuzzResponse.Err set) or an input that expands coverage (with | |
| // fuzzResponse.InterestingDuration set). | |
| // | |
| // fuzz does not modify the input in shared memory. Instead, it saves the | |
| // initial PRNG state in shared memory and increments a counter in shared | |
| // memory before each call to the test function. The caller may reconstruct | |
| // the crashing input with this information, since the PRNG is deterministic. | |
| func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) (resp fuzzResponse) { | |
| if args.CoverageData != nil { | |
| if ws.coverageMask != nil && len(args.CoverageData) != len(ws.coverageMask) { | |
| resp.InternalErr = fmt.Sprintf("unexpected size for CoverageData: got %d, expected %d", len(args.CoverageData), len(ws.coverageMask)) | |
| return resp | |
| } | |
| ws.coverageMask = args.CoverageData | |
| } | |
| start := time.Now() | |
| defer func() { resp.TotalDuration = time.Since(start) }() | |
| if args.Timeout != 0 { | |
| var cancel func() | |
| ctx, cancel = context.WithTimeout(ctx, args.Timeout) | |
| defer cancel() | |
| } | |
| mem := <-ws.memMu | |
| ws.m.r.save(&mem.header().randState, &mem.header().randInc) | |
| defer func() { | |
| resp.Count = mem.header().count | |
| ws.memMu <- mem | |
| }() | |
| if args.Limit > 0 && mem.header().count >= args.Limit { | |
| resp.InternalErr = fmt.Sprintf("mem.header().count %d already exceeds args.Limit %d", mem.header().count, args.Limit) | |
| return resp | |
| } | |
| originalVals, err := unmarshalCorpusFile(mem.valueCopy()) | |
| if err != nil { | |
| resp.InternalErr = err.Error() | |
| return resp | |
| } | |
| vals := make([]any, len(originalVals)) | |
| copy(vals, originalVals) | |
| shouldStop := func() bool { | |
| return args.Limit > 0 && mem.header().count >= args.Limit | |
| } | |
| fuzzOnce := func(entry CorpusEntry) (dur time.Duration, cov []byte, errMsg string) { | |
| mem.header().count++ | |
| var err error | |
| dur, err = ws.fuzzFn(entry) | |
| if err != nil { | |
| errMsg = err.Error() | |
| if errMsg == "" { | |
| errMsg = "fuzz function failed with no input" | |
| } | |
| return dur, nil, errMsg | |
| } | |
| if ws.coverageMask != nil && countNewCoverageBits(ws.coverageMask, coverageSnapshot) > 0 { | |
| return dur, coverageSnapshot, "" | |
| } | |
| return dur, nil, "" | |
| } | |
| if args.Warmup { | |
| dur, _, errMsg := fuzzOnce(CorpusEntry{Values: vals}) | |
| if errMsg != "" { | |
| resp.Err = errMsg | |
| return resp | |
| } | |
| resp.InterestingDuration = dur | |
| if coverageEnabled { | |
| resp.CoverageData = coverageSnapshot | |
| } | |
| return resp | |
| } | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| return resp | |
| default: | |
| if mem.header().count%chainedMutations == 0 { | |
| copy(vals, originalVals) | |
| ws.m.r.save(&mem.header().randState, &mem.header().randInc) | |
| } | |
| ws.m.mutate(vals, cap(mem.valueRef())) | |
| entry := CorpusEntry{Values: vals} | |
| dur, cov, errMsg := fuzzOnce(entry) | |
| if errMsg != "" { | |
| resp.Err = errMsg | |
| return resp | |
| } | |
| if cov != nil { | |
| resp.CoverageData = cov | |
| resp.InterestingDuration = dur | |
| return resp | |
| } | |
| if shouldStop() { | |
| return resp | |
| } | |
| } | |
| } | |
| } | |
| func (ws *workerServer) minimize(ctx context.Context, args minimizeArgs) (resp minimizeResponse) { | |
| start := time.Now() | |
| defer func() { resp.Duration = time.Since(start) }() | |
| mem := <-ws.memMu | |
| defer func() { ws.memMu <- mem }() | |
| vals, err := unmarshalCorpusFile(mem.valueCopy()) | |
| if err != nil { | |
| panic(err) | |
| } | |
| inpHash := sha256.Sum256(mem.valueCopy()) | |
| if args.Timeout != 0 { | |
| var cancel func() | |
| ctx, cancel = context.WithTimeout(ctx, args.Timeout) | |
| defer cancel() | |
| } | |
| // Minimize the values in vals, then write to shared memory. We only write | |
| // to shared memory after completing minimization. | |
| success, err := ws.minimizeInput(ctx, vals, mem, args) | |
| if success { | |
| writeToMem(vals, mem) | |
| outHash := sha256.Sum256(mem.valueCopy()) | |
| mem.header().rawInMem = false | |
| resp.WroteToMem = true | |
| if err != nil { | |
| resp.Err = err.Error() | |
| } else { | |
| // If the values didn't change during minimization then coverageSnapshot is likely | |
| // a dirty snapshot which represents the very last step of minimization, not the | |
| // coverage for the initial input. In that case just return the coverage we were | |
| // given initially, since it more accurately represents the coverage map for the | |
| // input we are returning. | |
| if outHash != inpHash { | |
| resp.CoverageData = coverageSnapshot | |
| } else { | |
| resp.CoverageData = args.KeepCoverage | |
| } | |
| } | |
| } | |
| return resp | |
| } | |
| // minimizeInput applies a series of minimizing transformations on the provided | |
| // vals, ensuring that each minimization still causes an error, or keeps | |
| // coverage, in fuzzFn. It uses the context to determine how long to run, | |
| // stopping once closed. It returns a bool indicating whether minimization was | |
| // successful and an error if one was found. | |
| func (ws *workerServer) minimizeInput(ctx context.Context, vals []any, mem *sharedMem, args minimizeArgs) (success bool, retErr error) { | |
| keepCoverage := args.KeepCoverage | |
| memBytes := mem.valueRef() | |
| bPtr := &memBytes | |
| count := &mem.header().count | |
| shouldStop := func() bool { | |
| return ctx.Err() != nil || | |
| (args.Limit > 0 && *count >= args.Limit) | |
| } | |
| if shouldStop() { | |
| return false, nil | |
| } | |
| // Check that the original value preserves coverage or causes an error. | |
| // If not, then whatever caused us to think the value was interesting may | |
| // have been a flake, and we can't minimize it. | |
| *count++ | |
| _, retErr = ws.fuzzFn(CorpusEntry{Values: vals}) | |
| if keepCoverage != nil { | |
| if !hasCoverageBit(keepCoverage, coverageSnapshot) || retErr != nil { | |
| return false, nil | |
| } | |
| } else if retErr == nil { | |
| return false, nil | |
| } | |
| mem.header().rawInMem = true | |
| // tryMinimized runs the fuzz function with candidate replacing the value | |
| // at index valI. tryMinimized returns whether the input with candidate is | |
| // interesting for the same reason as the original input: it returns | |
| // an error if one was expected, or it preserves coverage. | |
| tryMinimized := func(candidate []byte) bool { | |
| prev := vals[args.Index] | |
| switch prev.(type) { | |
| case []byte: | |
| vals[args.Index] = candidate | |
| case string: | |
| vals[args.Index] = string(candidate) | |
| default: | |
| panic("impossible") | |
| } | |
| copy(*bPtr, candidate) | |
| *bPtr = (*bPtr)[:len(candidate)] | |
| mem.setValueLen(len(candidate)) | |
| *count++ | |
| _, err := ws.fuzzFn(CorpusEntry{Values: vals}) | |
| if err != nil { | |
| retErr = err | |
| if keepCoverage != nil { | |
| // Now that we've found a crash, that's more important than any | |
| // minimization of interesting inputs that was being done. Clear out | |
| // keepCoverage to only minimize the crash going forward. | |
| keepCoverage = nil | |
| } | |
| return true | |
| } | |
| // Minimization should preserve coverage bits. | |
| if keepCoverage != nil && isCoverageSubset(keepCoverage, coverageSnapshot) { | |
| return true | |
| } | |
| vals[args.Index] = prev | |
| return false | |
| } | |
| switch v := vals[args.Index].(type) { | |
| case string: | |
| minimizeBytes([]byte(v), tryMinimized, shouldStop) | |
| case []byte: | |
| minimizeBytes(v, tryMinimized, shouldStop) | |
| default: | |
| panic("impossible") | |
| } | |
| return true, retErr | |
| } | |
| func writeToMem(vals []any, mem *sharedMem) { | |
| b := marshalCorpusFile(vals...) | |
| mem.setValue(b) | |
| } | |
| // ping does nothing. The coordinator calls this method to ensure the worker | |
| // has called F.Fuzz and can communicate. | |
| func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse { | |
| return pingResponse{} | |
| } | |
| // workerClient is a minimalist RPC client. The coordinator process uses a | |
| // workerClient to call methods in each worker process (handled by | |
| // workerServer). | |
| type workerClient struct { | |
| workerComm | |
| m *mutator | |
| // mu is the mutex protecting the workerComm.fuzzIn pipe. This must be | |
| // locked before making calls to the workerServer. It prevents | |
| // workerClient.Close from closing fuzzIn while workerClient methods are | |
| // writing to it concurrently, and prevents multiple callers from writing to | |
| // fuzzIn concurrently. | |
| mu sync.Mutex | |
| } | |
| func newWorkerClient(comm workerComm, m *mutator) *workerClient { | |
| return &workerClient{workerComm: comm, m: m} | |
| } | |
| // Close shuts down the connection to the RPC server (the worker process) by | |
| // closing fuzz_in. Close drains fuzz_out (avoiding a SIGPIPE in the worker), | |
| // and closes it after the worker process closes the other end. | |
| func (wc *workerClient) Close() error { | |
| wc.mu.Lock() | |
| defer wc.mu.Unlock() | |
| // Close fuzzIn. This signals to the server that there are no more calls, | |
| // and it should exit. | |
| if err := wc.fuzzIn.Close(); err != nil { | |
| wc.fuzzOut.Close() | |
| return err | |
| } | |
| // Drain fuzzOut and close it. When the server exits, the kernel will close | |
| // its end of fuzzOut, and we'll get EOF. | |
| if _, err := io.Copy(io.Discard, wc.fuzzOut); err != nil { | |
| wc.fuzzOut.Close() | |
| return err | |
| } | |
| return wc.fuzzOut.Close() | |
| } | |
| // errSharedMemClosed is returned by workerClient methods that cannot access | |
| // shared memory because it was closed and unmapped by another goroutine. That | |
| // can happen when worker.cleanup is called in the worker goroutine while a | |
| // workerClient.fuzz call runs concurrently. | |
| // | |
| // This error should not be reported. It indicates the operation was | |
| // interrupted. | |
| var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped") | |
| // minimize tells the worker to call the minimize method. See | |
| // workerServer.minimize. | |
| func (wc *workerClient) minimize(ctx context.Context, entryIn CorpusEntry, args minimizeArgs) (entryOut CorpusEntry, resp minimizeResponse, retErr error) { | |
| wc.mu.Lock() | |
| defer wc.mu.Unlock() | |
| mem, ok := <-wc.memMu | |
| if !ok { | |
| return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed | |
| } | |
| defer func() { wc.memMu <- mem }() | |
| mem.header().count = 0 | |
| inp, err := corpusEntryData(entryIn) | |
| if err != nil { | |
| return CorpusEntry{}, minimizeResponse{}, err | |
| } | |
| mem.setValue(inp) | |
| entryOut = entryIn | |
| entryOut.Values, err = unmarshalCorpusFile(inp) | |
| if err != nil { | |
| return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling provided value: %v", err) | |
| } | |
| for i, v := range entryOut.Values { | |
| if !isMinimizable(reflect.TypeOf(v)) { | |
| continue | |
| } | |
| wc.memMu <- mem | |
| args.Index = i | |
| c := call{Minimize: &args} | |
| callErr := wc.callLocked(ctx, c, &resp) | |
| mem, ok = <-wc.memMu | |
| if !ok { | |
| return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed | |
| } | |
| if callErr != nil { | |
| retErr = callErr | |
| if !mem.header().rawInMem { | |
| // An unrecoverable error occurred before minimization began. | |
| return entryIn, minimizeResponse{}, retErr | |
| } | |
| // An unrecoverable error occurred during minimization. mem now | |
| // holds the raw, unmarshaled bytes of entryIn.Values[i] that | |
| // caused the error. | |
| switch entryOut.Values[i].(type) { | |
| case string: | |
| entryOut.Values[i] = string(mem.valueCopy()) | |
| case []byte: | |
| entryOut.Values[i] = mem.valueCopy() | |
| default: | |
| panic("impossible") | |
| } | |
| entryOut.Data = marshalCorpusFile(entryOut.Values...) | |
| // Stop minimizing; another unrecoverable error is likely to occur. | |
| break | |
| } | |
| if resp.WroteToMem { | |
| // Minimization succeeded, and mem holds the marshaled data. | |
| entryOut.Data = mem.valueCopy() | |
| entryOut.Values, err = unmarshalCorpusFile(entryOut.Data) | |
| if err != nil { | |
| return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling minimized value: %v", err) | |
| } | |
| } | |
| // Prepare for next iteration of the loop. | |
| if args.Timeout != 0 { | |
| args.Timeout -= resp.Duration | |
| if args.Timeout <= 0 { | |
| break | |
| } | |
| } | |
| if args.Limit != 0 { | |
| args.Limit -= mem.header().count | |
| if args.Limit <= 0 { | |
| break | |
| } | |
| } | |
| } | |
| resp.Count = mem.header().count | |
| h := sha256.Sum256(entryOut.Data) | |
| entryOut.Path = fmt.Sprintf("%x", h[:4]) | |
| return entryOut, resp, retErr | |
| } | |
| // fuzz tells the worker to call the fuzz method. See workerServer.fuzz. | |
| func (wc *workerClient) fuzz(ctx context.Context, entryIn CorpusEntry, args fuzzArgs) (entryOut CorpusEntry, resp fuzzResponse, isInternalError bool, err error) { | |
| wc.mu.Lock() | |
| defer wc.mu.Unlock() | |
| mem, ok := <-wc.memMu | |
| if !ok { | |
| return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed | |
| } | |
| mem.header().count = 0 | |
| inp, err := corpusEntryData(entryIn) | |
| if err != nil { | |
| wc.memMu <- mem | |
| return CorpusEntry{}, fuzzResponse{}, true, err | |
| } | |
| mem.setValue(inp) | |
| wc.memMu <- mem | |
| c := call{Fuzz: &args} | |
| callErr := wc.callLocked(ctx, c, &resp) | |
| if resp.InternalErr != "" { | |
| return CorpusEntry{}, fuzzResponse{}, true, errors.New(resp.InternalErr) | |
| } | |
| mem, ok = <-wc.memMu | |
| if !ok { | |
| return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed | |
| } | |
| defer func() { wc.memMu <- mem }() | |
| resp.Count = mem.header().count | |
| if !bytes.Equal(inp, mem.valueRef()) { | |
| return CorpusEntry{}, fuzzResponse{}, true, errors.New("workerServer.fuzz modified input") | |
| } | |
| needEntryOut := callErr != nil || resp.Err != "" || | |
| (!args.Warmup && resp.CoverageData != nil) | |
| if needEntryOut { | |
| valuesOut, err := unmarshalCorpusFile(inp) | |
| if err != nil { | |
| return CorpusEntry{}, fuzzResponse{}, true, fmt.Errorf("unmarshaling fuzz input value after call: %v", err) | |
| } | |
| wc.m.r.restore(mem.header().randState, mem.header().randInc) | |
| if !args.Warmup { | |
| // Only mutate the valuesOut if fuzzing actually occurred. | |
| numMutations := ((resp.Count - 1) % chainedMutations) + 1 | |
| for i := int64(0); i < numMutations; i++ { | |
| wc.m.mutate(valuesOut, cap(mem.valueRef())) | |
| } | |
| } | |
| dataOut := marshalCorpusFile(valuesOut...) | |
| h := sha256.Sum256(dataOut) | |
| name := fmt.Sprintf("%x", h[:4]) | |
| entryOut = CorpusEntry{ | |
| Parent: entryIn.Path, | |
| Path: name, | |
| Data: dataOut, | |
| Generation: entryIn.Generation + 1, | |
| } | |
| if args.Warmup { | |
| // The bytes weren't mutated, so if entryIn was a seed corpus value, | |
| // then entryOut is too. | |
| entryOut.IsSeed = entryIn.IsSeed | |
| } | |
| } | |
| return entryOut, resp, false, callErr | |
| } | |
| // ping tells the worker to call the ping method. See workerServer.ping. | |
| func (wc *workerClient) ping(ctx context.Context) error { | |
| wc.mu.Lock() | |
| defer wc.mu.Unlock() | |
| c := call{Ping: &pingArgs{}} | |
| var resp pingResponse | |
| return wc.callLocked(ctx, c, &resp) | |
| } | |
| // callLocked sends an RPC from the coordinator to the worker process and waits | |
| // for the response. The callLocked may be canceled with ctx. | |
| func (wc *workerClient) callLocked(ctx context.Context, c call, resp any) (err error) { | |
| enc := json.NewEncoder(wc.fuzzIn) | |
| dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut}) | |
| if err := enc.Encode(c); err != nil { | |
| return err | |
| } | |
| return dec.Decode(resp) | |
| } | |
| // contextReader wraps a Reader with a Context. If the context is canceled | |
| // while the underlying reader is blocked, Read returns immediately. | |
| // | |
| // This is useful for reading from a pipe. Closing a pipe file descriptor does | |
| // not unblock pending Reads on that file descriptor. All copies of the pipe's | |
| // other file descriptor (the write end) must be closed in all processes that | |
| // inherit it. This is difficult to do correctly in the situation we care about | |
| // (process group termination). | |
| type contextReader struct { | |
| ctx context.Context | |
| r io.Reader | |
| } | |
| func (cr *contextReader) Read(b []byte) (int, error) { | |
| if ctxErr := cr.ctx.Err(); ctxErr != nil { | |
| return 0, ctxErr | |
| } | |
| done := make(chan struct{}) | |
| // This goroutine may stay blocked after Read returns because the underlying | |
| // read is blocked. | |
| var n int | |
| var err error | |
| go func() { | |
| n, err = cr.r.Read(b) | |
| close(done) | |
| }() | |
| select { | |
| case <-cr.ctx.Done(): | |
| return 0, cr.ctx.Err() | |
| case <-done: | |
| return n, err | |
| } | |
| } | |