PHPFixing
  • Privacy Policy
  • TOS
  • Ask Question
  • Contact Us
  • Home
  • PHP
  • Programming
  • SQL Injection
  • Web3.0
Showing posts with label channel. Show all posts
Showing posts with label channel. Show all posts

Tuesday, November 1, 2022

[FIXED] Why is the performance of rust tokio so poor? [release test result updated]

 November 01, 2022     channel, mutex, performance, rust, rust-tokio     No comments   

Issue

The following scenarios are frequently used in asynchronous programming.

  • channel tx/rx;
  • mutex lock/unlock;
  • async task spawn;

So I ran some comparison tests on a lower performance cloud host (equivalent to j1900) as follows. I found that the performance of rust-tokio is very, very poor compared to go-lang.

Is there any parameter that needs to be adjusted? Can a single thread executor improve it?

Results.

  • tx/rx, time per op: go-lang: 112 ns,

    tokio::sync::mpsc::channel: 7387 ns;

    std::sync::channel: 2705 ns,

    crossbean: 1062 ns.

  • mutex lock/unlock, per op:

    tokio::sync::Mutex 4051 ns

    std::sync::Mutex 321 ns

  • spawn (not join), per op:

    tokio::spawn: 8445 ns

Rust tokio test tx/rx on channel

    #[tokio::test]
    async fn test_chan_benchmark() {
        let count = 100_000;
        let (tx, mut rx) = tokio::sync::mpsc::channel(10000);
        let start = std::time::SystemTime::now();
        let handle = tokio::spawn(async move {
            loop {
                let i = rx.recv().await.unwrap();
                if i == count - 1 {
                    break;
                }
            }
        });

        for i in 0..count {
            tx.send(i).await.unwrap();
        }
        drop(tx);

        handle.await.unwrap();
        let stop = std::time::SystemTime::now();
        let dur = stop.duration_since(start).unwrap();
        println!(
            "count={count}, cosume={}ms, ops={}ns",
            dur.as_millis(),
            dur.as_nanos() / count as u128,
        );
    }

Go channel tx/rx:

func TestChanPerformance(t *testing.T) {
    count := 1000000
    ch := make(chan int, count)
    rsp := make(chan int, 1)
    t1 := time.Now()
    go func() {
        for {
            if _, ok := <-ch; !ok {
                rsp <- 0
                break
            }
        }
    }()
    for i := 0; i < count; i++ {
        ch <- i
    }
    close(ch)
    <-rsp

    d := time.Since(t1)
    t.Logf("txrx %d times consumed %d ms, %d nspo", count, d.Milliseconds(), d.Nanoseconds()/int64(count))
}

Mutex test:

    #[tokio::test]
    async fn bench_std_mutex() {
        for count in [1_000, 10_000, 100_000] {
            let start = std::time::SystemTime::now();

            let under = Arc::new(std::sync::Mutex::new(0));
            for _ in 0..count {
                let _ = under.lock().unwrap();
            }

            let stop = std::time::SystemTime::now();
            let dur = stop.duration_since(start).unwrap();
            println!(
                "count={count}, cosume={}ms, ops={}ns",
                dur.as_millis(),
                dur.as_nanos() / count as u128,
            );
        }
    }

Tokio spawn test:

    #[tokio::test]
    async fn bench_tokio_spawn() {
        let count = 100_000;
        //let mut ths = Vec::with_capacity(count);
        let start = std::time::SystemTime::now();
        for _ in 0..count {
            tokio::spawn(async move {});
        }
        let stop = std::time::SystemTime::now();
        let dur = stop.duration_since(start).unwrap();
        //for _ in 0..count {
        //    ths.pop().unwrap().await.unwrap();
        //}
        // do not wait for join, just spawn
        println!(
            "count={count}, cosume={}ms, ops={}ns",
            dur.as_millis(),
            dur.as_nanos() / count as u128,
        );
    }

=============UPDATED=========== For --release:

std::mpsc::Mutex: 13ns;
tokio::mpsc::Mutex: 130ns;
std::mpsc::channel: 200ns;
tokio::mpsc::channel: 256ns;
tokio::spawn: 553ns;

