tecnologia | big data | business intelligence | banco de dados

PySpark - Removendo duplicidades

Nesta publicação irei demonstrar como remover duplicidades, com e sem priorização, de um dataframe utilizando a linguagem PySpark. Não abordarei o uso da função Windows do PySpark, pois apresentou ser menos performática nos testes.

# Cria o dataframe
dadosAlunos = [
("A", "A1", 3, "Nota extra"),
("A", "A1", 4, None),
("A", "A3", 5, None),
("B", "B3", 1, None),
("B", "B2", 2, "Nota extra"),
("B", "B1", 3, None),
("C", "C2", 0, None),
("C", "C1", 2, None),
("D", "D1", 5, None),
("D", "D2", 8, None)
]
schema = ["nome", "departamento", "nota", "observacao"]
df = spark.createDataFrame(data=dadosAlunos, schema = schema)


# Exibe a estrutura do dataframe
df.printSchema()


# Exibe o dataframe ordenando as colunas
df.orderBy(
df.nome.asc()
, df.departamento.asc()
, df.nota.asc()
, df.observacao.asc()
).show(truncate=False)


# Remove a duplicidade da combinação nome e departamento priorizando nome em ordem alfabética e nota em ordem decrescente
# Remove a duplicidade da combinação nome e departamento
# priorizando nome em ordem alfabética e nota em ordem decrescente
df2 = df \
.orderBy(df.nome.asc(), df.nota.desc()) \
.dropDuplicates(subset = ['nome','departamento'])


# Exibe o dataframe ordenando as colunas
df2.orderBy(
df2.nome.asc()
, df2.departamento.asc()
, df2.nota.asc()
, df2.observacao.asc()
).show(truncate=False)



# Remove a duplicidade da combinação nome e departamento sem priorização
df3 = df \
.select(
df.nome
, df.departamento
) \
.dropDuplicates()


# Exibe o dataframe ordenando as colunas
df3.orderBy(
df2.nome.asc()
, df2.departamento.asc()
).show(truncate=False)
Data publicação: 21:10 10/08/2022

PySpark - Junções (JOIN)

Nesta publicação irei demonstrar como realizar junções (joins) utilizando a linguagem PySpark e fazer um paralelo com o SQL tradicional.

Cria o dataframe 1:
dadosDataframe1 = [
(1, "A")
, (2, "B")
, (3, "C")
, (4, "D")
, (5, "E")]
schemaDataframe1 = ["id", "coluna"]
df1 = spark.createDataFrame(data=dadosDataframe1, schema=schemaDataframe1)
df1.printSchema()
df1.show(truncate=False)

Cria o dataframe 2:
dadosDataframe2 = [
(1, "A")
, (2, "B")
, (3, "D")
, (4, "F")]
schemaDataframe2 = ["id", "coluna"]
df2 = spark.createDataFrame(data=dadosDataframe2, schema=schemaDataframe2)
df2.printSchema()
df2.show(truncate=False)


Inner Join (registros existentes em ambos dataframes):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='inner'
)\
.orderBy(
df1.coluna.asc()
)\
.select(
df1.coluna.alias('INNER_JOIN')
)\
.show(10,False)



Left Join (todos os registros do dataframe 1):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='left'
)\
.orderBy(
df1.coluna.asc()
)\
.select(
df1.coluna.alias('LEFT_JOIN')
)\
.show(10,False)



Right Join (todos os registros do dataframe 2):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='right'
)\
.orderBy(
df2.coluna.asc()
)\
.select(
df2.coluna.alias('RIGHT_JOIN')
)\
.show(10,False)



Full Join (todos os registros de ambos dataframes):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='full'
)\
.orderBy(
F.coalesce(df1.coluna, df2.coluna).asc()
)\
.select(
F.coalesce(df1.coluna, df2.coluna).alias('FULL_JOIN')
)\
.show(10,False)



Left Anti Join (registros exclusivos do dataframe 1):
df1\
.join(df2
, [df1.coluna == df2.coluna]
,how='left_anti'
)\
.orderBy(
df1.coluna.asc()
)\
.select(
df1.coluna.alias('LEFT_ANTI')
)\
.show(10,False)



Right Anti Join (registros exclusivos do dataframe 2):
df2\
.join(df1
, [df1.coluna == df2.coluna]
,how='left_anti'
)\
.orderBy(
df2.coluna.asc()
)\
.select(
df2.coluna.alias('RIGHT_ANTI')
)\
.show(10,False)



Full Join Exclusivo (registros exclusivos de cada dataframe):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='full'
)\
.where(
(df1.coluna.isNull()) | (df2.coluna.isNull())
) \
.orderBy(
F.coalesce(df1.coluna, df2.coluna).asc()
)\
.select(
F.coalesce(df1.coluna, df2.coluna).alias('FULL_JOIN')
)\
.show(10,False)



Publicações relacionadas:
- PySpark - Criando dataframes;
- Banco de Dados - Junções (JOIN).
Data publicação: 21:37 18/07/2022

PySpark - Criando dataframes

Nesta publicação irei abordar algumas formas de criar dataframes utilizando PySpark.

Criar dataframe de formar manual especificando os dados e os nomes das colunas:

dadosAlunos = [
("A", "A1", 3, "A"),
("A", "A1", 4, None),
("A", "A3", 5, None),
("B", "B3", 1, None),
("B", "B2", 2, "B"),
("B", "B1", 3, None),
("C", "C2", 0, None),
("C", "C1", 2, None),
("D", "D1", 5, None),
("D", "D2", 8, None)
]
dadosAlunosColunas = ["nome", "departamento", "nota", "coluna"]
df = spark.createDataFrame(data=dadosAlunos, schema=dadosAlunosColunas)
df.printSchema()
df.show(truncate=False)


