Kategorier: Alle - 文件 - 任务

av Haifan Zhu 12 år siden

530

Bulkload

Bulkload

Bulkload

ImportTSV

createSubmittableJob()
HFileOutputFormat.configureIncrementalLoad(job, table)

根据region数量setNumReduceTasks

setReducerClass 根据配置选择KeyValueSortReducer或PutSortReducer

setOutputFormatClass(HFileOutputFormat.class)

wl.writer.append(kv)

如果一行写完且HFile达到阈值rollWriters()

返回RecordWriter

setOutputValueClass(KeyValue.class)

setOutputKeyClass(ImmutableBytesWritable.class)

setPartitionerClass(getTotalOrderPartitionerClass())

设置Put为Value setMapOutputValueClass(Put.class)
设置字节数组为Key setMapOutputKeyClass(ImmutableBytesWritable.class)
对Put结果排序输出 setReducerClass(PutSortReducer.class)
建立HTable连接
setMapperClass
setInputFormatClass(TextInputFormat.class)
创建Job
读取配置并检查

LoadIncrementalHFiles doBulkLoad()

关闭线程池
bulkLoadPhase()
收集结果,把重试列表加到队列中
load过程交给线程池处理
tryAtomicRegionLoad()

返回重试列表

把没有成功的文件移回原位

bulkLoadHFiles()

region.bulkLoadHFiles()

关闭锁closeBulkRegionOperation()

store.bulkLoadHFile(finalPath)

notifyChangedReadersObservers()

sortAndClone()

将新StoreFile加入列表中

建立新的StoreFile

验证HFile的正确性assertBulkLoadHFileOk()

getStore(familyName)

获得锁startBulkRegionOperation()

getRegion(regionName)

groupOrSplitPhase()
收集返回切分结果
切分工作交给线程池处理
将Hfile分配到所在的region里,若Hfile的范围超过了region,进行切分 groupOrSplit()
获取每个region的Key范围getStartEndKeys()
遍历HFile文件discoverLoadQueue()
初始化线程池