Issue
I'm writing an app using Go that is interacting with Spotify's API and I find myself needing to use an infinite for loop to call an endpoint until the length of the returned slice is less than the limit, signalling that I've reached the end of the available entries.
For my user account, there are 1644 saved albums (I determined this by looping through without using goroutines). However, when I add goroutines in, I'm getting back 2544 saved albums with duplicates. I'm also using the semaphore pattern to limit the number of goroutines so that I don't exceed the rate limit.
I assume that the issue is with using the active
variable rather than channels, but my attempt at that just resulted in an infinite loop
wg := &sync.WaitGroup{}
sem := make(chan bool, 20)
active := true
offset := 0
for {
sem <- true
if active {
// add each new goroutine to waitgroup
wg.Add(1)
go func() error {
// remove from waitgroup when goroutine is complete
defer wg.Done()
// release the worker
defer func() { <-sem }()
savedAlbums, err := client.CurrentUsersAlbums(ctx, spotify.Limit(50), spotify.Offset(offset))
if err != nil {
return err
}
userAlbums = append(userAlbums, savedAlbums.Albums...)
if len(savedAlbums.Albums) < 50 {
// since the limit is set to 50, we know that if the number of returned albums
// is less than 50 that we're done retrieving data
active = false
return nil
} else {
offset += 50
return nil
}
}()
} else {
wg.Wait()
break
}
}
Thanks in advance!
Solution
I suspect that your main issue may be a misunderstanding of what the go
keyword does; from the docs:
A "go" statement starts the execution of a function call as an independent concurrent thread of control, or goroutine, within the same address space.
So go func() error {
starts the execution of the closure; it does not mean that any of the code runs immediately. In fact because, client.CurrentUsersAlbums
will take a while, it's likely you will be requesting the first 50 items 20 times. This can be demonstrated with a simplified version of your application (playground)
func main() {
wg := &sync.WaitGroup{}
sem := make(chan bool, 20)
active := true
offset := 0
for {
sem <- true
if active {
// add each new goroutine to waitgroup
wg.Add(1)
go func() error {
// remove from waitgroup when goroutine is complete
defer wg.Done()
// release the worker
defer func() { <-sem }()
fmt.Println("Getting from:", offset)
time.Sleep(time.Millisecond) // Simulate the query
// Pretend that we got back 50 albums
offset += 50
if offset > 2000 {
active = false
}
return nil
}()
} else {
wg.Wait()
break
}
}
}
Running this will produce somewhat unpredictable results (note that the playground caches results so try it on your machine) but you will probably see 20 X Getting from: 0
.
A further issue is data races. Updating a variable from multiple goroutines without protection (e.g. sync.Mutex
) results in undefined behaviour.
You will want to know how to fix this but unfortunately you will need to rethink your algorithm. Currently the process you are following is:
- Set
pos
to 0 - Get
50
records starting frompos
- If we got
50
records thenpos=pos+50
andloop back to step 2
This is a sequential algorithm; you don't know whether you have all of the data until you have requested the previous section. I guess you could make speculative queries (and handle failures) but a better solution would be to find some way to determine the number of results expected and then split the queries to get that number of records between multiple goroutines.
Note that if you do know the number of responses then you can do something like the following (playground):
noOfResultsToGet := 1644 // In the below we are getting 0-1643
noOfResultsPerRequest := 50
noOfSimultaneousRequests := 20 // You may not need this but many services will limit the number of simultaneous requests you can make (or, at least, rate limit them)
requestChan := make(chan int) // Will be passed the starting #
responseChan := make(chan []string) // Response from whatever request we are making (can be any type really)
// Start goroutines to make the requests
var wg sync.WaitGroup
wg.Add(noOfSimultaneousRequests)
for i := 0; i < noOfSimultaneousRequests; i++ {
go func(routineNo int) {
defer wg.Done()
for startPos := range requestChan {
// Simulate making the request
maxResult := startPos + noOfResultsPerRequest
if maxResult > noOfResultsToGet {
maxResult = noOfResultsToGet
}
rsp := make([]string, 0, noOfResultsPerRequest)
for x := startPos; x < maxResult; x++ {
rsp = append(rsp, strconv.Itoa(x))
}
responseChan <- rsp
fmt.Printf("Goroutine %d handling data from %d to %d\n", routineNo, startPos, startPos+noOfResultsPerRequest)
}
}(i)
}
// Close the response channel when all goroutines have shut down
go func() {
wg.Wait()
close(responseChan)
}()
// Send the requests
go func() {
for reqFrom := 0; reqFrom < noOfResultsToGet; reqFrom += noOfResultsPerRequest {
requestChan <- reqFrom
}
close(requestChan) // Allow goroutines to exit
}()
// Receive responses (note that these may be out of order)
result := make([]string, 0, noOfResultsToGet)
for x := range responseChan {
result = append(result, x...)
}
// Order the results and output (results from gorouting may come back in any order)
sort.Slice(result, func(i, j int) bool {
a, _ := strconv.Atoi(result[i])
b, _ := strconv.Atoi(result[j])
return a < b
})
fmt.Printf("Result: %v", result)
Relying on channels to pass messages often makes this kind of thing easier to think about and reduces the chance that you will make a mistake.
Answered By - Brits Answer Checked By - Timothy Miller (PHPFixing Admin)
0 Comments:
Post a Comment
Note: Only a member of this blog may post a comment.