package applications;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class ReadZipWriteHDFS
{
    // Expands the zip file passed as argument 1, into the
    // directory provided in argument 2
	
	public static String ZIP_FILE_IN = "";
	public static String HDFS_FILE_OUT = "";
	public static int FILES_PROCESSED = 0;

    public static void main(String args[]) throws Exception
    {
    	ReadZipWriteHDFS me = new ReadZipWriteHDFS();
        me.go(args);
    }
    
    public void go(String[] args) throws FileNotFoundException, IOException
    {
    	System.out.println("Begin execution of " + this.getClass().getName());

    	ZIP_FILE_IN = args[0];
    	HDFS_FILE_OUT = args[1];
    	
    	System.out.println("Will read from zip file: " + ZIP_FILE_IN);
    	System.out.println("Will write to HDFS file: " + HDFS_FILE_OUT);
    	
    	// see Hadoop in Action page 44
    	Configuration conf = new Configuration();
    	FileSystem hdfs = FileSystem.get(conf);
    	Path hdfsFile = new Path(HDFS_FILE_OUT);
        FSDataOutputStream outStream = hdfs.create(hdfsFile);
    	
    	ZipFile zipFile = new ZipFile(new File(ZIP_FILE_IN));
        InputStream is = new FileInputStream(ZIP_FILE_IN);
        ZipInputStream inStream = new ZipInputStream(is);
        
        recursive_extract(zipFile, inStream, outStream);
        outStream.close();

        System.out.println(FILES_PROCESSED + " files processed.");
    	System.out.println("End execution of " + this.getClass().getName());
    }
    
    public void recursive_extract(ZipFile zipFile, ZipInputStream inStream, OutputStream outStream)
    {
        // create a buffer to improve copy performance later.
        byte[] buffer = new byte[2048];
        
        try
        {
            // now iterate through each item in the stream. The get next
            // entry call will return a ZipEntry for each file in the
            // stream
            ZipEntry entry;
            while((entry = inStream.getNextEntry())!=null)
            {
            	/*
                String s = String.format("Entry: %s len %d added %TD",
                                entry.getName(), entry.getSize(),
                                new Date(entry.getTime()));
                System.out.println(s);
				*/
            	
                if (entry.getName().contains(".zip"))
                {
                	InputStream nestedInputStream = zipFile.getInputStream(entry);
                	ZipInputStream nestedZipInputStream = new ZipInputStream(nestedInputStream);
                	recursive_extract(zipFile, nestedZipInputStream, outStream);
                }
                else
                {
	                try
	                {
	                    int len = 0;
	                    while ((len = inStream.read(buffer)) > 0)
	                    {
	                        outStream.write(buffer, 0, len);
	                    }
		                FILES_PROCESSED++;
		                if (FILES_PROCESSED % 1000 == 0)
		                {
		                	System.out.println(FILES_PROCESSED + " files processed so far.");
		                }
	                }
	                catch (Exception e)
	                {
	                	System.out.println(" ! ! ! Error processing file " + entry.getName());
	                    // e.printStackTrace();
	                }
                }
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}