首页 > 其他分享 >函数计算与对象存储实现MapReduce

函数计算与对象存储实现MapReduce

时间:2023-01-26 10:00:37浏览次数:44  
标签:src 存储 file MapReduce key path 函数

Serverless架构可以在很多领域发挥极具价值的作用,包括监控告警、人工智能、图像处理、音视频处理等。同样,在大数据领域,Serverless架构仍然可以有良好的表现。以WordCount为例,可以依靠Serverless架构实现一个Serverless版本的MapReduce。

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。Map(映射)和Reduce(归约),是其主要思想,它们都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。Mapreduce极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一对共享相同的键组。

通过这段描述,可以明确MapReduce是面向大数据并行处理的计算模型、框架和平台。在传统学习中,通常会在Hadoop等分布式框架下进行MapReduce相关工作。随着云计算的逐渐发展,各个云厂商也都先后推出了在线的MapReduce业务。

通过MapReduce模型实现一个简单的WordCount算法。区别于传统使用Hadoop等大数据框架,使用对象存储与函数计算的结合体,即搭建在Serverless架构上的MapReduce模型。

1 理论基础

根据MapReduce模型,基于Serverless架构,将存储部分替换为对象存储,将计算部分替换成函数计算,绘制Serverless架构版本的MapReducde流程简图,如下所示。

 

基于Serverless架构的MapReduce模型流程简图

在流程图中可以看到,需要2个函数,分别作为Mapper和Reducer,以及3个对象存储的存储桶,分别作为输入的存储桶、中间临时缓存的存储桶以及结果的存储桶。以阿里云函数计算为例,在项目开始之前,先准备3个对象存储。

  • 对象存储1:serverless-book-mr-origin。
  • 对象存储2:serverless-book-mr-middle。
  • 对象存储3:serverless-book-mr-target。

为了让整个Mapper和Reducer的逻辑更加清晰,先对传统的WordCount结构进行改造,使其可以和Serverless架构下的FaaS平台更好地适配。Mapper和Reducer的工作原理可以简化为如下所示结构。

 

基于Serverless架构的MapReduce模型工作原理简图

2 功能实现

编写Mapper相关逻辑。通过存储桶1(即输入的存储桶)触发Mapper函数,然后通过Mapper函数完成如下步骤:

  • 通过事件信息,确定对象存储中的对象内容,并将文件缓存到函数实例中;
  • 读取被缓存文件;
  • 对文件内容进行切割;
  • 将结果生成<Key, Value>形式(完成映射关系),并将结果存储到存储桶2(即中间临时缓存的存储桶)。

Mapper的实现逻辑基本上与传统MapReduce的逻辑类似,只是读取数据以及存储数据的过程会变成通过对象存储进行下载对象和上传对象的过程:

# -*- coding: utf8 -*-
import datetime
import oss2
import re
import os
import sys
import json
import logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger()
logger.setLevel(level=logging.INFO)
auth = oss2.Auth('<AccessKeyID>', '<AccessKeySecret>')
source_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-origin')
middle_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-middle')
def delete_file_folder(src):    
if os.path.isfile(src):        
try:            
os.remove(src)        
except:            
pass    
elif os.path.isdir(src):       
for item in os.listdir(src):            
itemsrc = os.path.join(src, item)            
delete_file_folder(itemsrc)        
try:            
os.rmdir(src)        
except:            
pass
def download_file(key, download_path):    
logger.info("Download file [%s]" % (key))    
try:        
source_bucket.get_object_to_file(key, download_path)    
except Exception as e:        
print(e)        
return -1    
return 0
def upload_file(key, local_file_path):    
logger.info("Start to upload file to oss")    
try:        
middle_bucket.put_object_from_file(key, local_file_path)   
except Exception as e:        
print(e)        
return -1    
logger.info("Upload data map file [%s] Success" % key)    
return 0
def do_mapping(key, middle_file_key):    
src_file_path = u'/tmp/' + key.split('/')[-1]    
middle_file_path = u'/tmp/' + u'mapped_' + key.split('/')[-1]    
download_ret = download_file(key, src_file_path)  
# download src file    
if download_ret == 0:        
inputfile = open(src_file_path, 'r')   
# open local /tmp file        
mapfile = open(middle_file_path, 'w')  
# open a new file write stream        
for line in inputfile:            
line = re.sub('[^a-zA-Z0-9]', ' ', line)  
# replace non-alphabetic/                                                      
# number characters            
words = line.split()            
for word in words:                
mapfile.write('%s\t%s' % (word, 1))   
# count for 1                
mapfile.write('\n')        
inputfile.close()        
mapfile.close()        
upload_ret = upload_file(middle_file_key, middle_file_path)            
# upload the file's each word        
delete_file_folder(src_file_path)        
delete_file_folder(middle_file_path)        
return upload_ret    
else:        
return -1
def map_caller(event):    
key = event["events"][0]["oss"]["object"]["key"]   
logger.info("Key is " + key)    
middle_file_key = 'middle_' + key.split('/')[-1]    
return do_mapping(key, middle_file_key)
def handler(event, context):    
logger.info("start main handler")    
start_time = datetime.datetime.now()    
res = map_caller(json.loads(event.decode("utf-8")))    
end_time = datetime.datetime.now()    
print("data mapping duration: " + str((end_time - start_time).microseconds / 1000) + "ms")    
if res == 0:        
return "Data mapping SUCCESS"    
else:        
return "Data mapping FAILED"

