PHPFixing
  • Privacy Policy
  • TOS
  • Ask Question
  • Contact Us
  • Home
  • PHP
  • Programming
  • SQL Injection
  • Web3.0

Friday, September 30, 2022

[FIXED] How do I terminate an infinite loop from inside of a goroutine?

 September 30, 2022     concurrency, go     No comments   

Issue

I'm writing an app using Go that is interacting with Spotify's API and I find myself needing to use an infinite for loop to call an endpoint until the length of the returned slice is less than the limit, signalling that I've reached the end of the available entries.

For my user account, there are 1644 saved albums (I determined this by looping through without using goroutines). However, when I add goroutines in, I'm getting back 2544 saved albums with duplicates. I'm also using the semaphore pattern to limit the number of goroutines so that I don't exceed the rate limit.

I assume that the issue is with using the active variable rather than channels, but my attempt at that just resulted in an infinite loop

wg := &sync.WaitGroup{}
sem := make(chan bool, 20)
active := true
offset := 0
for {
    sem <- true
    if active {
        // add each new goroutine to waitgroup
        wg.Add(1)
        go func() error {
            // remove from waitgroup when goroutine is complete
            defer wg.Done()
            // release the worker
            defer func() { <-sem }()
            savedAlbums, err := client.CurrentUsersAlbums(ctx, spotify.Limit(50), spotify.Offset(offset))
            if err != nil {
                return err
            }
            userAlbums = append(userAlbums, savedAlbums.Albums...)
            if len(savedAlbums.Albums) < 50 {
                // since the limit is set to 50, we know that if the number of returned albums
                // is less than 50 that we're done retrieving data
                active = false
                return nil
            } else {
                offset += 50
                return nil
            }
        }()
    } else {
        wg.Wait()
        break
    }
}

Thanks in advance!


Solution

I suspect that your main issue may be a misunderstanding of what the go keyword does; from the docs:

A "go" statement starts the execution of a function call as an independent concurrent thread of control, or goroutine, within the same address space.

So go func() error { starts the execution of the closure; it does not mean that any of the code runs immediately. In fact because, client.CurrentUsersAlbums will take a while, it's likely you will be requesting the first 50 items 20 times. This can be demonstrated with a simplified version of your application (playground)

func main() {
    wg := &sync.WaitGroup{}
    sem := make(chan bool, 20)
    active := true
    offset := 0
    for {
        sem <- true
        if active {
            // add each new goroutine to waitgroup
            wg.Add(1)
            go func() error {
                // remove from waitgroup when goroutine is complete
                defer wg.Done()
                // release the worker
                defer func() { <-sem }()
                fmt.Println("Getting from:", offset)
                time.Sleep(time.Millisecond) // Simulate the query
                // Pretend that we got back 50 albums
                offset += 50
                if offset > 2000 {
                    active = false
                }
                return nil
            }()
        } else {
            wg.Wait()
            break
        }
    }
}

Running this will produce somewhat unpredictable results (note that the playground caches results so try it on your machine) but you will probably see 20 X Getting from: 0.

A further issue is data races. Updating a variable from multiple goroutines without protection (e.g. sync.Mutex) results in undefined behaviour.

You will want to know how to fix this but unfortunately you will need to rethink your algorithm. Currently the process you are following is:

  1. Set pos to 0
  2. Get 50 records starting from pos
  3. If we got 50 records then pos=pos+50 and loop back to step 2

This is a sequential algorithm; you don't know whether you have all of the data until you have requested the previous section. I guess you could make speculative queries (and handle failures) but a better solution would be to find some way to determine the number of results expected and then split the queries to get that number of records between multiple goroutines.

Note that if you do know the number of responses then you can do something like the following (playground):

noOfResultsToGet := 1644 // In the below we are getting 0-1643
noOfResultsPerRequest := 50
noOfSimultaneousRequests := 20 // You may not need this but many services will limit the number of simultaneous requests you can make (or, at least, rate limit them)

requestChan := make(chan int)       // Will be passed the starting #
responseChan := make(chan []string) // Response from whatever request we are making (can be any type really)

// Start goroutines to make the requests
var wg sync.WaitGroup
wg.Add(noOfSimultaneousRequests)
for i := 0; i < noOfSimultaneousRequests; i++ {
    go func(routineNo int) {
        defer wg.Done()
        for startPos := range requestChan {
            // Simulate making the request
            maxResult := startPos + noOfResultsPerRequest
            if maxResult > noOfResultsToGet {
                maxResult = noOfResultsToGet
            }
            rsp := make([]string, 0, noOfResultsPerRequest)

            for x := startPos; x < maxResult; x++ {
                rsp = append(rsp, strconv.Itoa(x))
            }
            responseChan <- rsp
            fmt.Printf("Goroutine %d handling data from %d to %d\n", routineNo, startPos, startPos+noOfResultsPerRequest)
        }
    }(i)
}
// Close the response channel when all goroutines have shut down
go func() {
    wg.Wait()
    close(responseChan)
}()

// Send the requests
go func() {
    for reqFrom := 0; reqFrom < noOfResultsToGet; reqFrom += noOfResultsPerRequest {
        requestChan <- reqFrom
    }
    close(requestChan) // Allow goroutines to exit
}()

// Receive responses (note that these may be out of order)
result := make([]string, 0, noOfResultsToGet)
for x := range responseChan {
    result = append(result, x...)
}

// Order the results and output (results from gorouting may come back in any order)
sort.Slice(result, func(i, j int) bool {
    a, _ := strconv.Atoi(result[i])
    b, _ := strconv.Atoi(result[j])
    return a < b

})
fmt.Printf("Result: %v", result)

Relying on channels to pass messages often makes this kind of thing easier to think about and reduces the chance that you will make a mistake.



Answered By - Brits
Answer Checked By - Timothy Miller (PHPFixing Admin)
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg
Newer Post Older Post Home

0 Comments:

Post a Comment

Note: Only a member of this blog may post a comment.

Total Pageviews

Featured Post

Why Learn PHP Programming

Why Learn PHP Programming A widely-used open source scripting language PHP is one of the most popular programming languages in the world. It...

Subscribe To

Posts
Atom
Posts
Comments
Atom
Comments

Copyright © PHPFixing