首页 > 其他分享 >40分钟学 Go 语言高并发:Context包与并发控制

40分钟学 Go 语言高并发:Context包与并发控制

时间:2024-11-23 22:30:55浏览次数:6  
标签:包与 func err ctx 40 并发 context time Context

Context包与并发控制

学习目标

知识点掌握程度应用场景
context原理深入理解实现机制并发控制和请求链路追踪
超时控制掌握超时设置和处理API请求超时、任务限时控制
取消信号传播理解取消机制和传播链优雅退出、资源释放
context最佳实践掌握使用规范和技巧工程实践中的常见场景

1. Context原理

1.1 Context基本结构和实现

让我们先看一个完整的Context使用示例:

package main

import (
    "context"
    "fmt"
    "log"
    "time"
)

// 请求追踪信息
type RequestInfo struct {
    TraceID    string
    SessionID  string
    StartTime  time.Time
}

// 服务接口
type Service interface {
    HandleRequest(ctx context.Context, req string) (string, error)
}

// 业务服务实现
type BusinessService struct {
    name string
}

func NewBusinessService(name string) *BusinessService {
    return &BusinessService{name: name}
}

// 处理请求
func (s *BusinessService) HandleRequest(ctx context.Context, req string) (string, error) {
    // 获取请求追踪信息
    info, ok := ctx.Value("request-info").(*RequestInfo)
    if !ok {
        return "", fmt.Errorf("request info not found in context")
    }

    log.Printf("[%s] Processing request: %s, TraceID: %s, Session: %s\n",
        s.name, req, info.TraceID, info.SessionID)

    // 模拟处理过程
    select {
    case <-time.After(2 * time.Second):
        return fmt.Sprintf("Result for %s", req), nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

// 请求中间件
func requestMiddleware(next Service) Service {
    return &middlewareService{next: next}
}

type middlewareService struct {
    next Service
}

func (m *middlewareService) HandleRequest(ctx context.Context, req string) (string, error) {
    // 开始时间
    startTime := time.Now()

    // 添加请求信息到context
    info := &RequestInfo{
        TraceID:   fmt.Sprintf("trace-%d", time.Now().UnixNano()),
        SessionID: fmt.Sprintf("session-%d", time.Now().Unix()),
        StartTime: startTime,
    }
    ctx = context.WithValue(ctx, "request-info", info)

    // 调用下一个处理器
    result, err := m.next.HandleRequest(ctx, req)

    // 记录处理时间
    duration := time.Since(startTime)
    log.Printf("Request completed in %v, TraceID: %s\n", duration, info.TraceID)

    return result, err
}

func main() {
    // 创建服务
    service := requestMiddleware(NewBusinessService("UserService"))

    // 创建基础context
    ctx := context.Background()

    // 添加超时控制
    ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
    defer cancel()

    // 处理请求
    result, err := service.HandleRequest(ctx, "get user profile")
    if err != nil {
        log.Printf("Request failed: %v\n", err)
        return
    }

    log.Printf("Request succeeded: %s\n", result)

    // 模拟超时场景
    ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    result, err = service.HandleRequest(ctx, "get user settings")
    if err != nil {
        log.Printf("Request failed: %v\n", err)
        return
    }
}

2. 超时控制

让我们实现一个带有超时控制的HTTP服务:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
)

// 响应结构
type Response struct {
    Data  interface{} `json:"data,omitempty"`
    Error string      `json:"error,omitempty"`
}

// 服务配置
type ServiceConfig struct {
    Timeout        time.Duration
    MaxConcurrent  int
    RetryAttempts  int
    RetryDelay     time.Duration
}

// HTTP客户端包装器
type HTTPClient struct {
    client  *http.Client
    config  ServiceConfig
    limiter chan struct{} // 并发限制器
}

// 创建新的HTTP客户端
func NewHTTPClient(config ServiceConfig) *HTTPClient {
    return &HTTPClient{
        client: &http.Client{
            Timeout: config.Timeout,
        },
        config: config,
        limiter: make(chan struct{}, config.MaxConcurrent),
    }
}

// 发送HTTP请求
func (c *HTTPClient) DoRequest(ctx context.Context, method, url string) (*Response, error) {
    var lastErr error
    
    for attempt := 0; attempt <= c.config.RetryAttempts; attempt++ {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case c.limiter <- struct{}{}: // 获取并发许可
        }
        
        // 确保释放并发许可
        defer func() {
            <-c.limiter
        }()
        
        // 创建请求
        req, err := http.NewRequestWithContext(ctx, method, url, nil)
        if err != nil {
            return nil, fmt.Errorf("create request failed: %w", err)
        }
        
        // 设置请求超时
        reqCtx, cancel := context.WithTimeout(ctx, c.config.Timeout)
        defer cancel()
        
        // 执行请求
        resp, err := c.client.Do(req.WithContext(reqCtx))
        if err != nil {
            lastErr = err
            log.Printf("Request failed (attempt %d): %v\n", attempt+1, err)
            
            // 如果不是最后一次尝试,等待后重试
            if attempt < c.config.RetryAttempts {
                select {
                case <-ctx.Done():
                    return nil, ctx.Err()
                case <-time.After(c.config.RetryDelay):
                    continue
                }
            }
            continue
        }
        defer resp.Body.Close()
        
        // 解析响应
        var result Response
        if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
            return nil, fmt.Errorf("decode response failed: %w", err)
        }
        
        return &result, nil
    }
    
    return nil, fmt.Errorf("all retry attempts failed, last error: %v", lastErr)
}

