Apache AirFlow With GCP

Şekil Resim Bir
Apache AirFlow With GCP

In this article, I will briefly talk about what is Apache Airflow, its features, then we will transfer the data we obtained from the API with Apache Airflow to Google Cloud Store, and we will send the data we transferred to Google Cloud BigQuery and we will create tables from the query we want, In this process, we will create Cloud Data Pipline in the Google Cloud environment with Apache Airflow and manage the ETL process.

Apache Airflow

Apache Airflow is an open-source workflow management platform for data engineering pipelines. It started at Airbnb in October 2014 as a solution to manage the company’s increasingly complex workflows. Creating Airflow allowed Airbnb to programmatically author and schedule their workflows and monitor them via the built-in Airflow user interface.

As mentioned, we can consider WikiPedia as a process management and monitoring tool. Since it is open source and from the Apache software family, innovations are added every day, errors are eliminated with continuous improvements, and one of its biggest features is that you can find a solution to any problem because the comminity network is large.

Some of the management and monitoring features it supports:

  • Cron operations
  • Etl transactions
  • Pipline management
  • Machine Learning processes
  • DevOps operations

In order to use Apache Airflow infrastructure, these features such as Dags, Task, Scheculder, Trigger, Operator, which I will briefly mention below, constitute the Apache Airflow functional structure.

Dags :A DAG is created for each workflow. It is the general concept of Airflow, we create tasks in Dags for a project, we can show Dags on graph, we can see and intervene in error and operation management.

Task: Small structures created in dags are called i.e. a task is doing a job There may be tens of separate threads in the dag. The name given to each business unit within the DAG

Scheduler: Triggers scheduled workflows. A component that monitors your DAGs and triggers tasks whose dependencies are met. When you start the service, it constantly works to monitor your DAG folder and stay in sync with it.

Worker: Listens and process queues containing workflow tasks

Operators: Each task in the workflow is defined by an operator. There are multiple operators written to perform a specific task. For example; PythonOperator, BashOperator and OracleOperator can be given.

Sensors: Sensors are a special type of Operator designed to do exactly one thing. They can be time-based or wait for a file or an external event, but all they do is wait until something happens.

Frequently used structures

BashOperator =  Execute commands in a Bash shell.

PythonOperator = Python runs your work

TriggerRule = Tasks with errors can be predicted in complex structures, but it is used in cases where it is desired to continue despite the error.

Multiple Depenicy=In short, for situations where you can manage dependencies, a valid task allows it to act on the next or previous output.

Connector=It provides access and management to structures such as Postgresql, Google Cloud, AWS. For example, using our project Google Cloud Connection, we provide GCP Storage and BigQuery management.

Retry = If the structure that will work is able to get errors in the first periods, it can use it without falling into error.

Catchup = Used to trigger retroactive tasks.

Talking about Apache Airflow so much, I said it would not be possible not to use it, and below I will mention how it works automatically using Airflow.

About Project

I get data through API, we organize the data we get, we send GCP Storage, we divide the data we send into two parts or we direct BigQuery completely, I run all of them with Apache Airflow, we can monitor.

As can be seen in the picture, we have briefly discussed an ETL process, I will talk about the tasks in order.

Points to be considered while creating the tasks dataset needs to be created in Google BigQuery. To establish a connection with Airflow and Gooc Cloud, it is necessary to create a connection

Google Cloud Connection On Airflow

Extract data Taks: Accessing api with Python code, We obtain the fuel information in the region where the data is entered with certain coordinates in Germany.

 extract_data=PythonOperator(
            task_id="extract_data",#Task Id 
            python_callable= getdata#Python Function
      )

Transform Data Taks:The resulting data is organized with the Pandas library

  transform_data=PythonOperator(
            task_id="transform_data",
            python_callable=transform
      )

 Load Data Task:Edited data is sent to Google Cloud Storage as a file
 load_data=PythonOperator(
          task_id="load_data",
          python_callable= load
      )

    Load data from GSC =Files in google cloud storage, google big query is sent

    load_data_from_gsc=GCSToBigQueryOperator(
        task_id="load_data_from_gsc",
        bucket="tanker_data",#Bucket Name
        source_objects=["*.csv"],#All csv format
        source_format="CSV",#Format
        skip_leading_rows=1,#first_column 
        field_delimiter=",",#csv comma format
        destination_project_dataset_table=f"{PROJECT_NAME}.{DB_NAME}.gas_table",#Table Name
        create_disposition="CREATE_IF_NEEDED",#If Table dont create 
        write_disposition="WRITE_TRUNCATE",#All task create table
        gcp_conn_id="Google_Cloud_Con"#Connection On Airflow
    )
Create Diesel Table = A separate table is created for diesel fuels, it is sent to the created table.
query_diesel=f"SELECT * FROM {PROJECT_NAME}.{DB_NAME}.gas_table where category = 'diesel'"

create_diesel_table = BigQueryExecuteQueryOperator(
        task_id='create_diesel_table',
        sql=query_diesel,
        destination_dataset_table=f"{PROJECT_NAME}.{DB_NAME}.diesel_table",
        create_disposition="CREATE_IF_NEEDED",#If Table dont create 
        write_disposition="WRITE_TRUNCATE",
        use_legacy_sql=False,
        gcp_conn_id="Google_Cloud_Con"
    )
Create gasoline table = A separate table is created for gasoline fuels, it is sent to the created table.
  query_gasoline=f"SELECT * FROM {PROJECT_NAME}.{DB_NAME}.gas_table where category = 'gasoline'"

    create_gasoline_table = BigQueryExecuteQueryOperator(
        task_id='create_gasoline_table',
        sql=query_gasoline,
        destination_dataset_table=f"{PROJECT_NAME}.{DB_NAME}.gasoline_table",
        create_disposition="CREATE_IF_NEEDED",#If Table dont create 
        write_disposition="WRITE_TRUNCATE",
        use_legacy_sql=False,
        gcp_conn_id="Google_Cloud_Con"

Dag = In order to create a mountain, the information to be done daily is given with the Dag_id and the schedule, and the start time is given.

with DAG(
    dag_id="GCP_ETL",
    schedule="@daily",
    start_date=pendulum.datetime(2023,6,25,tz="UTC"),
)as dag:
    
Important functions:
Load Function = Google Cloud Storage provides connectivity
def load():
    df=pd.DataFrame.from_dict([transform()])
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/scripts/stoked-cosine-391507-fc18e1ccc108.json'

    settime = datetime.now()#for filename 
    filetime = settime.strftime("%Y")+ "_"+settime.strftime("%m") +"_"+settime.strftime("%d") +"_"+settime.strftime("%H") +"_"+settime.strftime("%M")  +"_"+settime.strftime("%S") 
    file_name=f'gas-{filetime}.csv'
                
    client= storage.Client()
    bucket=client.bucket("tanker_data")#Bucket Name
    bucket.blob(file_name).upload_from_string(df.to_csv(index=False, encoding='utf-8'),content_type='application/octet-stream')

Google Cloud Storage: Bucket is created as tanker_data, the obtained csv files are added to the file name with the date of creation and the bucket is sent

Google Cloud BigQuery: I collect the files sent from GCP Storage in gas tables, then we distribute them to separate tables according to the type of fuel, as we mentioned in the above tasks.

Conclusion: There is no need to run these python files every day anymore, Apache Airflow can do this work for us every day with @daily timing, so we have automatized our work, ensured its continuity, now we can look at new projects.

If you want to access the source codes –>> Github Repostiry

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir