Issue
The idea is to exit outerloop from within go routine, I have used a channel to signal to break the loop. And I am using semaphore pattern to limit the number of goroutines spawned so that , I do not spawn enormously high number of go routines while waiting for loop to exits.
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
type Task struct {
ID int `json:"id"`
UserID int `json:"user_id"`
Title string `json:"title"`
Completed bool `json:"completed"`
}
func main() {
var t Task
wg := &sync.WaitGroup{}
stop := make(chan struct{})
sem := make(chan struct{}, 10)
results := make(chan Task, 1)
worker := func(i int) {
defer wg.Done()
defer func() { <-sem }()
res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
log.Fatal(err)
}
if i == 20 {
close(stop)
}
results <- t
}
i := 0
outer:
for {
select {
case <-stop:
fmt.Println("I came here")
close(sem)
break outer
case v := <-results:
fmt.Println(v)
default:
wg.Add(1)
sem <- struct{}{}
go worker(i)
i++
}
}
wg.Wait()
fmt.Println("I am done")
}
problem right now is , i see that it enters the case where i am trying to break the loop however it never reaches to I am done
the reason probably is that its getting infinitely blocked when trying to receive on results.
I would like to know how i can handle the same, effectively.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
type Task struct {
ID int `json:"id"`
UserID int `json:"user_id"`
Title string `json:"title"`
Completed bool `json:"completed"`
}
func main() {
wg := &sync.WaitGroup{}
sem := make(chan struct{}, 10)
ctx, cancel := context.WithCancel(context.Background())
var ts []Task
//results := make(chan Task, 1)
worker := func(i int) {
var t Task
defer wg.Done()
defer func() {
<-sem
}()
res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
log.Fatal(err)
}
if i > 20 {
cancel()
}
ts = append(ts, t)
}
i := 0
outer:
for {
select {
case <-ctx.Done():
break outer
default:
wg.Add(1)
sem <- struct{}{}
go worker(i)
i++
}
}
wg.Wait()
fmt.Println(ts)
}
This works but then i end up getting duplicate entries within the array which I want to avoid.
edit:: @Davud solution works however, I am still interested to know to further optimize and limit number of goroutines spawned. currently extra goroutines spawned=buffersize of sem. Which i some how want to reduced while still keeping it concurrent.
Solution
it happens because once the stop signal is received and it exits from the for loop, you are no longer listening and printing the results, and this causes the result channel to block the worker to continue processing.
As a solution, you can listen to the results channel in a separate goroutine.
Here I removed the case v := <-results: fmt.Println(v)
and added a goroutine. try it out
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
type Task struct {
ID int `json:"id"`
UserID int `json:"user_id"`
Title string `json:"title"`
Completed bool `json:"completed"`
}
func main() {
var t Task
wg := &sync.WaitGroup{}
stop := make(chan struct{})
sem := make(chan struct{}, 10)
results := make(chan Task, 1)
worker := func(i int) {
defer wg.Done()
defer func() { <-sem }()
res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
log.Fatal(err)
}
if i == 20 {
close(stop)
}
results <- t
}
i := 0
go func() {
for v := range results {
fmt.Println(v)
}
}()
outer:
for {
select {
case <-stop:
fmt.Println("I came here")
close(sem)
break outer
default:
wg.Add(1)
sem <- struct{}{}
go worker(i)
i++
}
}
wg.Wait()
fmt.Println("I am done")
}
Answered By - Davud Safarov Answer Checked By - Marilyn (PHPFixing Volunteer)
0 Comments:
Post a Comment
Note: Only a member of this blog may post a comment.