首页 > 其他分享 >探索:优雅地实现异步方法的并行化

探索:优雅地实现异步方法的并行化

时间:2023-02-09 11:44:05浏览次数:64  
标签:异步 string 并行 resultList 优雅 people1 accompanyInfo id

接上篇 通过一个示例形象地理解C# async await 非并行异步、并行异步、并行异步的并发量控制

前些天写了两篇关于C# async await异步的博客,
第一篇博客看的人多,点赞评论也多,我想应该都看懂了,比较简单。
第二篇博客看的人少,点赞的也少,没有评论。

我很纳闷,第二篇博客才是重点,如此吊炸天的代码,居然没人评论。
这个代码,就是.NET圈的顶级大佬也没有写过,为什么这么说,这就要说到C# async await的语法糖:
没有语法糖,代码一样写,java8没有语法糖,一样写出很厉害的代码。但有了C# async await语法糖,普通的水平一般的业务程序员,哪怕是菜B,也能写出高吞吐高性能的代码,这就是意义!
所以我说顶级大佬没写过,因为他们水平高,脑力好,手段多,自然不需要这么写。但普通程序员要那样写代码,麻烦不说,BUG频出。
标题我用了"探索"这个词,有没有更好的实践,让小白们都会写的并行异步的实践?

ElasticSearch的性能

代码的实用价值,是查询es。
最近发现es的性能非常好!先给大家看个控制台输出的截图。服务我是部署在服务器上的,真实环境,不是自己电脑。

379次es查询,仅需0.185秒,当然耗时会有波动,零点几秒都是正常的,超过1秒也有可能。
es最怕的是什么,是慢查询,是条件复杂的查询,是范围查询。
我的策略是多次精确查询,这样可以利用es极高的吞吐能力。

并行异步

既然查询次数多,单线程或者说同步肯定是不行的,必须并行。
并行代码,python能写吗?java能写吗?肯定能啊!
但我前同事写的python多次查询es写的就是同步代码,为什么不并行呢?并行肯定可以写,但是能不写就不写,为什么?因为写起来复杂,不好写。你以为自己技术好,脑力好没问题,但别人呢?
重点是什么?不仅要写并行代码,还要写的简单,不破坏代码原有逻辑结构。

异步方法

大家都会写的,用async await就行了,很简单,放个我写的,代码主要是在双循环中多次异步请求(大致看一下先跳过):

/// <summary>
/// xxx查询
/// </summary>
public async Task<List<AccompanyInfo>> Query2(string strStartTime, string strEndTime, int kpCountThreshold, int countThreshold, int distanceThreshold, int timeThreshold, List<PeopleCluster> peopleClusterList)
{
    List<AccompanyInfo> resultList = new List<AccompanyInfo>();
    Stopwatch sw = Stopwatch.StartNew();

    //创建字典
    Dictionary<string, PeopleCluster> clusterIdPeopleDict = new Dictionary<string, PeopleCluster>();
    foreach (PeopleCluster peopleCluster in peopleClusterList)
    {
        foreach (string clusterId in peopleCluster.ClusterIds)
        {
            if (!clusterIdPeopleDict.ContainsKey(clusterId))
            {
                clusterIdPeopleDict.Add(clusterId, peopleCluster);
            }
        }
    }

    int queryCount = 0;
    Dictionary<string, AccompanyInfo> dict = new Dictionary<string, AccompanyInfo>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        List<PeopleFeatureInfo> peopleFeatureList = await ServiceFactory.Get<PeopleFeatureQueryService>().Query(strStartTime, strEndTime, people1);
        queryCount++;
        foreach (PeopleFeatureInfo peopleFeatureInfo1 in peopleFeatureList)
        {
            DateTime capturedTime = DateTime.ParseExact(peopleFeatureInfo1.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
            string strStartTime2 = capturedTime.AddSeconds(-timeThreshold).ToString("yyyyMMddHHmmss");
            string strEndTime2 = capturedTime.AddSeconds(timeThreshold).ToString("yyyyMMddHHmmss");
            List<PeopleFeatureInfo> peopleFeatureList2 = await ServiceFactory.Get<PeopleFeatureQueryService>().QueryExcludeSelf(strStartTime2, strEndTime2, people1);
            queryCount++;
            if (peopleFeatureList2.Count > 0)
            {
                foreach (PeopleFeatureInfo peopleFeatureInfo2 in peopleFeatureList2)
                {
                    string key = null;
                    PeopleCluster people2 = null;
                    string people2ClusterId = null;
                    if (clusterIdPeopleDict.ContainsKey(peopleFeatureInfo2.cluster_id.ToString()))
                    {
                        people2 = clusterIdPeopleDict[peopleFeatureInfo2.cluster_id.ToString()];
                        key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2.ClusterIds)}";
                    }
                    else
                    {
                        people2ClusterId = peopleFeatureInfo2.cluster_id.ToString();
                        key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2ClusterId)}";
                    }

                    double distance = LngLatUtil.CalcDistance(peopleFeatureInfo1.Longitude, peopleFeatureInfo1.Latitude, peopleFeatureInfo2.Longitude, peopleFeatureInfo2.Latitude);
                    if (distance > distanceThreshold) continue;

                    AccompanyInfo accompanyInfo;
                    if (dict.ContainsKey(key))
                    {
                        accompanyInfo = dict[key];
                    }
                    else
                    {
                        accompanyInfo = new AccompanyInfo();
                        dict.Add(key, accompanyInfo);
                    }

                    accompanyInfo.People1 = people1;
                    if (people2 != null)
                    {
                        accompanyInfo.People2 = people2;
                    }
                    else
                    {
                        accompanyInfo.ClusterId2 = people2ClusterId;
                    }

                    AccompanyItem accompanyItem = new AccompanyItem();
                    accompanyItem.Info1 = peopleFeatureInfo1;
                    accompanyItem.Info2 = peopleFeatureInfo2;
                    accompanyInfo.List.Add(accompanyItem);

                    accompanyInfo.Count++;

                    resultList.Add(accompanyInfo);
                }
            }
        }
    }

    resultList = resultList.FindAll(a => (a.People2 != null && a.Count >= kpCountThreshold) || a.Count >= countThreshold);

    //去重
    int beforeDistinctCount = resultList.Count;
    resultList = resultList.DistinctBy(a =>
    {
        string str1 = string.Join(",", a.People1.ClusterIds);
        string str2 = a.People2 != null ? string.Join(",", a.People2.ClusterIds) : string.Empty;
        string str3 = a.ClusterId2 ?? string.Empty;
        StringBuilder sb = new StringBuilder();
        foreach (AccompanyItem item in a.List)
        {
            var info2 = item.Info2;
            sb.Append($"{info2.camera_id},{info2.captured_time},{info2.cluster_id}");
        }
        return $"{str1}_{str2}_{str3}_{sb}";
    }).ToList();

    sw.Stop();
    string msg = $"xxx查询,耗时:{sw.Elapsed.TotalSeconds:0.000} 秒,查询次数:{queryCount},去重:{beforeDistinctCount}-->{resultList.Count}";
    Console.WriteLine(msg);
    LogUtil.Info(msg);

    return resultList;
}

异步方法的并行化

上述代码是没有问题的,但也有问题。就是在双循环中多次请求,虽然用了async await,但不是并行,耗时会很长,如何优化?请看如下代码:

