Monday, September 21, 2015

Hive

Hadoop Distributed File System(HDFS™) is the foundation of the Hadoop cluster. The HDFS file system manages how the datasets are stored in the Hadoop cluster. It is responsible for distributing the data across the datanodes, managing replication for redundancy and administrative tasks like adding, removing and recovery of datanodes


The Apache Hive project provides a data warehouse view of the data in HDFS. Using a SQL-like language Hive lets you create summarizations of your data, perform ad-hoc queries, and analysis of large datasets in the Hadoop cluster. The overall approach with Hive is to project a table structure on the dataset and then manipulate it with HiveQL. Since you are using data in HDFS your operations can be scaled across all the datanodes and you can manipulate huge datasets.


HIVE installation
Issue 1
hduser@ubuntu:~/apache-hive-1.2.1-bin/bin$ hive

Logging initialized using configuration in jar:file:/home/hduser/apache-hive-1.2.1-bin/lib/hive-common-1.2.1.jar!/hive-log4j.properties
[ERROR] Terminal initialization failed; falling back to unsupported
java.lang.IncompatibleClassChangeError: Found class jline.Terminal, but interface was expected
at jline.TerminalFactory.create(TerminalFactory.java:101)
at jline.TerminalFactory.get(TerminalFactory.java:158)
at jline.console.ConsoleReader.<init>(ConsoleReader.java:229)
at jline.console.ConsoleReader.<init>(ConsoleReader.java:221)
at jline.console.ConsoleReader.<init>(ConsoleReader.java:209)
at org.apache.hadoop.hive.cli.CliDriver.setupConsoleReader(CliDriver.java:787)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:721)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

Exception in thread "main" java.lang.IncompatibleClassChangeError: Found class jline.Terminal, but interface was expected
at jline.console.ConsoleReader.<init>(ConsoleReader.java:230)
at jline.console.ConsoleReader.<init>(ConsoleReader.java:221)
at jline.console.ConsoleReader.<init>(ConsoleReader.java:209)
at org.apache.hadoop.hive.cli.CliDriver.setupConsoleReader(CliDriver.java:787)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:721)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)

at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

FIX applied:remove  jline-0.9.94.jar from the hadoop lib directory

hduser@ubuntu:/usr/local/hadoop-2.6.0/share/hadoop/yarn/lib$ mv jline-0.9.94.jar /home/hduser/

hduser@ubuntu:/usr/local/hadoop-2.6.0/share/hadoop/yarn/lib$ hive

Logging initialized using configuration in jar:file:/home/hduser/apache-hive-1.2.1-bin/lib/hive-common-1.2.1.jar!/hive-log4j.properties
hive> 


Installation of derby

wget http://archive.apache.org/dist/db/derby/db-derby-10.4.2.0/db-derby-10.4.2.0-bin.tar.gz

Reference
https://cwiki.apache.org/confluence/display/Hive/GettingStarted
http://www.tutorialspoint.com/hive/hive_installation.htm



https://www.facebook.com/notes/facebook-engineering/join-optimization-in-apache-hive/470667928919


Granularity is the extent to which a material or system is composed of distinguishable pieces or grains It can either refer to the exten to which a larger entity is subdivided
Coarse-Grained material or system have fewer, larger discrete component than fine-grained material or system

Partition &  Bucketing

Partition : PARTITION BY

Bucketting: CLUSTERED BY


Sample use case

Movielens user ratings

1. download data
wget http://files.grouplens.org/datasets/movielens/ml-100k.zip

2. create table
CREATE TABLE u_data (
  userid INT,
  movieid INT,
  rating INT,
  unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

3. Unzip
unzip ml-100k.zip

4. Load data into table
hive> load data local inpath '/home/cloudera/ml-100k/u.data' overwrite into table u_data;
Copying data from file:/home/cloudera/ml-100k/u.data
Copying file: file:/home/cloudera/ml-100k/u.data
Loading data to table default.u_data
rmr: DEPRECATED: Please use 'rm -r' instead.
Moved: 'hdfs://localhost.localdomain:8020/user/hive/warehouse/u_data' to trash at: hdfs://localhost.localdomain:8020/user/cloudera/.Trash/Current
Table default.u_data stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 1979173, raw_data_size: 0]
OK
Time taken: 4.508 seconds
5. Find the no of recoards inside the table
hive> select count(*) from u_data; Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapred.reduce.tasks=<number> Starting Job = job_201509182138_0001, Tracking URL = http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201509182138_0001 Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_201509182138_0001 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2015-09-19 07:06:42,926 Stage-1 map = 0%, reduce = 0% 2015-09-19 07:06:58,579 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.87 sec 2015-09-19 07:06:59,711 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.87 sec 2015-09-19 07:07:00,784 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.87 sec 2015-09-19 07:07:01,843 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.87 sec 2015-09-19 07:07:02,869 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.87 sec 2015-09-19 07:07:03,876 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.87 sec 2015-09-19 07:07:04,884 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.87 sec 2015-09-19 07:07:05,894 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.87 sec 2015-09-19 07:07:06,903 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.04 sec 2015-09-19 07:07:07,913 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.04 sec 2015-09-19 07:07:08,948 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.04 sec 2015-09-19 07:07:09,979 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.04 sec 2015-09-19 07:07:10,999 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.04 sec 2015-09-19 07:07:12,021 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.04 sec MapReduce Total cumulative CPU time: 5 seconds 40 msec Ended Job = job_201509182138_0001 MapReduce Jobs Launched: Job 0: Map: 1 Reduce: 1 Cumulative CPU: 5.04 sec HDFS Read: 1979394 HDFS Write: 7 SUCCESS Total MapReduce CPU Time Spent: 5 seconds 40 msec OK 100000 Time taken: 45.919 seconds
Lets start with little complex query
create a python script as follows:
vi weekday_mapper.py
import sys
import datetime

