Saving Local


//  case class Record(id:Long,name:String,age:Int,timestamp:Timestamp, date:Date) //Date serialization not working

case class Record2(id:Long,name:String,age:Int,timestamp:Timestamp)


//output

Record2(1,Stefan1,1,1970-01-01 01:16:40.0)
Record2(2,Stefan2,2,1970-01-01 01:33:20.0)
Record2(3,Stefan3,3,1970-01-01 01:50:00.0)
Record2(4,Stefan4,4,1970-01-01 02:06:40.0)
Record2(5,Stefan5,5,1970-01-01 02:23:20.0)
Record2(6,Stefan6,6,1970-01-01 02:40:00.0)
Record2(7,Stefan7,7,1970-01-01 02:56:40.0)
Record2(8,Stefan8,8,1970-01-01 03:13:20.0)
Record2(9,Stefan9,9,1970-01-01 03:30:00.0)
Record2(10,Stefan10,10,1970-01-01 03:46:40.0)
Record2(11,Stefan11,11,1970-01-01 04:03:20.0)
Record2(12,Stefan12,12,1970-01-01 04:20:00.0)
Record2(13,Stefan13,13,1970-01-01 04:36:40.0)
Record2(14,Stefan14,14,1970-01-01 04:53:20.0)
Record2(15,Stefan15,15,1970-01-01 05:10:00.0)
Record2(16,Stefan16,16,1970-01-01 05:26:40.0)
Record2(17,Stefan17,17,1970-01-01 05:43:20.0)
Record2(18,Stefan18,18,1970-01-01 06:00:00.0)
Record2(19,Stefan19,19,1970-01-01 06:16:40.0)
Record2(20,Stefan20,20,1970-01-01 06:33:20.0)
...
def write: Unit = {

    def createRecords: IndexedSeq[Record2] = {
      val records = (1 to 100).map { i =>
        Record2(i.toLong, s"Stefan$i", i, new Timestamp(1000000 * i.toLong))
      }
      records foreach println
      records
    }

    val sc: SparkContext = createContext
    val sqlContext = new SQLContext(sc)
    val records =createRecords

    val rdd = sc.parallelize(records)
    val frame = sqlContext.createDataFrame(rdd)
    frame.write.save("/tmp/frame2")
  }

Result :

hduser@maszyna:/home/pawel$ ls -l /tmp/frame2/
total 12
-rw-r--r-- 1 pawel pawel  485 lip 20 20:19 _common_metadata
-rw-r--r-- 1 pawel pawel  826 lip 20 20:19 _metadata
-rw-r--r-- 1 pawel pawel 2152 lip 20 20:19 part-r-00001.gz.parquet
-rw-r--r-- 1 pawel pawel    0 lip 20 20:19 _SUCCESS

Loading local

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)


+--+--------+---+--------------------+
|id|    name|age|           timestamp|
+--+--------+---+--------------------+
| 1| Stefan1|  1|1970-01-01 01:16:...|
| 2| Stefan2|  2|1970-01-01 01:33:...|
| 3| Stefan3|  3|1970-01-01 01:50:...|
| 4| Stefan4|  4|1970-01-01 02:06:...|
| 5| Stefan5|  5|1970-01-01 02:23:...|
| 6| Stefan6|  6|1970-01-01 02:40:...|
| 7| Stefan7|  7|1970-01-01 02:56:...|
| 8| Stefan8|  8|1970-01-01 03:13:...|
| 9| Stefan9|  9|1970-01-01 03:30:...|
|10|Stefan10| 10|1970-01-01 03:46:...|
|11|Stefan11| 11|1970-01-01 04:03:...|
|12|Stefan12| 12|1970-01-01 04:20:...|
|13|Stefan13| 13|1970-01-01 04:36:...|
|14|Stefan14| 14|1970-01-01 04:53:...|
|15|Stefan15| 15|1970-01-01 05:10:...|
|16|Stefan16| 16|1970-01-01 05:26:...|
|17|Stefan17| 17|1970-01-01 05:43:...|
|18|Stefan18| 18|1970-01-01 06:00:...|
|19|Stefan19| 19|1970-01-01 06:16:...|
|20|Stefan20| 20|1970-01-01 06:33:...|
+--+--------+---+--------------------+

Code :

  def load={
    val sc: SparkContext = createContext
    val sqlContext = new SQLContext(sc)

    val frame=sqlContext.read.load("/tmp/frame2")
    frame.printSchema()
    frame.show(20)
  }

HDFS

Config :

cat /usr/local/hadoop/etc/hadoop/core-site.xml


<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
private val path = "hdfs://localhost:9000/tmp/frame1"

hduser@maszyna:/home/pawel$ hadoop fs -ls /tmp
Found 2 items
drwxr-xr-x   - pawel  supergroup          0 2015-07-20 22:09 /tmp/frame1
drwx------   - hduser supergroup          0 2015-03-01 15:46 /tmp/hadoop-yarn

hduser@maszyna:/home/pawel$ hadoop fs -ls /tmp/frame1
Found 4 items
-rw-r--r--   3 pawel supergroup          0 2015-07-20 22:09 /tmp/frame1/_SUCCESS
-rw-r--r--   3 pawel supergroup        485 2015-07-20 22:09 /tmp/frame1/_common_metadata
-rw-r--r--   3 pawel supergroup        826 2015-07-20 22:09 /tmp/frame1/_metadata
-rw-r--r--   3 pawel supergroup       2152 2015-07-20 22:09 /tmp/frame1/part-r-00001.gz.parquet

load from HDFS

private val path = "hdfs://localhost:9000/tmp/frame1"

def load={
    val sc: SparkContext = createContext
    val sqlContext = new SQLContext(sc)

    val frame=sqlContext.load(path)
    frame.printSchema()
    frame.show(20)
  }
+--+--------+---+--------------------+
|id|    name|age|           timestamp|
+--+--------+---+--------------------+
| 1| Stefan1|  1|1970-01-01 01:16:...|
| 2| Stefan2|  2|1970-01-01 01:33:...|
| 3| Stefan3|  3|1970-01-01 01:50:...|
| 4| Stefan4|  4|1970-01-01 02:06:...|
| 5| Stefan5|  5|1970-01-01 02:23:...|
| 6| Stefan6|  6|1970-01-01 02:40:...|
| 7| Stefan7|  7|1970-01-01 02:56:...|
| 8| Stefan8|  8|1970-01-01 03:13:...|

results matching ""

    No results matching ""