Criar dataframe a partir de um arquivo csv:

df = spark.read.csv(
path='/caminho/empresas/csv',
sep=';',
inferSchema=True,
header=True
)
df.printSchema()
df.show(truncate=False)


Criar dataframe a partir de arquivo(s) parquet:

df = spark.read.parquet(
path='/caminho/empresas/parquet'
)
df.printSchema()
df.show(truncate=False)
Data publicação: 23:21 05/06/2022

Cursos gratuitos (dados, desenvolvimento, Linux, ...)

Nesta publicação irei concentrar cursos gratuitos dos temas que geralmente são abordados por aqui. Sempre que possível, irei manter esta publicação atualizada.

*** Cursos em português:

- Python (Básico):
https://www.youtube.com/watch?v=S9uPNppGsGo&list=PLHz_AreHm4dlKP6QQCekuIPky1CiwmdI6

- Python Fundamentos para Análise de Dados, Big Data Fundamentos, Microsoft Power BI para Data Science, entre outros:
https://www.datascienceacademy.com.br/cursosgratuitos

- Git e GitHub, Linux, Banco de Dados, SQL, Python, Docker, TypeScript e muitos outros:
https://web.dio.me/

Big Data & Analytics, Business Intelligence, Cybersecurity, DevOps & Agile Culture, Linux Fundamentos, Python, entre outros:
https://on.fiap.com.br/local/programaeucapacito/


*** Cursos em inglês:

- Python:
https://www.youtube.com/watch?v=mRMmlo_Uqcs&list=PLIhvC56v63ILPDA2DQBv0IKzqsWTZxCkp

- Linux:
https://www.youtube.com/watch?v=VbEx7B_PTOE&list=PLIhvC56v63IJIujb5cyE13oLuyORZpdkL

- Bash Scripting:
https://www.youtube.com/watch?v=SPwyp2NG-bE&list=PLIhvC56v63IKioClkSNDjW7iz-6TFvLwS
Data publicação: 21:26 25/05/2022

PySpark - Sintaxes

Nesta publicação irei mencionar brevemente alguns comandos/sintaxes úteis do Spark utilizando a linguagem Python.


Dataframe.show():

Exibe as primeiras n linhas na tela.


Parâmetros:
n: números de linhas a exibir.
truncate: se informado True, irá cortar as strings com mais de 20 caracteres. Se um número maior que 1 for informado, strings maiores que o número informado serão cortadas e alinnhadas a direita.
vertical: se informado True, as linhas serão exibidas na vertical (uma linha por valor de coluna).


Exemplos:
df.show()
df.show(truncate=3)
df.show(vertical=True)
df.show(n=20, truncate=True, vertical=False)


Dataframe.select():

Retorna um novo dataframe de acordo com as expressões que foram determinadas.


Exemplos:
df.select('*').collect()
df.select('name', 'age').collect()
df.select(df.name, (df.age + 10).alias('age')).collect()


Dataframe.Filter():

Filtra linhas usando determinadas condições.
Obs: where() é um apelido do filter().


Exemplos:
df.filter(df.age > 3).collect()
df.where(df.age == 2).collect()
df.filter("age > 3").collect()
df.where("age = 2").collect()


Dataframe.distinct():

Retorna um novo dataframe contendo as linhas distintas no dataframe.


Exemplos:
df.distinct().count()


Dataframe.withColumn():

Retorna um novo dataframe adicionado a coluna ou renomeando a coluna que possui o mesmo nome.


Exemplo:
df.withColumn('age2', df.age + 2).collect()


Dataframe.printSchema():

Exibe a estrutura do dataframe no formato de árvore.


Sintaxe:
df.printSchema()


Dataframe.columns:

Retorna todas as colunas como uma lista.


Sintaxe:
df.columns


Dataframe.count():

Retorna o número de linhas do dataframe.


Sintaxe:
df.count()


Dataframe.describe():

Computa as estatísticas para colunas númericas e string.


Exemplos:
df.describe().show()
df.describe(['age']).show()


Dataframe.summary():

Computa statisticas especificas para colunas númericas e strings.


Exemplos:
df.summary().show()
df.summary("count", "min", "25%", "75%", "max").show()
df.select("age", "name").summary("count").show()


Dataframe.Collect():

Retorna todos os registros como uma lista de linha.


Sintaxe:
df.collect()


Dataframe.selectExpr():

Retorna um novo dataframe de acordo com as expressões SQL que foram determinadas.


Exemplo:
df.selectExpr("age * 2", "abs(age)").collect()


Dataframe.persist():

Determina o nível de armazenamento para persistir o conteúdo do dataframe durante as operações após ser computado pela primeira vez.


Sintaxe:
df.persist()


Dataframe.unpersist():

Marca o dataframe como não persistente, removendo todos os blocos da memória e disco.


Sintaxe:
df.unpersist(blocking=False)


Data publicação: 22:29 07/07/2021
Perfil
Olá jovem Padawan, seja bem vindo! Este site foi criado com o intuito de compartilhar um pouco de conhecimento de Tecnologia da Informação, Big Data, Banco de Dados e Business Intelligence.

GitHub  Linkedin  Youtube

"O sorriso é o símbolo do vencedor." (Masaaki Hatsumi)


Leandro Sacramento, Todos os direitos reservados - 2012 - 2022