Solution

Add --release to instruct the compiler to perform optimizations.

To demonstrate just how much of a difference this makes, here is a simple add function compiled with and without optimizations:

pub fn add(a: u32, b: u32) -> u32 {
    a + b
}
  • with optimizations:
example::add:
        lea     eax, [rdi + rsi]
        ret
  • without optimizations:
example::add:
        push    rax
        add     edi, esi
        mov     dword ptr [rsp + 4], edi
        setb    al
        test    al, 1
        jne     .LBB0_2
        mov     eax, dword ptr [rsp + 4]
        pop     rcx
        ret
.LBB0_2:
        lea     rdi, [rip + str.0]
        lea     rdx, [rip + .L__unnamed_1]
        mov     rax, qword ptr [rip + core::panicking::panic@GOTPCREL]
        mov     esi, 28
        call    rax
        ud2

.L__unnamed_2:
        .ascii  "/app/example.rs"

.L__unnamed_1:
        .quad   .L__unnamed_2
        .asciz  "\017\000\000\000\000\000\000\000\002\000\000\000\005\000\000"

str.0:
        .ascii  "attempt to add with overflow"

Note that the optimized version does no longer contain an overflow check. The overflow check is very useful during debugging, but also very slow.



Answered By - Finomnis
Answer Checked By - Clifford M. (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Friday, September 30, 2022

[FIXED] How to start a new goroutine each time a channel is updated

 September 30, 2022     channel, concurrency, go, goroutine     No comments   

Issue

I am making a program that monitors different webpages, each time a new url is added to a page, I would like to start a new goroutine to scrape the new url. I am trying to simulate this like this:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var Wg sync.WaitGroup
    link := make(chan string)
    startList := []string{}
    go func() {
        for i := 0; i < 20; i++ {
            //should simulate the monitoring of the original web page
            nextLink := fmt.Sprintf("cool-website-%d", i)
            link <- nextLink
        }
    }()

    for i := 0; i < 20; i++ {
        newLink := <-link
        startList = append(startList, newLink)
        Wg.Add(1)
        go simulateScraping(i, startList[i])
        Wg.Done()
    }
    Wg.Wait()
}

func simulateScraping(i int, link string) {
    fmt.Printf("Simulating process %d\n", i)
    fmt.Printf("scraping www.%s.com\n", link)
    time.Sleep(time.Duration(30) * time.Second)
    fmt.Printf("Finished process %d\n", i)
}

This results in the following error fatal error: all goroutines are asleep - deadlock!. How do I only start the simulateScraping function each time that newLink is updated or when startList is appended to?

Thanks!


Solution

I see several problems with the code.

  1. Wait group is useless in the code because Wg.Done is called immediately and does not wait until the simulateScraping finishes, because it's running in parallel.

To fix this, the closure function could be used

        go func(i int) {
            simulateScraping(i, newLink)
            Wg.Done()
        }(i)

  1. Instead of an increment loop, I would use for-each range loop. It allows code to be executed as soon as a new value get to a channel and automatically breaks when the channel closes.
    var i int
    for newLink := range link {
        Wg.Add(1)
        go func(i int) {
            simulateScraping(i, newLink)
            Wg.Done()
        }(i)
        i++
    }
    Wg.Wait()
  1. startList := []string{} Looks useless. Not sure how it was supposed to be used.

  2. Channel must be closed.

    go func() {
        for i := 0; i < 20; i++ {
            //should simulate the monitoring of the original web page
            nextLink := fmt.Sprintf("cool-website-%d", i)
            link <- nextLink
        }
       close(link) // Closing the channel
    }()

The whole code

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var Wg sync.WaitGroup
    link := make(chan string)
    go func() {
        for i := 0; i < 20; i++ {
            //should simulate the monitoring of the original web page
            nextLink := fmt.Sprintf("cool-website-%d", i)
            link <- nextLink
        }
        close(link)
    }()

    var i int
    for newLink := range link {
        Wg.Add(1)
        go func(i int) {
            simulateScraping(i, newLink)
            Wg.Done()
        }(i)
        i++
    }
    Wg.Wait()
}