// 处理HTTP请求的处理器
func handleRequest(w http.ResponseWriter, r *http.Request) {
    // 创建context
    ctx := r.Context()
    
    // 模拟长时间处理
    select {
    case <-time.After(2 * time.Second):
        response := Response{
            Data: "Request processed successfully",
        }
        json.NewEncoder(w).Encode(response)
    case <-ctx.Done():
        response := Response{
            Error: "Request timeout",
        }
        w.WriteHeader(http.StatusGatewayTimeout)
        json.NewEncoder(w).Encode(response)
    }
}

func main() {
    // 配置HTTP客户端
    config := ServiceConfig{
        Timeout:       5 * time.Second,
        MaxConcurrent: 10,
        RetryAttempts: 3,
        RetryDelay:    time.Second,
    }
    
    client := NewHTTPClient(config)
    
    // 创建HTTP服务器
    http.HandleFunc("/api", handleRequest)
    
    // 启动服务器
    go func() {
        log.Println("Server starting on :8080")
        if err := http.ListenAndServe(":8080", nil); err != nil {
            log.Fatal(err)
        }
    }()
    
    // 等待服务器启动
    time.Sleep(time.Second)
    
    // 测试请求
    ctx := context.Background()
    
    // 测试正常请求
    resp, err := client.DoRequest(ctx, "GET", "http://localhost:8080/api")
    if err != nil {
        log.Printf("Request failed: %v\n", err)
    } else {
        log.Printf("Response: %+v\n", resp)
    }
    
    // 测试超时请求
    ctx, cancel := context.WithTimeout(ctx, time.Second)
    defer cancel()
    
    resp, err = client.DoRequest(ctx, "GET", "http://localhost:8080/api")
    if err != nil {
        log.Printf("Request failed (expected): %v\n", err)
    } else {
        log.Printf("Response: %+v\n", resp)
    }
    
    // 保持主程序运行
    select {}
}
package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 请求处理器
type RequestHandler struct {
    requests  chan Request
    responses chan Response
    done      chan struct{}
    wg        sync.WaitGroup
}

// 请求结构
type Request struct {
    ID      int
    Timeout time.Duration
    Data    string
}

// 响应结构
type Response struct {
    RequestID int
    Result    string
    Error     error
}

