Let’s Gossip with Apache Cassandra

Şekil Resim Bir
Let’s Gossip with Apache Cassandra

Gossip with Cassandra

Before I start explaining Apache Cassandra, I will briefly talk about what NoSql is, why it is needed.As the data grew, it began to diversify, their types and structures are changing day by day and new needs are met. Remember in the past, think of an e-commerce site, the customer had products that received information about, and they were data to a certain extent. Today, dozens of delivery addresses belonging to a customer, the scores he gave about the products he bought, most importantly, the comments were as important as the emojis in the comments, There was a need to process and analyze data with new formats and create new values. No sql has become a storeable database according to the types of these data,Not only SQL(NoSql) provides us with Document, Key Value, Wide Column, Graph types of data.You can examine more detailed types from Db engines site.

Apache Cassandra creates tables that support wide column data types.

What is Apache Cassandra What makes it powerful ?

As big companies develop big data according to their needs, they started their products in the open source world at the point where they developed various tools according to their needs. Facebook started their development work in Cassandra, then left Apache to the open source world. Apache Cassandra also positions itself as the combination of Amazon Dynmo and Google BigTable two powerful NoSql structures.

Apache Cassandra’s powerful features 💪

  • It works in a Distributed Architecture structure and this feature minimizes the fault tolerance.
  • Each node works as a worker, using resources efficiently and increasing data processing power.
  • Consists of column-oriented characteristic tables.
  • It has its own Cassandra Query Language (CQL) query language.
  • It has a Flexible schema structure

Apache Cassandra Key Features

  • Keyspace An object called a key field is used to store user-defined types and column families.Database emulates in RDMS databases.
  • Replication Factor  the number of copies of each piece of data stored on the nodes in a cluster.
  • Replication Strategy for each Edge keyspace determines the nodes where replicas are placed
  • Table We specify the column and data types.

Apache Cassandra working structure

The distributed data structure of Apache Cassandra consists of nodes called Node. Each node receives a certain number of tokens, and this structure is called Cassandra ring.

Why gossip in our title? Because Cassandra Nodes communicate with each other through a protocol called gossip. In this gossip, there are information such as successful or fail.

  • Availability and partition tolerance in the Cap theorem are targets
  • It does not support structures such as JOIN, GROUP BY ,OR CLAUSE, so when designing table structures, it is necessary to design with denormalization logic.
  • It is mandatory to give a primary key, but there is no foreign key structure.

Let’s make some hanon after transferring the main features that need to be known.

About Project 🏃 Hands On Time

In order to increase our daily mentality and physical fitness, I am engaged in jogging, I read the data I obtained from the application I use here with Python from my local computer. We will transfer Apache Cassandra as the infrastructure, and we will use Apache Airflow to automate it.

We run two separate airflow and cassandra configuration files to run our Docker infrastructure.

docker-compose -f docker-compose.airflow.yaml -f docker-compose.cassandra.yaml up -d

Running container structure

The read fitness function I use to read the data from the local computer.

def read_fitness():
    path_to_json_files = '/opt/airflow/dags/Elevation-data'
    #get all JSON file names as a list
    json_file_names = [filename for filename in os.listdir(path_to_json_files) if filename.endswith('.json')]

    for json_file_name in json_file_names:
        with open(os.path.join(path_to_json_files, json_file_name)) as json_file:
            json_text = json.load(json_file)
            return json_text

After reading the data, we need to patch the following steps by entering the contaneir of Cassandra.

docker exec -it fitness-cassandra-1 cqlsh

I was creating a Cassandra keyspace to print Apache Cassandra, since there is no global structure here, I chose simple stratergy and created a keyspace with a single node that does not have hardware suitable for the notes structure.

CREATE KEYSPACE IF NOT EXISTS
running WITH REPLICATION = 
{'class':'SimpleStrategy','replication_factor':'1'};

After creating keyspace we use code to access keyspace

cqlsh> use running ;
cqlsh:running>

It’s time to create the table structure, we create our table as follows.

CREATE TABLE IF NOT EXISTS running.run_table(
timestamp TIMESTAMP,
elevation INT,
source_type INT,
duration BIGINT,
distance BIGINT,
elevation_gain INT,
elevation_loss INT
PRIMARY KEY (timestamp));

To view the tables in the keyspace.

cqlsh:running> desc tables
run_table

It’s time to print our data to the table in the keyspace we created in cassandra. We do this with the following code with the Save_fitness function.

def save_fitness():
    cluster = Cluster(["172.30.0.4"],port = 9042)
    
    session=cluster.connect("running")
    for i in range(0, len(read_fitness())):
        session.execute("""INSERT INTO  running.run_table (timestamp,elevation,source_type,duration,distance,elevation_gain,elevation_loss) 
                    VALUES(%s,%s,%s,%s,%s,%s,%s)""",(read_fitness()[i]["timestamp"],read_fitness()[i]['elevation'],read_fitness()[i]['source_type'],read_fitness()[i]['duration']
                                                    ,read_fitness()[i]['distance'],read_fitness()[i]['elevation_gain'],read_fitness()[i]['elevation_loss'])
                    )

We used Apache Airflow to automate this process, I created the Airflow task structure as follows.

with DAG(
    dag_id="Fitness",
    schedule="@hourly",
    start_date=pendulum.datetime(2023,6,25,tz="UTC"),
)as dag:

    read_json =PythonOperator(
               task_id='read_json',
               python_callable=read_fitness
    )
    save_cassandra =PythonOperator(
               task_id='save_cassandra',
               python_callable=save_fitness
    )
    read_json>>save_cassandra

The view of the DAGS in Airflow and the tasks in it when triggered

I see that the tasks are completed successfully on Airflow workflow. To check the container that is Cassandra we can query in the following CQL language.

For example, my longest running distance

Hands on You can access the entire project in the github repository.https://github.com/HayrullahKaraman/Data_Engineer_Workspace/tree/main/Fitness

Conclusion

We have come to the end of a blog post, we learned Apache Cassandra and made a hand-on project, of course, I did it here with the local computer in the project, I am sure that it will be very enjoyable to make this structure in a distributed structure in different regions, I hope it was useful, stay safe ,See you later!👋

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir