SparkSQLで文字列型を日付型に変換する
SparkSQLで日付文字列を日付型に変更する方法のメモです。
Processing JSON data with Spark SQL
こことか参考になりますが、スキーマを事前に与えたく無いケースもあります。
スキーマはJSONを読み込ませた時に自動で構築させて、あとから特定のフィールド(列)だけは、日付型にしたい、そんなときはどうすれば良いか。
答えはwithColumnとcastでした。
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val data = """{"time":"2016-08-29T12:20:34.999Z","id":1}""" :: Nil
data: List[String] = List({"time":"2016-08-29T12:20:34.999Z","id":1})
scala> val events = sc.parallelize(data)
events: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:35
scala> val df = sqlContext.read.json(events)
df: org.apache.spark.sql.DataFrame = [id: bigint, time: string]
scala>
例えばこうするとDataFrameは構築できますが、timeはstringです。
ここでcastを入れると
scala> val df = sqlContext.read.json(events).withColumn("time", col("time").cast(DataTypes.TimestampType))
df: org.apache.spark.sql.DataFrame = [id: bigint, time: timestamp]
scala> df.show()
+---+--------------------+
| id| time|
+---+--------------------+
| 1|2016-08-29 21:20:...|
+---+--------------------+
ちゃんとtimestampになりました。
TimeZoneが気になる場合もあるでしょう。その場合はto_utc_timestampを使うと良いようです。
scala> val df = sqlContext.read.json(events).withColumn("time", to_utc_timestamp(col("time"), "Asia/Tokyo").cast(DataTypes.TimestampType))
df: org.apache.spark.sql.DataFrame = [id: bigint, time: timestamp]
scala> df.show()
+---+--------------------+
| id| time|
+---+--------------------+
| 1|2016-08-29 12:20:...|
+---+--------------------+
scala>
SparkSQL便利ですね!