PySpark, библиотека Python для Apache Spark, предоставляет мощный набор инструментов для распределенной обработки и анализа данных. В этой статье мы углубимся в различные методы, доступные в PySpark для реализации условных операций. Мы рассмотрим различные методы, предоставим примеры кода и выделим варианты их использования. К концу этой статьи вы получите четкое представление о том, как использовать эти методы для выполнения сложных манипуляций с данными на основе условной логики.
Метод 1: when()
и otherwise()
Функция when()
в PySpark позволяет определять условные выражения внутри DataFrame. Требуется условие и значение, которое будет оценено, если условие истинно. Вы можете объединить несколько операторов when()
и использовать функцию otherwise()
для указания значения по умолчанию, если ни одно из условий не соответствует. Вот пример:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Apply conditional logic using when() and otherwise()
df = df.withColumn("Category", when(df.Age < 30, "Young").otherwise("Old"))
df.show()
Выход:
+-------+---+--------+
| Name|Age|Category|
+-------+---+--------+
| Alice| 25| Young|
| Bob| 30| Old|
|Charlie| 35| Old|
+-------+---+--------+
Method 2: `expr()`
PySpark's `expr()` function enables you to specify conditional operations using SQL-like expressions. It allows you to write complex logical conditions and transform data based on those conditions. Here's an example:
```python
from pyspark.sql.functions import expr
# Add a new column using expr()
df = df.withColumn("Status", expr("CASE WHEN Age < 30 THEN 'Active' ELSE 'Inactive' END"))
df.show()
Выход:
+-------+---+--------+--------+
| Name|Age|Category| Status|
+-------+---+--------+--------+
| Alice| 25| Young| Active|
| Bob| 30| Old|Inactive|
|Charlie| 35| Old|Inactive|
+-------+---+--------+--------+
Method 3: `filter()`
The `filter()` function in PySpark allows you to filter rows from a DataFrame based on a specified condition. It can be used to implement conditional filtering effectively. Here's an example:
```python
# Filter rows based on a condition using filter()
filtered_df = df.filter(df.Age > 30)
filtered_df.show()
Выход: