- カウントする
df.count()
print df.printSchema()
- DynamicFrame <-> Dataframe 変換
from awsglue.dynamicframe import DynamicFrame # DynamicFrame -> Spark DataFrame df = DynamicFrame.toDF(<DynamicFrame>) # Spark DataFrame -> DynamicFrame result = DynamicFrame.fromDF(<DataFrame>, glueContext, 'result')
- 空の Dataframe を作成する
from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType sc = SparkContext(conf=SparkConf()) spark = SparkSession(sc) schema = StructType([ StructField("column1",StringType(),True), StructField("column2",StringType(),True) ]) df = spark.createDataFrame(sc.emptyRDD(), schema)
Dataframe に カラムを追加
- コード
- col1、col2、col3 を "_" 区切りで連結して、カラム名を col_name としている。
from pyspark.sql.functions import concat, col, lit df = df.select(concat(col("col1"), lit('_'),col("col2"), lit('_'),col("col3") ).alias("col_name"))
- 参考:
- Python: PySpark で DataFrame にカラムを追加する - CUBE SUGAR CONTAINER
- pyspark.sql module — PySpark master documentation
- pyspark.sql module — PySpark master documentation
df.select(concat(col("fname"),lit(','), col("mname"),lit(','),col("lname")).as("FullName")) .show(false)Spark - How to Concatenate DataFrame columns — SparkByExamples
DataFrame の show() メソッドでデータを省略せずに表示する
- コード
df.show(truncate=False)
- 参考
show(n=20, truncate=True, vertical=False)
Prints the first n rows to the console.
- Parameters
- n – Number of rows to show.
- truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.
- vertical – If set to True, print output rows vertically (one line per column value).
>>> df DataFrame[age: int, name: string] >>> df.show() +---+-----+ |age| name| +---+-----+ | 2|Alice| | 5| Bob| +---+-----+ >>> df.show(truncate=3) +---+----+ |age|name| +---+----+ | 2| Ali| | 5| Bob| +---+----+ >>> df.show(vertical=True) -RECORD 0----- age | 2 name | Alice -RECORD 1----- age | 5 name | Bob New in version 1.3.pyspark.sql module — PySpark master documentation
group byする
- コード
df = df.groupby("col1", "col2").select("col")
- 参考
groupByでまとめてからaggで集計する
# 書き方1 df.groupBy(col('col_name')).agg({'col_name1': 'expr', 'col_name2': 'expr'}) #書き方2 .aliasがかけやすい df.groupBy(col('col_name')).agg(sum(col('col_name')))利用可能な集計方法
Pyspark dataframe操作 - Qiita
項目 expr 件数 count 平均 mean 平均 avg 総和 sum 最大値 max 最小値 min
- とある処理
def get_e_id(df, minimal_diff=100000): window_spec = Window.partitionBy(["C_ID", "T_ID"]).orderBy([ "TS"]) _df_time_diff = ( df .withColumn('time_diff', col("TS") - lag(col("TS"), 1).over(window_spec)) .withColumn('start_flg', when((col('time_diff')!=minimal_diff)|(col('time_diff').isNull()), 1).otherwise(0)) .withColumn('event_id', F.sum('start_flg').over(window_spec)) ) return _df_time_diff
DataFrame で射影する
from pyspark.sql.functions import col df = df.select(col("col1"),col('col2'))
Glue
ファイルをまとめて書く
- コード1(DataFrameで書く)
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") dropnullfields3 = dropnullfields3.toDF().repartition(4) dropnullfields3.write.mode('overwrite').parquet('s3://test-glue00/se2/out_flightdata1/')
- コード2(DynamicFrameで書く)
from awsglue.dynamicframe import DynamicFrame dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") df_dropnullfields3 = dropnullfields3.toDF().repartition(4) ds_writer = DynamicFrame.fromDF(df_dropnullfields3, glueContext, "ds_writer") datasink4 = glueContext.write_dynamic_frame.from_options(frame = ds_writer, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata1/"}, format = "parquet", transformation_ctx = "datasink4")