Friday, September 18, 2015

OOzie

Apache Oozie Workflow scheduler for Hadoop

- Oozie is a workflow scheduler system to manage apache hadoop jobs
- Oozie workflow jobs are Directed Acyclical Graphs(DAGs) of actions
- Oozie coordinator jobs are recurrent Oozie workflow jobs triggered  by time and data availbility


Programms are dependent to each other to achieve common goal

Oozie acts as a middle man between the user and hadoop. The user provides his job to Oozie and Oozie executes it on Hadoop via a launcher job followed by returing results.

User has to design workflow.xml with setu pf actions arranged in a DAG(Direct Acyclical Graph) as shown below

Three component required to run Simple MapReduce workflow

1. Properties file: job. properties
sample job.properties

nameNode=hdfs://localhost.localdomain:50070
jobTracker=localhost.localdomain:50030
queueName=default
examplesRoot=map-reduce
 
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}
inputDir=input-data
outputDir=map-reduce

2. workflow.xml : this file defines the workflow for the particular job as a set of actions
Sample workflow.xml
<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.2">
    <start to='wordcount'/>
    <action name='wordcount'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.myorg.WordCount.Map</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.myorg.WordCount.Reduce</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to='end'/>
        <error to='end'/>
    </action>
    <kill name='kill'>
        <value>${wf:errorCode("wordcount")}</value>
    </kill/>
    <end name='end'/>
</workflow-app>

3> Libraries
lib/ this is directory in the HDFS which containes libraries used in workflow for example .har and shared object files. During run time, the OOzie server picks up contents of this directory and deploys them on the actual compute node using Hadoop Distributed cache. When submitting job from local system, this lib directory would have to manually copied over hdfs before workflow can be run

command to submit and execute mapreduce program via oozie

1. Create a directory to store all your workflow components(job.properties, workflow.xml and libraries). Inside this director create lib director

# mkdir mapred
#mkdir mapred/lib

Please note, that this workflow directory and its content are created on local file system and they must be copied to the HDFS filesystem once they are ready

2. Create the files workflow.xml and job. properties as follows

#cd mapred
#vi workflow.xml
<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.2">
    <start to='wordcount'/>
    <action name='wordcount'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.myorg.WordCount.Map</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.myorg.WordCount.Reduce</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to='end'/>
        <error to='end'/>
    </action>
    <kill name='kill'>
        <value>${wf:errorCode("wordcount")}</value>
    </kill/>
    <end name='end'/>
</workflow-app>

# vi job.properties
nameNode=hdfs://localhost.localdomain:50070
jobTracker=localhost.localdomain:50030
queueName=default
examplesRoot=map-reduce
 
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}
inputDir=input-data
outputDir=map-reduce

5. Copy the WordCount.jar file into the workflow lib directory and  directory structure should look like 
# cd mapred
job.properties
workflow.xml
lib/
lib/wordcount.jar

6. Copy the workflow directory to hdfs
#hadoop fs -copyFromLocal mapred/ mapred
please note that the job.properties file is always used from the local file system and it need not be copied to hdfs filesysm

7. run the workflow using below command

# oozie job --oozie http://localhost:5001/oozie -config /home/cloudera/mapred/job.properties -run

Once the workflow is submitted, Oozie server returns the workflow ID which can be used for monitoring and debugging purpose

Please note on parameter 
-config option specify location of your job.properties file which is local to your machine
- oozie option sepecify the location of oozie server

Reference: https://cwiki.apache.org/confluence/display/OOZIE/Map+Reduce+Cookbook

Thursday, September 17, 2015

zookeeper

Zookeeper: is a distributed, open source coornidation service for distributed applications. It exposes a simple set of primitives that distributed application can build upton to implement higher level of services for synchronization, configuration maintenance and groups and naming


Design goal: Zookeeper allows distributed processes to coordinate with each other through a shared hierarchical namespace which is orgnised similarly to a standard file system.

The Zookeeper implementations puts a premium on high performance, highly available , strictly ordered access.

ZooKeeper is replicated: Like the distributed processes it coordinates, zooker itsself is intended to be replicated over a set of hosts called ensemble



The servers that makeup Zookeeper service must all know about each other. They maintain an in-memory image of state, along with the transaction logs and snapshots in persistent store. As long as majority of serves are available zookeeper will be available


ZooKeerps is ordered. Zookeeper stamps each update with a number that reflects the order of all ZooKeeper transactions. Subsequent operations can use the order to implement higher-level of abstraction such as synchronization primitives.

Nodes and ephemeral nodes
Each node in ZooKeeper namespace can have data associated with it as well as childre. It is like having a file system that allows a files to also be direcotry, called Znodes

ZooKeerp also has the notion of ephemeral nodes. These znodes exists as long as the sessions that created the znode is active. When the session end the ephemeral znodes gets deleted

Conditional update and watches:
ZooKeeper supports the concept of watches. client can set a watch on znodes. A watch will be triggered and removed when the znode changes.When a watch is triggered the client receives a packet saying that the znode has changed. And if the connection between the client and one of ZooKeeper servers is brokgen the client will receive a local notification.

Gurantees: Zookeeper is very fast and very simple. Since its goal, though, is to be a basis for the construction of more complicated services, such as synchronisation, it provides a set of guarantees These are

- Sequential Consistency: Updates from a client will be applied in the order that they were sent
- Atomicity - Update either succeed or fail. No partial results
- Single System image: A client will see the same view of service regardless of the server that it connects to
- Reliability Once an update has been applied, it will persist from that time forward untill a client overwrites the update
- Timeliness: The clients view of the system is guranteed to be up to date within a certain time bound

Implementation
The ZooKeeper service can run into two modes. In standalone  and replicated mode
Standalone mode for testing purpose
Replicated mode for production system

ZooKeeper runs in replicated mode on a cluster of machines called ensemble. ZooKeeper achieves high availability through replication and can provide a service as long as a majority of the machines in the ensemble are up.
For example: In 5 node ensemble, any two  machine can fail and service still work because a majority of three remain. .
Note that a six-node ensemble can also tolerate if only two machine failing, if three machine fail the remaining three do not constitute a majority of the six. For this reason, it is usual to have an odd number of machines in an ensemble.


ZooKeeper is very simple: all it has to do is ensure that every modification to the tree of znodes is replicated to a majority of the ensemble.

ZooKeeper uses protocol called Zab that runs in two phases, which may be repeated indefinitely

Phase 1: Leader election
The machines in an ensemble go through a process of electing a distinguished member called the leader. The other machine called follower. This state is finished once a majority of followers have synchronized their state with leader

Phase 2. Atomic broadcast
All write requests are forwarded to the leader, which broadcasts the update to the followers. When a majority have persisted the change, the leader commits the update and the client gets response saying the update succeeded. The protocol for achieving consensus is designed to be atomic, so a change either succeeds or fail

If leader fails, the remaining machine hold another leader election and continue as before with new leader. If the old leader later recovers, it then starts as a follower.

All machines in the ensemble write updates to disk before updating their in-memory copies of the znode tree. Read request may be serviced from any machine, and becuase they involve only a lookup from memory, they are very fast

ZooKeeper client and ZooKeeper server