Before I start explaining Apache Cassandra, I will briefly talk about what NoSql is, why it is needed.As the data grew, it began to diversify, their types and structures are changing day by day and new needs are met. Remember in the past, think of an e-commerce site, the customer had products that received information about, and they were data to a certain extent. Today, dozens of delivery addresses belonging to a customer, the scores he gave about the products he bought, most importantly, the comments were as important as the emojis in the comments, There was a need to process and analyze data with new formats and create new values. No sql has become a storeable database according to the types of these data,Not only SQL(NoSql) provides us with Document, Key Value, Wide Column, Graph types of data.You can examine more detailed types from Db engines site.
Apache Cassandra creates tables that support wide column data types.
What is Apache Cassandra What makes it powerful ?
As big companies develop big data according to their needs, they started their products in the open source world at the point where they developed various tools according to their needs. Facebook started their development work in Cassandra, then left Apache to the open source world. Apache Cassandra also positions itself as the combination of Amazon Dynmo and Google BigTable two powerful NoSql structures.
Apache Cassandra’s powerful features 💪
Apache Cassandra Key Features
Apache Cassandra working structure
The distributed data structure of Apache Cassandra consists of nodes called Node. Each node receives a certain number of tokens, and this structure is called Cassandra ring.
Why gossip in our title? Because Cassandra Nodes communicate with each other through a protocol called gossip. In this gossip, there are information such as successful or fail.
Let’s make some hanon after transferring the main features that need to be known.
About Project 🏃 Hands On Time
In order to increase our daily mentality and physical fitness, I am engaged in jogging, I read the data I obtained from the application I use here with Python from my local computer. We will transfer Apache Cassandra as the infrastructure, and we will use Apache Airflow to automate it.
We run two separate airflow and cassandra configuration files to run our Docker infrastructure.
docker-compose -f docker-compose.airflow.yaml -f docker-compose.cassandra.yaml up -d
Running container structure
The read fitness function I use to read the data from the local computer.
def read_fitness():
path_to_json_files = '/opt/airflow/dags/Elevation-data'
#get all JSON file names as a list
json_file_names = [filename for filename in os.listdir(path_to_json_files) if filename.endswith('.json')]
for json_file_name in json_file_names:
with open(os.path.join(path_to_json_files, json_file_name)) as json_file:
json_text = json.load(json_file)
return json_text
After reading the data, we need to patch the following steps by entering the contaneir of Cassandra.
docker exec -it fitness-cassandra-1 cqlsh
I was creating a Cassandra keyspace to print Apache Cassandra, since there is no global structure here, I chose simple stratergy and created a keyspace with a single node that does not have hardware suitable for the notes structure.
CREATE KEYSPACE IF NOT EXISTS
running WITH REPLICATION =
{'class':'SimpleStrategy','replication_factor':'1'};
After creating keyspace we use code to access keyspace
cqlsh> use running ;
cqlsh:running>
It’s time to create the table structure, we create our table as follows.
CREATE TABLE IF NOT EXISTS running.run_table(
timestamp TIMESTAMP,
elevation INT,
source_type INT,
duration BIGINT,
distance BIGINT,
elevation_gain INT,
elevation_loss INT
PRIMARY KEY (timestamp));
To view the tables in the keyspace.
cqlsh:running> desc tables
run_table
It’s time to print our data to the table in the keyspace we created in cassandra. We do this with the following code with the Save_fitness function.
def save_fitness():
cluster = Cluster(["172.30.0.4"],port = 9042)
session=cluster.connect("running")
for i in range(0, len(read_fitness())):
session.execute("""INSERT INTO running.run_table (timestamp,elevation,source_type,duration,distance,elevation_gain,elevation_loss)
VALUES(%s,%s,%s,%s,%s,%s,%s)""",(read_fitness()[i]["timestamp"],read_fitness()[i]['elevation'],read_fitness()[i]['source_type'],read_fitness()[i]['duration']
,read_fitness()[i]['distance'],read_fitness()[i]['elevation_gain'],read_fitness()[i]['elevation_loss'])
)
We used Apache Airflow to automate this process, I created the Airflow task structure as follows.
with DAG(
dag_id="Fitness",
schedule="@hourly",
start_date=pendulum.datetime(2023,6,25,tz="UTC"),
)as dag:
read_json =PythonOperator(
task_id='read_json',
python_callable=read_fitness
)
save_cassandra =PythonOperator(
task_id='save_cassandra',
python_callable=save_fitness
)
read_json>>save_cassandra
The view of the DAGS in Airflow and the tasks in it when triggered
I see that the tasks are completed successfully on Airflow workflow. To check the container that is Cassandra we can query in the following CQL language.
For example, my longest running distance
Hands on You can access the entire project in the github repository.https://github.com/HayrullahKaraman/Data_Engineer_Workspace/tree/main/Fitness
Conclusion
We have come to the end of a blog post, we learned Apache Cassandra and made a hand-on project, of course, I did it here with the local computer in the project, I am sure that it will be very enjoyable to make this structure in a distributed structure in different regions, I hope it was useful, stay safe ,See you later!👋