package applications;

import java.io.IOException;
import java.util.Iterator;

import mybeans.Reading;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class SensorMapReduce 
{

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
    {
		//private final static IntWritable one = new IntWritable(1);
		private Text outKey = new Text();
		private Text outValue = new Text();
	
		public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
		{
		    String line = value.toString();
		    Reading rec = Reading.parse(line);    // create one object from one csv line. confidential.
		    String unitName = rec.getUnitName();
		    if (unitName != null)
		    {
		    	unitName = unitName.trim();

		    	// my way of filtering out the garbage...
		    	if (unitName.length() > 0 && unitName.length() < 20
		    		&& (unitName.startsWith("CMO5") || unitName.startsWith("UP2"))) 
		    	{
				    outKey.set("XXXX" + unitName.substring(4));   // disguise data
				    outValue.set("1" 						// count
				    	+ "," + rec.getTankTempCelcius()	// min
				    	+ "," + rec.getTankTempCelcius()	// max
				    	+ "," + rec.getTankTempCelcius()	// sum
				    	);
				    output.collect(outKey, outValue);
		    	}
		    }
	    }
    }
    
    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> 
    {
		public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
		{
			// reducer accepts a count, min, max, and sum.
			// when no combiner is used, the count will always be 1.
			// reducer outputs a count, min, max, sum, and average.
			// when reducer is used as a combiner, the input average will be ignored.
			
		    Text value = new Text();

		    int countOut = 0;
		    double minOut = Double.MAX_VALUE;
		    double maxOut = Double.MIN_VALUE;
		    double sumOut = 0;
		    
		    while (values.hasNext()) 
		    {
		    	String[] vals = values.next().toString().split(",");

		    	int countIn = Integer.parseInt(vals[0]);
		    	double minIn = Double.parseDouble(vals[1]);
		    	double maxIn = Double.parseDouble(vals[2]);
		    	double sumIn = Double.parseDouble(vals[3]);
		    	
		    	countOut += countIn;
		    	minOut = (minIn < minOut? minIn: minOut);
		    	maxOut = (maxIn > maxOut? maxIn: maxOut);
		    	sumOut += sumIn;
		    }
		    
		    double meanOut = (countOut == 0? 0: sumOut/countOut);
		    
		    value.set("" + countOut + "," + minOut + "," + maxOut + "," + sumOut + "," + meanOut);
		    output.collect(key, value);
		}
    }

    public static void main(String[] args) throws Exception 
    {
    	// usage: SensorMapReduce inputpath outputpath {Y/N}
    	// where {Y/N} is to use combiner or not.
    	
		JobConf conf = new JobConf(SensorMapReduce.class);
		conf.setJobName("sensor_map_reduce");
		 
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(Text.class);
		 
		conf.setMapperClass(Map.class);
		
		if ("Y".equals(args[2]))
		{
			conf.setCombinerClass(Reduce.class);
		}
		
		conf.setReducerClass(Reduce.class);
		
		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
	
		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
		JobClient.runJob(conf);
    }
}
