What are the options for using Python in Hadoop?
Python developers are looking to transition their Python Skills in the Hadoop Ecosystem. In a recent episode of Big Data Big Questions I answered question about using Python on Hadoop. Let’s take a deeper look at how to use Python in the Hadoop Ecosystem by building a Hadoop Python Example.
High Demand for Python Skills
Python consistently ranks in the top 5 programming languages. The market is certainly red hot for Python developers; a quick search on Indeed results in 45K open roles in the US. Python is used heavily in the Data Science world, so crossing over Python for Data Engineers is a natural move. Hadoop Ecosystem tools are quick to add support for Python with the Data Science talent pool available to take advantage of Big Data. Python is also a easy language to pick up and allows for new Data Engineers to write their first Map Reduce or Spark Job faster than learning Java.

Why Not Just Use Java?
Java is still topping the list for popularity so why not just use Java? The first reason is it’s fairly complicated to setup and run your Java development environment. On the flip side with Python it’s as easy as installing Python and running the interactive shell. The second reason is once you have your code you have to compile it then run it versus Python write your script then run from the command line. The last reason is the amount of code you have to write in Java is a lot more than Python or other languages. Checkout how the amount of code to run a word count for Java versus Python.
Java Word Count
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer
public void reduce(Text key, Iterator
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName(“wordcount”);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
packageorg.myorg; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; publicclassWordCount{ publicstaticclassMap extendsMapReduceBase implementsMapper<LongWritable,Text,Text,IntWritable>{ privatefinalstaticIntWritable one=newIntWritable(1); privateText word=newText(); publicvoidmap(LongWritable key,Text value,OutputCollector<Text,IntWritable>output,Reporter reporter)throwsIOException{ Stringline=value.toString(); StringTokenizer tokenizer=newStringTokenizer(line); while(tokenizer.hasMoreTokens()){ word.set(tokenizer.nextToken()); output.collect(word,one); } } } publicstaticclassReduce extendsMapReduceBase implementsReducer<Text,IntWritable,Text,IntWritable>{ publicvoidreduce(Text key,Iterator<IntWritable>values,OutputCollector<Text,IntWritable>output,Reporter reporter)throwsIOException{ intsum=0; while(values.hasNext()){ sum+=values.next().get(); } output.collect(key,newIntWritable(sum)); } } publicstaticvoidmain(String[]args)throwsException{ JobConf conf=newJobConf(WordCount.class); conf.setJobName(“wordcount”); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf,newPath(args[0])); FileOutputFormat.setOutputPath(conf,newPath(args[1])); JobClient.runJob(conf); } } |
Python Word Count
WORD_RE = re.compile(r”[w’]+”)
class MRWordFreqCount(MRJob):
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def combiner(self, word, counts):
yield (word, sum(counts))
def reducer(self, word, counts):
yield (word, sum(counts))
if __name__ == ‘__main__’:
MRWordFreqCount.run()
#Source
#https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_word_freq_count.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import re WORD_RE=re.compile(r“[w’]+”) classMRWordFreqCount(MRJob): def mapper(self,_,line): forwordinWORD_RE.findall(line): yield(word.lower(),1) def combiner(self,word,counts): yield(word,sum(counts)) def reducer(self,word,counts): yield(word,sum(counts)) if__name__==‘__main__’: MRWordFreqCount.run() #Source #https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_word_freq_count.py |
Roughly 57 lines of code versus 15. Which code base would you rather maintain?
How Does Hadoop Support Python?
If Java is more complex than Python and Spark supports Python, does Hadoop have a Python option? Yes! Python developers have 4 options for using Python on Hadoop. Let’s dive into each of those options.
Streaming
Hadoop streaming is one of the most popular ways to write Python on Hadoop. Streaming is built into Hadoop Distribution and offers the ability to pass script in the stdin. If you are using Hadoop then you already have Streaming built into the package. Using Streaming a developer can pass in Python script or other languages for Mapper jobs. The functionality works similar to how User Defined Functions work in Pig Latin. Developers use Java as a wrapper to pass in their Python (other languages) scripts, then at run time the job is executed in Java.
Hadoopy
Hadoopy is an extension of Hadoop Streaming and uses Cython for Python Map Reduce jobs. I’m a huge fan of Hadoopy’s documentation, extremely developer friendly. However, be aware if you are looking to use the Hadoopy package it might not be compatible with latest Hadoop versions. The project hasn’t been update since the end of 2012.
Pydoop
Pydoop allows for developers to write Python scripts natively then interface directly with data in Hadoop cluster. It’s possible by using Pydoop’s HDFS API for reading and writing data into HDFS from Python. Basically developer writes taking advantage of the full Python library then wraps the script with Mappers and Reducers. Pydoop can be installed using pip pydoop.
MRJob
MRJob is a Python package that helps write and read Hadoop Streaming jobs. Heavily used and incubated out of Yelp, MRJob supports EMR, Native Hadoop, and Google’s Cloud Dataproc. The setup is similar to Pydoop using pip to install but the project is still very active. The documentation is up to date as well supporting the different implementations of MRJob. The project continues to be supported. For the Python Hadoop example below I will use MRJob.
Running Python on Hadoop with MRJob
The example used below is a word count application provided by the examples located on Yelp’s Github for MRJob. For data you can use any text file to parse over and count the number of words. Also you will need have a running a Hadoop cluster and Python is installed with Pip.
Step 1 – Install MRJob with Pip
- Install the MRJob library using Pip.
pip install mrjob |
Step 2 – Word Count Script
- Open your favorite text editor (VIM) and use the following script and save following script as word-count.py.
WORD_RE = re.compile(r”[w’]+”)
class MRWordFreqCount(MRJob):
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def combiner(self, word, counts):
yield (word, sum(counts))
def reducer(self, word, counts):
yield (word, sum(counts))
if __name__ == ‘__main__’:
MRWordFreqCount.run()
#Source
#https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_word_freq_count.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
from mrjob.job import MRJob import re WORD_RE=re.compile(r“[w’]+”) classMRWordFreqCount(MRJob): def mapper(self,_,line): forwordinWORD_RE.findall(line): yield(word.lower(),1) def combiner(self,word,counts): yield(word,sum(counts)) def reducer(self,word,counts): yield(word,sum(counts)) if__name__==‘__main__’: MRWordFreqCount.run() #Source #https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_word_freq_count.py |
Step 3 – Install MRJob with Pip
- Now that we have our MRJob word-count script we can turn it loose on our files in Hadoop. To run the script prefix with Python then script name (word-count.py), next the file location (hdfs:url/user/file.txt) and finally -r hadoop.
$python word_count.py[file location]–rhadoop>counts |
More Python Hadoop Examples
Congratulations you just ran your first Hadoop Python job using the Python Package MRJob! Now if you want to go further and start experimenting with other examples, be sure to checkout more Python examples by downloading the MRJobs Examples. Also make sure to subscribe to my news letter focused on Building Stronger Data Engineers.