/// <summary>
/// xxx查询
/// </summary>
public async Task<List<AccompanyInfo>> Query(string strStartTime, string strEndTime, int kpCountThreshold, int countThreshold, int distanceThreshold, int timeThreshold, List<PeopleCluster> peopleClusterList)
{
    List<AccompanyInfo> resultList = new List<AccompanyInfo>();
    Stopwatch sw = Stopwatch.StartNew();

    //创建字典
    Dictionary<string, PeopleCluster> clusterIdPeopleDict = new Dictionary<string, PeopleCluster>();
    foreach (PeopleCluster peopleCluster in peopleClusterList)
    {
        foreach (string clusterId in peopleCluster.ClusterIds)
        {
            if (!clusterIdPeopleDict.ContainsKey(clusterId))
            {
                clusterIdPeopleDict.Add(clusterId, peopleCluster);
            }
        }
    }

    //组织第一层循环task
    Dictionary<PeopleCluster, Task<List<PeopleFeatureInfo>>> tasks1 = new Dictionary<PeopleCluster, Task<List<PeopleFeatureInfo>>>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        var task1 = ServiceFactory.Get<PeopleFeatureQueryService>().Query(strStartTime, strEndTime, people1);
        tasks1.Add(people1, task1);
    }

    //计算第一层循环task并缓存结果,组织第二层循环task
    Dictionary<string, Task<List<PeopleFeatureInfo>>> tasks2 = new Dictionary<string, Task<List<PeopleFeatureInfo>>>();
    Dictionary<PeopleCluster, List<PeopleFeatureInfo>> cache1 = new Dictionary<PeopleCluster, List<PeopleFeatureInfo>>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        List<PeopleFeatureInfo> peopleFeatureList = await tasks1[people1];
        cache1.Add(people1, peopleFeatureList);
        foreach (PeopleFeatureInfo peopleFeatureInfo1 in peopleFeatureList)
        {
            DateTime capturedTime = DateTime.ParseExact(peopleFeatureInfo1.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
            string strStartTime2 = capturedTime.AddSeconds(-timeThreshold).ToString("yyyyMMddHHmmss");
            string strEndTime2 = capturedTime.AddSeconds(timeThreshold).ToString("yyyyMMddHHmmss");
            var task2 = ServiceFactory.Get<PeopleFeatureQueryService>().QueryExcludeSelf(strStartTime2, strEndTime2, people1);
            string task2Key = $"{strStartTime2}_{strEndTime2}_{string.Join(",", people1.ClusterIds)}";
            tasks2.TryAdd(task2Key, task2);
        }
    }

    //读取第一层循环task缓存结果,计算第二层循环task
    Dictionary<string, AccompanyInfo> dict = new Dictionary<string, AccompanyInfo>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        List<PeopleFeatureInfo> peopleFeatureList = cache1[people1];
        foreach (PeopleFeatureInfo peopleFeatureInfo1 in peopleFeatureList)
        {
            DateTime capturedTime = DateTime.ParseExact(peopleFeatureInfo1.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
            string strStartTime2 = capturedTime.AddSeconds(-timeThreshold).ToString("yyyyMMddHHmmss");
            string strEndTime2 = capturedTime.AddSeconds(timeThreshold).ToString("yyyyMMddHHmmss");
            string task2Key = $"{strStartTime2}_{strEndTime2}_{string.Join(",", people1.ClusterIds)}";
            List<PeopleFeatureInfo> peopleFeatureList2 = await tasks2[task2Key];
            if (peopleFeatureList2.Count > 0)
            {
                foreach (PeopleFeatureInfo peopleFeatureInfo2 in peopleFeatureList2)
                {
                    string key = null;
                    PeopleCluster people2 = null;
                    string people2ClusterId = null;
                    if (clusterIdPeopleDict.ContainsKey(peopleFeatureInfo2.cluster_id.ToString()))
                    {
                        people2 = clusterIdPeopleDict[peopleFeatureInfo2.cluster_id.ToString()];
                        key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2.ClusterIds)}";
                    }
                    else
                    {
                        people2ClusterId = peopleFeatureInfo2.cluster_id.ToString();
                        key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2ClusterId)}";
                    }

                    double distance = LngLatUtil.CalcDistance(peopleFeatureInfo1.Longitude, peopleFeatureInfo1.Latitude, peopleFeatureInfo2.Longitude, peopleFeatureInfo2.Latitude);
                    if (distance > distanceThreshold) continue;

                    AccompanyInfo accompanyInfo;
                    if (dict.ContainsKey(key))
                    {
                        accompanyInfo = dict[key];
                    }
                    else
                    {
                        accompanyInfo = new AccompanyInfo();
                        dict.Add(key, accompanyInfo);
                    }

                    accompanyInfo.People1 = people1;
                    if (people2 != null)
                    {
                        accompanyInfo.People2 = people2;
                    }
                    else
                    {
                        accompanyInfo.ClusterId2 = people2ClusterId;
                    }

                    AccompanyItem accompanyItem = new AccompanyItem();
                    accompanyItem.Info1 = peopleFeatureInfo1;
                    accompanyItem.Info2 = peopleFeatureInfo2;
                    accompanyInfo.List.Add(accompanyItem);

                    accompanyInfo.Count++;

                    resultList.Add(accompanyInfo);
                }
            }
        }
    }

    resultList = resultList.FindAll(a => (a.People2 != null && a.Count >= kpCountThreshold) || a.Count >= countThreshold);

    //去重
    int beforeDistinctCount = resultList.Count;
    resultList = resultList.DistinctBy(a =>
    {
        string str1 = string.Join(",", a.People1.ClusterIds);
        string str2 = a.People2 != null ? string.Join(",", a.People2.ClusterIds) : string.Empty;
        string str3 = a.ClusterId2 ?? string.Empty;
        StringBuilder sb = new StringBuilder();
        foreach (AccompanyItem item in a.List)
        {
            var info2 = item.Info2;
            sb.Append($"{info2.camera_id},{info2.captured_time},{info2.cluster_id}");
        }
        return $"{str1}_{str2}_{str3}_{sb}";
    }).ToList();

    //抓拍记录排序
    foreach (AccompanyInfo item in resultList)
    {
        item.List.Sort((a, b) => -string.Compare(a.Info1.captured_time, b.Info1.captured_time));
    }

    sw.Stop();
    string msg = $"xxx查询,耗时:{sw.Elapsed.TotalSeconds:0.000} 秒,查询次数:{tasks1.Count + tasks2.Count},去重:{beforeDistinctCount}-->{resultList.Count}";
    Console.WriteLine(msg);
    LogUtil.Info(msg);

    return resultList;
}

