Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
MaD GUI is a basis for graphical annotation and computational analysis of time series data.

MaD GUI Machine Learning and Data Analytics Graphical User Interface MaD GUI is a basis for graphical annotation and computational analysis of time se

Machine Learning and Data Analytics Lab FAU 10 Dec 19, 2022
A collection of Scikit-Learn compatible time series transformers and tools.

tsfeast A collection of Scikit-Learn compatible time series transformers and tools. Installation Create a virtual environment and install: From PyPi p

Chris Santiago 0 Mar 30, 2022
Projeto: Machine Learning: Linguagens de Programacao 2004-2001

Projeto: Machine Learning: Linguagens de Programacao 2004-2001 Projeto de Data Science e Machine Learning de análise de linguagens de programação de 2

Victor Hugo Negrisoli 0 Jun 29, 2021
Sleep stages are classified with the help of ML. We have used 4 different ML algorithms (SVM, KNN, RF, NN) to demonstrate them

Sleep stages are classified with the help of ML. We have used 4 different ML algorithms (SVM, KNN, RF, NN) to demonstrate them.

Anirudh Edpuganti 3 Apr 03, 2022
vortex particles for simulating smoke in 2d

vortex-particles-method-2d vortex particles for simulating smoke in 2d -vortexparticles_s

12 Aug 23, 2022
Decision Weights in Prospect Theory

Decision Weights in Prospect Theory It's clear that humans are irrational, but how irrational are they? After some research into behavourial economics

Cameron Davidson-Pilon 32 Nov 08, 2021
A scikit-learn based module for multi-label et. al. classification

scikit-multilearn scikit-multilearn is a Python module capable of performing multi-label learning tasks. It is built on-top of various scientific Pyth

802 Jan 01, 2023
Lightning ⚡️ fast forecasting with statistical and econometric models.

Nixtla Statistical ⚡️ Forecast Lightning fast forecasting with statistical and econometric models StatsForecast offers a collection of widely used uni

Nixtla 2.1k Dec 29, 2022
A repository for collating all the resources such as articles, blogs, papers, and books related to Bayesian Statistics.

A repository for collating all the resources such as articles, blogs, papers, and books related to Bayesian Statistics.

Aayush Malik 80 Dec 12, 2022
MooGBT is a library for Multi-objective optimization in Gradient Boosted Trees.

MooGBT is a library for Multi-objective optimization in Gradient Boosted Trees. MooGBT optimizes for multiple objectives by defining constraints on sub-objective(s) along with a primary objective. Th

Swiggy 66 Dec 06, 2022
CD) in machine learning projectsImplementing continuous integration & delivery (CI/CD) in machine learning projects

CML with cloud compute This repository contains a sample project using CML with Terraform (via the cml-runner function) to launch an AWS EC2 instance

Iterative 19 Oct 03, 2022
Simple and flexible ML workflow engine.

This is a simple and flexible ML workflow engine. It helps to orchestrate events across a set of microservices and create executable flow to handle requests. Engine is designed to be configurable wit

Katana ML 295 Jan 06, 2023
Convoys is a simple library that fits a few statistical model useful for modeling time-lagged conversions.

Convoys is a simple library that fits a few statistical model useful for modeling time-lagged conversions. There is a lot more info if you head over to the documentation. You can also take a look at

Better 240 Dec 26, 2022
Magenta: Music and Art Generation with Machine Intelligence

Magenta is a research project exploring the role of machine learning in the process of creating art and music. Primarily this involves developing new

Magenta 18.1k Dec 30, 2022
Python module for data science and machine learning users.

dsnk-distributions package dsnk distribution is a Python module for data science and machine learning that was created with the goal of reducing calcu

Emmanuel ASIFIWE 1 Nov 23, 2021
Bayesian optimization in JAX

Bayesian optimization in JAX

Predictive Intelligence Lab 26 May 11, 2022
Pandas DataFrames and Series as Interactive Tables in Jupyter

Pandas DataFrames and Series as Interactive Tables in Jupyter Star Turn pandas DataFrames and Series into interactive datatables in both your notebook

Marc Wouts 364 Jan 04, 2023
Ml based project which uses regression technique to predict the price.

Price-Predictor Ml based project which uses regression technique to predict the price. I have used various regression models and finds the model with

Garvit Verma 1 Jul 09, 2022
🌲 Implementation of the Robust Random Cut Forest algorithm for anomaly detection on streams

🌲 Implementation of the Robust Random Cut Forest algorithm for anomaly detection on streams

Real-time water systems lab 416 Jan 06, 2023
Gaussian Process Optimization using GPy

End of maintenance for GPyOpt Dear GPyOpt community! We would like to acknowledge the obvious. The core team of GPyOpt has moved on, and over the past

Sheffield Machine Learning Software 847 Dec 19, 2022