// 创建新的请求处理器
func NewRequestHandler() *RequestHandler {
    return &RequestHandler{
        requests:  make(chan Request, 100),
        responses: make(chan Response, 100),
        done:      make(chan struct{}),
    }
}

// 启动处理器
func (h *RequestHandler) Start(workers int) {
    for i := 0; i < workers; i++ {
        h.wg.Add(1)
        go h.worker(i)
    }
}

// 工作协程
func (h *RequestHandler) worker(id int) {
    defer h.wg.Done()
    
    for {
        select {
        case req, ok := <-h.requests:
            if !ok {
                fmt.Printf("Worker %d: request channel closed\n", id)
                return
            }
            
            // 创建context用于超时控制
            ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)
            
            // 处理请求
            response := h.processRequest(ctx, req)
            
            // 发送响应
            select {
            case h.responses <- response:
                fmt.Printf("Worker %d: sent response for request %d\n", 
                    id, req.ID)
            case <-h.done:
                cancel()
                return
            }
            
            cancel() // 清理context
            
        case <-h.done:
            fmt.Printf("Worker %d: received stop signal\n", id)
            return
        }
    }
}

// 处理单个请求
func (h *RequestHandler) processRequest(ctx context.Context, req Request) Response {
    // 模拟处理时间
    processTime := time.Duration(rand.Intn(int(req.Timeout))) + req.Timeout/2
    
    select {
    case <-time.After(processTime):
        return Response{
            RequestID: req.ID,
            Result:   fmt.Sprintf("Processed: %s", req.Data),
        }
    case <-ctx.Done():
        return Response{
            RequestID: req.ID,
            Error:    ctx.Err(),
        }
    }
}

// 提交请求
func (h *RequestHandler) SubmitRequest(req Request) error {
    select {
    case h.requests <- req:
        return nil
    case <-h.done:
        return fmt.Errorf("handler is stopped")
    }
}

// 获取响应
func (h *RequestHandler) GetResponse() (Response, error) {
    select {
    case resp := <-h.responses:
        return resp, nil
    case <-h.done:
        return Response{}, fmt.Errorf("handler is stopped")
    }
}

// 停止处理器
func (h *RequestHandler) Stop() {
    close(h.done)
    h.wg.Wait()
    close(h.requests)
    close(h.responses)
}

func main() {
    // 创建请求处理器
    handler := NewRequestHandler()
    handler.Start(3)
    
    // 发送一些测试请求
    requests := []Request{
        {ID: 1, Timeout: time.Second, Data: "Fast request"},
        {ID: 2, Timeout: time.Second * 2, Data: "Normal request"},
        {ID: 3, Timeout: time.Millisecond * 500, Data: "Quick request"},
        {ID: 4, Timeout: time.Second * 3, Data: "Slow request"},
    }
    
    // 提交请求
    for _, req := range requests {
        if err := handler.SubmitRequest(req); err != nil {
            fmt.Printf("Failed to submit request %d: %v\n", req.ID, err)
            continue
        }
        fmt.Printf("Submitted request %d\n", req.ID)
    }
    
    // 收集响应
    var wg sync.WaitGroup
    wg.Add(1)
    
    go func() {
        defer wg.Done()
        for i := 0; i < len(requests); i++ {
            resp, err := handler.GetResponse()
            if err != nil {
                fmt.Printf("Failed to get response: %v\n", err)
                continue
            }
            
            if resp.Error != nil {
                fmt.Printf("Request %d failed: %v\n", resp.RequestID, resp.Error)
            } else {
                fmt.Printf("Request %d succeeded: %s\n", resp.RequestID, resp.Result)
            }
        }
    }()
    
    // 等待所有响应处理完成
    wg.Wait()
    
    // 停止处理器
    handler.Stop()
    fmt.Println("Main: processing completed")
}

3. 取消信号传播

让我们继续完成取消信号传播的示例代码:

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

