Spark DataFrameのschemaにまつわる罠とschemaの補正
こんにちは、@s_tsukaです。今日はSpark DataFrameのschemaについてです。
Spark DataFrame
DataFrameは名前付きの列に整理されたDataSetで、DataSetは分散型コレクションです。データをSQLっぽく操作できるので使ってみるとかなり便利です。このあたりは以下のドキュメントが参考になるかと思います。
Spark SQL, DataFrames and Datasets Guide
Spark SQL, データフレーム および データセット ガイド(上記の日本語訳)
操作例とschemaの罠
このDataFrameを利用するときは場合によってはschemaに気をつけなくてはなりません。
一言で言うと、DataFrameReaderでjsonやparquetファイルを読み込んだとき、すべてのfieldは強制的にnullableになってしまいます。
具体的にどういうことなのか見ていきましょう。
まずは適当なjsonファイルを用意しておきます。
% cat a.json
{"a": "aaa", "b": "bbb"}
公式サイトから予めDLしておいたバイナリを使ってspark-shellを実行します。どのversionでも良いのですが今回はSpark 2.1.0 Pre-built for Apache Hadoop 2.6を使っています。
spark-shellを起動するとspark変数(SparkSession型)が使えるようになるので、これでDataFrameの読み込みなど操作を行っていきます。
まずは先程のjsonファイルをDataFrameReaderで読み込んでSchemaを出力してみます。
scala> spark.read.json("/path/to/a.json").printSchema
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
nullableになりました。しかしjsonファイル内にはschemaが存在しないので、これはある意味当たり前です。
DataFrameReaderには#schemaというメソッドがあり、これを使うことで読み込み時にスキーマを適用できます。早速やってみましょう。
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val schema = StructType(Array(StructField("a", StringType, true), StructField("b", StringType, false)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,StringType,true), StructField(b,StringType,false))
scala> spark.read.schema(schema).json("/path/to/a.json").printSchema
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
残念ですが意図通りになりませんでした。フィールドaはnullable = true, フィールドbはnullable = falseというschemaをあてたにも関わらず結果は両方nullable = trueです。
以下はどうでしょうか。bというフィールドがなく、代わりにcというフィールドがあります。
scala> val schema = StructType(Array(StructField("a", StringType, true), StructField("c", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,StringType,true), StructField(c,StringType,true))
scala> spark.read.schema(schema).json("/path/to/a.json").printSchema
root
|-- a: string (nullable = true)
|-- c: string (nullable = true)
元のデータにはcというフィールドはありませんでしたが、ちゃんとschemaとしてはありますね。
このようにschemaメソッドでschemaを指定でき、それに従って読み込むが、その後すべてのfieldはnullableになるという仕様を持っています。
ではどうすればよいのでしょうか。
schemaの補正
答えは簡単で、プログラム上特に問題がなければそのまま利用し、問題がある場合はschemaの補正をしてあげればOKです。
まず読み込んだデータがnullableだからといって、集計などには問題が無いはずです。どういうときに問題かというと、このデータをparquetファイルなどに保存するときです。実際にやってみましょう。
scala> spark.read.json("/path/to/a.json").write.parquet("/path/to/parquet/")
% parquet-tools schema part-00000-4048efdd-71c6-41fe-a303-c6c1904a1da4.snappy.parquet
message spark_schema {
optional binary a (UTF8);
optional binary b (UTF8);
}
(parquet-toolsは普通の端末には入ってないので、brewとかで入れてください)
残念ながらoptionalになってます。元のDataFrameがnullableなので当然といえば当然ですが・・・、正しいschemaがあるので、その正しいschemaで保存したいというニーズもあると思います。
これを実現するにはschemaの補正を行います。やり方は簡単です。
scala> val schema = StructType(Array(StructField("a", StringType, true), StructField("b", StringType, false)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,StringType,true), StructField(b,StringType,false))
scala> spark.createDataFrame(spark.read.json("/path/to/a.json").rdd, schema).printSchema
root
|-- a: string (nullable = true)
|-- b: string (nullable = false)
正しくbがnullable = falseになりました。
DataFrameからはrddが取り出せるので、それを取り出し、RDD+schemaでDataFrameを作り直す、というだけです。これを再度保存してみましょう。
scala> spark.createDataFrame(spark.read.json("/path/to/a.json").rdd, schema).write.parquet("/path/to/parquet2/")
% parquet-tools schema part-00000-ea95122e-e6ab-4c00-bea7-67c907ba621d.snappy.parquet
message spark_schema {
optional binary a (UTF8);
required binary b (UTF8);
}
正しくrequiredになってますね。
最後に一応このrequiredのparquetファイルをDataFrameReaderで読み込んでみましょう。
scala> spark.read.parquet("/path/to/parquet2/").printSchema
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
残念ながらbがnullable = trueになっています。
さらなる罠、読み込み元がRDDの場合はschemaが維持される
上記の説明で、read.jsonやread.parquetでファイルを読み込んだ場合は強制的にnullable = trueになると言いました。ですが、read.jsonでファイルではなく、RDDを読み込んだ場合はまた挙動が違います。早速実験します。
scala> spark.read.schema(schema).json(sc.parallelize(Seq("""{"a": "aaa", "b": "bbb"}"""))).printSchema
root
|-- a: string (nullable = true)
|-- b: string (nullable = false)
schema(schema)でスキーマを与えており、このnullableが正しく維持されています。
nullable = true が強制されるのはファイルを読み込んだとき、です。
まとめ
- Spark DataFrameのDataFrameReaderでファイルを読み込む場合、強制的にnullableになります
- DataFrameReader#schemaを使ってもnullableの指定は変更されません
- createDataFrameを使ってschemaを補正しましょう
- あくまでファイルを読み込んだ場合の話で、元データがRDDの場合は挙動が異なるので注意しましょう
みなさんもお気をつけください。