同样的方法,建立reducer.py文件,编写Reducer逻辑。

在传统架构下的MapReduce模型中,在Map处理完之后,Reduce节点会将各个Map节点上属于自己的数据复制到内存缓冲区中,将数据合并成一个大的数据集,并且按照Key值进行聚合,再把聚合后的Value值作为Iterable(迭代器)交给用户使用。这些数据经过用户自定义的Reduce函数进行处理之后,同样会以链值对的形式输出,默认输出到HDFS上的文件。

在Serverless架构下,Reducer的逻辑与传统架构下MapReduce模型中Reducer的逻辑基本类似,首先每个Reduce节点对应的就是Reducer函数的每个实例,而Reducer函数的主要工作是通过存储桶2(即中间临时缓存的存储桶)的触发下载Mapper函数处理完成的数据(类似于传统架构下Reduce节点会将各个Map节点上属于自己的数据复制到内存缓冲区中的过程),然后继续实现传统架构下Reduce的聚合逻辑和排序逻辑,最终将结果存储到存储桶3(即结果的存储桶)中。该部分函数的逻辑代码如下:

# -*- coding: utf8 -*- import oss2 from operator import itemgetter import os import sys import json import datetime import logging logging.basicConfig(level=logging.INFO, stream=sys.stdout) logger = logging.getLogger() logger.setLevel(level=logging.INFO) auth = oss2.Auth('<AccessKeyID>', '<AccessKeySecret>') middle_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-middle') target_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-target') def delete_file_folder(src):    if os.path.isfile(src):        try:            os.remove(src)        except:            pass    elif os.path.isdir(src):        for item in os.listdir(src):            itemsrc = os.path.join(src, item)            delete_file_folder(itemsrc)        try:            os.rmdir(src)        except:            pass def download_file(key, download_path):    logger.info("Download file [%s]" % (key))    try:        middle_bucket.get_object_to_file(key, download_path)    except Exception as e:        print(e)        return -1    return 0 def upload_file(key, local_file_path):    logger.info("Start to upload file to oss")    try:        target_bucket.put_object_from_file(key, local_file_path)    except Exception as e:        print(e)        return -1    logger.info("Upload data map file [%s] Success" % key)    return 0 def alifc_reducer(key, result_key):    word2count = {}    src_file_path = u'/tmp/' + key.split('/')[-1]    result_file_path = u'/tmp/' + u'result_' + key.split('/')[-1]    download_ret = download_file(key, src_file_path)    if download_ret == 0:        map_file = open(src_file_path, 'r')        result_file = open(result_file_path, 'w')        for line in map_file:            line = line.strip()            word, count = line.split('\t', 1)            try:                count = int(count)                word2count[word] = word2count.get(word, 0) + count            except ValueError:                logger.error("error value: %s, current line: %s" % (ValueError, line))                continue        map_file.close()        delete_file_folder(src_file_path)        sorted_word2count = sorted(word2count.items(), key=itemgetter(1))[::-1]        for wordcount in sorted_word2count:            res = '%s\t%s' % (wordcount[0], wordcount[1])            result_file.write(res)            result_file.write('\n')        result_file.close()        upload_ret = upload_file(result_key, result_file_path)        delete_file_folder(result_file_path)        return upload_ret    else:        return -1 def reduce_caller(event):    key = event["events"][0]["oss"]["object"]["key"]    logger.info("Key is " + key)    result_key = 'result_' + key.split('/')[-1]    return alifc_reducer(key, result_key) def handler(event, context):    logger.info("start main handler")    start_time = datetime.datetime.now()    res = reduce_caller(json.loads(event.decode("utf-8")))    end_time = datetime.datetime.now()    print("data reducing duration: " + str((end_time - start_time).microseconds / 1000) + "ms")    if res == 0:        return "Data reducing SUCCESS"    else:        return "Data reducing FAILED"

当完成Mapper函数和Reducer函数的核心逻辑后,可以在函数控制台创建对应的函数,如图8-37所示。

 

