Spark DataFrameのschemaにまつわる罠とschemaの補正


こんにちは、@s_tsukaです。今日はSpark DataFrameのschemaについてです。

Spark DataFrame

DataFrameは名前付きの列に整理されたDataSetで、DataSetは分散型コレクションです。データをSQLっぽく操作できるので使ってみるとかなり便利です。このあたりは以下のドキュメントが参考になるかと思います。

Spark SQL, DataFrames and Datasets Guide

Spark SQL, データフレーム および データセット ガイド(上記の日本語訳)

操作例とschemaの罠

このDataFrameを利用するときは場合によってはschemaに気をつけなくてはなりません。

一言で言うと、DataFrameReaderでjsonやparquetファイルを読み込んだとき、すべてのfieldは強制的にnullableになってしまいます。

参考:https://stackoverflow.com/questions/39697193/nullable-field-is-changed-upon-writing-a-spark-dataframe

具体的にどういうことなのか見ていきましょう。

まずは適当なjsonファイルを用意しておきます。

% cat a.json
{"a": "aaa", "b": "bbb"}

公式サイトから予めDLしておいたバイナリを使ってspark-shellを実行します。どのversionでも良いのですが今回はSpark 2.1.0 Pre-built for Apache Hadoop 2.6を使っています。

spark-shellを起動するとspark変数(SparkSession型)が使えるようになるので、これでDataFrameの読み込みなど操作を行っていきます。

まずは先程のjsonファイルをDataFrameReaderで読み込んでSchemaを出力してみます。

scala> spark.read.json("/path/to/a.json").printSchema
root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)

nullableになりました。しかしjsonファイル内にはschemaが存在しないので、これはある意味当たり前です。

DataFrameReaderには#schemaというメソッドがあり、これを使うことで読み込み時にスキーマを適用できます。早速やってみましょう。

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val schema = StructType(Array(StructField("a", StringType, true), StructField("b", StringType, false)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,StringType,true), StructField(b,StringType,false))

scala> spark.read.schema(schema).json("/path/to/a.json").printSchema
root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)

残念ですが意図通りになりませんでした。フィールドaはnullable = true, フィールドbはnullable = falseというschemaをあてたにも関わらず結果は両方nullable = trueです。

以下はどうでしょうか。bというフィールドがなく、代わりにcというフィールドがあります。

scala> val schema = StructType(Array(StructField("a", StringType, true), StructField("c", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,StringType,true), StructField(c,StringType,true))

scala> spark.read.schema(schema).json("/path/to/a.json").printSchema
root
 |-- a: string (nullable = true)
 |-- c: string (nullable = true)

元のデータにはcというフィールドはありませんでしたが、ちゃんとschemaとしてはありますね。

このようにschemaメソッドでschemaを指定でき、それに従って読み込むが、その後すべてのfieldはnullableになるという仕様を持っています。

ではどうすればよいのでしょうか。

schemaの補正

答えは簡単で、プログラム上特に問題がなければそのまま利用し、問題がある場合はschemaの補正をしてあげればOKです。

まず読み込んだデータがnullableだからといって、集計などには問題が無いはずです。どういうときに問題かというと、このデータをparquetファイルなどに保存するときです。実際にやってみましょう。

scala> spark.read.json("/path/to/a.json").write.parquet("/path/to/parquet/")
% parquet-tools schema part-00000-4048efdd-71c6-41fe-a303-c6c1904a1da4.snappy.parquet
message spark_schema {
  optional binary a (UTF8);
  optional binary b (UTF8);
}

(parquet-toolsは普通の端末には入ってないので、brewとかで入れてください)

残念ながらoptionalになってます。元のDataFrameがnullableなので当然といえば当然ですが・・・、正しいschemaがあるので、その正しいschemaで保存したいというニーズもあると思います。

これを実現するにはschemaの補正を行います。やり方は簡単です。

scala> val schema = StructType(Array(StructField("a", StringType, true), StructField("b", StringType, false)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,StringType,true), StructField(b,StringType,false))

scala> spark.createDataFrame(spark.read.json("/path/to/a.json").rdd, schema).printSchema
root
 |-- a: string (nullable = true)
 |-- b: string (nullable = false)

正しくbがnullable = falseになりました。

DataFrameからはrddが取り出せるので、それを取り出し、RDD+schemaでDataFrameを作り直す、というだけです。これを再度保存してみましょう。

scala> spark.createDataFrame(spark.read.json("/path/to/a.json").rdd, schema).write.parquet("/path/to/parquet2/")

% parquet-tools schema part-00000-ea95122e-e6ab-4c00-bea7-67c907ba621d.snappy.parquet
message spark_schema {
 optional binary a (UTF8);
 required binary b (UTF8);
}

正しくrequiredになってますね。

最後に一応このrequiredのparquetファイルをDataFrameReaderで読み込んでみましょう。

scala> spark.read.parquet("/path/to/parquet2/").printSchema
root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)

残念ながらbがnullable = trueになっています。

さらなる罠、読み込み元がRDDの場合はschemaが維持される

上記の説明で、read.jsonやread.parquetでファイルを読み込んだ場合は強制的にnullable = trueになると言いました。ですが、read.jsonでファイルではなく、RDDを読み込んだ場合はまた挙動が違います。早速実験します。

scala> spark.read.schema(schema).json(sc.parallelize(Seq("""{"a": "aaa", "b": "bbb"}"""))).printSchema
root
 |-- a: string (nullable = true)
 |-- b: string (nullable = false)

schema(schema)でスキーマを与えており、このnullableが正しく維持されています。

nullable = true が強制されるのはファイルを読み込んだとき、です。

まとめ

  • Spark DataFrameのDataFrameReaderでファイルを読み込む場合、強制的にnullableになります
  • DataFrameReader#schemaを使ってもnullableの指定は変更されません
  • createDataFrameを使ってschemaを補正しましょう
  • あくまでファイルを読み込んだ場合の話で、元データがRDDの場合は挙動が異なるので注意しましょう

みなさんもお気をつけください。