上述代码说明

  1. 为了使异步并行化,双循环要写三遍。第一遍只需写第一层循环,省了第二层。第二遍没有数据处理的那层子循环。第三遍是最全的。
  2. 和普通的异步相比,第一、二遍双循环是多出来的,第三遍双循环代码结构和普通的异步代码结构是一样的。
  3. 写的时候,可以先写好普通的异步方法,然后再改造成并行化的异步方法。

你为什么说.NET圈的顶级大佬没有写过?

  1. 不吹个牛,博客没人看,没人点赞啊?!
  2. 我倒是希望有大佬写个更好的实践,把我这种写法淘汰掉,因为这是我能想到的最容易控制的写法了。
    并行代码,很多人都会写,java、python也能写,但问题是,比较菜的普通业务程序员,如何无脑写这种并行代码?
    最差的写法,例如java的CompletableFuture,结合业务代码,写法就很复杂了。真的没法无脑写。
    其次的写法,例如:
List<PeopleFeatureInfo>[] listArray = await Task.WhenAll(tasks2.Values);

在双循环体中,怎么拿结果?肯定能写,但又要思考怎么写了不是?
而我的写法,在双循环体中是可以直接拿结果的:

List<PeopleFeatureInfo> list = await tasks2[task2Key];
  1. 只放C#代码没有说服力,我一个同事python写的很6,他写的挖掘代码很多都是并行,放一段代码:
def get_es_multiprocess(index_list, people_list, core_percent, rev_clusterid_idcard_dict):
    '''
    多进程读取es数据,转为整个数据帧,按时间排序
    :return: 规模较大的数据帧
    '''
    col_list = ["cluster_id", "camera_id", "captured_time"]
    pool = Pool(processes=int(mp.cpu_count() * core_percent))
    input_list = [(i, people_list, col_list) for i in index_list]
    res = pool.map(get_es, input_list)
    if not res:
        return None
    pool.close()
    pool.join()
    df_all = pd.DataFrame(columns=col_list+['longitude', 'latitude'])
    for df in res:
        df_all = pd.concat([df_all, df])
    # 这里强制转换为字符串!
    df_all['cluster_id_'] = df_all['cluster_id'].apply(lambda x: rev_clusterid_idcard_dict[str(x)])
    del df_all['cluster_id']
    df_all.rename(columns={'cluster_id_': 'cluster_id'}, inplace=True)
    df_all.sort_values(by='captured_time', inplace=True)
    print('=' * 100)
    print('整个数据(聚类前):')
    print(df_all.info())
    cluster_id_list = [(i, df) for i, df in df_all.groupby(['cluster_id'])]
    cluster_id_list_split = [j for j in func(cluster_id_list, 1000000)]
    # todo 缩小数据集,用于调试!
    data_all = df_all.iloc[:, :]
    return data_all, cluster_id_list_split

