Spark merge rows in one row

问题: I have the following Dataframe: #+-----------------------------+--------+--------+---------+ #|PROVINCE_STATE|COUNTRY_REGION|2/1/2020|2/1/2020|2/11/2020| #+-----------...

问题:

I have the following Dataframe:

enter image description here

#+-----------------------------+--------+--------+---------+
#|PROVINCE_STATE|COUNTRY_REGION|2/1/2020|2/1/2020|2/11/2020|
#+--------------+--------------+--------+--------+---------+
#|             -|     Australia|      12|      15|       15|
#+--------------+--------------+--------+--------+---------+

I need to merge all rows in one, and for dates to have sum based on COUNTRY_REGION. The thing is that I have many more columns and no idea how to do it dynamically. Tried groupBy, but still don't work. Thanks.


回答1:

If your first two columns are always province and state and other n-columns are dates you can try below (Scala):

import org.apache.spark.sql.functions._
val dateCols = df.columns.drop(2).map(c => sum(c).as(c)) // select all columns except first 2 and perform sum on each of them
df.groupBy('country_region).agg(dateCols.head,dateCols.tail:_*).show()

python version:

import pyspark.sql.functions as f
dateCols = [f.sum(c) for c in df.columns][2:] # select all columns except first 2 and perform sum on each of them
df.groupBy('country_region').agg(*dateCols).show()

output:

+--------------+--------+---------+---------+
|country_region|2/1/2020|2/10/2020|2/11/2020|
+--------------+--------+---------+---------+
|           aus|      12|       15|       15|
+--------------+--------+---------+---------+

回答2:

Use aggregation:

select '-' as province_state, country_region,
       sum(`2/1/2020`), sum(`2/10/2020`), sum(`2/11/2020`)
from t
group by country_region;

I'm not sure what you mean by "dynamically". As a SQL query, you need to list each expression independently.


回答3:

Try this.

from pyspark.sql import functions as F
from dateutil.parser import parse

def is_date(string, fuzzy=False):
    try: 
        parse(string, fuzzy=fuzzy)
        return True
    except ValueError:
        return False

df.groupBy(F.lit('-').alias("PROVINCE_STATE"),'COUNTRY_REGION')
  .agg(*((F.sum(x)).cast('int').alias(x) for x in df.columns if is_date(x)==True)).show()


#+--------------+--------------+--------+---------+---------+
#|PROVINCE_STATE|COUNTRY_REGION|2/1/2020|2/10/2020|2/11/2020|
#+--------------+--------------+--------+---------+---------+
#|             -|     Australia|      12|       15|       15|
#+--------------+--------------+--------+---------+---------+

回答4:

Try this in pyspark: One way of doing this is using window functions

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window


    spark = SparkSession.builder 
        .appName('SO')
        .getOrCreate()


    sc= spark.sparkContext

    df = sc.parallelize([
        ("new south wales", "aus", 4, 4, 4),("victoria",  "aus", 4, 4, 4), ("queensland",  "aus", 3, 5, 5), ("south australia","aus", 1, 2, 2)
    ]).toDF(["province_state", "country_region", "2/1/2020", "2/10/2020", "2/11/2020"])

    df.show()
    #
    # +---------------+--------------+--------+---------+---------+
    # | province_state|country_region|2/1/2020|2/10/2020|2/11/2020|
    # +---------------+--------------+--------+---------+---------+
    # |new south wales|           aus|       4|        4|        4|
    # |       victoria|           aus|       4|        4|        4|
    # |     queensland|           aus|       3|        5|        5|
    # |south australia|           aus|       1|        2|        2|
    # +---------------+--------------+--------+---------+---------+

    w = Window().partitionBy('country_region')

    w1 = Window().partitionBy('country_region').orderBy('country_region')

    for column in df.columns:
        if column not in ['country_region','province_state']:
            df = df.withColumn(column, F.sum(column).over(w) )

    df1 = df.withColumn("r_no", F.row_number().over(w1)).where(F.col('r_no')==1)

    df1.select(F.lit('_').alias('province_state'), *[ column for column in df1.columns if column not in ['province_state']]).drop(F.col('r_no')).show()

    # +--------------+--------------+--------+---------+---------+
    # |province_state|country_region|2/1/2020|2/10/2020|2/11/2020|
    # +--------------+--------------+--------+---------+---------+
    # |             _|           aus|      12|       15|       15|
    # +--------------+--------------+--------+---------+---------+
  • 发表于 2020-06-27 22:53
  • 阅读 ( 290 )
  • 分类:sof

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除