MapReduce从入门到精通之word_count实践操作
最基础的Work_count:
Map.py
import sys
import time
import re
p = re.compile(r'\w+')
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
if len(p.findall(s)) < 1:
continue
word = p.findall(s)[0].lower()
#word = s.strip()
if word.strip() != "":
print "%s\t%s" % (word, 1)
Red.py
#!/usr/bin/python
import sys
current_word = None
sum = 0
for line in sys.stdin:
word, val = line.strip().split('\t')
if current_word == None:
current_word = word
if current_word != word:
print "%s\t%s" % (current_word, sum)
current_word = word
sum = 0
sum += int(val)
print "%s\t%s" % (current_word, str(sum))
执行:
cat The_Man_of_Property.txt |python map.py |sort -k1|python red.py
白名单word_count:
Map.py:
import sys
import time
def read_local_file_func(f):
word_set = set()
file_in = open(f, 'r')
for line in file_in:
word = line.strip()
word_set.add(word)
return word_set
def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
#print s + "\t" + "1"
#print '\t'.join([s, "1"])
print "%s\t%s" % (s, 1)
if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
Red.py:和上一样
执行:
cat The_Man_of_Property.txt |python map2.py mapper_func white_list |sort -k1 |python red.py
两个文件,key有相同的,value不相同,最终将两个文件合并(相同的key,把它的价格,和商品名字拿出来),如何解决?
执行
Map_a.py
#!/usr/bin/env python # -*- coding: utf-8 -*- # coding=UTF-8 import sys for line in sys.stdin: ss = line.strip().split(' ') key = ss[0] value = ss[1] print "%s\t1\t%s" % (key, value)
map_b.py:
#!/usr/bin/env python # -*- coding: utf-8 -*- # coding=UTF-8 import sys for line in sys.stdin: ss = line.strip().split(' ') key = ss[0] value = ss[1] print "%s\t2\t%s" % (key, value)
执行 cat a_join.txt |python map_a.py >a1
执行 cat b_join.txt |python map_b.py >b2
编写map_join.py
#!/usr/bin/env python # -*- coding: utf-8 -*- # coding=UTF-8 import sys val_1 = '' for line in sys.stdin: key, flag, val = line.strip().split('\t') if flag == '1': val_1 = val elif flag == '2' and val_1 != '': val_2 = val print"%s\t%s\t%s" %(key, val_1, val_2) val_1 = ''
执行cat a1 b2 |sort -k1 |python map_join.py 完成map操作
hadoop上运行的sh脚本:
set -e -x
HADOOP_CMD="/usr/local/src/hadoop-2.7.3/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar"
INPUT_FILE_PATH_A="/data/a_join.txt"
INPUT_FILE_PATH_B="/data/b_join.txt"
OUTPUT_A_PATH="/output/a"
OUTPUT_B_PATH="/output/b"
OUTPUT_JOIN_PATH="/output/join"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_A_PATH $OUTPUT_B_PATH $OUTPUT_JOIN_PATH
#$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_JOIN_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_A \
-output $OUTPUT_A_PATH \
-mapper "python map_a.py" \
-file ./map_a.py \
# Step 2.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_B \
-output $OUTPUT_B_PATH \
-mapper "python map_b.py" \
-file ./map_b.py \
# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $OUTPUT_A_PATH,$OUTPUT_B_PATH \
-output $OUTPUT_JOIN_PATH \
-mapper "cat" \
-reducer "python red_join.py" \
-file ./red_join.py \
-jobconf stream.num.map.output.key.fields=2 \
-jobconf num.key.fields.for.partition=1
运行结果: