什么是并发?

  • 独立执行计算单元的合成
  • 是组织软件结构的一种途径(尤其是写代码的)

并发不是并行

  • 如果你只有一个处理器,你的程序可以并发不能并行
  • 另一方面,良好的并发程序代码应该能在多处理器上高效的运行

基础概念

Goroutines

  • 一个 goroutine 是一个独立的执行函数,有独立的调用栈,不是线程,一个程序中只有一个线程和无数 goroutine
  • Goroutines 被动态的多路复用到线程上
  • 可以认为是一种廉价的线程?

Channels

  • 起联系两个 goroutine 的作用
//declaring and initializing
 var c chan int
 c = make(chan int)
 //or
 c := make(chan int)
//Sending on a channel
c <- 1
//Receiving from a channel
value = <-c
  • channel 在 go 中是一等公民,与 Integers、Strings 地位相当

Select

  • 在没有 default 的情况下都没有 ready 会永远阻塞,有 default 如果当前没有能立即执行的则执行 default
  • 同时准备好则随机选择
select {
	case v1 := <-c1:
		fmt.Printf("receive %v from c1\n", v1)
	case c3 <- 23:
		fmt.Printf("sent %v to c3\n", 23)
	default:
		fmt.Printf("no one was ready to communicate \n")
}

Go Approach

Don't communicate by sharing memory, share memory by communicating


示例代码与说明

默认基础

  • 基本函数
func boring(msg string) {
	for i := 0; i++ {
		fmt.Println(msg, i)
		time.Sleep(time.Second)
	}
}
  • Goroutines 方式运行并使用 Channels(
  • 主函数执行 <-c 时等待值被送来,boring 函数执行 c <- value时,等待接收方准备好
  • 发送者和接收者都必须准备完毕才能通信否则必须等待
func main() {
	c := make(chan string)
	go boring("boring!", c)
	for i := 0; i<5; i++ {
		fmt.Printf("You say: %q\n", <-c) //Receive expression is just a value
	}
	fmt.Println("You're boring; I'm leaving.")
}
func boring(msg string, c chan string) {
	for i := 0; i++ {
		c <- fmt.Printf("%s %d", msg, i) //Expression to be sent can be any suitable value
		time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	}
}

生成器:返回 channel 的函数

  • 在函数内的 goroutine
func boring(msg string) <-chan string { // Return receive-only channel of strings
	c := make(chan string)
	go func() {
		for i := 0; i++ {
			c <- fmt.Printf("%s %d", msg, i) //Expression to be sent can be any suitable value
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c // Return the channel to the caller
}
  • 多路复用:有序运行的 goroutine(joe 和 ann 轮流)
func main() {
	joe := boring("Joe")
	ann := boring("Ann")
	for i := 0; i < 5; i++ {
		fmt.Println(<-joe)
		fmt.Println(<-ann)
	}
	fmt.Println("You're boring; I'm leaving.")
}
  • 无序运行:
graph LR
a(Joe) --> b(fanIn)
c(Ann) --> b
b --> d(main)
func fanIn(input1, input2 <- chan string) <-chan string {
	c := make(chan string)
	go func() { for { c <- <- input1 } }() // double '<-' means sending string to channel, not input channel itself
	go func() { for { c <- <- input2 } }()
	return c
}
func main() {
	c := fanIn(boring("Joe"), boring("Ann"))
	for i := 0; i < 10; i++ {
		fmt.Println(<-c)
	}
	fmt.Println("You're boring; I'm leaving.")
}
  • 在无序的基础上重新恢复有序(共享一个 wait channel,等待信号重新输出)
type Message struct {
	str string
	wait chan bool
}
// Each speaker must wait for a go-ahead
for i := 0; i < 5; i++ {
	msg1 := <-c;
	fmt.Println(msg1.str)
	msg2 := <-c;
	fmt.Println(msg2.str)
	msg1.wait <- true
	msg2.wait <- true
}
// main function
waitForIt := make(chan bool) // Shared between all messages

c <- Message { fmt.Sprintf("%s: %d", msg, i), waitForIt }
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond) <- waitForIt

Select

  • 超时设置1(此示例每一轮耗时 1 s)
func main() {
	c := boring("Joe")
	for {
		select {
			case s := <-c:
				fmt.Printf(s)
			case <- time.After(1 * time.Second): // Return a channel that will deliver a value after the specified interval
				fmt.Printf("slow")
				return
		}
	}
}
  • 超时设置2(此示例整体耗时 5 s)
func main() {
	c := boring("Joe")
	timeout := time.After(5 * time.Second)
	for {
		select {
			case s := <-c:
				fmt.Printf(s)
			case <- timeout: // Return a channel that will deliver a value after the specified interval
				fmt.Printf("slow")
				return
		}
	}
}
  • cleanup and quit
quit := make(chan string)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0: i-- {
	fmt.Println(<-c)
}
quit <- "Bye!"
fmt.Printf("Joe says: %q\n", <-quit)
// select
select {
	case c <- fmt.Sprintf("%s: %d", msg, i):
		//sth
	case <- quit: // Return a channel that will deliver a value after the specified interval
		cleanup()
		quit <- "See you"
		return
	}

Daisy-Chain

enter image description here

func f(left, right chan int) {
	left <- 1 + <- right
}
func main() {
	const n = 100
	leftmost := make(chan int)
	right := leftmost
	left := leftmost
	for i := 0; i < n; i++ {
		right = make(chan int)
		go f(left, right)
		left = right
	}
	go func(c chan int) { c <- 1}(right)
	fmt.Printf(<-leftmost)
}

伪实战

Google Search

var {
	Web = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
}

type Search func(query string) Result

func fakeSearch(query string) Search {
	return func(query string) Result {
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}
func Google(query string) (results []Result) {
	c := make(chan Results)
	go func() { c <- Web(query) } ()
	go func() { c <- Image(query) } ()
	go func() { c <- Video(query) } ()
	for i := 0; i < 3; i++ {
		result := <- c
		results = append(result, Video(query))
	}
	return
}
  • 加上超时设定
func Google(query string) (results []Result) {
	c := make(chan Results)
	go func() { c <- Web(query) } ()
	go func() { c <- Image(query) } ()
	go func() { c <- Video(query) } ()
	timeout = time.After(80 * time.Millisecond)
	for i := 0; i < 3; i++ {
		select {
			case result := <- c:
				results = append(results, result)
			case <- timeout:
				fmt.Println("timed out")
				return
			}
		}
	}
	return
}
  • replication(在超时限制下多个副本同时执行一个操作,只要有一个不超时就相当于成功,返回结果)
  • 是一种策略,并非 go 原生的语法支持一类的用法
func First(query string, replicas ...Search) Result {
	c := make(chan Result)
	searchReplica := func(i int) { c <- replicas[i](query) }
	for i := range replicas {
		go searchReplica(i)
	}
	return <-c
}
func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	result := First("golang", fakeSearch("replica 1"), fakeSearch("replica 2"))
	elapsed := time.Since(start)
	fmt.Println(result)
	fmt.Println(elapsed)
}

Written with StackEdit.