Please enable JavaScript.
Coggle requires JavaScript to display documents.
Hadoop (Components (Mapper p72 (MapRunnable (more generic interface,…
Hadoop
Components
-
-
implementations
-
TotalOrderPartitioner
-
- :question:Sampling via IntervalSampler/RandomSampler/SplitSampler etc on client side to get split points
- TotalOrderPartitioner save the split points into a trie to fast locate partition of each given key, generating partitions of #reducers
-
-
each mapper task is assigned a sequence of input key-value pairs (called input split)
execution framework strives to align #mapper to #blocks of file for this task, to make mappers run in max parallel
-
-
a Mapper object is created for every map task, so it's possible to preserve state across
multiple input key-value pairs within the same mapper object (same task)
-
MapRunnable
more generic interface, custom handling k-v pairs
getSplits
(logically) split input file into given #splits, each only has metadata like (file, start, length, hosts), not really splitting on disk
#splits determines #mappers for the task, :star:better make split size == block size
:question:host selection should maximize data locality.
- sort racks containing the InputSplit by size,
- sort nodes containing the InputSplit in each rack by size,
- select hosts of top N nodes (N = #replicas)
-
getRecordWriter
return RecordWriter which is used to write key-value pairs to file (in OutputCollector.collect(newK, newV))
checkOutputSpecs
called before submitting job to JobTracker, to verify output validity (path exists? etc.)
side-effect file
to optimize slow tasks, the same task is run in multiple places and take the first completed as result.
the competing tasks all write to tmp file (handled by OutputCommitter)
when task is done, an empty file named _SUCCESS is created in the output dir
-
-
-
:star: Observer Pattern
-
when job added/removed/updated, JobTracker invoke registered listeners
-
-
-
- startTracker - 创建JobTracker对象及初始化一些重要变量
-
-
expireTrackersThread
发现和清理死掉的TaskTracker(心跳超时),从数据结构trackerToJobsToCleanup,trackerToTasksToCleanup,trackerToTaskMap,trackerToMarkedTasksMap中清除之,标记正在运行的task为KILLED_UNCLEAN
-
-
-
-
-
-
-
-
-
步骤
- 检查JobTracker/TaskTracker版本是否匹配
- 检查磁盘是否损坏,是则重新初始化TT
- 发送心跳,接收回复并执行JT传达的命令
-
-
2PC (pull-mode)
- TaskAttempt完成计算后运行状态RUNNING -> COMMIT_PENDING,RPC告知TT
- TT得知一个TA为COMMIT_PENDING状态后,立刻缩短心跳间隔以快速汇报给JT
- JT检查如果该TA是该TaskInProgress第一个完成的,回复CommitTaskAction批准提交
- TT将该TA加入commitResponses列表
- TA通过RPC定期检查自己是否在commitResponses列表内,是则转移结果到最终目录并确认提交完成给TT
- TT将该TA状态改为SUCCEEDED,下次心跳通知JT
-
- JobClient向JT发送KillTask请求,JT将该TA加入tasksToKill列表
- TA所在TT心跳时,JT回复KillTaskAction及相关信息
- TT将该TA移除runningJobs,状态改为KILLED_UNCLEAN,通知directoryCleanupThread线程清理其工作目录,释放所占slot,缩短心跳间隔
- JT命令TT启动一个cleanUpTask来清理TA已经写入HDFS的数据,TT收到后启动JVM执行任务,完成后通知JT更新TA状态为KILLED
-
-
-
-
-
state
-
READY
no pending dependancy, ready to run
RUNNING
job executing, may end up in SUCCESS or FAILED
-
-
OutputCollector.collect(k,v) should NOT modify key or value, that'll cause error when Mapper use them later on
implement by having OutputCollector write to next Mapper/Reducer
mapper.map(k,v, chain.getMapperCollector(0, output, reporter), reporter);
ChainOutputCollector
collect() will call map on the next Mapper, or write to file on last Mapper
- JobClient upload files (jars, data files, archive files, libs etc.) to HDFS according to JobConf
DistributedCache class handles uploading
- JobClient RPCs JobTracker to submit job
- JobTracker calls TaskScheduler to initialize jobs
Task Types
Setup Task (Optional)
update state to setup, call OutputCommitter.setupJob()
when finished, job state change PREP -> RUNNING, start running Map task
Map Task
execute Mappers, number is determined by input splits count
Reduce Task
-
Hadoop starts Reduce tasks only after mapred.reduce.slowstart.completed.maps (default 5%) Map tasks are done
Cleanup Task (Optional)
delete temporary files/dirs, set state to RUNNING -> SUCCEEDED once done
-
Job Execution
Speculative Execution p174
-
-
Longest Approximate Time to End p176
-
-
-
-
-
- 通过RecordReader读取InputSplit并解析成key-value pairs
- 调用用户定义的map()进行处理产生新的k-v pairs
- 调用OutputCollector.collect,将结果用Partitioner分片,写入环形内存缓冲区MapOutputBuffer
- 当内存缓冲区满,排序(多轴快速排序 p229 ),压缩缓存中数据并写入磁盘临时文件
- (Optional) 调用Combiner以分片为单位合并计算结果
- 所有数据处理完成后,Merger合并(基于小顶堆的多轮归并排序 p231 )所有文件成只有一个文件
map方法处理完一个键值对后调用OutputCollector.collect,调用Partitioner.getPartition获取键值对所属分区号,将三元组<key, value, partition>传给MapOutputBuffer.collect处理
-
-
-
-
-
-
Hadoop0.21采用共享环形缓冲区,无须再设置io.sort.record.percent。用指针equator界定索引和数据的共同起始点,各自朝相反方向扩张
-
-
-
- Shuffle/Copy - 从各个Map task远程读取一片数据,存入内存或溢写磁盘 (ReduceCopier类执行)
- Merge - 远程读取过程中开启两个后台线程对内存和磁盘数据进行合并(ReduceCopier类执行)
- Sort - 对各个Map task传来的有序数据片进行归并排序
- Reduce - 调用用户定义的reduce方法处理数据
- Write - 将计算结果写入HDFS
Sort 和 Reduce并行进行。Sort阶段Reduce Task为内存和磁盘文件建立小顶堆,维护指向堆顶的迭代器。
Reduce task不断移动迭代器,将key相同的数据顺次交给reduce方法处理
-
- 向JobTracker请求一个新的JobId
- 在HDFS保存job资源(jar文件等)并划分输入
- 执行job
-
MapReduce Algo Design p42
-
Secondary Sorting (on value) p64
-
value-to-key conversion
move the value you want to sort on into a composite key, and
define a custom partitioner to properly group the composite keys to the same reducer
reduce-side join
-
one-to-many
create composite key of (join key, row id), define sort order to first sort by join key,
then sort all rows from dataset1 before all rows from dataset2
many-to-many
for each join key, reducer buffers all rows from the smaller dataset,
then cross-product with rows from the other dataset
-
map-side join
align both datasets by the join key in input split for each Mapper, and perform join within mapper
far more efficient than reduce-side join, because no need to shuffle the datasets over network
:!: the reducers generating data for later map-side join MUST NOT emit any key but the one they are currently processing
-
memory-backed join
-
load the smaller dataset into memory, and map over the larger dataset to join with the in-memory one
divide the smaller dataset into shards if not fit into memory, or use external distributed key/value store to hold dataset
-
-
TaskTracker通过心跳向JobTracker汇报可用slots等信息,JobTracker调用TaskScheduler.assignTasks获取任务分配信息,以心跳回复给TaskTracker
-
TaskScheduler实现
FIFO (JobQueueTaskScheduler) p186
-
-
-
-
-
LinuxResourceCalculatorPlugin p202
-
-
-
读取
- DistributedFileSystem.open() 打开文件系统连接
- DistributedFileSystem 访问NameNode获得文件块所在DataNode的地址列表
- 通过FSDataInputStream依次从其中最近的DataNode读取数据
- 关闭文件系统连接
写入
- DistributedFileSystem.open() 打开文件系统连接
- DistributedFileSystem请求NameNode为即将写入的数据分配存储空间及相关信息
- 副本地址一般选择一个不同节点,一个不同机架,一个不同DC
- 提供FSData向NameNode分配的地址写入数据。数据会串流至所有副本地址
- 所有副本确认写入完成后关闭DistributedFileSystem连接
-
-
-
-
-