PySpark - Python Spark 3 Hadoop coding framework , logging , error handling & testing for beginners | Engineering Tech | Skillshare

PySpark - Python Spark 3 Hadoop coding framework , logging , error handling & testing for beginners

Engineering Tech, Big Data, Cloud and AI Solution Architec

PySpark - Python Spark 3 Hadoop coding framework , logging , error handling & testing for beginners

Engineering Tech, Big Data, Cloud and AI Solution Architec

Play Speed
  • 0.5x
  • 1x (Normal)
  • 1.25x
  • 1.5x
  • 2x
32 Lessons (1h 56m)
    • 1. Introduction

    • 2. What is Big Data Spark?

    • 3. Environment setup steps

    • 4. Installing Python

    • 5. Installing PyCharm

    • 6. Installing JDK

    • 7. Installing Spark 3 & Hadoop

    • 8. PyCharm PySpark Hello DataFrame

    • 9. PyCharm Hadoop Spark programming

    • 10. Python basics

    • 11. Structuring code with classes and methods

    • 12. How Spark works?

    • 13. Creating and reusing SparkSession

    • 14. Spark dataframe

    • 15. Separating out Ingestion, Transformation and Persistence code

    • 16. Python Logging

    • 17. Managing log level through a configuration file

    • 18. Having custom logger for each Python class

    • 19. Python error handling

    • 20. Ingesting data from Hive

    • 21. Transforming ingested data

    • 22. Installing PostgreSQL

    • 23. Spark PostgreSQL interaction with Psycopg2 adapter

    • 24. Spark PostgreSQL interaction with JDBC driver

    • 25. Persisting transformed data in PostgreSQL

    • 26. Organizing code further

    • 27. Reading configuration from a property file

    • 28. Python unittest framework

    • 29. Unit testing PySpark transformation logic

    • 30. Unit testing an error

    • 31. Python spark submit

    • 32. Conclusion

  • --
  • Beginner level
  • Intermediate level
  • Advanced level
  • All levels
  • Beg/Int level
  • Int/Adv level

Community Generated

The level is determined by a majority opinion of students who have reviewed this class. The teacher's recommendation is shown until at least 5 student responses are collected.





About This Class

This course will bridge the gap between your academic and real world knowledge and prepare you for an entry level Big Data Python Spark developer role. You will learn the following

  • Python Spark coding best practices

  • Logging

  • Error Handling

  • Reading configuration from properties file

  • Doing development work using PyCharm

  • Using your local environment as a Hadoop Hive environment

  • Reading and writing to a Postgres database using Spark

  • Python unittesting framework
  • Building a data pipeline using Hadoop , Spark and Postgres

Prerequisites :

  • Basic programming skills

  • Basic database knowledge

  • Hadoop entry level knowledge

Meet Your Teacher

Teacher Profile Image

Engineering Tech

Big Data, Cloud and AI Solution Architec


Hello, I'm Engineering.

See full profile

Class Ratings

Expectations Met?
  • Exceeded!
  • Yes
  • Somewhat
  • Not really
Reviews Archive

In October 2018, we updated our review system to improve the way we collect feedback. Below are the reviews written before that update.

Your creative journey starts here.

  • Unlimited access to every class
  • Supportive online creative community
  • Learn offline with Skillshare’s app

Why Join Skillshare?

Take award-winning Skillshare Original Classes

Each class has short lessons, hands-on projects

Your membership supports Skillshare teachers

Learn From Anywhere

Take classes on the go with the Skillshare app. Stream or download to watch on the plane, the subway, or wherever you learn best.



