第六章:并发编程基础
6.1 Goroutine
创建Goroutine
package main
import (
"fmt"
"time"
)
func sayHello() {
for i := 0; i < 5; i++ {
fmt.Println("Hello")
time.Sleep(100 * time.Millisecond)
}
}
func sayWorld() {
for i := 0; i < 5; i++ {
fmt.Println("World")
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// 串行执行
// sayHello()
// sayWorld()
// 并发执行
go sayHello()
go sayWorld()
// 等待goroutine完成
time.Sleep(1 * time.Second)
fmt.Println("主函数结束")
}
匿名函数Goroutine
package main
import (
"fmt"
"time"
)
func main() {
// 带参数的匿名函数
for i := 0; i < 3; i++ {
go func(n int) {
fmt.Printf("Goroutine %d\n", n)
}(i) // 立即传参,避免闭包问题
}
// 闭包问题示例
for i := 0; i < 3; i++ {
go func() {
// i是共享变量,可能输出相同值
fmt.Printf("闭包: %d\n", i)
}()
}
time.Sleep(1 * time.Second)
}
6.2 Channel
基本使用
package main
import "fmt"
func main() {
// 创建channel
ch := make(chan int)
// 发送数据
go func() {
ch <- 100
}()
// 接收数据
value := <-ch
fmt.Println(value)
// 缓冲channel
bufferedCh := make(chan int, 3)
bufferedCh <- 1
bufferedCh <- 2
bufferedCh <- 3
fmt.Println(<-bufferedCh)
fmt.Println(<-bufferedCh)
fmt.Println(<-bufferedCh)
}
关闭Channel
package main
import "fmt"
func producer(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 关闭channel
}
func main() {
ch := make(chan int)
go producer(ch)
// 使用range遍历
for value := range ch {
fmt.Println(value)
}
// 检查channel是否关闭
ch2 := make(chan int, 2)
ch2 <- 1
close(ch2)
v, ok := <-ch2
fmt.Println(v, ok) // 1 true
v, ok = <-ch2
fmt.Println(v, ok) // 0 false(已关闭)
}
Select多路复用
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自ch2"
}()
// 使用select等待多个channel
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
// 带超时的select
ch3 := make(chan string)
select {
case msg := <-ch3:
fmt.Println(msg)
case <-time.After(1 * time.Second):
fmt.Println("超时")
}
// 非阻塞select
select {
case msg := <-ch3:
fmt.Println(msg)
default:
fmt.Println("没有数据")
}
}
6.3 同步原语
WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 完成时调用
fmt.Printf("Worker %d 开始\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1) // 增加计数
go worker(i, &wg)
}
wg.Wait() // 等待所有完成
fmt.Println("所有worker完成")
}
Mutex
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
count int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
// RWMutex - 读写锁
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
var wg sync.WaitGroup
counter := Counter{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("计数:", counter.Value())
}
Once
package main
import (
"fmt"
"sync"
)
var once sync.Once
var instance *Singleton
type Singleton struct {
data string
}
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{data: "单例对象"}
fmt.Println("创建实例")
})
return instance
}
func main() {
// 多次调用只执行一次
for i := 0; i < 10; i++ {
go func() {
_ = GetInstance()
}()
}
fmt.Scanln()
}
6.4 Context
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("收到取消信号,退出")
return
default:
fmt.Println("工作中...")
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
// 带超时的context
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go worker(ctx)
time.Sleep(3 * time.Second)
// 手动取消
ctx2, cancel2 := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
cancel2()
}()
select {
case <-ctx2.Done():
fmt.Println("已取消")
}
}
6.5 项目实战:并发下载器
package main
import (
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
)
type Downloader struct {
urls []string
workers int
results chan Result
}
type Result struct {
URL string
Size int64
Duration time.Duration
Error error
}
func NewDownloader(urls []string, workers int) *Downloader {
return &Downloader{
urls: urls,
workers: workers,
results: make(chan Result, len(urls)),
}
}
func (d *Downloader) download(url string) Result {
start := time.Now()
resp, err := http.Get(url)
if err != nil {
return Result{URL: url, Error: err}
}
defer resp.Body.Close()
size, _ := io.Copy(io.Discard, resp.Body)
return Result{
URL: url,
Size: size,
Duration: time.Since(start),
}
}
func (d *Downloader) Run() []Result {
var wg sync.WaitGroup
urlCh := make(chan string, len(d.urls))
// 启动worker
for i := 0; i < d.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range urlCh {
d.results <- d.download(url)
}
}()
}
// 发送任务
for _, url := range d.urls {
urlCh <- url
}
close(urlCh)
// 等待完成
wg.Wait()
close(d.results)
// 收集结果
var results []Result
for r := range d.results {
results = append(results, r)
}
return results
}
func main() {
urls := []string{
"https://golang.org",
"https://google.com",
"https://github.com",
}
downloader := NewDownloader(urls, 3)
results := downloader.Run()
for _, r := range results {
if r.Error != nil {
fmt.Printf("失败: %s, 错误: %v\n", r.URL, r.Error)
} else {
fmt.Printf("成功: %s, 大小: %d, 耗时: %v\n",
r.URL, r.Size, r.Duration)
}
}
}