func simulateScraping(i int, link string) {
    fmt.Printf("Simulating process %d\n", i)
    fmt.Printf("scraping www.%s.com\n", link)
    time.Sleep(3 * time.Second)
    fmt.Printf("Finished process %d\n", i)
}

Here is a good talk about "Concurrency Patterns In Go"



Answered By - Alex
Answer Checked By - Marie Seifert (PHPFixing Admin)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

[FIXED] When to use a finilizer to close a channel?

 September 30, 2022     channel, concurrency, finalizer, go     No comments   

Issue

This is the second of two questions (this is the first one) to help make sense of the Go generics proposal examples.

In particular I am having trouble-so far-understanding two bits of code from the examples section of the proposal entitled "Channels":

The second issue I have is in the following definition of the the Ranger function.

Namely, I don't understand the need to call runtime.SetFinalizer(r,r.finalize) where in fact what the finalize) method of the *Receiver[T] type is supposed to do is simply to signal that the receiver is done receiving values (close(r.done)).

The way I see it, by providing a finalizer for a *Receiver[T] the code is delegating the obligation to close the receiver to the runtime.

The way I understand this piece of code, is that the *Receiver[T] signals to the *Sender[T] that it won't be receiving any more values when the GC decides that the former is unreachable ie no more references are available to it.

If my interpretation is correct, why wait that long for the receiver to signal it's done? Is't it possible, to explicitly handle the close operation in the code somehow?

Thanks.

Code:

// Ranger provides a convenient way to exit a goroutine sending values
// when the receiver stops reading them.
//
// Ranger returns a Sender and a Receiver. The Receiver provides a
// Next method to retrieve values. The Sender provides a Send method
// to send values and a Close method to stop sending values. The Next
// method indicates when the Sender has been closed, and the Send
// method indicates when the Receiver has been freed.
func Ranger[T any]() (*Sender[T], *Receiver[T]) {
    c := make(chan T)
    d := make(chan bool)
    s := &Sender[T]{values: c, done: d}
    r := &Receiver[T]{values: c, done: d}
    // The finalizer on the receiver will tell the sender
    // if the receiver stops listening.
    runtime.SetFinalizer(r, r.finalize)
    return s, r
}

// A Sender is used to send values to a Receiver.
type Sender[T any] struct {
    values chan<- T
    done   <-chan bool
}

// Send sends a value to the receiver. It reports whether any more
// values may be sent; if it returns false the value was not sent.
func (s *Sender[T]) Send(v T) bool {
    select {
    case s.values <- v:
        return true
    case <-s.done:
        // The receiver has stopped listening.
        return false
    }
}

// Close tells the receiver that no more values will arrive.
// After Close is called, the Sender may no longer be used.
func (s *Sender[T]) Close() {
    close(s.values)
}

// A Receiver receives values from a Sender.
type Receiver[T any] struct {
    values <-chan T
    done  chan<- bool
}

// Next returns the next value from the channel. The bool result
// reports whether the value is valid. If the value is not valid, the
// Sender has been closed and no more values will be received.
func (r *Receiver[T]) Next() (T, bool) {
    v, ok := <-r.values
    return v, ok
}

// finalize is a finalizer for the receiver.
// It tells the sender that the receiver has stopped listening.
func (r *Receiver[T]) finalize() {
    close(r.done)
}

Solution

TLDR: Your understanding is correct, the done channel may simply be closed by the receiver "manually" to signal the lost of interest (to stop the communication and relieve the sender from its duty).


Channels are used for goroutines to communicate in a concurrency safe manner. The idiomatic use is that the sender party keeps sending values, and once there are no more values to send, it is signaled by the sender closing the channel.

The receiver party keeps receiving from the channel until it is closed, which signals there won't be (can't be) any more values coming on the channel. This is usually / easiest done using a for range over the channel.

So usually the receiver has to keep receiving until the channel is closed, else the sender party would get blocked forever. Often this is OK / sufficient.

The demonstrated Ranger() construct is for the non-general case when there's need / possibility for the receiver to stop the communication.

