HDPCD-Java-复习笔记(16)
PIG
Pig uses a high-level, SQL-like programming language namedPig Latin.
Pig
was created at Yahoo to make it easier to analyze the data in HDFS without the complexities of writing a traditional MapReduce program. The developers of Pig published their philosophy to summarize the goals of Pig, using comparisons
to actual pigs:
Pigs eat anything |
Pig can process any data, structured or unstructured |
Pigs live anywhere |
Pig can run on any parallel data processing framework, so Pig scripts do not have to run just on Hadoop |
Pigs are domestic animals |
Pig is designed to be easily controlled and modified by its users |
Pigs fly |
Pig is designed to process data quickly |
Pig Latin scripts can be executed one of three ways:
Pig script |
Write a Pig Latin program in a text file and execute it using the pig executable |
Grunt shell |
Enter Pig statements manually one-at-a-time from a CLI tool known as the Grunt interactive shell |
Embedded in Java |
Use the PigServer class to execute a Pig query from within Java code |
Pig executes in a unique fashion. Some commands build on previous commands, while certain commands trigger a MapReduce job:
•During execution,each statement is processed by the Pig interpreter
• If a statement is valid, it getsadded to a logical plan built by the interpreter
• The steps in the logical plan do not actually execute until aDUMP or STORE command
The Grunt Shell
Grunt is an interactive shell that enables users to enter Pig Latin statements and also interact with HDFS.
To enter the Grunt shell, run the pig executable in the PIG_HOME\bin folder.
Pig Data Types
Pig has the following built-in scalar datatypes:
int |
A 32-bit signed integer |
long |
A 64-bit signed integer |
float |
32-bit floating-point number |
double |
64-bit floating-point number |
chararray |
Strings of Unicode characters, represented as java.lang.String objects |
bytearray |
A blob or array of bytes |
boolean |
Can be either true or false (case-sensitive) |
datetime |
Stores a date and time in the format 1970-01-01T00:00:00.000+00:00 |
biginteger and bigdecimal |
Map to Java’s BigInteger and BigDecimal classes, respectively, and are useful when performing precision arithmetic |
The FOREACH GENERATE Operator
The following example command takes in the salaries relation and generates a new relation that only contains two of the columns in salaries:
•> A = FOREACH salaries GENERATE age, salary;
• > DESCRIBE A;
• A: {age: int,salary: double}
More Pig Examples
The ORDER BY command enables sorting the data in a relation:
•employees = LOAD 'pig/input/File1' USING PigStorage(',')
• AS (name:chararray,age:int, zip:int,salary:double);
• sorted = ORDER employees BY salary;
The LIMIT command limits the number of output tuplesfor a relation:
•employees = LOAD 'pig/input/File1' USING PigStorage(',');
•AS (name:chararray,age:int, zip:int,salary:double);
•agegroup = GROUP employees BY age;
•h = LIMIT agegroup100;
The JOIN command performs a join of two relations:
- e1 = LOAD 'pig/input/File1' USING PigStorage(',')
- AS (name:chararray, age:int, zip:int, salary:double);
- e2 = LOAD 'pig/input/File2' USING PigStorage(',')
- AS (name:chararray, phone:chararray);
- e3 = JOIN e1 BY name, e2 BY name;
- DESCRIBE e3;
- DUMP e3;
• e3: {e1::name:chararray, e1::age:int,
• e1::zip:int,e1::salary:double,
• e2::name:chararray,e2::phone:chararray}
• (Joe,21,94085,50000.0,Joe,4085559898)
• (Joe,21,94085,50000.0,Joe,4085557777)
• (Tom,21,94085,5000.0,Tom,4085551211)
• (Tom,21,94085,5000.0,Tom,6505550123)
• (John,45,95014,25000.0,John,4085554332)
· Write a Java class that extends EvalFunc. |
· Deploy the class in a JAR file. |
· Register the JAR file in the Pig script using the REGISTER command. |
· Optionally define an alias for the UDF using the DEFINE command. |
A UDF Example
The following UDF adds a comma between two input strings:
- package com.hortonworks.udfs;
- public class CONCAT_COMMA extends EvalFunc<String> {
- @Override
- public String exec(Tuple input) throws IOException {
- String first = input.get(0).toString().trim();
- String second = input.get(1).toString().trim();
- return first + ", " + second;
- }
- }
The exec method is called when the UDF is invoked fromthe Pig script.
The input parameter is a Tuple instance - which allowsfor an arbitrary number of arguments.Use the get method of Tuple to
retrieve the arguments passed in.
Invoking a UDF
Before a UDF can be invoked, the function needs to be registered by the Pig script.Use the REGISTER command to
register a JAR:
•
register my.jar;
Once the JAR is registered, call the UDFusing its fully-qualified class name:
- x = FOREACH logevents
- GENERATE com.hortonworks.udfs.CONCAT_COMMA(level, code);
•DEFINE CONCAT_COMMA com.hortonworks.udfs.CONCAT_COMMA();
Now invoke the UDF using the alias:
• x = FOREACH logevents GENERATE CONCAT_COMMA(level, code);
Another UDF Example
There aretwo types ofPig User-Defined Functions:
· Evaluation functions |
· Filter functions |
Evaluation functions have a generic that specifies the return type of the function. Filter functions do not use a generic because they can only return a boolean.
Filter functions are used for filtering relations.
Filter Functions
A custom filter function is written by:
· Extending the FilterFunc class |
· Overriding the exec function and returning a boolean |
To demonstrate, the following MinimumMoneyFlow class filters stocks whose daily
money flow is greater than a specified minimum.
- public class MinimumMoneyFlow extends FilterFunc {
- private long minFlow;
- public MinimumMoneyFlow(String minFlow) {
- this.minFlow = Long.parseLong(minFlow);
- }
- @Override
- public Boolean exec(Tuple input) throws IOException {
- Object value1 = input.get(0);
- Object value2 = input.get(1);
- Object value3 = input.get(2);
- Object value4 = input.get(3);
- if(value1 == null || value2 == null ||
- value3 == null || value4 == null)
- return false;
- long volume = Long.parseLong(value1.toString());
- double high = Double.parseDouble(value2.toString());
- double low = Double.parseDouble(value3.toString());
- double close = Double.parseDouble(value4.toString());
- double currentFlow =
- ((high + low + close) / 3) * volume;
- if(currentFlow >= minFlow) {
- return true;
- } else {
- return false;
- }
- }
- }
register /root/java/workspace/Pig/stockudfs.jar;
DEFINE MONEYFLOW stockudfs.MinimumMoneyFlow('100000000');
stockdata = LOAD 'stocksA/NYSE_daily_prices_A.csv' using
PigStorage(',') AS (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,close:float, volume:int);
m = FILTER stockdataBY MONEYFLOW(volume,high, low, close);
Accumulator UDFs
When a UDF implements Accumulator, Pig does not read in all the data at once
but instead reads in only a subset of the records and passes them to the accumulate method of the UDF.
Writing an Accumulator UDF
To write an Accumulator function, extend EvalFunc and implement Accumulator.
This means both an exec method and the accumulate method must be written. If those two methods have the exact same behavior, then optionally extend AccumulatorEvalFunc and
only implement the accumulate function.
Accumulator containsthree methods:
· accumulate |
· getValue |
· cleanup |
Here is thesequence of events
when an Accumulator function is invoked:
•The
accumulate method is passed a subset of records. The default value is 20,000 records, but you configure that number using the pig.accumulative.batchsize property.
•The method keeps getting re-invoked until all records have been passed to accumulate.
•The framework then invokes the getValue method,
which is responsible for returning the result of the function.
•If the data passed in is a collection of bags
in a nested foreach , then the cleanup method is invokedafter getValue, and accumulate starts
getting called again with the next bag of records.
To demonstrate,The following is only a subset of the code;
•public class COUNT extends EvalFunc<Long>
• implements Algebraic, Accumulator<Long> {
• private long intermediateCount = 0L;
• public void accumulate(Tuple b) throws IOException {
• try {
• DataBag bag = (DataBag)b.get(0);
• Iterator it = bag.iterator();
• while (it.hasNext()){
• Tuple t = (Tuple)it.next();
• if (t != null && t.size() > 0 &&
• t.get(0) != null) {
• intermediateCount += 1;
• }
• }
• } catch (ExecException ee) {
• throw ee;
• }
• }
• public void cleanup() {
• intermediateCount = 0L;
• }
• public Long getValue() {
• return intermediateCount;
• }
• }
Understanding Accumulator Behavior
Keep in mind that implementing Accumulator does not guarantee that the Pig framework will send records in small batches. That is why it is important
to implement the exec method also, in case the framework invokes the UDF with all the records at once.
How the framework decides when to accumulate and when to send all records at once depends on the usage of the UDF:
1.If multiple UDFs are invoked in a FOREACH statement, and all of the UDFs implement Accumulator, then the records will be sent
in small batches.
2.If at least one UDF in a FOREACH statement does not implement Accumulator, then all of the function calls in that FOREACH statement will be executed in a single batch of records. This is because the entire bag of records has to be read into memory for at least one function, so there is no gain in accumulating records for the other UDFs in that statement.
Overview of Algrebraic Functions
A common UDF type in Pig is algebraic functions, which take in a group of tuples and return a scalar value. Examples include the built-in Pig functions like MAX,MIN, COUNT,
SUM, and so on.
What makesa function algebraic is its ability to process a bag of tuples over multiple phases, instead of
in a single phase where the entire bag is passed to the function at once.
To write an algebraic function, implement the Algebraic interface, whichcontains
three methods:
getInitial |
Returns a String that represents the class name of the Initial function. |
getIntermed |
Returns a String that represents the class name of the Intermediate function. |
getfinal |
Returns a String that represents the class name of the Final function. |
However, in a situation where an algebraic function makes sense, developers can realize the benefit from the MapReduce framework
being able to use a Combiner to minimize network traffic and efficiently process a large number of tuples.
Algebraic functions are similar to Accumulator functions:
they implement Algebraic AND extend EvalFunc. If an algebraic function is invoked in a foreach statement along with a non-algebraic function, then the algebraic function will be invoked normally andwill ignore its algebraic implementation.
Example of an Algebraic Function
- public class COUNT extends EvalFunc<Long>
- implements Algebraic, Accumulator<Long> {
- public Long exec(Tuple input) throws IOException {
- return count(input);
- }
- public String getInitial() {
- return Initial.class.getName();
- }
- public String getIntermed() {
- return Intermed.class.getName();
- }
- public String getFinal() {
- return Final.class.getName();
- }
- static public class Initial extends EvalFunc<Tuple> {
- public Tuple exec(Tuple input) throws IOException {
- return
- TupleFactory.getInstance().newTuple(count(input));
- }
- }
- static public class Intermed extends EvalFunc<Tuple> {
- public Tuple exec(Tuple input) throws IOException {
- return
- TupleFactory.getInstance().newTuple(sum(input));
- }
- }
- static public class Final extends EvalFunc<Long> {
- public Long exec(Tuple input) throws IOException {
- return sum(input);
- }
- }
- static protected Long count(Tuple input) {
- Object values = input.get(0);
- if (values instanceof DataBag)
- return ((DataBag)values).size();
- else if (values instanceof Map)
- return new Long(((Map)values).size());
- }
- static protected Long sum(Tuple input) {
- DataBag values = (DataBag)input.get(0);
- long sum = 0;
- for (Iterator<Tuple> it = values.iterator();
- it.hasNext();) {
- Tuple t = it.next();
- sum += (Long)t.get(0);
- }
- return sum;
- }
- }
https://svn.apache.org/repos/asf/pig/trunk/src/org/apache/pig/builtin/COUNT.java