博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark集成hbase与hive数据转换与代码练习
阅读量:6227 次
发布时间:2019-06-21

本文共 3984 字,大约阅读时间需要 13 分钟。

  帮一个朋友写个样例,顺便练手啦~一直在做平台的各种事,但是代码后续还要精进啊。。。

1 import java.util.Date 2  3 import org.apache.hadoop.hbase.HBaseConfiguration 4 import org.apache.hadoop.hbase.client.{Put, Scan, Result} 5 import org.apache.hadoop.hbase.io.ImmutableBytesWritable 6 import org.apache.hadoop.hbase.mapred.TableOutputFormat 7 import org.apache.hadoop.hbase.mapreduce.TableInputFormat 8 import org.apache.hadoop.hbase.util.Bytes 9 import org.apache.hadoop.mapred.JobConf10 import org.apache.log4j.{Level, Logger}11 import org.apache.spark.rdd.RDD12 import org.apache.spark.sql.DataFrame13 import org.apache.spark.sql.hive.HiveContext14 import org.apache.spark.{SparkContext, SparkConf}15 16 /**17  * Created by ysy on 2/10/17.18  */19 object test {20 21     case class ysyTest(LS_certifier_no: String,loc: String,LS_phone_no: String)22 23     def main (args: Array[String]) {24       val sparkConf = new SparkConf().setMaster("local").setAppName("ysy").set("spark.executor.memory", "1g")25       val sc = new SparkContext(sparkConf)26       val sqlContext = new HiveContext(sc)27       sqlContext.sql("drop table pkq")28       val columns = "LS_certifier_no,LS_location,LS_phone_no"29       val hbaseRDD = dataInit(sc,"EVENT_LOG_LBS",columns).map(data =>{30         val id =Bytes.toString(data._2.getValue("f1".getBytes, "LS_certifier_no".getBytes))31         val loc = Bytes.toString(data._2.getValue("f1".getBytes, "LS_location".getBytes))32         val phone = Bytes.toString(data._2.getValue("f1".getBytes, "LS_phone_no".getBytes))33         (id,loc,phone)34       })35       val showData = hbaseRDD.foreach(println)36       val datas = hbaseRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null)37       val hiveDF = initHiveTableFromHbase(sc:SparkContext,sqlContext,datas)38       writeHiveTableToHbase(sc,hiveDF)39 40 41     }42 43   def initHiveTableFromHbase(sc:SparkContext,sqlContext: HiveContext,hiveRDD:RDD[(String,String,String)]) : DataFrame = {44     val hRDD = hiveRDD.map(p => ysyTest(p._1,p._2,p._3))45       val hiveRDDSchema = sqlContext.createDataFrame(hiveRDD)46       hiveRDDSchema.registerTempTable("pkq")47       hiveRDDSchema.show(10)48       hiveRDDSchema49   }50 51   def dataInit(sc : SparkContext,tableName : String,columns : String) : RDD[(ImmutableBytesWritable,Result)] = {52     val configuration = HBaseConfiguration.create()53     configuration.addResource("hbase-site.xml")54     configuration.set(TableInputFormat.INPUT_TABLE,tableName )55     val scan = new Scan56     val column = columns.split(",")57     for(columnName <- column){58       scan.addColumn("f1".getBytes(),columnName.getBytes())59     }60     val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])61     System.out.println(hbaseRDD.count())62     hbaseRDD63   }64 65   def writeHiveTableToHbase(sc : SparkContext,hiveDF : DataFrame) = {66     val configuration = HBaseConfiguration.create()67     configuration.addResource("hbase-site.xml ")68     configuration.set(TableOutputFormat.OUTPUT_TABLE,"EVENT_LOG_LBS")69     val jobConf = new JobConf(configuration)70     jobConf.setOutputFormat(classOf[TableOutputFormat])71 72     val putData = hiveDF.map(data =>{73       val LS_certifier_no = data(0)74       val LS_location = data(1)75       val LS_phone_no = data(2)76       (LS_certifier_no,LS_location,LS_phone_no)77     })78 79     val rdd = putData.map(datas =>{80       val put = new Put(Bytes.toBytes(Math.random()))81       put.addColumn("f1".getBytes(),"LS_certifier_no".getBytes(),Bytes.toBytes(datas._1.toString))82       put.addColumn("f1".getBytes(),"LS_location".getBytes(),Bytes.toBytes(datas._2.toString))83       put.addColumn("f1".getBytes(),"LS_phone_no".getBytes(),Bytes.toBytes(datas._3.toString))84       (new ImmutableBytesWritable, put)85     })86     val showRdd = rdd.foreach(println)87     rdd.saveAsHadoopDataset(jobConf)88   }89 90   }

转载地址:http://ahnna.baihongyu.com/

你可能感兴趣的文章
实施微服务架构的关键技术
查看>>
“流”的思维—Workflowy
查看>>
Day19 网络编程
查看>>
.NET平台MongoDB下使用JobStore存储Quartz.Net的Job,Trigger数据
查看>>
Java多线程编程—锁优化
查看>>
python文本 字符与字符值转换
查看>>
Linux虚拟化技术KVM、QEMU与libvirt的关系(转)
查看>>
Ceph分布式存储-原理介绍及简单部署
查看>>
MYSQL数据库设计规范与原则
查看>>
UWP: 实现 UWP 应用自启动
查看>>
Windows内核之进程的终止和子进程
查看>>
Vivado+FPGA:如何使用Debug Cores(ILA)在线调试(烧录到flash里可以直接启动)
查看>>
[Preference] How to avoid Forced Synchronous Layout or FSL to improve site preference
查看>>
【laravel5.4】php artisan migrate报错:Specified key was too long; max key length is 767 bytes
查看>>
[转]外贸出口流程图
查看>>
微信小程序onLaunch修改globalData的值
查看>>
php实现简单算法3
查看>>
打陀螺
查看>>
phpStudy中升级MySQL版本到5.7.17的方法步骤
查看>>
SQLServer BI 学习笔记
查看>>