// 任务定义
type Task struct {
    ID      string
    Process func(ctx context.Context) (interface{}, error)
}

// 工作池
type WorkerPool struct {
    workers    int
    tasks      chan Task
    results    chan interface{}
    errors     chan error
    done       chan struct{}
    wg         sync.WaitGroup
}

// 创建新的工作池
func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        workers: workers,
        tasks:   make(chan Task, workers*2),
        results: make(chan interface{}, workers*2),
        errors:  make(chan error, workers*2),
        done:    make(chan struct{}),
    }
}

// 启动工作池
func (p *WorkerPool) Start(ctx context.Context) {
    // 启动workers
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go p.worker(ctx, i)
    }

    // 等待所有worker完成
    go func() {
        p.wg.Wait()
        close(p.done)
        close(p.results)
        close(p.errors)
    }()
}

// worker处理任务
func (p *WorkerPool) worker(ctx context.Context, id int) {
    defer p.wg.Done()
    
    log.Printf("Worker %d started\n", id)
    
    for {
        select {
        case <-ctx.Done():
            log.Printf("Worker %d stopped: %v\n", id, ctx.Err())
            return
        case task, ok := <-p.tasks:
            if !ok {
                log.Printf("Worker %d: task channel closed\n", id)
                return
            }

            log.Printf("Worker %d processing task %s\n", id, task.ID)
            
            // 创建任务专用的context
            taskCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
            
            // 执行任务
            result, err := task.Process(taskCtx)
            cancel() // 释放任务context资源
            
            if err != nil {
                select {
                case p.errors <- fmt.Errorf("task %s failed: %w", task.ID, err):
                case <-ctx.Done():
                    return
                }
            } else {
                select {
                case p.results <- result:
                case <-ctx.Done():
                    return
                }
            }
        }
    }
}

// 提交任务
func (p *WorkerPool) Submit(task Task) error {
    select {
    case p.tasks <- task:
        return nil
    case <-p.done:
        return fmt.Errorf("worker pool is closed")
    }
}

// 关闭工作池
func (p *WorkerPool) Close() {
    close(p.tasks)
}

// 获取结果通道
func (p *WorkerPool) Results() <-chan interface{} {
    return p.results
}

// 获取错误通道
func (p *WorkerPool) Errors() <-chan error {
    return p.errors
}

func main() {
    // 创建根context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 创建工作池
    pool := NewWorkerPool(3)
    pool.Start(ctx)

    // 创建模拟任务
    tasks := []Task{
        {
            ID: "task-1",
            Process: func(ctx context.Context) (interface{}, error) {
                select {
                case <-time.After(2 * time.Second):
                    return "Task 1 completed", nil
                case <-ctx.Done():
                    return nil, ctx.Err()
                }
            },
        },
        {
            ID: "task-2",
            Process: func(ctx context.Context) (interface{}, error) {
                select {
                case <-time.After(3 * time.Second):
                    return "Task 2 completed", nil
                case <-ctx.Done():
                    return nil, ctx.Err()
                }
            },
        },
        {
            ID: "task-3",
            Process: func(ctx context.Context) (interface{}, error) {
                select {
                case <-time.After(1 * time.Second):
                    return nil, fmt.Errorf("task 3 failed")
                case <-ctx.Done():
                    return nil, ctx.Err()
                }
            },
        },
    }

    // 提交任务
    for _, task := range tasks {
        if err := pool.Submit(task); err != nil {
            log.Printf("Failed to submit task %s: %v\n", task.ID, err)
        }
    }

    // 等待3秒后取消所有任务
    go func() {
        time.Sleep(3 * time.Second)
        log.Println("Cancelling all tasks...")
        cancel()
    }()

    // 收集结果和错误
    completed := 0
    expected := len(tasks)

    for completed < expected {
        select {
        case result, ok := <-pool.Results():
            if !ok {
                continue
            }
            log.Printf("Got result: %v\n", result)
            completed++
        case err, ok := <-pool.Errors():
            if !ok {
                continue
            }
            log.Printf("Got error: %v\n", err)
            completed++
        case <-ctx.Done():
            log.Printf("Main: context cancelled: %v\n", ctx.Err())
            completed = expected // 强制退出循环
        }
    }

    // 关闭工作池
    pool.Close()
    
    // 等待工作池完全关闭
    <-pool.done
    log.Println("All workers stopped")
}