上述python代码解析

  1. 核心代码:
res = pool.map(get_es, input_list)
pool.join()
...省略

其中get_es是查询es的方法,他写的应该不是异步方法,不过这个不是重点
2. res是查询结果,通过并行的方式一把查出来,放到res中,然后把结果再解出来
3. 注意,这只是单循环,想想双层循环怎么写
4. pool.join()是阻塞线程的
5. 同事注释中写的是"多进程",是写错了吗?实际是多线程?还是就是多进程?
6. 当然,python是有async await异步写法的,应该不比C#差,只是同事没有用,可能是因为他用的python版本不够新

未完,待补充

XXX

  1. 我们开发的低代码平台很牛B,C#:我就是低代码!
  2. 我们开发的平台很牛B,支持写脚本、自定义脚本,C#:我就是脚本!
  3. 我们用spark、flink分布式,性能牛B,C#:并行异步性能吊炸天,内存给大些,单机就可以。C#:我的并行异步的性能,能把es干挂,只要不是计算密集型,只要内存够,不用spark、flink。

标签:异步,string,并行,resultList,优雅,people1,accompanyInfo,id
From: https://www.cnblogs.com/s0611163/p/17098841.html

相关文章

  • 一篇文章带了解如何用SpringBoot在RequestBody中优雅的使用枚举参数
    目录确认需求定义枚举和对象实现转换逻辑方案一:精准攻击方案二:全范围攻击测试总结 确认需求需求与前文类似,只不过这里需要是在RequestBody中使用。与前文......
  • 【浪漫法式】淡然而优雅的复古
    优雅、热情,法式风格总是能直击人内心深处对浪漫精致生活的渴求。法式装修中,空间立体感是很重要的,讲究的是线条流畅,层次分明,其次就是对称美。::客厅 LivingRoom强调轴线的......
  • tcp长连接服务几种优雅升级的方案
    1、将长连接服务拆分成:长连接接入服务+ 后端的业务服务保证长连接服务不处理业务,这样基本不需要重启长连接服务,对客户端无感2、客户端的转移结合客户端的转移,在升级......
  • 如何优雅的接入邮件、短信及消息推送
    在日常开发中,我们经常会需要发送邮件、短信、APP消息及任务(报警)通知等内容,按照现有开发规则,每个业务平台在需要发送此类消息时都需要重新对接一次相关平台,不仅会造成业务......
  • 73、商城业务---商品详情---异步化编排优化
    我们的业务逻辑如下:1和2的先后次序无关,但是1必须先于3、4、5执行1、编写线程池的配置@ConfigurationpublicclassMyThreadConfig{@BeanpublicThrea......
  • Redis 异步客户端选型及落地实践
    作者:京东科技王晨Redis异步客户端选型及落地实践可视化服务编排系统是能够通过线上可视化拖拽、配置的方式完成对接口的编排,可在线完成服务的调试、测试,实现业务需求的交付......
  • 并发、并行、串行
    并发并,顾名思义,就是首先是多条“路”共同执行。并发的意思就是一个CPU在某一个时间执行多个任务。例如写代码,既要查数据库,又要码代码,一直在查数据库和码代码中切换......
  • Redis 异步客户端选型及落地实践
    作者:京东科技王晨Redis异步客户端选型及落地实践可视化服务编排系统是能够通过线上可视化拖拽、配置的方式完成对接口的编排,可在线完成服务的调试、测试,实现业务需求的......
  • 大哥,这是并发不是并行,Are You Ok?
    本文内容整理自**博学谷狂野架构师**多线程概述基础概念进程和线程进程是程序运行资源分配的最小单位​ 进程是操作系统进行资源分配的最小单位,其中资源包括......
  • 异步函数(async)的委托类型
    例如,某个委托类型如下:publicdelegateTaskRequestDelegate(HttpContextcontext);根据上面的签名,定义的两个 “示例” 函数如下:【传统的】publicTaskMyTestA(H......