tecnologia | big data | business intelligence | banco de dados

PySpark - Removendo duplicidades

Nesta publicação irei demonstrar como remover duplicidades, com e sem priorização, de um dataframe utilizando a linguagem PySpark. Não abordarei o uso da função Windows do PySpark, pois apresentou ser menos performática nos testes.

# Cria o dataframe
dadosAlunos = [
("A", "A1", 3, "Nota extra"),
("A", "A1", 4, None),
("A", "A3", 5, None),
("B", "B3", 1, None),
("B", "B2", 2, "Nota extra"),
("B", "B1", 3, None),
("C", "C2", 0, None),
("C", "C1", 2, None),
("D", "D1", 5, None),
("D", "D2", 8, None)
]
schema = ["nome", "departamento", "nota", "observacao"]
df = spark.createDataFrame(data=dadosAlunos, schema = schema)


# Exibe a estrutura do dataframe
df.printSchema()


# Exibe o dataframe ordenando as colunas
df.orderBy(
df.nome.asc()
, df.departamento.asc()
, df.nota.asc()
, df.observacao.asc()
).show(truncate=False)


# Remove a duplicidade da combinação nome e departamento priorizando nome em ordem alfabética e nota em ordem decrescente
# Remove a duplicidade da combinação nome e departamento
# priorizando nome em ordem alfabética e nota em ordem decrescente
df2 = df \
.orderBy(df.nome.asc(), df.nota.desc()) \
.dropDuplicates(subset = ['nome','departamento'])


# Exibe o dataframe ordenando as colunas
df2.orderBy(
df2.nome.asc()
, df2.departamento.asc()
, df2.nota.asc()
, df2.observacao.asc()
).show(truncate=False)



# Remove a duplicidade da combinação nome e departamento sem priorização
df3 = df \
.select(
df.nome
, df.departamento
) \
.dropDuplicates()


# Exibe o dataframe ordenando as colunas
df3.orderBy(
df2.nome.asc()
, df2.departamento.asc()
).show(truncate=False)
Data publicação: 21:10 10/08/2022

PySpark - Junções (JOIN)

Nesta publicação irei demonstrar como realizar junções (joins) utilizando a linguagem PySpark e fazer um paralelo com o SQL tradicional.

Cria o dataframe 1:
dadosDataframe1 = [
(1, "A")
, (2, "B")
, (3, "C")
, (4, "D")
, (5, "E")]
schemaDataframe1 = ["id", "coluna"]
df1 = spark.createDataFrame(data=dadosDataframe1, schema=schemaDataframe1)
df1.printSchema()
df1.show(truncate=False)

Cria o dataframe 2:
dadosDataframe2 = [
(1, "A")
, (2, "B")
, (3, "D")
, (4, "F")]
schemaDataframe2 = ["id", "coluna"]
df2 = spark.createDataFrame(data=dadosDataframe2, schema=schemaDataframe2)
df2.printSchema()
df2.show(truncate=False)


Inner Join (registros existentes em ambos dataframes):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='inner'
)\
.orderBy(
df1.coluna.asc()
)\
.select(
df1.coluna.alias('INNER_JOIN')
)\
.show(10,False)



Left Join (todos os registros do dataframe 1):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='left'
)\
.orderBy(
df1.coluna.asc()
)\
.select(
df1.coluna.alias('LEFT_JOIN')
)\
.show(10,False)



Right Join (todos os registros do dataframe 2):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='right'
)\
.orderBy(
df2.coluna.asc()
)\
.select(
df2.coluna.alias('RIGHT_JOIN')
)\
.show(10,False)



Full Join (todos os registros de ambos dataframes):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='full'
)\
.orderBy(
F.coalesce(df1.coluna, df2.coluna).asc()
)\
.select(
F.coalesce(df1.coluna, df2.coluna).alias('FULL_JOIN')
)\
.show(10,False)



Left Anti Join (registros exclusivos do dataframe 1):
df1\
.join(df2
, [df1.coluna == df2.coluna]
,how='left_anti'
)\
.orderBy(
df1.coluna.asc()
)\
.select(
df1.coluna.alias('LEFT_ANTI')
)\
.show(10,False)



Right Anti Join (registros exclusivos do dataframe 2):
df2\
.join(df1
, [df1.coluna == df2.coluna]
,how='left_anti'
)\
.orderBy(
df2.coluna.asc()
)\
.select(
df2.coluna.alias('RIGHT_ANTI')
)\
.show(10,False)



Full Join Exclusivo (registros exclusivos de cada dataframe):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='full'
)\
.where(
(df1.coluna.isNull()) | (df2.coluna.isNull())
) \
.orderBy(
F.coalesce(df1.coluna, df2.coluna).asc()
)\
.select(
F.coalesce(df1.coluna, df2.coluna).alias('FULL_JOIN')
)\
.show(10,False)



Publicações relacionadas:
- PySpark - Criando dataframes;
- Banco de Dados - Junções (JOIN).
Data publicação: 21:37 18/07/2022

PySpark - Criando dataframes

Nesta publicação irei abordar algumas formas de criar dataframes utilizando PySpark.

Criar dataframe de formar manual especificando os dados e os nomes das colunas:

dadosAlunos = [
("A", "A1", 3, "A"),
("A", "A1", 4, None),
("A", "A3", 5, None),
("B", "B3", 1, None),
("B", "B2", 2, "B"),
("B", "B1", 3, None),
("C", "C2", 0, None),
("C", "C1", 2, None),
("D", "D1", 5, None),
("D", "D2", 8, None)
]
dadosAlunosColunas = ["nome", "departamento", "nota", "coluna"]
df = spark.createDataFrame(data=dadosAlunos, schema=dadosAlunosColunas)
df.printSchema()
df.show(truncate=False)


Criar dataframe a partir de um arquivo csv:

df = spark.read.csv(
path='/caminho/empresas/csv',
sep=';',
inferSchema=True,
header=True
)
df.printSchema()
df.show(truncate=False)


Criar dataframe a partir de arquivo(s) parquet:

df = spark.read.parquet(
path='/caminho/empresas/parquet'
)
df.printSchema()
df.show(truncate=False)
Data publicação: 23:21 05/06/2022
Perfil
Olá jovem Padawan, seja bem vindo! Este site foi criado com o intuito de compartilhar um pouco de conhecimento de Tecnologia da Informação, Big Data, Banco de Dados e Business Intelligence.

GitHub  Linkedin  Youtube

"Meus filhos terão computadores, sim, mas antes terão livros. Sem livros, sem leitura, os nossos filhos serão incapazes de escrever - inclusive a sua própria história." (Bill Gates)


Leandro Sacramento, Todos os direitos reservados - 2012 - 2022