3.1 取消信号传播流程图

在这里插入图片描述

4. Context最佳实践

4.1 Context使用规范

  1. 函数调用链传递
// 推荐
func HandleRequest(ctx context.Context, req *Request) error

// 不推荐
func HandleRequest(timeout time.Duration, req *Request) error
  1. Context应作为第一个参数
// 推荐
func ProcessTask(ctx context.Context, task *Task) error

// 不推荐
func ProcessTask(task *Task, ctx context.Context) error
  1. 不要储存Context在结构体中
// 不推荐
type Service struct {
    ctx context.Context
}

// 推荐
type Service struct {
    // 其他字段
}

func (s *Service) DoWork(ctx context.Context) error

4.2 Context使用注意事项

  1. 不要将nil传递给context参数
// 推荐
ctx := context.Background()
ProcessTask(ctx, task)

// 不推荐
ProcessTask(nil, task)
  1. context.Value应该只用于请求作用域数据
// 推荐
ctx = context.WithValue(ctx, "request-id", requestID)

// 不推荐 - 配置信息应该通过其他方式传递
ctx = context.WithValue(ctx, "db-config", dbConfig)
  1. 正确处理取消信号
select {
case <-ctx.Done():
    return ctx.Err()
default:
    // 继续处理
}

4.3 实践建议

  1. 超时控制
  • 设置合理的超时时间
  • 在不同层级使用不同的超时时间
  • 确保资源正确释放
  1. 错误处理
  • 区分超时和取消错误
  • 传递有意义的错误信息
  • 实现优雅降级
  1. 性能优化
  • 避免创建过多的context
  • 合理使用context.Value
  • 及时取消不需要的操作
  1. 日志追踪
  • 记录关键操作的耗时
  • 追踪请求的完整链路
  • 记录取消原因

总结

关键点回顾

  1. Context原理
  • 继承关系
  • 值传递机制
  • 生命周期管理
  1. 超时控制
  • 设置超时时间
  • 处理超时信号
  • 资源清理
  1. 取消信号传播
  • 信号传递机制
  • 取消处理流程
  • 资源释放
  1. 最佳实践
  • 使用规范
  • 注意事项
  • 优化建议

实践建议

  1. 代码规范
  • 遵循命名约定
  • 合理组织代码结构
  • 添加必要的注释
  1. 错误处理
  • 使用有意义的错误信息
  • 实现错误恢复机制
  • 记录错误日志
  1. 性能优化
  • 减少不必要的context创建
  • 避免context.Value滥用
  • 及时释放资源

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

标签:包与,func,err,ctx,40,并发,context,time,Context
From: https://blog.csdn.net/weixin_40780178/article/details/144000001