阿里云函数计算函数列表

创建完函数后,还需要创建三个存储桶,分别用来存储源文件、中间临时文件以及最终结果文件,如下所示。

 

阿里云对象存储桶列表

完成存储桶和函数计算的建设之后,还需要针对存储桶和函数计算进行关系对应:

  • Mapper函数配置对象存储触发器,关联存储桶serverless-book-mr-origin;
  • Reducer函数配置对象存储触发器,关联存储桶serverless-book-mr-middle。

3 测试体验

当完成业务逻辑的开发以及项目部署后,可以进行基于Serverless架构的MapReduce模型的测试工作。此时,准备一个英文短文,并将该短文作为源数据输入,目的是通过MapReduce模型实现该短文的WordCount,如下所示。

 

待进行WordCount文本示例

然后,将短文上传到存储桶serverless-book-mr-origin,如下所示。

 

测试文本上传至存储桶

上传完成后,Mapper函数会被触发,当Mapper函数执行完成之后,可以看到存储桶serverless-book-mr-middle中生成了临时的缓存文件,如下所示。

 

WordCount案例Mapper.py函数执行结果

当缓存文件被投递到存储桶serverless-book-mr-middle中时,说明Reducer函数在对象存储触发器的触发下,已经开始了异步工作的流程。稍等片刻,在Reducer函数执行完成后,可以在存储桶serverless-book-mr-target中看到最终生成的文档,即通过Serverless架构实现的MapReduce模型最终生成的WordCount文件。

如下所示,在这个文件中,已经顺利地统计出了原始短文中的词频。至此,就完成了基于Serverless架构的MapReduce模型的建设,并完成了词频统计功能。

 

WordCount案例Reducer.py函数执行结果

本实例将多个函数部署在同一个服务下,通过3个存储桶和2个函数联动,完成一个MapReduce功能。在实际生产中,每个项目都不会是单独使用某个函数,而是组合应用多个函数,形成一个Service体系。

 

标签:src,存储,file,MapReduce,key,path,函数
From: https://www.cnblogs.com/muzinan110/p/17067578.html

相关文章

  • 读Java8函数式编程笔记01_Lambda表达式
    1. Java8函数式编程1.1. 没有单子1.2. 没有语言层面的惰性求值1.3. 没有为不可变性提供额外支持1.4. 集合类可以拥有一些额外的方法:default方法2. 现实世界中......
  • JavaScript:判断数组对象值是否相同的函数声明
    varobj1={name:"w",};varobj2={name:"w",};functionisObjectValueEqual(a,b){//判断两个对......
  • C++成员初始化列表比在构造函数内部赋值效率更高
    A是个类,B中包含A类的对象在执行构造函数的时候,如果内部有类对象,使用列表初始化效率会更高B中的a和b都是A的对象a是用的列表初始化b是在构造函数内部初始化a只会执行一......
  • 一元函数积分概念、计算及应用
    一、一元函数积分的概念和性质一元函数积分学包括不定积分与定积分两部分。定积分在几何、物理、工程技术、经济等领域均有广泛的应用,是一元积分的核心。不定积分实质是......
  • 【Python基础学习】5.函数和代码复用
    主要参考来源:慕课嵩天老师的“Python语言程序设计”[https://www.icourse163.org/course/BIT-268001?tid=1468130447]5.1函数的定义与使用函数的理解与定义函数是一段具......
  • power函数
    #define_CRT_SECURE_NO_WARNINGS#include<stdio.h>#include<iostream>doublepower(doublex,doubley){doublez=1;while(y){z*=x;--y;}returnz;}voidma......
  • ThinkPHP函数详解:cache方法
    cache方法是3.0版本开始新增的缓存管理方法。注意:3.1.2版本后因cache方法并入原S方法,所以cache方法不再建议使用,用S方法即可。cache用于缓存设置、获取、删除操作用......
  • 在CListView中添加点击右键消息响应函数!
    //简单几步就可以了,整个过程描述和截图如下所下:用ClassWizard中的MessageMaps功能添加对NM_RCLICK消息的响应函数://添加成功后,直接编辑函数代码即可voidCRightMenuView:......
  • 函数的封装之获取进程句柄...
    现在写程序,经常要获取进程的句柄,为此我将它封装成了一个函数,呵呵,以后直接Copy就可以用了...HANDLEGetProHandle::getProcessHandle(CStringcaption){//用于保存指定窗口......
  • 03初识MapReduce
    初识MapReduce一、什么是MapReduceMapReduce是一种编程范式,它借助Map将一个大任务分解成多个小任务,再借助Reduce归并Map的结果。MapReduce虽然原理很简单,但是使用MapRedu......