No.6大数据入门 | MR实践:文件分发方式-cacheArchive
上文我们介绍了两种分发文件方式,接下来我们介绍第三种:通过解压压缩包形式进行文件分发:其中white_list是以压缩包形式上传的,并且也是需要先上传至hdfs上,这里我们的压缩包如下:
将其上传至hdfs上:
3)-cacheArchive形式分发
map.py需要重新开发,对目录进行遍历并读取压缩包文件的内容:
#!/usr/bin/python
import os
import sys
import gzip
import time
def get_file_handler(f):
file_in = open(f, 'r')
return file_in
def get_cachefile_handlers(f):
f_handlers_list = []
if os.path.isdir(f):
for fd in os.listdir(f):
f_handlers_list.append(get_file_handler(f + '/' + fd))
return f_handlers_list
def read_local_file_func(f):
word_set = set()
for cachefile in get_cachefile_handlers(f):
for line in cachefile:
word = line.strip()
word_set.add(word)
return word_set
def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)
for line in sys.stdin:
time.sleep(100)
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print "%s\t%s" % (s, 1)
if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
run.sh脚本如下:
HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachearchive_broadcast"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func WH.gz" \
-reducer "python red.py reduer_func" \
-jobconf "mapred.reduce.tasks=2" \
-jobconf "mapred.job.name=cachefile_demo" \
-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \#WH.gz是一个别名、代称,指一个目录w.tar.gz
-file "./map.py" \
-file "./red.py"
运行bash run.sh即可
欢迎关注:【大数据学习笔记】个人公众号,一起交流学习!