A single channel does not provide a mean for the receiver party to signal the sender that the receiver has lost interest, and no more values are needed. This requires an additional channel which the receiver has to close (and the sender has to monitor of course). As long as there's a single receiver, this is also OK. But if there are multiple receivers, closing the done channel gets a little more complicated: it's not OK for all the receivers to close the done channel: closing an already closed channel panics. So the receivers also have to be coordinated, so only a single receiver, or rather the coordinator party itself closes the done channel, once only; and this has to happen after all receivers "abandoned" the channel.

Ranger() helps with this, and in a simple way by delegating closing the done channel using a finalizer. This is acceptable because usually it wouldn't even be the receiver(s) task to stop the communication, but in the rare case if this still arises, it will be dealt with (in an easy way, without the need of an additional, coordinator goroutine).



Answered By - icza
Answer Checked By - Clifford M. (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

[FIXED] Why set a channel input parameter to nil?

 September 30, 2022     channel, concurrency, go     No comments   

Issue

I need some help making sense of the Go generics proposal examples.

In particular I am having trouble-so far-understanding two bits of code from the examples section of the proposal entitled "Channels":

The first point I don't get is found in the definition of func Merge[T any](c1,c2 <-chan T) <-chan T: the function instantiates a chan T variable that will hold the result and then it spins up a goroutine that will handle the actuall merging.

In particular, there is an infinite loop that runs as long as at least one of the input channels (to be merged) is not nil.

Code:

// Merge merges two channels of some element type into a single channel.
func Merge[T any](c1, c2 <-chan T) <-chan T {
    r := make(chan T)
    go func(c1, c2 <-chan T, r chan<- T) {
        defer close(r)
        for c1 != nil || c2 != nil {
            select {
            case v1, ok := <-c1:
                if ok {
                    r <- v1
                } else {
                    c1 = nil
                }
            case v2, ok := <-c2:
                if ok {
                    r <- v2
                } else {
                    c2 = nil
                }
            }
        }
    }(c1, c2, r)
    return r
}

In the loop, a select statement combs through the input channels for validly received values (the vn,ok:=<-cn expressions) and then comes the weird part:

If the channel sends valid values, merge and be done with it (r<-vn) but if the channel reports that it just supplied the zero value for its element type (ok==false) then the code in the else branch of both case branches does something incomprehensible (to me): it sets the channel variable to nil!

So, why does the merging goroutine is allowed to set its input channels to nil. Isn't this supposed to be the obligation of the goroutine that is responsible for input channels cn? I'm obviously missing something, please enlighten me.


Solution

Setting the channel to nil will prevent from that case getting executed from there on. Note: only the function parameter is set to nil, which is a local variable to the function, the channel object is left intact.

The function should loop until both input channels are closed, which will likely not happen at the same time. Once a channel is closed, receiving from it can proceed immediately. So if a channel is closed, the case receiving from it could be executed always from there on, immediately, unnecessarily using high CPU, and also it could prevent receiving from the other, non-closed channel (if multiple cases are ready, one is chosen pseudo-randomly, see How does select work when multiple channels are involved?).

On the other hand, if a (closed) channel is set to nil, it will essentially be taken out from select, because receiving from a nil channel blocks forever (so only the other channel will be monitored from there on). For channel axioms, see How does a non initialized channel behave?



Answered By - icza
Answer Checked By - Katrina (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Tuesday, September 20, 2022

[FIXED] How to get a response from the consumer in a producer / consumer scenario?

 September 20, 2022     channel, consumer, go, goroutine, producer     No comments   

Issue

I've been trying to use channels to build some kind of producer / consumer. I have a requests channel where the many producers push requests, then I have processRequests that handles the requests.

package main

var requests chan string

func processRequests() {
    for {
        request <- requests
        // Process request...
        // And return response - how?
    }
}

func main() {
    requests = make(chan string)

    go processRequests()

    requests <- "doSomething"
    requests <- "doSomethingElse"
    requests <- "etc"

    select {} // Block forever
}

What I'm wondering is what would be the best way to send back a response to the producer (and to the right one, since there's more than one), once the request is fulfilled? Basically how to make this a two way channel?

Any idea how it could be done?


Solution

You should really use two channels. Trying to make it work with one channel will be messy.

There's a Google Sites on the Producer/Consumer pattern that may be useful.

For the producer to know what the consumer is responding to, you could use a struct for the response:

type responseMessage struct {
    Request string
    Response string
}

var requests chan string
var responses chan *responseMessage

func processRequests() {
    for {
        request <- requests
        // Process request...
        responses <- &responseMessage{request, "some response string"}
    }
}

func processResponses() {
    someResponseMessage := <- responses
    if someResponseMessage.Request == "doSomething" {
        // do something!
    }
}

func main() {
    requests = make(chan string)
    responses = make(chan *responseMessage)

    go processRequests()
    go processResponses()

    requests <- "doSomething"
    requests <- "doSomethingElse"
    requests <- "etc"

    select {} // Block forever
}


Answered By - Tyson
Answer Checked By - Candace Johnson (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Thursday, August 4, 2022

[FIXED] How to prevent that a channel exception make a WCF service stop working?

 August 04, 2022     .net, channel, exception, timeout, wcf     No comments   

Issue

I created a WCF singleton service with netNamedPipeBinding. When a channel exception occurs, it leaves a channel in a faulty state, and all subsequent operations throws exceptions. How can I prevent this? I want that a TimeoutException, or any of the other common exceptions, to make only one operation fail, and not to make the service unresponsive.


Solution

There is another way. You can instantiate the client proxy for each request instead of using a single instance for more than one request. This way if the channel enters a faulty state, it is discarded anyway.

This is a bit tricky because you shouldn't dispose a channel, besides it being IDisposable.

It won't work:

using(var channel = channelFactory.CreateChannel())
{
    return channel.ServiceMethod(parameter);
}

Instead you should:

public static class Service<T>
{
    public static ChannelFactory<T> _channelFactory = new ChannelFactory<T>("");

    public static TResult Use<TResult>(Func<T, TResult> func)
    {
        TResult output;
        var channel = (IClientChannel)_channelFactory.CreateChannel();
        bool success = false;
        try
        {
            output = func((T)proxy);
            channel.Close();
            success = true;
        }
        finally
        {
            if (!success)
            {
                proxy.Abort();
            }
        }
        return output;
    }
}

return Service<IService>.Use(channel =>
{
    return channel.ServiceMethod(parameter);
});


Answered By - Jader Dias
Answer Checked By - Dawn Plyler (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Tuesday, July 12, 2022

[FIXED] How to Properly forward a message from another user's channel into a group?

 July 12, 2022     api, channel, forward, message, telegram     No comments   

Issue

i would like to know how can i forward a message from a given channel id with the message number ( like when you copy the message link from inside the telegram itself) to a given group join link using messages.forwardMessage from telegram API funtion?

i.e: this is my join link of the destination:
https://t.me/joinchat/AAAAAAlw6NYyLMlMES5hbw

and this is the message which should be forwarded to that above link: https://t.me/kjdfvbjkdfbvkj/3

Thank you ( esp to @apadana )


Solution

Here is how to forward from a group (not supergroup) to a channel. If you are looking for something else please comment.

#testChannel
source_chat_id=12345 
source_access_hash=1234567890
source_chat = InputPeerChannel(source_chat_id, source_access_hash)

#testGroup: 
dest_chat_id = 123456

# chat = InputPeerChannel(chat_id, access_hash)


total_count, messages, senders = client.get_message_history(
                        source_chat, limit=10)

msg = messages[0]    

dest_chat = InputPeerChat(dest_chat_id)
result = client.invoke(ForwardMessagesRequest(from_peer=source_chat, id=[msg.id], random_id=[generate_random_long()], to_peer=dest_chat))


Answered By - apadana
Answer Checked By - David Marino (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg
Older Posts Home
View mobile version

Total Pageviews

Featured Post

Why Learn PHP Programming

Why Learn PHP Programming A widely-used open source scripting language PHP is one of the most popular programming languages in the world. It...

Subscribe To

Posts
Atom
Posts
All Comments
Atom
All Comments

Copyright © PHPFixing