Saving Local
// case class Record(id:Long,name:String,age:Int,timestamp:Timestamp, date:Date) //Date serialization not working
case class Record2(id:Long,name:String,age:Int,timestamp:Timestamp)
//output
Record2(1,Stefan1,1,1970-01-01 01:16:40.0)
Record2(2,Stefan2,2,1970-01-01 01:33:20.0)
Record2(3,Stefan3,3,1970-01-01 01:50:00.0)
Record2(4,Stefan4,4,1970-01-01 02:06:40.0)
Record2(5,Stefan5,5,1970-01-01 02:23:20.0)
Record2(6,Stefan6,6,1970-01-01 02:40:00.0)
Record2(7,Stefan7,7,1970-01-01 02:56:40.0)
Record2(8,Stefan8,8,1970-01-01 03:13:20.0)
Record2(9,Stefan9,9,1970-01-01 03:30:00.0)
Record2(10,Stefan10,10,1970-01-01 03:46:40.0)
Record2(11,Stefan11,11,1970-01-01 04:03:20.0)
Record2(12,Stefan12,12,1970-01-01 04:20:00.0)
Record2(13,Stefan13,13,1970-01-01 04:36:40.0)
Record2(14,Stefan14,14,1970-01-01 04:53:20.0)
Record2(15,Stefan15,15,1970-01-01 05:10:00.0)
Record2(16,Stefan16,16,1970-01-01 05:26:40.0)
Record2(17,Stefan17,17,1970-01-01 05:43:20.0)
Record2(18,Stefan18,18,1970-01-01 06:00:00.0)
Record2(19,Stefan19,19,1970-01-01 06:16:40.0)
Record2(20,Stefan20,20,1970-01-01 06:33:20.0)
...
def write: Unit = {
def createRecords: IndexedSeq[Record2] = {
val records = (1 to 100).map { i =>
Record2(i.toLong, s"Stefan$i", i, new Timestamp(1000000 * i.toLong))
}
records foreach println
records
}
val sc: SparkContext = createContext
val sqlContext = new SQLContext(sc)
val records =createRecords
val rdd = sc.parallelize(records)
val frame = sqlContext.createDataFrame(rdd)
frame.write.save("/tmp/frame2")
}
Result :
hduser@maszyna:/home/pawel$ ls -l /tmp/frame2/
total 12
-rw-r--r-- 1 pawel pawel 485 lip 20 20:19 _common_metadata
-rw-r--r-- 1 pawel pawel 826 lip 20 20:19 _metadata
-rw-r--r-- 1 pawel pawel 2152 lip 20 20:19 part-r-00001.gz.parquet
-rw-r--r-- 1 pawel pawel 0 lip 20 20:19 _SUCCESS
Loading local
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- timestamp: timestamp (nullable = true)
+--+--------+---+--------------------+
|id| name|age| timestamp|
+--+--------+---+--------------------+
| 1| Stefan1| 1|1970-01-01 01:16:...|
| 2| Stefan2| 2|1970-01-01 01:33:...|
| 3| Stefan3| 3|1970-01-01 01:50:...|
| 4| Stefan4| 4|1970-01-01 02:06:...|
| 5| Stefan5| 5|1970-01-01 02:23:...|
| 6| Stefan6| 6|1970-01-01 02:40:...|
| 7| Stefan7| 7|1970-01-01 02:56:...|
| 8| Stefan8| 8|1970-01-01 03:13:...|
| 9| Stefan9| 9|1970-01-01 03:30:...|
|10|Stefan10| 10|1970-01-01 03:46:...|
|11|Stefan11| 11|1970-01-01 04:03:...|
|12|Stefan12| 12|1970-01-01 04:20:...|
|13|Stefan13| 13|1970-01-01 04:36:...|
|14|Stefan14| 14|1970-01-01 04:53:...|
|15|Stefan15| 15|1970-01-01 05:10:...|
|16|Stefan16| 16|1970-01-01 05:26:...|
|17|Stefan17| 17|1970-01-01 05:43:...|
|18|Stefan18| 18|1970-01-01 06:00:...|
|19|Stefan19| 19|1970-01-01 06:16:...|
|20|Stefan20| 20|1970-01-01 06:33:...|
+--+--------+---+--------------------+
Code :
def load={
val sc: SparkContext = createContext
val sqlContext = new SQLContext(sc)
val frame=sqlContext.read.load("/tmp/frame2")
frame.printSchema()
frame.show(20)
}
HDFS
Config :
cat /usr/local/hadoop/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
private val path = "hdfs://localhost:9000/tmp/frame1"
hduser@maszyna:/home/pawel$ hadoop fs -ls /tmp
Found 2 items
drwxr-xr-x - pawel supergroup 0 2015-07-20 22:09 /tmp/frame1
drwx------ - hduser supergroup 0 2015-03-01 15:46 /tmp/hadoop-yarn
hduser@maszyna:/home/pawel$ hadoop fs -ls /tmp/frame1
Found 4 items
-rw-r--r-- 3 pawel supergroup 0 2015-07-20 22:09 /tmp/frame1/_SUCCESS
-rw-r--r-- 3 pawel supergroup 485 2015-07-20 22:09 /tmp/frame1/_common_metadata
-rw-r--r-- 3 pawel supergroup 826 2015-07-20 22:09 /tmp/frame1/_metadata
-rw-r--r-- 3 pawel supergroup 2152 2015-07-20 22:09 /tmp/frame1/part-r-00001.gz.parquet
load from HDFS
private val path = "hdfs://localhost:9000/tmp/frame1"
def load={
val sc: SparkContext = createContext
val sqlContext = new SQLContext(sc)
val frame=sqlContext.load(path)
frame.printSchema()
frame.show(20)
}
+--+--------+---+--------------------+
|id| name|age| timestamp|
+--+--------+---+--------------------+
| 1| Stefan1| 1|1970-01-01 01:16:...|
| 2| Stefan2| 2|1970-01-01 01:33:...|
| 3| Stefan3| 3|1970-01-01 01:50:...|
| 4| Stefan4| 4|1970-01-01 02:06:...|
| 5| Stefan5| 5|1970-01-01 02:23:...|
| 6| Stefan6| 6|1970-01-01 02:40:...|
| 7| Stefan7| 7|1970-01-01 02:56:...|
| 8| Stefan8| 8|1970-01-01 03:13:...|