将 Pandas DataFrame 转换为 Spark DataFrame

Pulamolu Sai Mohan 2023年1月30日
  1. 使用 createDataFrame() 函数将 Pandas DataFrame 转换为 Spark DataFrame
  2. 使用 createDataFrame()schema 函数将 Pandas DataFrame 转换为 Spark DataFrame
  3. 使用启用 apache arrowcreateDataFrame() 函数将 Pandas DataFrame 转换为 Spark DataFrame
将 Pandas DataFrame 转换为 Spark DataFrame

DataFrame 是二维可变数据结构。DataFrame 中的数据存储在称为行和列的标记轴中。

Pandas 和 Spark 都有 DataFrame。本教程将讨论将 Pandas DataFrame 转换为 Spark DataFrame 的不同方法。

使用 createDataFrame() 函数将 Pandas DataFrame 转换为 Spark DataFrame

createDataFrame() 函数用于从 RDD 或 pandas.DataFrame 创建 Spark DataFrame。createDataFrame() 将数据和方案作为参数。

我们将很快讨论方案。

createDataFrame() 的语法:

createDataFrame(data, schema=None)

参数:

  1. data = 要传递的 DataFrame
  2. schema = str 或列表,可选

返回:DataFrame

方法:

  1. 导入 pandas 库并使用 DataFrame() 方法创建一个 Pandas DataFrame。
  2. 通过从 pyspark 库中导入 SparkSession 创建 spark 会话。
  3. 将 Pandas DataFrame 传递给 SparkSession 对象的 createDataFrame() 方法。
  4. 打印 DataFrame。

以下代码使用 createDataFrame() 函数将 Pandas DataFrame 转换为 Spark DataFrame。

# import the pandas
import pandas as pd

# from pyspark library import sql
from pyspark import sql

# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()

# Creating the pandas DataFrame using pandas.DataFrame()

data = pd.DataFrame(
    {
        "Course": ["Python", "Spark", "Java", "JavaScript", "C#"],
        "Mentor": ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
        "price$": [199, 299, 99, 250, 399],
    }
)

# Converting the pandas dataframe in to spark dataframe
spark_DataFrame = spark_session.createDataFrame(data)

# printing the dataframe
spark_DataFrame.show()

输出:

+----------+---------+------+
|    Course|   Mentor|price$|
+----------+---------+------+
|    Python|   Robert|   199|
|     Spark|Elizibeth|   299|
|      Java|    Nolan|    99|
|JavaScript|    Chris|   250|
|        C#|  johnson|   399|
+----------+---------+------+

使用 createDataFrame()schema 函数将 Pandas DataFrame 转换为 Spark DataFrame

我们在前面的示例中讨论了 createDataFrame() 方法。现在我们将看到如何在转换 DataFrame 时更改 schema。

此示例将使用模式更改列名,将 Course 更改为 Technology,将 Mentor 更改为 developer,将 price 更改为 Salary

schema:

schema 定义字段名称及其数据类型。在 Spark 中,schema 是 DataFrame 的结构,DataFrame 的 schema 可以使用 StructType 类来定义,它是 StructField 的集合。

StructField 采用字段或列的名称、数据类型和可为空的。可空参数定义该字段是否可以为空。

方法:

  1. 导入 pandas 库并使用 DataFrame() 方法创建一个 Pandas DataFrame。
  2. 通过从 pyspark 库中导入 SparkSession 创建 Spark 会话。
  3. 通过将 StructField 的集合传递给 StructType 类来创建模式; StructField 对象是通过传递字段的名称、数据类型和可为空来创建的。
  4. 将 Pandas DataFrame 和模式传递给 SparkSession 对象的 createDataFrame() 方法。
  5. 打印 DataFrame。

以下代码使用 createDataFrame()schema 将 Pandas DataFrame 转换为 Spark DataFrame。

# import the pandas
import pandas as pd

# from pyspark library import SparkSession
from pyspark.sql import SparkSession

from pyspark.sql.types import *

# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()


# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame(
    {
        "Course": ["Python", "Spark", "Java", "JavaScript", "C#"],
        "Mentor": ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
        "price$": [199, 299, 99, 250, 399],
    }
)

# Creating/Changing schema
dfSchema = StructType(
    [
        StructField("Technology", StringType(), True),
        StructField("developer", StringType(), True),
        StructField("Salary", IntegerType(), True),
    ]
)

# Converting the pandas dataframe in to spark dataframe
spark_DataFrame = spark_session.createDataFrame(data, schema=dfSchema)


# printing the dataframe
spark_DataFrame.show()

输出:

+----------+---------+------+
|Technology|developer|Salary|
+----------+---------+------+
|    Python|   Robert|   199|
|     Spark|Elizibeth|   299|
|      Java|    Nolan|    99|
|JavaScript|    Chris|   250|
|        C#|  johnson|   399|
+----------+---------+------+

使用启用 apache arrowcreateDataFrame() 函数将 Pandas DataFrame 转换为 Spark DataFrame

Apache Arrow 是一种独立于语言的列式内存格式,用于平面和分层数据或任何结构化数据格式。Apache Arrow 通过创建标准的列式内存格式提高了数据分析的效率。

默认情况下禁用 apache 箭头;我们可以使用以下代码显式启用它。

SparkSession.conf.set("spark.sql.execution.arrow.enabled", "true")

方法:

  1. 导入 pandas 库并使用 DataFrame() 方法创建一个 Pandas DataFrame。
  2. 通过从 pyspark 库中导入 SparkSession 创建 spark 会话。
  3. 使用 conf 属性启用 apache 箭头。
  4. 将 Pandas DataFrame 传递给 SparkSession 对象的 createDataFrame() 方法。
  5. 打印 DataFrame。

以下代码通过启用 apache 箭头将 Pandas DataFrame 转换为 Spark DataFrame 来使用 createDataFrame() 函数。

# import the pandas
import pandas as pd

# from pyspark library import sql
from pyspark import sql

# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()

# Creating the pandas DataFrame using pandas.DataFrame()

data = pd.DataFrame(
    {
        "Course": ["Python", "Spark", "Java", "JavaScript", "C#"],
        "Mentor": ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
        "price$": [199, 299, 99, 250, 399],
    }
)


spark_session.conf.set("spark.sql.execution.arrow.enabled", "true")

# Converting the pandas dataframe in to spark dataframe
sprak_arrow = spark_session.createDataFrame(data)

# printing the dataframe
sprak_arrow.show()

输出:

+----------+---------+------+
|    Course|   Mentor|price$|
+----------+---------+------+
|    Python|   Robert|   199|
|     Spark|Elizibeth|   299|
|      Java|    Nolan|    99|
|JavaScript|    Chris|   250|
|        C#|  johnson|   399|
+----------+---------+------+

相关文章 - Pandas DataFrame