ablog

不器用で落着きのない技術者のメモ

PySparkメモ

  • カウントする
df.count()
print df.printSchema()
  • DynamicFrame <-> Dataframe 変換
from awsglue.dynamicframe import DynamicFrame

# DynamicFrame -> Spark DataFrame
df = DynamicFrame.toDF(<DynamicFrame>)

# Spark DataFrame -> DynamicFrame
result = DynamicFrame.fromDF(<DataFrame>, glueContext, 'result')
from pyspark.sql.types import StructType 
from pyspark.sql.types import StructField 
from pyspark.sql.types import StringType 

sc = SparkContext(conf=SparkConf()) 
spark = SparkSession(sc) 

schema = StructType([ 
    StructField("column1",StringType(),True), 
    StructField("column2",StringType(),True) 
]) 
df = spark.createDataFrame(sc.emptyRDD(), schema) 
Dataframe に カラムを追加
  • コード
    • col1、col2、col3 を "_" 区切りで連結して、カラム名を col_name としている。
from pyspark.sql.functions import concat, col, lit
df = df.select(concat(col("col1"), lit('_'),col("col2"), lit('_'),col("col3") ).alias("col_name"))
  df.select(concat(col("fname"),lit(','),
    col("mname"),lit(','),col("lname")).as("FullName"))
      .show(false)
Spark - How to Concatenate DataFrame columns — SparkByExamples
DataFrame の show() メソッドでデータを省略せずに表示する
  • コード
df.show(truncate=False)
  • 参考

show(n=20, truncate=True, vertical=False)
Prints the first n rows to the console.

  • Parameters
    • n – Number of rows to show.
    • truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.
    • vertical – If set to True, print output rows vertically (one line per column value).
>>> df
DataFrame[age: int, name: string]
>>> df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
>>> df.show(truncate=3)
+---+----+
|age|name|
+---+----+
|  2| Ali|
|  5| Bob|
+---+----+
>>> df.show(vertical=True)
-RECORD 0-----
 age  | 2
 name | Alice
-RECORD 1-----
 age  | 5
 name | Bob
New in version 1.3.
pyspark.sql module — PySpark master documentation
group byする
  • コード
df = df.groupby("col1", "col2").select("col")
  • 参考

groupByでまとめてからaggで集計する

# 書き方1
df.groupBy(col('col_name')).agg({'col_name1': 'expr', 'col_name2': 'expr'})

#書き方2 .aliasがかけやすい
df.groupBy(col('col_name')).agg(sum(col('col_name')))

利用可能な集計方法

項目 expr
件数 count
平均 mean
平均 avg
総和 sum
最大値 max
最小値 min
Pyspark dataframe操作 - Qiita
  • とある処理
def get_e_id(df, minimal_diff=100000):
    window_spec = Window.partitionBy(["C_ID", "T_ID"]).orderBy([ "TS"])
    _df_time_diff = (
        df
        .withColumn('time_diff', col("TS") - lag(col("TS"), 1).over(window_spec))
        .withColumn('start_flg', when((col('time_diff')!=minimal_diff)|(col('time_diff').isNull()), 1).otherwise(0))
        .withColumn('event_id', F.sum('start_flg').over(window_spec))
    )
    return _df_time_diff
DataFrame で射影する
from pyspark.sql.functions import col
df = df.select(col("col1"),col('col2'))
DataFrame をソートする
  • コード
df=df.orderBy(col('col1').asc())
DataFrame の上位N件を取得する
  • コード
df=df.limit(10).show()

Glue

ファイルをまとめて書く
  • コード1(DataFrameで書く)
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields3 = dropnullfields3.toDF().repartition(4)
dropnullfields3.write.mode('overwrite').parquet('s3://test-glue00/se2/out_flightdata1/')
  • コード2(DynamicFrameで書く)
from awsglue.dynamicframe import DynamicFrame

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
df_dropnullfields3 = dropnullfields3.toDF().repartition(4)
ds_writer = DynamicFrame.fromDF(df_dropnullfields3, glueContext, "ds_writer")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = ds_writer, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata1/"}, format = "parquet", transformation_ctx = "datasink4")