| |
| |
| |
|
|
| package singleflight |
|
|
| import ( |
| "bytes" |
| "errors" |
| "fmt" |
| "os" |
| "os/exec" |
| "runtime" |
| "runtime/debug" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "testing" |
| "time" |
| ) |
|
|
| func TestDo(t *testing.T) { |
| var g Group[string] |
| v, err, _ := g.Do("key", func() (string, error) { |
| return "bar", nil |
| }) |
| if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { |
| t.Errorf("Do = %v; want %v", got, want) |
| } |
| if err != nil { |
| t.Errorf("Do error = %v", err) |
| } |
| } |
|
|
| func TestDoErr(t *testing.T) { |
| var g Group[any] |
| someErr := errors.New("Some error") |
| v, err, _ := g.Do("key", func() (any, error) { |
| return nil, someErr |
| }) |
| if err != someErr { |
| t.Errorf("Do error = %v; want someErr %v", err, someErr) |
| } |
| if v != nil { |
| t.Errorf("unexpected non-nil value %#v", v) |
| } |
| } |
|
|
| func TestDoDupSuppress(t *testing.T) { |
| var g Group[string] |
| var wg1, wg2 sync.WaitGroup |
| c := make(chan string, 1) |
| var calls int32 |
| fn := func() (string, error) { |
| if atomic.AddInt32(&calls, 1) == 1 { |
| |
| wg1.Done() |
| } |
| v := <-c |
| c <- v |
|
|
| time.Sleep(10 * time.Millisecond) |
|
|
| return v, nil |
| } |
|
|
| const n = 10 |
| wg1.Add(1) |
| for i := 0; i < n; i++ { |
| wg1.Add(1) |
| wg2.Add(1) |
| go func() { |
| defer wg2.Done() |
| wg1.Done() |
| v, err, _ := g.Do("key", fn) |
| if err != nil { |
| t.Errorf("Do error: %v", err) |
| return |
| } |
| if v != "bar" { |
| t.Errorf("Do = %T %v; want %q", v, v, "bar") |
| } |
| }() |
| } |
| wg1.Wait() |
| |
| |
| c <- "bar" |
| wg2.Wait() |
| if got := atomic.LoadInt32(&calls); got <= 0 || got >= n { |
| t.Errorf("number of calls = %d; want over 0 and less than %d", got, n) |
| } |
| } |
|
|
| |
| |
| func TestForget(t *testing.T) { |
| var g Group[any] |
|
|
| var ( |
| firstStarted = make(chan struct{}) |
| unblockFirst = make(chan struct{}) |
| firstFinished = make(chan struct{}) |
| ) |
|
|
| go func() { |
| g.Do("key", func() (i any, e error) { |
| close(firstStarted) |
| <-unblockFirst |
| close(firstFinished) |
| return |
| }) |
| }() |
| <-firstStarted |
| g.Forget("key") |
|
|
| unblockSecond := make(chan struct{}) |
| secondResult := g.DoChan("key", func() (i any, e error) { |
| <-unblockSecond |
| return 2, nil |
| }) |
|
|
| close(unblockFirst) |
| <-firstFinished |
|
|
| thirdResult := g.DoChan("key", func() (i any, e error) { |
| return 3, nil |
| }) |
|
|
| close(unblockSecond) |
| <-secondResult |
| r := <-thirdResult |
| if r.Val != 2 { |
| t.Errorf("We should receive result produced by second call, expected: 2, got %d", r.Val) |
| } |
| } |
|
|
| func TestDoChan(t *testing.T) { |
| var g Group[string] |
| ch := g.DoChan("key", func() (string, error) { |
| return "bar", nil |
| }) |
|
|
| res := <-ch |
| v := res.Val |
| err := res.Err |
| if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { |
| t.Errorf("Do = %v; want %v", got, want) |
| } |
| if err != nil { |
| t.Errorf("Do error = %v", err) |
| } |
| } |
|
|
| |
| |
| func TestPanicDo(t *testing.T) { |
| var g Group[any] |
| fn := func() (any, error) { |
| panic("invalid memory address or nil pointer dereference") |
| } |
|
|
| const n = 5 |
| waited := int32(n) |
| panicCount := int32(0) |
| done := make(chan struct{}) |
| for i := 0; i < n; i++ { |
| go func() { |
| defer func() { |
| if err := recover(); err != nil { |
| t.Logf("Got panic: %v\n%s", err, debug.Stack()) |
| atomic.AddInt32(&panicCount, 1) |
| } |
|
|
| if atomic.AddInt32(&waited, -1) == 0 { |
| close(done) |
| } |
| }() |
|
|
| g.Do("key", fn) |
| }() |
| } |
|
|
| select { |
| case <-done: |
| if panicCount != n { |
| t.Errorf("Expect %d panic, but got %d", n, panicCount) |
| } |
| case <-time.After(time.Second): |
| t.Fatalf("Do hangs") |
| } |
| } |
|
|
| func TestGoexitDo(t *testing.T) { |
| var g Group[any] |
| fn := func() (any, error) { |
| runtime.Goexit() |
| return nil, nil |
| } |
|
|
| const n = 5 |
| waited := int32(n) |
| done := make(chan struct{}) |
| for i := 0; i < n; i++ { |
| go func() { |
| var err error |
| defer func() { |
| if err != nil { |
| t.Errorf("Error should be nil, but got: %v", err) |
| } |
| if atomic.AddInt32(&waited, -1) == 0 { |
| close(done) |
| } |
| }() |
| _, err, _ = g.Do("key", fn) |
| }() |
| } |
|
|
| select { |
| case <-done: |
| case <-time.After(time.Second): |
| t.Fatalf("Do hangs") |
| } |
| } |
|
|
| func TestPanicDoChan(t *testing.T) { |
| if runtime.GOOS == "js" { |
| t.Skipf("js does not support exec") |
| } |
|
|
| if os.Getenv("TEST_PANIC_DOCHAN") != "" { |
| defer func() { |
| recover() |
| }() |
|
|
| g := new(Group[any]) |
| ch := g.DoChan("", func() (any, error) { |
| panic("Panicking in DoChan") |
| }) |
| <-ch |
| t.Fatalf("DoChan unexpectedly returned") |
| } |
|
|
| t.Parallel() |
|
|
| cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v") |
| cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") |
| out := new(bytes.Buffer) |
| cmd.Stdout = out |
| cmd.Stderr = out |
| if err := cmd.Start(); err != nil { |
| t.Fatal(err) |
| } |
|
|
| err := cmd.Wait() |
| t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) |
| if err == nil { |
| t.Errorf("Test subprocess passed; want a crash due to panic in DoChan") |
| } |
| if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { |
| t.Errorf("Test subprocess failed with an unexpected failure mode.") |
| } |
| if !bytes.Contains(out.Bytes(), []byte("Panicking in DoChan")) { |
| t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in DoChan") |
| } |
| } |
|
|
| func TestPanicDoSharedByDoChan(t *testing.T) { |
| if runtime.GOOS == "js" { |
| t.Skipf("js does not support exec") |
| } |
|
|
| if os.Getenv("TEST_PANIC_DOCHAN") != "" { |
| blocked := make(chan struct{}) |
| unblock := make(chan struct{}) |
|
|
| g := new(Group[any]) |
| go func() { |
| defer func() { |
| recover() |
| }() |
| g.Do("", func() (any, error) { |
| close(blocked) |
| <-unblock |
| panic("Panicking in Do") |
| }) |
| }() |
|
|
| <-blocked |
| ch := g.DoChan("", func() (any, error) { |
| panic("DoChan unexpectedly executed callback") |
| }) |
| close(unblock) |
| <-ch |
| t.Fatalf("DoChan unexpectedly returned") |
| } |
|
|
| t.Parallel() |
|
|
| cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v") |
| cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") |
| out := new(bytes.Buffer) |
| cmd.Stdout = out |
| cmd.Stderr = out |
| if err := cmd.Start(); err != nil { |
| t.Fatal(err) |
| } |
|
|
| err := cmd.Wait() |
| t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) |
| if err == nil { |
| t.Errorf("Test subprocess passed; want a crash due to panic in Do shared by DoChan") |
| } |
| if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { |
| t.Errorf("Test subprocess failed with an unexpected failure mode.") |
| } |
| if !bytes.Contains(out.Bytes(), []byte("Panicking in Do")) { |
| t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in Do") |
| } |
| } |
|
|