ablog

不器用で落着きのない技術者のメモ

Spark の RDD、DataFrame、DAG と Glue の DynamicFrame などについて

Sparkとは

P.100

Apache Spark」も、MapReduce より効率の良いデータ処理を実現するプロジェクトとして開発が進められています。Hadoop の延長線上にある Tez とは異なり、Spark は Hadoop とは別の独立したプロジェクトです。Spark の特徴は大量のメモリを活用して高速化を実現することです。(中略)コンピュータが異常停止すると途中まで処理した中間データは消えてしまいますが、そのときには処理をやり直して、失われた中間データをま作れば良いというのが Spark の考え方です(図 3.8)。

(中略)

Spark は Hadoop を置き換えるものではなく、MapReduce を置き換える存在です。例えば、分散ファイルシステムである HDFS や、リソースマネージャである YARN などは、Spark からでもそのまま利用できます。Hadoop を利用しない構成も可能であり、分散ストレージとして Amazon S3 を利用したり、あるいは分散データベースである Cassandra からデータを読み込んだりするようなことも可能です。

Spark の歴史

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

P.5

Spark プロジェクトはもともとカリフォルニア大学バークレー校 AMPLab の研究プロジェクトとして2009年にスタートしました。BDAS(the Berkeley Data Analytics Stack)と呼ばれるビッグデータ分析のためのソフトウェアスタックがあり、Spark はそのコンポーネントのひとつに位置付けられています。Spark プロジェクトは2010年初頭にオープンソース化され、2013年6月に Apache Incubator Project に採択されて「Apache Spark」となりました。この頃から本格的な開発体制が整い始め、2013年10月には AMPLab から Spark 開発者がスピンアウトして、米 Databrics が設立されました。現在も Spark 開発者の多くは Databrics に所属しています。

AMPLAB is a University of California, Berkeley lab focused on Big data analytics. The name stands for the Algorithms, Machines and People Lab.[1][2] It has been publishing papers since 2008[3] and was officially launched in 2011.[4]
While AMPLab has worked on a wide variety of big data projects, many know it as the lab that invented Apache Spark.[5]

AMPLab - Wikipedia

About | AMPLab – UC Berkeley

DAG(Directed Acyclic Graph)とは

P.202-204

MapReduce に変わる新しいフレームワーク DAGによる内部表現
新しいフレームワークに共通するのがDAG(directed acyclic graph)と呼ばれるデータ構造です(図 5.12)。日本語では「有向非循環グラフ」と呼ばれます。DAG そのものは何かの技術ではなく、数学やコンピュータアルゴリズムで用いられるデータモデルの一つです。DAG は、次のような性質を持ちます。

  • ノードとノードが矢印で結ばれる(有向)
  • 矢印をいくら辿っても同じノードに戻らない(非循環)

データフローでは、実行すべき一連のタスクをDAGによるデータ構造として表現します。図中の矢印はタスクの実行順序を示しており、その依存関係を保ちながらうまく実行順序を決めることで、すべてのタスクを漏れなく完了させることができます。後は、これをどれだけ効率よく実行できるかという問題です。
従来の MapReduce も「Map」と「Reduce」の2種類のノードから成るシンプルなDAGであると考えることができます。ただし、一つのノードで処理が終わらなければ次の処理に進めないという非効率なものでした。
一方、データフローではDAGを構成する各ノードがすべて同時並行で実行されます。処理の終わったデータは、ネットワーク経由で次々と受け渡され、MapReduce にあった待ち時間をなくしています。


SparkにおけるDAG
DAGはシステムの内部的な表現であり、利用者がその存在を意識することはほとんどありません。データフローに限らず、Hive on Tez や Presto のようなクエリエンジンでもDAGは採用されており、SQLからDAGのデータ構造が内部で自動生成されています。一方、Spark のようなデータフローのフレームワークでは、プログラミング言語を用いてより直接的にDAGのデータ構造を組み立てます。
(中略)
DAGによるプログラミングの特徴が遅延評価(lazy evaluation)です。プログラムの各行は、実際にはDAGのデータ構造を組み立てているだけであり、その場では何の処理も行いません。まずはDAGを構築し、その後で明示的に、あるいは暗黙的に実行結果を要求することによって、ようやくデータ処理が開始されます。
MapReduceのようにMapやReduceを一つずつ実行するのではなく、最初にデータパイプライン全体をDAGとして組み立ててから実行に移すことで、内部のスケジューラが分散システムにとって効率の良い実行計画を建ててくれるのがデータフローの優れたところです。

RDDとは

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

P.14

Apache Spark のデータ処理には「RDD(Resilient Distributed Dataset)」と呼ばれるデータ構造を利用します。Spark のプログラミングモデルは「RDDを加工して新たなRDDを生成し、これを繰り返すことで目的の結果を得る」というものになっています。
(中略)
RDD は大量のデータを要素として保持する分散コレクションです。巨大な配列やリストのようなデータ構造を想像すると分かりやすいでしょう。RDD は複数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的にはパーティションというかたまりに分割されています。Spark では、このパーティションが分散処理の単位となります。RDDパーティションごとに複数のマシンで処理することによって、単一のマシンでは処理しきれない大量のデータを扱うことができるのです。
ユーザーはたとえばHDFSなどの分散ファイルシステム上のファイルの内容を RDD にロードし、RDD を加工することで大量のデータの分散処理を実現できます。Spark ではこの加工に相当する処理を「変換」と呼びます。そして、RDD の内容を元に「アクション」と呼ばれる処理を適用して目的の結果を得るのです(図 2.2)。
このほかに、RDD はイミュータブル(内部の要素の値を変更できない)ということと、生成や変換が遅延評価されるという性質があります。

DataFrame*1 とは

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

P.110

Spark SQL ではドライバプログラムからさまざまな形式のデータセットを統一的に扱うために、DataFrame と呼ばれる抽象的なデータ構造を用います。DataFrame とは RDBMS のテーブルのように行と名前とデータ型が付与された列の概念を持つデータ構造です。Spark SQL ではさまざまnあデータ型をサポートしており、Dataframe の列のデータ型に指定することができます。

DynamicFrame とは

  • AWS Glue でデータの抽出・変換をする際に使う Spark の DataFrame の Wrapper。
  • DynamicFrame は PythonScala 両方の API がある。
  • DataFrame は最初にスキーマ定義(列の型など)が必要で、同一列に型が異なる値があると String としてしか扱えないが、DynamicFrame だとスキーマ定義で読んだ上で型を揃えるなどの前処理ができる?
  • なので、DynamicFrame で前処理、DataFrame でSparkSQLで高度なAPIを使う、DynamicFrame に変換して書出しといった使い方になる?

参考

  • Python の DynamicFrame クラスの説明

DynamicFrame クラス
Apache Spark の主要な抽象化の 1 つは SparkSQL DataFrame で、これは R と Pandas にある DataFrame 構造に似ています。DataFrame はテーブルと似ており、機能スタイル (マップ/リデュース/フィルタ/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。

DataFrames は、強力で広く使用されていますが、抽出、変換、およびロード (ETL) 操作に関しては制限があります。最も重要なのは、データをロードする前にスキーマを指定する必要があることです。SparkSQL は、データに対してパスを 2 つ作ることでこれを解決します。1 つ目はスキーマを推測し、2 つ目はデータをロードします。ただし、この推測は限定されており、実際の煩雑なデータには対応しません。たとえば、同じフィールドが異なるレコードの異なるタイプである可能性があります。Apache Spark は、多くの場合、作業を中断して、元のフィールドテキストを使用してタイプを string として報告します。これは正しくない可能性があり、スキーマの不一致を解決する方法を細かく制御する必要があります。また、大規模なデータセットの場合、ソースデータに対する追加パスが非常に高価になる可能性があります。

これらの制限に対応するために、AWS Glue により DynamicFrame が導入されました。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。

同様に、DynamicRecord は DynamicFrame 内の論理レコードを表します。これは、Spark DataFrame の行と似ていますが、自己記述型であり、固定スキーマに適合しないデータに使用できます。

スキーマの不一致を解決したら、DynamicFrames を DataFrames との間で変換することができます。

DynamicFrame クラス - AWS Glue
  • Scala の DynamicFrame の説明

DynamicFrame は、自己記述型の DynamicRecord オブジェクトの分散コレクションです。

DynamicFrame は、ETL (抽出、変換、ロード) オペレーションの柔軟なデータモデルを提供するように設計されています。これらのオブジェクトを作成するのにスキーマは必要なく、乱雑または不整合な値や型を持つデータの読み取りと変換に使用できます。スキーマは、スキーマを必要とするオペレーションでオンデマンドで計算できます。

DynamicFrame は、データクリーニングと ETL 用の広範な変換を提供します。また、既存のコードと統合するための SparkSQL DataFrames との相互変換や、DataFrames が提供する多くの分析オペレーションをサポートしています。

AWS Glue Scala DynamicFrame クラス - AWS Glue

GlueContext
The file context.py contains the GlueContext class. GlueContext extends PySpark's SQLContext class to provide Glue-specific operations. Most Glue programs will start by instantiating a GlueContext and using it to construct a DynamicFrame.

DynamicFrame
The DynamicFrame, defined in dynamicframe.py, is the core data structure used in Glue scripts. DynamicFrames are similar to Spark SQL's DataFrames in that they represent distributed collections of data records, but DynamicFrames provide more flexible handling of data sets with inconsistent schemas. By representing records in a self-describing way, they can be used without specifying a schema up front or requiring a costly schema inference step.

DynamicFrames support many operations, but it is also possible to convert them to DataFrames using the toDF method to make use of existing Spark SQL operations.

https://github.com/awslabs/aws-glue-libs/tree/master/awsglue
  • DynamicFrame で使えるメソッド

— Construction —

  • __init__
  • fromDF
  • toDF

(中略)

— Transforms —

  • apply_mapping
  • drop_fields
  • filter
  • join
  • map
  • relationalize
  • rename_field
  • resolveChoice
  • select_fields
  • spigot
  • split_fields
  • split_rows
  • unbox
  • unnest
  • write

(中略)

— Errors —

  • assertErrorThreshold
  • errorsAsDynamicFrame
  • errorsCount
  • stageErrorsCount
DynamicFrame Class - AWS Glue
# Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Licensed under the Amazon Software License (the "License"). You may not use
# this file except in compliance with the License. A copy of the License is
# located at
#
#  http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.

import json
from awsglue.utils import makeOptions, callsite
from itertools import imap, ifilter
from awsglue.gluetypes import _deserialize_json_string, _create_dynamic_record, _revert_to_dict, _serialize_schema
from awsglue.utils import _call_site, _as_java_list, _as_scala_option, _as_resolve_choiceOption
from pyspark.rdd import RDD, PipelinedRDD
from pyspark.sql.dataframe import DataFrame
from pyspark.serializers import PickleSerializer, BatchedSerializer


class ResolveOption(object):
    """
    ResolveOption is used for resolve ChoiceType while converting DynamicRecord to DataFrame
    option.action includes "Project", "KeepAsStruct" and "Cast".
    """
    def __init__(self, path, action, target=None):
        """
        :param path: string, path name to ChoiceType
        :param action: string,
        :param target: spark sql Datatype
        """
        self.path = path
        self.action = action
        self.target = target

*1:SparkSQLの