for line in sys.stdin:
  line = line.strip()
  userid, movieid, rating, unixtime = line.split('\t')
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  print '\t'.join([userid, movieid, rating, str(weekday)])


create hive sql script
vi mapper.sql

CREATE TABLE u_data_new (
  userid INT,
  movieid INT,
  rating INT,
  weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

add FILE weekday_mapper.py;

INSERT OVERWRITE TABLE u_data_new
SELECT
  TRANSFORM (userid, movieid, rating, unixtime)
  USING 'python weekday_mapper.py'
  AS (userid, movieid, rating, weekday)
FROM u_data;

SELECT weekday, COUNT(*)
FROM u_data_new
GROUP BY weekday;

run the hive scrip as follows:
[cloudera@localhost ml-100k]$ hive -f ./mapper.sql
************************************************************************************
Working on movie lense dataset
Download data
Unzip ml-1m.zip
data preparetion:
Change column seperation
[cloudera@localhost ml-1m]$ sed 's/::/#/g' ratings.dat > ratings.t [cloudera@localhost ml-1m]$ sed 's/::/#/g' users.dat > users.t [cloudera@localhost ml-1m]$ sed 's/::/#/g' movies.dat > movies.t
#Create a file as follows
vi occupations.t
0#other/not specified 1#academic/educator 2#artist 3#clerical/admin 4#college/grad student 5#customer service 6#doctor/health care 7#executive/managerial 8#farmer 9#homemaker 10#K-12 student 11#lawyer 12#programmer 13#retired 14#sales/marketing 15#scientist 16#self-employed 17#technician/engineer 18#tradesman/craftsman 19#unemployed 20#writer
#Create table as follows
#vi createtab.sql
create database movielens;
use movielens; CREATE EXTERNAL TABLE ratings ( userid INT, movieid INT, rating INT, tstamp STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '#' STORED AS TEXTFILE LOCATION '/dataset/movielens/ratings'; CREATE EXTERNAL TABLE movies ( movieid INT, title STRING, genres ARRAY<STRING> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '#' COLLECTION ITEMS TERMINATED BY "|" STORED AS TEXTFILE LOCATION '/dataset/movielens/movies'; CREATE EXTERNAL TABLE users ( userid INT, gender STRING, age INT, occupation INT, zipcode STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '#' STORED AS TEXTFILE LOCATION '/dataset/movielens/users'; CREATE EXTERNAL TABLE occupations ( id INT, occupation STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '#' STORED AS TEXTFILE LOCATION '/dataset/movielens/occupations';
$ hive -f ./createtab.sql
$load data into tables
hadoop fs -put ratings.t /dataset/movielens/ratings hadoop fs -put movies.t /dataset/movielens/movies hadoop fs -put users.t /dataset/movielens/users hadoop fs -put occupations.t /dataset/movielens/occupations
$vi concate.sql
CREATE TABLE rating_full as select r.*, m.title as m_title, concat_ws('|',sort_array(m.genres)) as m_genres, u.gender as u_gender, u.age as u_age, u.occupation as u_occupation, u.zipcode as u_zipcode from ratings r JOIN movies m ON (r.movieid = m.movieid) JOIN users u ON (r.userid = u.userid);
#hive -f ./concate.sql
hive> desc rating_full;
#creating a training/testing table such that each has 80/20 of the original data
vi traintest.sql
SET hivevar:seed=31; 
CREATE TABLE ratings2 as select rand(${seed}) as rnd, userid, movieid, rating from ratings; 
CREATE TABLE training as select * from ratings2 order by rnd DESC limit 800000; CREATE TABLE testing as select * from ratings2 order by rnd ASC limit 200209;

#hive -f  ./traintest.sql