Extract rows from CSV file containing specific values using MapReduce, Pig, Hive, Apache Drill and Spark

example.csv

101,87,65,67
102,43,45,40
103,23,56,34
104,65,55,40
105,87,96,40

Q. Retrieve “value in first column” of rows containing “40 in the last column”.
E.g. Output:
40 102
40 104
40 105

# PIG:

example.pig
———–
input_file = LOAD ‘/user/samples/input/csv/example/example.csv’ USING PigStorage(‘,’);
input_file = LOAD ‘/Users/ArvindGudiseva/workspace/hadoop/samples/input/example.csv’ USING PigStorage(‘,’);

filtered_records = FILTER input_file BY $3==40;
final_records = FOREACH filtered_records GENERATE $3 AS num1, $0 AS num2;
DUMP final_records;

Run:

1) HDFS Mode: pig -f /Users/ArvindGudiseva/workspace/hadoop/pig/example_hdfs.pig
In case of errors:
mr-jobhistory-daemon.sh start historyserver
pig -x mapreduce /Users/ArvindGudiseva/workspace/hadoop/pig/example_hdfs.pig

2) Local Mode: pig -x local /Users/ArvindGudiseva/workspace/hadoop/pig/example_local.pig

# HIVE:
—-
example.hql (OR) example.sql
—————————-
CREATE EXTERNAL TABLE IF NOT EXISTS example(num0 INT, num1 INT, num2 INT, num3 INT)
COMMENT ‘extract rows containing specific value’
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE
— LOCATION ‘/user/samples/input/csv/example/’ // Error as forward slash is added at the end
— LOCATION ‘/user/samples/input/csv/example/example.csv’ // Not valid as Directory is required
LOCATION ‘/user/samples/input/csv/example’;

SELECT num3, num0 FROM example WHERE num3=40;

Run:

hive –f ‘/Users/ArvindGudiseva/workspace/hadoop/hive/example.sql’
(OR)
hive –f ‘/Users/ArvindGudiseva/workspace/hadoop/hive/example.hql’

# DRILL:
—–
Start Drill
———–
Access: http://localhost:8047/query
$ drill-embedded
$ sqlline -u jdbc:drill:zk=local

Run:

1) Local File System:
SELECT columns[3], columns[0] FROM dfs.`/Users/ArvindGudiseva/workspace/hadoop/samples/input/example.csv`
WHERE columns[3]=40;

2) HDFS File System:
–#– Storage Plugin for HDFS —
{
“type”: “file”,
“enabled”: true,
“connection”: “hdfs://localhost:9000/”,
“workspaces”: {
“root”: {
“location”: “/”,
“writable”: true,
“defaultInputFormat”: null
}
},
“formats”: {
“json”: {
“type”: “json”
},
“csv”: {
“type”: “text”,
“extensions”: [
“csv”
],
“delimiter”: “,”
}
}
}

–#– Query —
SELECT columns[3], columns[0] FROM hdfs.`/user/samples/input/csv/example/example.csv`
WHERE columns[3]=40;

# SPARK:
—–
Start Spark Shell
—————–
$ spark-shell –packages com.databricks:spark-csv_2.10:1.3.0

Run:

val csv_file = sqlContext.read.format(“com.databricks.spark.csv”).option(“header”, “false”).option(“inferschema”, “true”).load(“/user/samples/input/csv/example/example.csv”)
// Schema is Inferred -> csv_file: org.apache.spark.sql.DataFrame = [C0: int, C1: int, C2: int, C3: int]
csv_file.registerTempTable(“example”)
val result = sqlContext.sql(“SELECT C3, C0 FROM example WHERE C3=40”)
result.collect.foreach(println)

# MAP REDUCE:
———-
Note: It’s a Mapper only job

–#– Class Mapper —
public class ExampleMapper extends Mapper<Object, Text, IntWritable, IntWritable> {

@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {

String[] tokens = value.toString().split(“,”);

String col0 = tokens[0];
String col1 = tokens[1];
String col2 = tokens[2];
String col3 = tokens[3];

int colInt0 = Integer.parseInt(col0);
int colInt3 = Integer.parseInt(col3);

//if col3 is == 40; print col3 and col0
if(colInt3 == 40){
context.write(new IntWritable(colInt3), new IntWritable(colInt0));
}
}

}

–#– Class Reducer (Not Required) —
public class ExampleReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

@Override
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {

int col0 = 0;
int col3 = 0;

Iterator itr = values.iterator();
while (itr.hasNext()){
col3 = Integer.parseInt(key.toString());
col0 = Integer.parseInt(itr.next().toString());

context.write(new IntWritable(col3), new IntWritable(col0));
}

}

}

–#– Class Driver —
public class ExampleDriver extends Configured implements Tool {

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new ExampleDriver(), args);
System.exit(exitCode);
}

public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf(“Usage: %s [generic options] <input> <output>\n”,
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}

Job job = new org.apache.hadoop.mapreduce.Job();
job.setJarByClass(ExampleDriver.class);
job.setJobName(“ExampleDriver”);

FileInputFormat.addInputPath(job, new Path(args[0]));
// Added to delete the Output folder if already exists
Configuration conf = job.getConfiguration();
FileSystem.get(conf).delete(new Path(args[1]), true);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(ExampleMapper.class);
//job.setReducerClass(ExampleReducer.class);

// Sets reducer tasks to 0
job.setNumReduceTasks(0);

int returnValue = job.waitForCompletion(true) ? 0:1;
System.out.println(“job.isSuccessful ” + job.isSuccessful());
return returnValue;

}
}

Run:

$ mvn clean install
$ hadoop jar target/HadoopMapReduce-1.0.jar example.ExampleDriver /user/samples/input/csv/example/example.csv /user/samples/output
$ hdfs dfs -cat /user/samples/output/part-m-00000
$ hdfs dfs -cat /user/samples/output/part-r-00000 (if Reducer is used)

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s