相关文章

  • Thinkphp-Laravel音乐播放器系统的设计与实现 7z140
    目录课题介绍具体实现截图课程项目技术路线代码流程执行分析thinkphp框架解析开发技术介绍PHP核心代码部分展示详细视频演示源码获取课题介绍首先,要掌握有关系统的基本理论,如PHP语言、MySQL数据库等,并对其进行全面的分析。为了给系统打下坚实的技术基础,本系统采用......
  • 并发编程(13)——无锁环形并发队列
    文章目录十三、day131.什么是无锁数据结构?2.环形队列3.实现线程安全的环形队列3.1实现有锁环形队列3.2实现无锁环形队列(有缺陷)3.3实现无锁环形队列(无缺陷)3.3.1pop函数3.3.2push函数3.3.3优化后的pop和push函数3.3.4完整代码4.无锁环形并发队列的优缺点十......
  • 2024-2025-1 20241407《计算机基础与程序设计》第九周学习总结
    作业信息这个作业属于哪个课程2024-2025-1计算机基础与程序设计这个作业要求在哪里2024-2025-1计算机基础与程序设计第九周作业这个作业的目标操作系统责任,内存与进程管理,分时系统,CPU调度,文件、文件系统,文件保护,磁盘调度作业正文本博客教材学习内容总结......
  • CF1404B. Tree Tag
    1405D-树标记让我们独立地考虑几个情况。情况1:$\text{dist}(a,b)\leqda$不出所料,在这种情况下,Alice在第一步就通过标记Bob获胜。情况2:$2da\geq\text{树的直径}$在这里,树的直径被定义为最长简单路径的长度。在这种情况下,Alice可以移动到树的中心。一旦Alice......
  • AO3401A-ASEMI中低压P沟道MOS管AO3401A
    编辑:llAO3401A-ASEMI中低压P沟道MOS管AO3401A型号:AO3401A品牌:ASEMI封装:SOT-23最大漏源电流:-4.2A漏源击穿电压:-30V批号:最新RDS(ON)Max:0.06Ω引脚数量:3沟道类型:P沟道MOS管芯片尺寸:MIL漏电流:恢复时间:5ns芯片材质:封装尺寸:如图特性:中低压MOS管、P沟道MOS管工作结温:-55℃......
  • CCF认证202406-02 | 矩阵重塑(其二)
    题目背景矩阵转置操作是将矩阵的行和列交换的过程。在转置过程中,原矩阵 A 的元素 aij​ 会移动到转置后的矩阵 AT 的 aji​ 的位置。这意味着 A 的第 i 行第 j 列的元素在 AT 中成为了第 j 行第 i 列的元素。例如,有矩阵 A 如下:A=[abcdef]它的转置矩......
  • CCF认证202406-01 | 矩阵重塑(其一)
    题目背景矩阵(二维)的重塑(reshape)操作是指改变矩阵的行数和列数,同时保持矩阵中元素的总数不变。题目描述矩阵的重塑操作可以具体定义为以下步骤:设原矩阵为 M,其维度为n×m,即有 n 行和 m 列。新矩阵为 M′,其维度为p×q。重塑操作要满足 n×m=p×q,这保证了元素的总数不......
  • AO3400A-ASEMI中低压N沟道MOS管AO3400A
    编辑:llAO3400A-ASEMI中低压N沟道MOS管AO3400A型号:AO3400A品牌:ASEMI封装:SOT-23批号:最新最大漏源电流:5.8A漏源击穿电压:30VRDS(ON)Max:30mΩ引脚数量:3芯片个数:沟道类型:N沟道MOS管、低压MOS管漏电流:ua特性:N沟道MOS管、场效应管工作温度:-55℃~150℃备受欢迎的AO3400AMOS管......
  • 【解决MongoDB安装难题!】计算机丢失VCRUNTIME140D.dll?一招教你快速修复!
    在安装或运行Java的MongoDB相关程序时,如果遇到“无法启动此程序,因为计算机丢失VCRUNTIME140D.dll”的错误,通常是由于缺少MicrosoftVisualC++Redistributable包。VCRUNTIME140D.dll是VisualC++2015-2019Redistributable的一部分,用于支持C++应用程序的运行时库。以下是解......
  • 代码随想录算法训练营第十三天| 110.平衡二叉树、257. 二叉树的所有路径、404.左叶子
    110.平衡二叉树题目链接:.-力扣(LeetCode)文章链接:代码随想录视频链接:后序遍历求高度,高度判断是否平衡|LeetCode:110.平衡二叉树_哔哩哔哩_bilibili《代码随想录》算法公开课开讲啦!快来打卡!本期视频的文字讲解版在「代码随想录」刷题网站:programmercarl.com,这里刷题顺序,详......