SparkSQLで文字列型を日付型に変換する


SparkSQLで日付文字列を日付型に変更する方法のメモです。

Processing JSON data with Spark SQL

こことか参考になりますが、スキーマを事前に与えたく無いケースもあります。

スキーマはJSONを読み込ませた時に自動で構築させて、あとから特定のフィールド(列)だけは、日付型にしたい、そんなときはどうすれば良いか。

答えはwithColumnとcastでした。

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val data = """{"time":"2016-08-29T12:20:34.999Z","id":1}""" :: Nil
data: List[String] = List({"time":"2016-08-29T12:20:34.999Z","id":1})

scala> val events = sc.parallelize(data)
events: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:35

scala> val df = sqlContext.read.json(events)
df: org.apache.spark.sql.DataFrame = [id: bigint, time: string]

scala>

例えばこうするとDataFrameは構築できますが、timeはstringです。

ここでcastを入れると

scala> val df = sqlContext.read.json(events).withColumn("time", col("time").cast(DataTypes.TimestampType))
df: org.apache.spark.sql.DataFrame = [id: bigint, time: timestamp]

scala> df.show()
+---+--------------------+
| id|                time|
+---+--------------------+
|  1|2016-08-29 21:20:...|
+---+--------------------+

ちゃんとtimestampになりました。

TimeZoneが気になる場合もあるでしょう。その場合はto_utc_timestampを使うと良いようです。

scala> val df = sqlContext.read.json(events).withColumn("time", to_utc_timestamp(col("time"), "Asia/Tokyo").cast(DataTypes.TimestampType))
df: org.apache.spark.sql.DataFrame = [id: bigint, time: timestamp]

scala> df.show()
+---+--------------------+
| id|                time|
+---+--------------------+
|  1|2016-08-29 12:20:...|
+---+--------------------+


scala>

SparkSQL便利ですね!