1. Introduction: Welcome to this Python Spark PySpark coding pre-market Best Practices course. Step-by-step, you'll understand how to build a production ready Python Spark application from scratch. You'll understand how to organize the code, how to do error handling, how to do logging, how to read configuration file. You'll also understand how to do unit testing of your Python Spark Core. You lot what matters in the real world. And you'll be learning Python Spark while building a data pipeline. You'll understand how to read data from different sources using Spark, do the processing and store it to another data source. This course will take you from an academic background to a real-world developer rule. You do not keep your prior knowledge in Spark Core Python. However, if you have some hadoop background, that will definitely help you succeed in this course. If you are more interested in Scala, we have a similar course on Sparks color-coding framework. You can check it out. So let's dive in and get started. 2. What is Big Data Spark?: Big data has two key concepts, distributed storage and distributed computing. Instead of storing huge volume of data in a single database, you split the data and store them on multiple machines in a distributed manner. And you take the processing to where the data is. Before Spark came into picture Hadoop MapReduce was the most popular technology in the big data ecosystem in Hadoop, which stored the data in the distributed file system. And then we run MapReduce programs to do the processing on the data. Mapreduce rates intermediate data to disks, to platform videos, calculation, and sparse, improved on that by storing data in memory. Spark website claims it is a 100 times faster than MapReduce. Spark or Hadoop is a very popular architecture in the real world. You stored the data in Hadoop HDFS file system in Hive tables, and you do all kind of processing like transformations, aggregations using Spark. Spark can also access data from relational databases and do the processing and store the data back into relational databases. Spark and even work on file system. Spark and run on various cloud environments and access files from AWS. S3 is your blobs and various other storage systems. Spark has its own cluster manager. And when it is running on Hadoop, it can leverage the yon cluster manager, also. Spark DataFrame or Spark SQL. Liabilities are the main we work with higher version of Spark. Dataframe. You read data from various sources and stored them in a tabular manner. And the new platform transformations, aggregations on the data. And once the data is processed, you stored the data back into another Datastore. Spark Sequel simplify Spark programming further by providing with Secret Lake interface using which you can work with different data sources through sequel queries. 3. Environment setup steps: You do Python Spark Hadoop programming using PyCharm. You'd have to install Python. Then you have bike shop. After that, JDK is required because Spark is built in Scala language if scholar requires JDK after installing JDK recap spark. And then to do Hadoop hype programming on Windows you need with which needs to be added to the Spark Installation Directory. After that, you can get going with Spark and Hadoop programming using PyCharm. Let's see that in action. 4. Installing Python: Go to the Python officer loop site and download. Python will download the latest version, which is 3.9. Once downloaded, click on the installer. And you are doubt Python to classify and installed. Once installation is complete. Go to the command prompt, can verify. Python 3.9 is available of this machine. 5. Installing PyCharm: Let's now installed PyCharm, which is one of the most popular IDEs for Python development. Go to JetBrains website and download the Community Edition. Once downloaded, click on the installer. You can select the 64-bit launcher shortcuts that will be created on your desktop. Once finished, the Lord pyjamas. No need to import setting. Let's create a new project. Directly. Specify that, but let's give the project a Mm. It takes some time to create a virtual environment. You'll get an isolated environment to manage your project dependencies. By default, you'll get a main.out. We the main method. Click on this green arrow to run it down. You can see the sample code output high Python pie chart in the console. Let's add another print statement. Relied a breakpoint. Simply click here, and then it would run up to that point. It printed up to that plane, and you can click and execute the next line. So this is how we can debug python programming. 6. Installing JDK: Java is required course part. Let's install JDK to the Oracle website and download Java. Java is not already installed at the command prompt. Type Java Space Das version. So Java is not in stars and let's download from Oracle website. We'll download 64-bit version Pardo Windows machine. You can choose the right one for your operating system. Will prompt you to sign up. If you do not have an Oracle AID already, then create one. After dark, you can download and simply click on install. Now you can go in and verify if Java is installed. Also make sure Java is added to the class, but go to Environment Variables. Click environment variables and C Program Files, Java. And you used up to the bin directory, to the data. 7. Installing Spark 3 & Hadoop: Let's go to Spark website and download the spark will download 3.2.1, which is built for Hadoop 2.7 meter site and download glitz unzip reducing 7-Zip. Let's copy this directory to see directly you can copy it to anywhere. I limited sparked three. Now I'll go to Windows environment variable. Let's make C Spark, Hadoop home. And C three also is that spark home. To work with Hadoop on Windows, you would require window Telstar dxi, which you can download from our GitHub repository, future Python, Spark, and our unit two. Copy that to the Spark 3-bit directly. We also need to pseudo-class spot. Go do your part. And our Debian directly sparc three bin directory to the path environment variable. So hadoop home should be C spot three, Spark Home Services part three. And part should contain sees part three and bin. Then you should have though, when we till file within though, been directly. After that, you can open a command prompt and type by Spark. You can see that task three has been installed on this machine. 8. PyCharm PySpark Hello DataFrame: To run py spark from the command line. Environment variables and hard by Spark driver Python is by ten. Now go to command prompt. Let's have a simple list. Will explain by Tullis turned our dataframe concepts later. Now let's ensure that our environment is set up and now we are ready to start. Python Spark programming. Hypoxic will import the desert type. Now let's create a DataFrame from the list. My list than the schema is integer type. Now let's do df.loc o. We can see the DataFrame value getting printed to the console. 9. PyCharm Hadoop Spark programming: Let's now save this DataFrame to a Hive table. Will enable hype support. Ghetto create. Now prom DataFrame, we can create a dim view. The table one. Next we'll create a Hive table. From that table. Again, we'll be exploring these concepts later. For now, we are setting up a Hadoop Spark and moment and making sure that it's working fine. Spark SQL, table. Select star from table one. And let's run it now. We need to give a parenthesis here. Run it again. The program completed successfully. Exit court gentlemen, success. You can go to the project folder and under dark the spark warehouse directory where they will live in created. We did not give any name to the table, so it is as the pebble limb and the Dart you'll see files. So that is our Hadoop ICBA stores file in HDFS file system. And you are seeing the same thing here in the Windows environment. You can open the files and see the content of the DataFrame, that is 123 and deserts split across multiple files. One fail is one and the other file list 23. This is how we can do Hadoop and Spark programming on your Windows machine. 10. Python basics: We create a new project, then some Python basics. Let's create a new project. We'll name it as LAN Python. Okay, let's look at this Python file that PyCharm periods will remove all the comments. In a Python program. The starting point is this if block, which takes Name Is equal to men. That's where the program execution begins. You could also write this edge, men. And this is the main method. And from here you call them n. That is the way you would see more sudo python programs written in the real world. But it's okay to have a different name for domain metadata. Let's learn some Python basics. We'll do our Python programming within this main method. Python spacing and indentation matters. You can't have print like this given expected an indented block. So all the Methodists are the different blocks. Lake if block and the block start with indentation. And wherever they did this and ends, the method ends or the loop ends. We can have a print statement here also. These two lines are part of this if loop. Python supports all kind of arithmetic operations like addition, subtraction. Let's declare two variables. In Python, you can declare variables without giving the datatype. Regard output three plus four. And you can have a string value populated with the Python will not complain. So depending on what value we are populating, Python will take that data type list to the popular data type in Python. Collection like an adder, where you can have a sequence of elements. And then you can grab a particular element by specifying the index number. Index number starts with 01. If we change it to, let's say, 301234. You can also grab the last element by specifying minus1. And minus two would give us the second last element. In Python, either single quotes or double quotes would work. And you can have single courts within double quotes are double-quotes within single quote. Let's concatenate string one and string two and print it out. Let's start looking at loops. We have already seen an example of a flu. So you write e flew four like this. If three is less than four colon, then print. We didn't a fluke. And wherever indentation edge loop ends, we'll print out say the flu, Please less than four. So it printed book. You can also put a parentheses here. If we make, let say three greater than four, it will not print this lane. There are many ways you can date for looping by Tn. There is something called range for i in range 53 and value of high value of phi. Let's do that. Operate a bench. We can give a command. If I were this clean, we could have concatenated. But this is another we have predicted values by ten. That is something called string interpolation. You can write the value of I0, then curly braces, and then you can say dark format. So that this value will get bust side that you can have this. Or if there is a string you can just concatenate then printed with men string. You can also have a for loop to loop through a list. Let's declare another list, L2. Now to look through this, you write for j mylist to bring value of J. Let's run it. Bring value of j within the loop. We've already seen an example of function in Python. Desire declared a function. Let's have another function. We call it calculates sum. And I will take two parameters, a b and return a plus b. Now within the main method, we'll call this calculate some 5-6. Let's store it in another variable. To equal this. Now we'll print it out. There is a cutout. Calculate sum is not defined, will fix this typo. You got 11. We can also return multiple values, let say sum and multiplication. Both. The way to do that is with a comma, you can specify what all things you want to return. Now we're returning a plus b and k multiplied by me. And we can store it in two variables, three, therefore, by declaring them like this. Then 23, and then print wherefore. We got 30, we got 200. Suitable to return two values from a function in Python. To write data to a file. You simply do with open. Give the filename and mortis w, that is rate. And then break. Let's execute this. My failed one God created. Let's check it out. Sample content one has been populated with an append mode. You can append more content. So discontent got appended to the original content. You can have w Write Mode. Lets read the content with new content. So the content got overwritten. So these are some basics of Python. This is not everything. This much knowledge is sufficient for you to get started with PySpark programming. 11. Structuring code with classes and methods: Let's now create the basic structure of our Spark project. We live it data by plane dot. That will be the entry point, what our application. And then you could have disturbed by which you read data from a data source. It could be Jaime, what any other data source. And then, which you'll apply the transformations. And then finally we'll have a parses that BY which will store the transform data to another data store. So this data pipeline dot py will invoke these three in sequence to do the data processing. So typically this is how you structure coordinator real-world. Let's go to PyCharm and create this project structure. Let's create a new project and we'll call it data by plane. This is the default math.pi. Let's get rid of these comments. We'll declare a class. We'll call this R1 biplane. I will simply write the running pipeline pipelining. Because this method is now within the class by plane. To invoke the grumpy, plenty of clean, extensive that biplane class. Then you can call run by plane using the instance of the class. The run pipeline with third niche to take selfies a parameter, because this is now a class label matter, not a separate function. Self is used to represent the instance of the class. Using cell. You can also access, although class-level variables, which we'll see later. This is the structure of our application that we are trying to get to. We'll rename the min dot a2 data by plane. And you would run, it could run as before. Now let's add two more Python files. We'll call it ingested by and And the third one, parses within in disturbed by it will declare a class called ingestion and have a method's down ingest data. In Python, you declare variables in small letters and then have underscore to separate different word stats dot convention will print ingesting. Now we didn't gas form. Let's have a similar declaration. We'll call it transform and transform data. Finely parsed installed. So we'll declare a class persist, and we'll put a print statement. Persisting. Let's now go back to data pipeline. Within run pipeline will call ingest transform and persist one after the other. First trillion this data using in despite then transform, then persist. First, we need to import the files that we've just created in the main class. Import ingest, transform and parses within the data pipeline by template. So this is how you can import Python file in another vital file. Now let's instantiate classes. Import files by specifying the file limb. Then to instantiate, you give the file lemon dot class name and then invoke DaMatta similarly, lists, instantiate the other three. So let's now run it. So we are first indistinct den, transforming them persisting swordsmen, glass structure is ready. Then we'll gradually ARD functionality within each of these using Python Spark. 12. How Spark works?: Let's get an understanding of how Spark walks. All Spark programs have managed by a driver program, which typically done Sunda master node. We are running Spark locally. Typically in production, spark could run on a multi node cluster and moment, the driver program would run on the master node, and it will create several workers on the wildcard notes. The driver program creates a spark session, which is the entry point to any Spark application. Once you have the SparkSession, you'd get access to all Spark functionality. Is spark system is created using a buildup pattern. You specify our name and then you can specify videos, other configuration parameter lake, how many cores you need to create a SparkSession? Can nibble hype support is required to get access to the Datastore and interact with Hadoop Hive data store. Once the SparkSession is creative, you get going with Spark programming by creating DataFrame and then manipulating that data using videos function tolerable level in DataFrame. Spark SQL is the high level API that is available on top of Spark DataFrame. Using Spark secretly, secretly query and interact with data link data the way you do in a relational database. Note that when you are working in a shell, the SparkSession is every level. It is Spark variable. You do not need to create that explicitly. 13. Creating and reusing SparkSession: Spark system is the entry point to any Spark application. Once you have the SparkSession, you should reuse it throughout your application. Let's now create a spark session in the data pipeline project will first go to Settings. Select the Python interpreter under the project. Click guard, search for PySpark. Select an install packages by Spark has been installed. Let's close. This. Will go to a read configuration. Select Environment Variables. Click here. Will spark home, which is C colon sparkly. And then also Python part. That should be Sees part three and click OK. Okay. Now let's import py spark. And let's just run it and find it recognized by Spark. Let's have a method to create a spark session. We live it class-level variables safeguard spark. And then using that will create a spark session that we have created our earlier. You can hit enter and split this lane into multiple lines by chat mode automatically our days backwards slash. That's how you split the line into multiple lines. We'll call this create SparkSession immediately after instantiating the spy plane glass. Let's run it. We have to import SparkSession. Prompt Pi, Spark got sequin. Let's import SparkSession. Let's run it now. Spark is the high-level Python liability using which you get access to all Python Spark modules. It took some time to create the spark session and after that it ran fine. Let's now check if the SparkSession got created on arc will go to run pipeline and within that, lets create a dummy DataFrame. It would be similar to what we had done earlier. And this time we'll say silk dot spark because that is the class level variable we are accessing Within run biplane though we created the SparkSession within create SparkSession method. We assigned it to a class level variables spark by saying self.age br, not to access it in any other method you do self.age spark, and then you can start working on it. You have to import in teaser tape. Now let's do a df.loc. Oh, so the DataFrame called printed and all the three pipeline classes got executed. One thing you might have noticed this time we did not add content route in the product structure. So that is not required because we specified environment variable. So to work with PySpark, you simply are done by Spark in the Python interpreter and said that to environment variables then, then you should be good to go. Now, we have a spark session and we should be reusing it throughout the data by plane project. So let's now see how to pass that SparkSession too though, in just method, we can simply pass it while invoking the ingest data or while instantiating the ingest class, we can sit at class level variable for the ingest class, which will store the SparkSession and which can be used within this in this class. Spark is something called init method, which is Lake constructor. Using that, you can initialize variables doing the object creation will declare a method for the ingest class and Will says self.age equals Park. We need to import the required by Spark library in the class also. It still doesn't know it's a spark system variable. Only at runtime it will be determine what value this SPARQL query will condense. That is how Python works. And we could pass any value to this part, not just partition. Now let's go back to the data plane. And while instantiating the ingest class will pass the SparkSession variable. Now this partition will be available in the ingest process instance. Let's check it out. We'll move this code to ingest data method within the class. And here when we say self.age spark, it would get though class-level variables part of this in this class. Let's check it out. These are Hadoop Spark internal logs. Everytime you do some operation on the Spark library, you see this message is in a different color. Now the DataFrame values are getting printed within in this class in ingest data method. So this is how you can create a spark session at the beginning of your application and use it throughout the application. 14. Spark dataframe: Spark is a popular big data technology and the real world, you'd see spar getting used for large-scale data transformation. Spark in the data from different sources like Hadoop Hive or relational databases are from files, then it can apply aggregations, do filtering, and do all kinds of data transformation, and then store the data back to another data source. Let's look at some examples of different operations that can be performed using Spark DataFrame. So earlier we created a simple data frame from a Python list. Now let's create a DataFrame from a CSV file, a CSV file called retail store dot csv. We load this CSV file to a Spark DataFrame and perform different operations. So let's see that in action. Back in the ingest data method will remove this. Let's change it to ingesting from CSV. Spark is a read_csv method using which you can read data from a CSV file. Let's read data from retail store dot CSV, and store it in a customer DataFrame. Let's run it now. We can see that spark is read data from this retail store dot csv. Created a DataFrame. Dataframe is like an in-memory Excel where data is stored in rows and column format. Here we did not specify that the CSB already has a header. So it created a header with different column names, but that can be taken care of, which you will see shortly. But the key thing here is now we have an in-memory data structure on which you can do different operations like filtering, aggregation, removing malware, lose, etc. Which is where Spark is used in many real-world applications. Let's change this to header equals true. Then sparkle dot csv file header is the header. Using this grave, we can get various statistical information about the DataFrame. Will first write the code, then put a breakpoint to see different outputs. We can select a particular column, let say Country column from the DataFrame by doing a select and column name. And then let's show it. We can also group by data, by certain field. Let's run it and see it output. We'll put a breakpoint here and get this debug. Nowadays shown the entire dataframe and stopped here. Let's hit this table. We're RFA. We forgot to capture parenthesis here. Now let's step over again. Regard sub statistical info about the DataFrame. Like how many rows are there? It excluded null values. That's where you would see different count. It also gave the mean and standard deviation for numeric fields like aged salary. And what is the mean value and what is the maximum? Let's continue. Next. We are doing a select on country. It selected only the country column and displayed that. And now we're doing grouped by country. And this doesn't have any attribute called show. If we're doing any aggregation operation, we have to get the content that display that. So let's move the breakpoint there and run it again. We can see the data grouped by country and the count will now see an example of filter operation on the DataFrame.filter roche Where salaries, Let's say greater than 30 thousand. And show that we'll see an example of another group by operation. And we'll calculate the average salary and max_age from the given data. And let's have an example of orderBy also. One filter, one group by and one orderBy. Let's run it up to this point and then we step over. We did a filtering on salary greater than 30 thousand, printed all the rows where salary is greater than 30 thousand. Let's calculate average salary and max_age and display that. So we're doing a group by gender and calculating average salary and max_age, which is getting displayed here. Let's do the orderBy operation. And it did DOD and displayed data by salary in an ascending order. So this is how we can do various filtering and aggregations using Spark DataFrame, which is the main way you work with Hive and Spark. 15. Separating out Ingestion, Transformation and Persistence code: We did all the transformation operation in index.html, but that is not where it is intended to go. You've created three different Python files, ingest, transform, and persist. Let's organize the code for this CSV file you skills and have today ingestion coordinate in disturbed by a0 and then move the transformation CTO And we'll also see how to store data to another file. Land will capture that coding persistent. So let's see that it may distort data, will read the customer DataFrame, and then return that. Let's remove all the score. Back in the data pipeline, will create a variable df, which will have the return data. And we'll pass that value to transform and then get it transform DataFrame back. And we'll show that here. We are instantiating transformation class with spark session. So let's modify the init method. We'll copy this score from ingest and capture it here. Now we didn't got transformed data. We would take DataFrame as a parameter. And then we'll do our transformation operations here will do a simple transformation this time. The F1 equal P F dot n, a dark drop. Should this should remove all the roads which is null values. And we'll return that DataFrames are removed. Every time you do some operation on the DataFrame, you to create a new data frame variable lambda stored outputted that you can't modify an existing DataFrame. Will first show the ingested DataFrame, and then we'll show that transform DataFrame. We'll have to import py, spark and SparkSession in So let's run it and see the output. It's running the pipeline, I mean, distinct data from the CSV. Now to show the data here, you didn't just trade and showed the data here. Now it is doing the transformation. You can see that all the roads which are null values have been dropped. File example ro T11 had a null value in salaries, so that has been dropped. And wrote 24 has been dropped and the row 2D6 has been dropped. So this is a very simple transformation operation. But then now we have seen how to organize the court to have ingestion in one Python file and then have the transformation in another one. We also have a, the last two parts sister DataFrame. Once data is transformed, it can be persisted to any data store. Let's first see how to parses this resulting DataFrame to another CSV file. In persisted pi will have a inet method. First, we will import py spark and spark session. And we'll take DataFrame as the input. Get-up frame is a method using which we can write data to a CSV file. We said before trade opsin header equals true and specify the CSV file name. And this would create a CSV file with transform data. Let's now pause the DataFrame to the parses data method. Will also pasta SparkSession though it's not required because we are not doing direct operation on the data frame. Let's run it now. It's now reading from CSV. It apply transformations and then finally, persisting the data. So that is complete. Now it created a folder that is our Spark and Hadoop works. It creates a directory with the file name specified and under dotted creates says deepest part failed with the actual content of the file. If the data volume is used, it might split the data into multiple CSV files. And you can use wireless to specify the number of part files. For example, df.columns OS1 would always right into a single file. Let's see that doing this cuz it already created one single file and we'll not have dot CSV here. We'll specify a name because Spark is creating HDFS directory and a 100 that it is creating a CSV file with the file content. So this time we got a directory without any dot CSB extensor and the content got stored to a CSV file, single CSB file. So this is how we can build a data pipeline using Python Spark. We've seen a very simple example of CSV fail. Then in the later part of the course, we'll also see how to read data from Hawaii bend do transformation and then destroyed the dynamic, the Detache stroke. 16. Python Logging: We'll now look at how to do logging in Python. Till now we have been adding print statement to see how the code is getting executed. In a real-world application, we use a logging framework so that we can control different log levels. Let's understand that through some examples. Python is in logging liability. We simply import that. Let's add some logging statement at the starting of the application. We have added four types of log messages, debug, info, warning and error. We'll put a breakpoint here and we'll see how this works. And we've also added the logging liability. Let's run this code. It would run up to this point it stop. You can go to the Console tab and see the output. We can see that warning message and ever misses they've been printed. So by default, Python is a warning log level and anything higher than wanting gets predicted. If we aren't to print the info debug, we can say that log level or at the beginning. Let's get the logger to info and run it again. We can see that in foo is getting printed. Now let's change it to debug. All for getting printed. So anything harder than debug that is in for wanting it at all, forgetting printed. And we can change it to error also. Only error is getting printed. So you've seen how logging works in Python. Now in Egypt, the classic method, you cannot different types of log messages. If there is a roadblock you large error. If you just want to print something out to the console to understand the program flow, you can use info or if you want to print more log messages that would help you in debugging. You put some debug log messages and then you would control your log level. In each of your program. You can move this late to a class level variable also show that this is set for the entire class. Use that wherever you need in this class. Let's put some log messages in pipe played metals also, we live in for logging in. R1 pipeline method is a best practice. You print out the beginning and end of all methods in your class. And we'll set the log level to info. Here we'll say application started, SparkSession created. And finally, biplane executed. Let's run the program. We can see application started. You also see sitting before log level to one that is wanting. So that is for Spark. Before log. We can see SparkSession created, pipeline method started and we are seeing print statements from other classes. That is fine. We have not added log statements in those classes. Regarding later that this already exists. That is fine. We did not delete that. But you've got a sense of how vlogging works. Now let's delete this directory and done it again. But this time we had different log level. Will set it to wanting. Now when you run it, they'd four statements will not get printed. We can see that info statements at not getting printed. We did not see this application started. And SparkSession created log messages. Suppress log. You can set the log level. And based on the log level, your program will output log state prints to the console. One challenge here is you have to modify your code every time. You need to have a different log level. And then you'll have multiple classes in your application. So let's see how to manage logging three, a properties file or a configuration file. In the next lab. 17. Managing log level through a configuration file: Let's now understand how to control log level through a properties for a large configuration file. Create a directory called the resources. You can create with any name. But that is the standard practice. And we'll ever directly call fixed under that, where we'll store the log config. And let's create a new file. We'll call it logging dot-com root logo for the entire application. And then subsequently we, Laurie Lager for each of our Python file, which we'll see later. You also need to declare a handler. We Tyler you can send log to console or to some file or to some other places. Will have a higher law for console handler because we are playing to print log two dot console. Next we need to declare the handler and log level for the root logger. And then we need to have a property called handler underscore console handler, and we need to specify a formatter. And finally, we need to have a promoter underscore sample format of property are the which will specify in what format the log will get printed. Now after this, you can manage logging Javier application through this property fail. So let's see that. Go to Data Pipeline dot py and we'll have to import the logging dot config liabilities. Now instead of having a basic config, will now read config from that configuration file. Using file config. Rest of the code will remain unchanged. Let's now run it and see the output. We gotta key editor. Let's check it out. We also need to add a key for the format. We are using sample formatter. So we need to add a key for the sample formatter. We'll run it again. Now we can see that route info application strategy that is getting preempted. It picked up the root login. To understand how we can control log level to the config file. And let's add a few more log statements. We'll have one whining statement is application started with wanting one retailer and then let's add another one. Really great. Debugging the application. And let's run it. Debug it up to this point so that the entire program doesn't get executed. And we can see that all four log statements at getting printed, now go back to login got config, and instead of debug, let's have a different error level will have info. As you might have noticed, we have log level at the root and we also have a log level at the handler level. Using the handler log level, you can control the logging to different places. For example, you might have a debug statement in the console. But if we are having another handler which is printing to have phi, let's say printing to email, then you can have a different log level there. Let's see how this would work. Will change the root log level to info, but we'll keep debugging console handler. Let's run it. We can see that the debug did not get printed though we had debug level at the handler level, go handler would overwrite whatever is there at the root level, but it will not have hired log level than what is specified in. So if we change this to debug and then have it for here, we will get the same output. Change this to wanting and keep God log level at the root level to debug the wanting and data are getting breadth deck. But here if we have let say wanting Barton, the handler, we have info. They also wanting will get printed. And now we'll change the handler Twitter. We can see application started with data. So Handler rude overrate the root level, but it will not have hired level than what is specified in the hadoop. Logger level will keep this debug and then control our logging handled log level will change it back to info called tree info wanting and data are getting printed. Let's go back to the main program. And we have enforced it meant those will get printed. Now to use this in Python classes, you simply declare a class level variable. And you also need to import logging and logging. Let's import. For Python files will also have logging declared in all four. Now let's add log statement. Instead of print, we would say login dot info ingesting from CSV. And let's have another one. Dataframe created. Similarly in persist will have a log statement, will remove the print. Let's now run the entire application. We can see in four statements in the pipeline by ten file. And this one is coming from ingested by, and this is coming from parses dot pi, this is coming from The program ended with all the info statements. Let's run it again with a different log level, will dilute this output directory, will now change the log level in the handler to wanting. And let's have one warning statement to make sure it is working correctly. Will run it now. Wanting it got printed, that is at the beginning of the application, but you would see none of the other info statements that are getting printed. Now the program finished. And apart from the wanting statement that we had within transformer, metadata log statements got printed. And we now have a error statement in parses dot pi, let's say Dummy error, persisting. And in the, the handler will change their liberal, will delete this directory and run it. Application started with an error. That error statement is getting printed. Now we should only see the error statement that is there in the persister by none of the other wanting or in post-it mature get printed. W rev in persistent. So this is how you can manage your log level through a configuration file and heavy blogging implemented in your Python Spark application using Python logging liabilities. 18. Having custom logger for each Python class: Let's now see how we can have different logos for each of our Python files. Will go to and our data logger for ingest transform. And persist. We need to have a logger defined for ingest transform. And you also need to specify what the error level and which handler we can have a common console handlers for all. And name would be the name. And we'll have one foot persist. Let's have info for all to start with. Now will go to ingest, start by hand. To read a particular logger, will say logging dark NetLogo. And this is ingested by, so we'll get the ingest one. By instead of having logging dot info, you'll have lager dot info. Let's are they wanting statement also to demoed this one is deprecated, super huge wanting created with wanting. And let's make similar changes. Persist. Let start it in also. So now we have four different loggers defined root ingest transform persist. And we have log level in full for all. And it had a level also we have Info and we have defined which logger to use in each of our classes. Let's run it now. Will dilute this output directory. Now all the log statement in all classes should get printed because we have been following all the classes. We can see a log statement from ingest. So a log statement from all classes got printed. Lets say we want to suppress they enforced treatment from parses will go to And now for the parses logger will set the log level to wanting and we'll run it. So now we are able to control log level in a particular python file. So this should help you in debugging in a production environment where we want to increase the log level for a particular by ten plus. This persistence would get suppressed, that this lane got suppressed. And wherever log statements are getting printed, you can see which loggers getting used and what is the adult level. Let's just try one more logging level change and transform we will suppress info, will change it to error. So now the log statement from transform had been suppressed. One thing you might have noticed is log is getting printed. Ways to avoid that we let to sit the propagate property to false. Otherwise routine, custom logger both will get invoked. Let's run and see the output. Now. Now we can see that log is getting printed only once. We got this error because the folder already exist, which is fine. In this lab, we have understood how they are different logos for different Python files and how to control log levels in each of those files. 19. Python error handling: Let's understand how to do error handling. And Python will implement error handling in persister data method, which is throwing exception whenever there is a directory name transform detailed store that already exist. So let's see how to handle that. In Python, you can put the code which is likely to throw it out within a try block type colon and hit enter. And then if this block of code throws, any error will have to handle that, even accept sunblock. The syntax for that is you put except exception Ks EXP colon will write n log statement logger dot-dot-dot error occurred while persisting data. And we can append the exception to this message. We need to convert the exception to a string. And we do that in Python by saying STR. And within parenthesis, whatever object we want to convert the string. In this case it would be XP. So this should convert the except sent to a string and then append that to the IRA. End. Moment there occurs will exit. For that, we can do sys dot exit what? We need to import sys, which is system. When an application and successfully it exits with code 0. So here we are forcing it to exit with code one, which would indicate a failure of the application. Let's run it and see what happens. And we already have this transform detailed store directory. So when it tries to burst sister again to the same directory, it will throw an exception and it would come out. We have already logo entertainers treatment that will get printed. The law we've been trying out a dummy statement, but this is a genuine case of error which will get printed to the log. We got that error. An error occurred while persisting and died says part distinct already exist. So this is the exception that got caught here and bits getting printed here. You can handle it by storing it in a database or send an email notification. In Python, we put comment with hash. So you can do all these operations before exiting out of the application so that the red is logged somewhere. Typically you catch except sons. At a higher level, we are calling this parses method from data by plain Python program. So in bots is.5, whenever an error occurs will not exist. But we would throw an exception which will be caught at a higher level. We would say raise exception is DFS static pre holiday exist. We can pass this exception also or we can pass a custom message. Now this will be thrown to the method which invokes versus data, which is data pipeline dot py. Before that, let's clean up this. Error messages will demote the dummy error message. Back in the data by plane, we will remove this dummy messages. And we didn't run pipeline will handle the era. We live another try block except except some edge EXP. And then we logged data and error occurred while running the pipeline. Will append that error to the string. And here in the pipeline we can send email notification are logger to database, and then finally, exit. That way. You don't have to write this code in your Python files. We need to import sys here. So let's run it again and see what happens. Yeah, we got the acceptor and it's printing here. While running the pipeline is defense director already exist. It also Labda exert error message in the parses dot p, but it did not exist there. It came to the main method and bit log dig in the customer message, which is HDFS directly or Lydia exist, which we wrote here. And it printed that. And if any of the other classes throws in a row that will also get caught tended to be logged here. Let's introduce a dummy error in ingest data. We'll say rich, except son. An error occurred while indistinct. Now we'll see that it'll enter this method and links it to raise an exception and it will come out. You'll stop at the initiation stage. It takes it it, and it said an error occurred while ingesting. So this is how we can do error handling and you can have log statement with the infrared, the exception block will always have error. And if we change the log level to error in production or other end moments only the error will get printed. Will remove the dummy error. 20. Ingesting data from Hive: Earlier we read data from a CSV file and stored in a Hive table. Let's now build a data pipeline which will read data from highways dfs, and then do some transformations using Spark. And then finally store the transform data in a Postgres table. In the real world, the hybrid HDFS data might get populated by another process, let's scoop or Spark Streaming or some other process which would be populating this data. But the use case that we'll be focusing on is one study days available in hey, how do you clean, process it and apply all the transformations using Spark and store it in Postgres. Let's first create a dummy Hive table. Let's create a method which will populate data in the table using Spark dot secret will first create a future IQ scores are database. Then we'll create a table with this schema. And I'll say if not exists, future ex courts table with a certain number of fields, course ID, course name, and let's add a few board author name and number of reviews. So this should be our table. Now once the table is created, will populate it with some WD cuts. We live wonder chord, of course ideas, one course named Java, future exothermic. And then number three was 45. Similarly, let's hard bunch of records with some course name, author name, and number of reviews. And there are certain rows with the blank value. And we'll enforce hate to treat them as null. And later on we'll do transformation to replace those null values. You do an alter table and using Siri lay format, you end foresight to treat them as null values and be strings as null. All we're doing here is creating a dummy Hive table, which you can use to build it up a plane. Now lets call this method from the main method, and we'll comment it out after calling it once create type table. So we would have a spark session before calling this. We'll put a breakpoint here. First greatest SparkSession. Then it'll enter this method and create this dummy hyperbola. We did not specify self.age spark here. So let's stop it and do that. Otherwise, it will give an error that sparked NOT_FOUND. Python is an interpreted language. You will only get to know the error. The runtime will run it again. Let's now create the schema. It created the database. Now let's create the table. And Coordinator. Good complained because the screen was not created correctly. Let's put everything in one lane, fix it and run it to populate that table and alta that pebble. After that, we can ingest data from that table and comment out the invocation of clear timetable metadata. We don't need to run this method again and again. And we'll go to in my garden than the grid data from this table. Let's go to India, start by, instead of reading from CSB, will read from the new table. Using Spark Sequel, we can read it by specifying the select query. Now discourse DataFrame. We can be done. And in brand pay plane, we can see the industry DataFrame. Let's put a breakpoint here and run it. So this ingested all the records that we have been centered in the table using these queries. Now let's print it and see the output. So we can see that the entire table content has been loaded to a Spark DataFrame. And we had a Bordeaux displayed. We have completed first tip up the plane that is reading from a high table. Next, we'll see how to apply transformation on the data that we have just straight from the hype pebble. 21. Transforming ingested data: We've seen how to read data from a table next to will be playing some transform isn't on the ingested data. If we look at this data that we are grade from HIV, it is null values in number of reviews column, and it is null values in author name. So wherever we have null values in the author name will populate with unknown. And wherever we have null values in the number of reviews column will populate with 0. Let's go to and apply that transformation. Will comment out this transformation. This was dropping all the rows with null values. First, we'll replace null village in the author name and created DataFrame df f1. Then we'll replace the non-valid in number of reviews with 0 and create DataFrame df2. And we'll return that. Running. Spark does lazy evaluation. So until the df.loc choice invoked, spark will not do the actual processing. Whenever we call this axon method df.loc transform DFT dot showed that Tim only both the transformations will be performed. So this is lazy evaluation in Spark. You can do a series of transformation. And finally, you can either show or persist. That is when the execution will be done. Let's print the transform DataFrame. So we can see the transform data prim with null values populated with unknown in the author field and the number of reviews Melville is populated with 0. This was the original DataFrame, and this is the classroom DataFrame. So you can apply more transformations in the You can do aggregations, do other filtering. For now, let's move forward with this transformation and then understand how to store the transform data frame in a Postgres table that would complete our data pipeline. 22. Installing PostgreSQL: Let's download and install Postgres equal. Go to postgres equilibrate and go to the download link. I'll be downloading for Windows. Click on download the installer. Let's pick ten W14. I'll download for Windows 64-bit. Once downloaded, click on the installer. Click Next. You can leave the default directory. No need for stock below. Will give our admin. Admin, you can choose any password. 5432 is the default port. Let's install. Suppose this equal has been installed. To Lord post-classic will simply take BCR and click on it. Bcr administer interface to a database. Your dopant seated localhost port phi 1401. If BZ Edwin takes time to Lord search where P sequel, which should also be present on your machine. That is the command-line tool to enter Postgres. Then prompted for local laws just hit Enter on database or default database Postgres hit Enter. Port is 5432, that's 34 port patron data. And user memories Postgres, smart for user positivists. So that is the past dot-dot-dot would have given during installation, I gave Arduino. So I'll enter that. And now I'm in the Postgres database. Let me clear this Schema, create schema, future Rick's schema. Makes sure you heard semicolon up terribly dee da la SQL statement. I'll create a schema, future ex schema, and then allocated table futurist Schema dot featured x underscore course catalog, put W lab, six fields edge described here. The table got created successfully. Let's insert some records to this course catalog pebble. This table lays course ID, course name, alternate, course 67, and creation date, where inserting a row with course I did two course name Hadoop Spark HOTAIR is future work skill and this is the core sex and image JSON format. We specified the core section type education or course name. It is varchar, character wearing, which is like stream course IDs in pleasure and not null, please. Some date is date which is not None. And we've added a constraint, dot-dot-dot. Course id is the primary key. Let's hit Enter. We do not enter the schema name correctly. Let's do that. Insert another row. Now we have entered two rows. Let's select data from this table. We can see that we have a future ex schema course catalog table. And within that we have two records. So this is how we can install Postgres and create Schema, create table, and use this database for your application development. 23. Spark PostgreSQL interaction with Psycopg2 adapter: Let us understand how to interact with this sequel database from Spark. Psycho PC2 is one of the Python adapters to work with. Paused the sequel database. We need to go to python interpreter and installed psychophysical. I have already installed. That's why it's showing up in the list unit to go to Install Packages, select psycho PC2 and install it if we're working in some other Python and mom and then make sure you do pip install psycho PC2 to install this setup. And once it is installed, let's write a new method to read data from the postgres table which we just created past clinically import say copays e2. Then let's create a new method that lead from BZ. We're creating it with First, we need to establish a database connection for that, you declared connection variable with psychopathy z2 dot connect and pass a user ID password, the host name and database name. In our case, the user ADC postgres password desired when that a specified during the DB cliched. Ghettos is hosted on the local machine. And the database, maybe it's postscript. So this is how you establish that connection with psychophysical. Once that is done, you need to create a cursor. And using the cursor you can execute different queries. Let's create a sequel query to read data from the future ICS course catalog table which you have just created. We fired the select query against the database. We created this table lonelier and populate it to records. Try to fetch data from this table and populated data frame. To get the data from this table, we need to use Python pandas liberty and create a Pandas dataframe. Did convert that to a Spark DataFrame. To work with pandas, we first need to import Pandas. You also need to ensure pandas. Library visa included. Again, go to python interpreter and click here. And our pandas, if we're working in another environment, you do pip install pandas. And psycho busy. Required for this demo. To read the sequel query output, we're pandas DataFrame. There is a lady called Secret LEO which we need to import. Once that is done, you can create a Pandas DataFrame with the sequel IO read sequel query method. You pass the sequel query which says select star from the table. And the connection parameter, which we established using Sacco PC2. In sparkly, a Pandas data frame can be easily converted to a Spark DataFrame by using Spark dot create DataFrame commodity. Finally, let's print it out. We'll call this method from the data by plane to see and output. We'll call it at the beginning. Just Process dot read from PC. Let's put a breakpoint here and run it. Now let's step through this method. Connection created with the Postgres database. You created a coarser defined sequel query. Now you've created a Pandas DataFrame PDGF. Next we'll convert that to a Spark DataFrame. And then finally, let's show the output. Here we can see that the data from output has been printed. So these are the two records that we inserted into this table earlier. This is how you can create a Spark DataFrame using psycho PC2 and pandas. Let's now see how to insert a record to this table using psycho PC2 will go to parses stirred by n, create a method. We need to first establish a connection the way we did earlier. Then we need to create a cursor. And after that we did to declare a insert query. We need to populate course ID, course name, author named Core six, some creation date. And then the values will pass the values that untimed. So let's change this to person basis. We have five fields in this table. Will create a tuple, Python tuple to populate these values. So I've declared a insert query, declared tuple. Now let's see how to invoke the insert query. Cursor has a method called using that you can pass the insert query and the insert tuple, and it will execute the query. And then you have to close the cursor and commit the data. Let's run it, will go to data pipeline and invoke this method that let's invoke it here. Insert into PC. And we will put a breakpoint in this metadata, course IDs and integer field. So we have to avoid quotes here. Let's run it now. We'll step through. You've got executed. Close the cursor comic dot connection. Now in the postgres table, fire the select query again. The last record got inserted through spark. And you can run the freed from PC method again to see the new DataFrame with the newly populated record. Let's do that. We can see the DataFrame populated with all the three records. So this is how we can work with Spark and Postgres equal using psycho PC2. 24. Spark PostgreSQL interaction with JDBC driver: Next we'll understand how to connect to pose basic. We'll tweet JDBC driver, goto, JDBC Postgres and download the Postgres sequel JDBC driver. It's a JAR file unit to download it and store it somewhere on your machine. Let me keep it in the project directory. You need to create a spark session with this jar file. So let's modify the SparkSession code. Here. We need to configure Spark dot jars. Then let's give the JAR file name. Let's see if sparked, his son is getting created with this Postgres equal JAR file. We'll put a breakpoint off dark SparkSession creation code. Unexpected indent. Let's fix that. Here. Do a backward slash for new line. We did not put dot-dot-dot extension. Let's do that. We'll run it again. Now the sparks isn't has been created. Let's now understand how to read data from postgres sequel using Spark dot-dot-dot JDBC method will create a new method. Read from PC using JDBC driver. Spark is a method called Spark dot. You specify the format is JDBC and the URL, you specify the postgres SQL URL. Whereas its local lost 5432 port and Postgres database BB table that we would read from his future ex schema and eugenics course catalog. Username is postgres, password is admin. After reading this, let's print it out. We'll call this method from data by plane. Let's comment out the previous one. Now in this process that feed from PC using JDBC driver, let's put a breakpoint here and run it. This was an old breakpoint, will just step through and does tape into this method by using this here at Munich to step into this method. It given era, let's check it out. No suitable driver. To fix this, no suitable driver error will go to spark citizen creation court and change the config parameter driver dark, extract, classpath, and thence respect a Jaffa will now go back and put a breakpoint here. And let's remove all of the breakpoints and we'll run it. Okay, so it has created the data frame. Let's run this command to see the output. Yeah, we got the output. So this is how you can read data from a Postgres WGA and JDBC driver. Earlier we had seen how to do the same thing using psycho PC2. 25. Persisting transformed data in PostgreSQL: Let's now complete data pipeline. Earlier we had raided out from a Hive table and applied that transformation to replace the null values. Let's now parses that DataFrame to a Postgres table. First, we need to create a Postgres table which will store the transform data frame. It has to have a matching number of columns. We have course ID, course name, author name, and number of reviews. Let's create a table with all character types or data types. The table has been created. Let's validate using a SQL query, will try to fetch from this future IQ score stable. Currently there is nothing. Let's now go back to our application code and persister DataFrame. We can do that easily using the DataFrame dot right method. We have to specify the more disciplined formative JDBC. Then give the URL for your database, which is localhost 5432 Postgres. And then the BB table name is huge and that's the tableware dot transform data will get stored and then user ID and password. Let's run it now. We'll change this to more disciplined and run it by plane got executed. Let's query from the course table. We can see that 13 dicots have been inserted. Lets now run it again. And it should again insert 13 records because the more disciplined we can see the transform DataFrame got inserted, the non values have been replaced with unknown non-zero program completed. Let's query the table again. Now we can see 26 rows. So everytime you persist, it will append to the records that already exist. So this is how you can build a data pipeline using Spark, you can breed from any source, handwrite to another database. After applying transformation, you have seen how to read from high Eventbrite tuples lists, and you've also seen how to read from postgres and the right to have sparks course twenties, guitar transformation, data processing and very huge volume. We have seen that our transformation for a small file, but the same code word for billions of records in a cluster environment. 26. Organizing code further: We'll organize the court further. We'll create a new directory called pipeline. And under that, we'll have all Python files except for the main data pipeline, dot-dot-dot. Let's move these files in disturbed papers is.5 to that directory. And you have to re-factor here. When you refactor the file path gets automatically at gestured wherever the files are getting referred. You can go to Data Pipeline dot pi. And you can see now we are importing from Pipeline folder from Pipeline Import Transform persistent ingest. Pycharm is automatically adjusted the pattern. Let's also move the Resources folder to buy a plane folder and refactor. You can see the login part has been modified automatically. Let's run it and see if it is working as expected. We got a key error formatted. So that means in this story, Pindar, the two Python files, they were not able to find the login conf file. Python always looks for a particular file from the source directory. So we have to always specify the path relative to the main route directory. And the path is correct in the blender by how Berlin ingest transform and persist, we are getting an error that tied snot bubble to find the config files. In index.html, we have a relative path starting from the directory where the individual pious. But Python always looks for starting from the main directory. So I have to specify pipeline slash resources class config slash in ingest transform and persist files. Let's run it now. It should run find the stem application started. So the program got executed successfully. Will also move good JDBC JAR file and a retail store or CSB TO pipeline folder. Now everything is under pipeline folder except for the data pipeline dot pi. We need to modify the JAR file path to give the relative fat from the project root directory. 27. Reading configuration from a property file: Let's now understand how to read configuration file. We have seen an example of logging earlier, but these tablets tried to read some other properties. We have a table name in parses dot by a future ex schema, future IQ score stable. We can move that table limb to the properties file so that we can easily modify it later without having to modify the source code. Let's create a new properties file called biplane dot-dot-dot EMA under the Resources folder. Create different sections. Let's first create a section four dB conflicts. And that that will have dark target busy table name, which is future excuse my future IQ scores. Now to read this file, we would require labeling recalled conflict parser. Let's import that up to that, do you have to declare a variable called config, which you'll be in stance to the config parser class from the conflict parser library. Next to declare a variable for the target table. And we'll read that from the properties file. And instead of hard-coding in the df.loc dissection will populate table lamp from the target variable. You do a config dot, read and specify the properties file name. And then after that from the config, you do a get and specify the sucks and Nim and thus persuade the property name. In this case, the sex and name would be DB underscore conflicts. Property name would be target underscore, underscore table. Let's print it in the log. We also need to ensure the party's relative from the root directory, Sudipto specify pipeline slash resources slash biplane dot INA. Now let's run the program. Will step through and we should see the target table name in the log. Eugenic schema of eugenics courts are able limb. This is how you can breed application configuration from a property file. You can have multiple sections and boundaries. 600 can have multiple properties. And instead of hard-coding in your program, you can read those values from the properties file. 28. Python unittest framework: In the real world, you will unit test your code, then handed over to the testing team to do functional and integration testing. So let's understand how we can do unit testing. By Spark application. As a best practice, you create a separate directory to store all your tests. Let's create a directory called test. And we do not need to ship that directory when up to package or code for other environments. After that, you create a test by then find for each appear Python file that you've written. For our example, let's test file for You can give any name, but this is the convincing first clinically important unit disk, which is the unit testing framework in Python. It's similar to JUnit and other unit testing framework. Let's see how that works. Whatever class you are testing, for example, transform, you create a based class with the name appended to the class you are testing. And within parenthesis, you write unit test dot test case. After that year to creative function for each of your taste. And the function names should start with based underscore. And then you can give any meaningful name. Let's call our fastest is faster underscore test. And we'll do a simple comparison using assert. Unit test provides assert, assert equal nautical, using which you can perform various comparison. Let's compare if three equal to three using assert equal. Now declared them men if block for the program. And to invoke the test, you have to write a unit And under taste NDRC, the output, we can see that the test is passed. You can also run directly from the function name. If we change it to five, it will fail because three is not equal to five. Expected is three and actual value is phi. Let's add another test. We'll call it second taste, and we lose the assert true this time, you check if something is true or not. You also answered false to check if something is false or not. Let's check if Python is written in caps or not. You produce drew than the taste would pass. Insert failed for the first one. Let's fix it. And now both the test passed. You can click on this stigma to see all the first test passed and our second test also change it to lowercase, and then the second one would fail. First, 1, second, 1 third. So these are some examples of unit testing with Python. Next, we'll understand how to unit test our transform metadata. 29. Unit testing PySpark transformation logic: Let us understand how to unit this though, transformation logic in our data pipeline. We're built ended up I blend to read data from high EBIT to store it in Postgres. However, to taste the transformation logic, we don't need to connect to hybrid post. Chris, The idea behind unit testing is you should be able to unit test any function without all the dependencies. For example, this transform data function, it takes a DataFrame. It replaces all the null values, return no non-zero, and returns another DataFrame. So no matter where the data premise coming from, from Hayward, from file or from any other source. This method is designed to do. Now we look transformation. So let's see how we can test this. This time we'll clear the test fail under the project root folder. Otherwise by Jeremiah, creative Susan locating all the dependencies. So let's create a taste Python file port transformer. We'll call it tastes. Plasma will first import the unit test. Then from the pipeline directory will import the transform class, will define a class to do unit testing for transform class. Let's also men if block. Now taster, transformers and logic will have to pass it dataframe, which contents null values. We have mock CSB file, which contains two records, and it has null in one upto author name. So let's create a DataFrame from this CSV file and read it through the transformer. To create a DataFrame, we first need to create a spark session. We'll now write a function called transform should replace null value. And within that, we create a spark session. Next, we'll read the mock course data CSV file and create a DataFrame and will display that data frame. Now we'll call the transform data method of the transform class, passing the data. And then we'll print though return DataFrame. We have done it earlier. This time we're doing it within the paste function. Once you have the transformed him DataFrame, we can extract though or tag name for the CMS course. To do that, you filter on course ID two, which is this row. Then select the author name, then collect data. We extract the row object passing 0 within square bracket. Then from that you can extract dot and m. So this is how you can extract specific values from a Python Spark. Dataframe will print the author name. And finally, we'll do a search. This author names should be unknown. The transform methods would have replaced the null value in the CMS course author name. We timed node. So let's run it and test it. The list passed successfully. We first printed the original DataFrame from the CSV file. You can see that the non-linear dot and m, then we printed the transformed data frame. So the taskbar method replace the null value with unknown. And finally, we didn't necessarily CMS at an Amazon knowns are assert tests passed. Change it to, let's say unknown towards something else, the test would fail. The test failed is ten because expected value is unknown to ended June when Lewis and not. So this is how you can unit test different function for each function and understand what is the input parameter and output parameter. And design your taste in such a way that you can test it without calling any other function. 30. Unit testing an error: Your functions might throw all kinds of exceptions or errors. And you want to check whether it is doing what I did or not. So let's see how that can be done. Will write a new test, should throw a type error. Let's create a spark session, instantiated that transform process class and simply pass a null value in Python knowledge represented as none. And see what happens. Let's first run this. We're trying to apply transformation on a DataFrame which is null value. Let's check the output. We got an attribute data that is NoneType, doesn't have attribute any. So this is expected. But then how do we unit test ten MCAR test pass? If it is doing that, I could do that. There is something called SR traces. And you check for the type of error. In this case we are expecting our tributaries. So you'll write with silver desert ledges attribute data. Then whatever function you are calling, you'll write it after colon. Let's now run it and see what happens. This time that test passed because we're expecting attribute error and we got that data. 31. Python spark submit: Once you are done with all development work, you can package your court and send it to the cluster environment for deployment. Deploying Python code is really easy. Unlike Java or Scala, you don't have to create any jar files or lipid Lewis with the files. Typically in the cluster environment Hadoop hype setup will already be available, learned, accessible too. Spark application and all the required are dependencies that you have in Python like pandas library or any other liability. Those will also be available for you. From a developer standpoint, you know, Dory aboard, packaging and deploying your Python files. In a clustered environment. Spark job is executed through Spark submit command. Since we are testing the core locally, we can go to the command prompt and try orders, same Spark submit command. We have to open the command prompt within this project directory is we have the spark warehouse folder where the tables are located, which you are reading in our data pipeline. Type CMD in the navigation bar to go to the command prompt with this directory. And we need to install pandas. Earlier. We install pandas in PyCharm. But we also need to ensure pandas Isabel willing despite them environment. We can do as Sparks Summit, that is, our Spark. Jobs are executed in a clustered environment. You do Spark Summit and specified domain Python file, which is the input point for their application. In our case it is data underscore pi blender. Hit Enter, and the job will get executed. 32. Conclusion: Thank you for enrolling for this course. Will end this course with a preview of what sparks color-coding trademark course.