2021 Edition - Spark Scala Structured Streaming | Engineering Tech | Skillshare

Playback Speed

  • 0.5x
  • 1x (Normal)
  • 1.25x
  • 1.5x
  • 2x

2021 Edition - Spark Scala Structured Streaming

teacher avatar Engineering Tech, Big Data, Cloud and AI Solution Architec

Watch this class and thousands more

Get unlimited access to every class
Taught by industry leaders & working professionals
Topics include illustration, design, photography, and more

Watch this class and thousands more

Get unlimited access to every class
Taught by industry leaders & working professionals
Topics include illustration, design, photography, and more

Lessons in This Class

10 Lessons (47m)
    • 1. Introduction

    • 2. Structure Streaming Explained

    • 3. Batch vs Streaming code

    • 4. Writing streaming data to a Hive table

    • 5. Streaming aggregation

    • 6. Filtering Stream

    • 7. Adding timestamp to streaming data

    • 8. Aggregation in a time window

    • 9. Tumbling window and Sliding window

    • 10. Appendix - IntelliJ Spark Scala Hive environment setup

  • --
  • Beginner level
  • Intermediate level
  • Advanced level
  • All levels
  • Beg/Int level
  • Int/Adv level

Community Generated

The level is determined by a majority opinion of students who have reviewed this class. The teacher's recommendation is shown until at least 5 student responses are collected.





About This Class

This course will give you an understanding of how Spark Structured Streaming works.You will learn the following

1. How to setup a Spark Scala local development environment.

2. How to stream data from files.

3. How to write stream data to a Hive table

Meet Your Teacher

Teacher Profile Image

Engineering Tech

Big Data, Cloud and AI Solution Architec


Hello, I'm Engineering.

See full profile

Class Ratings

Expectations Met?
  • Exceeded!
  • Yes
  • Somewhat
  • Not really
Reviews Archive

In October 2018, we updated our review system to improve the way we collect feedback. Below are the reviews written before that update.

Why Join Skillshare?

Take award-winning Skillshare Original Classes

Each class has short lessons, hands-on projects

Your membership supports Skillshare teachers

Learn From Anywhere

Take classes on the go with the Skillshare app. Stream or download to watch on the plane, the subway, or wherever you learn best.


1. Introduction: Imagine you have real-time access to all the phone sales happening globally. Can you determine how many phones are getting sorted every hour? Or can you determine what is the hourly sales volume in a 15-minute Slade interval. Spark scholars Structured Streaming course will enable you to solve this kind of real life use cases. So the best way to understand Structured Streaming is by looking at an example. So we'll try to read data from files using Spark structured streaming liability and store that in a heap they will, and also we'll try to print it to console. So let's see that in action. And we have three sample files. Each file contents to records, so player ID and then blade name. So FastqFile is impurity 123456. So we'll read these files using structured streaming libraries, one way one. So let's see the code first. So this is the chord we have written. In the main method. We have first declared a spark session variable and unpopulated that. After that, we have declared a schema to read the file. So we have to find simpler idea and employed name. We'll be applying that schema where reading content from the files. And then we are reading data using reached three method of this partition. And this reach team is pointing to C and put directory from where data will be picked up in Enceladus directly would be blank. Finally, whatever data we are reading, we are reading to the console and the Modi's update. So that data would get written incrementally. So this is the input directory we have seen for directory, which is initially Black. We have three files in the download folder which we're going to copy one-by-one and Darcy direction. Let's start this program. Sparksession created. Then the application would be waiting to start the streaming. Okay, now it's ready to stream. Let's copy of files to the import directly one-by-one. First we'll copy the file one. We can see that it has been picked up by the US parks streaming application, declared the record 12. Next Berlitz copy file to record 34 got picked up. And then finally we'll move into the file. So this is how you can implement Sparks Streaming to read data from files. Spark Streaming can read from videos, data sources. This was one example of how you can read the data using Spark Streaming and store it in a DataFrame. Next, we'll look at this trimming coding detail and also understand various other aspects of streaming. If we're completely notice parks gala, you can check out our other course on Sparks, color coding practices and distinct. 2. Structure Streaming Explained: Structure streaming was introduced in Spark two before Spark to in Spark one, spark used to do streaming in micro batches. In Spark to stream processing was introduced. And the EPAs for batch processing and stream processing where unified, There is no difference between how you work with batch data versus how you work with streaming data in Spark two isn't when new data stream comes in, records get added to the unbounded table. And we can create a DataFrame out of it. And the way you work with big data, you can work pretty much the same way with this unbounded table for streaming data. Church streaming, this unbounded table is called the input table, which holds the data. It DataFrame is created from this input table and all operation that can be performed on a batch data can also be performed on streaming data. Structured Streaming treats the streaming data is a table that is continuously getting updated in the previous verse enough spark when it is to do stream processing and micro batches, you have to specify the batch interval. Now that or what it is gone. Stream processing is taken care of by the system. You do not have to worry about batch interval. Also screaming in the previous, Wilson used to work with RDDs and structured streaming what's with DataFrames? The newer liability for Spark, because you are using DataFrames so you can leverage the optimization that are available for Geta flames, which was not available on RDDs when you're using micro batching in the older was an A-B Spock. 3. Batch vs Streaming code: Let's look at this Spark streaming program closely. So it is three main parts. Initially we create a spark session, then we read the stream, and then we write the stream to console or you can relate it to is DFS directory or any other database which we'll see in future labs. So the key difference between batch and streaming is instead of read, we have read stream, and instead of rate we have rates trim. So that is the Q defense. Other ways, once the DataFrame is created, you can do pretty much any operation that you would otherwise doing a batch programming. And then apart from that, there are a few other details for streaming. For example, what is the window in which we want to read the data? And also from what point onwards we want to read the data as the data would be continuously flowing. So these are some of the concepts you need to understand are the ways spark kids, many TZ, batch and streaming IPAs have been unified. And structure stemming is based on DataFrame, so it is optimized. And then you can leverage all the APIs and optimization that are available on data frames that you used for batch processing costuming also. Thank you. 4. Writing streaming data to a Hive table: Next to instead of trading doubt Porto console, we write data to a Hive table. So there is a separate scholar file for the same. And this project, Structured Streaming saved to HIV. The only thing that you need to do is pointed to a Hive table location. So you'll be reading data in a similar manner from import directory. And wild right being will specify a location where the return will be taken to. We also need to specify the checkpoint location is Spark Streaming API definition. So let's start this program. This time will be starting structured streaming saved to HIV. Let's look at the bomb dot XML while it is loading. So far, no border demos, the demo that we did in the previous lab, grade to the console. And for this particular one where we are writing to HIV, we need the Spark Core, Spark Sequel, and spot. These three liabilities. We do not need any additional dependency for structured streaming. So the program is ready. Now let's start moving files. Now the input directories and p, And we'll start copying files. You can see the output location is HIV location should would clear the HIV location directory under the main project folder. We'll see that shortly. Let's copy file one. The pilot had been picked up. Let's set up first is project directory. You can see that the file has been written to her directly from wherever we can really behave, they would, let's copy data to files. You'll see more part fails getting generated. And other CSV file has been created. And finally, we'll copy file three. Let's look at the content of each of the files. This is the second fade. This was the first fail. Now and at the file will get created for the tar file. Now you can create a table pointing to this location and that would have data. So we are incrementally loading to a pebble using Spark Streaming. You have the tar file has been copied. Next, there's an assignment you can try to clear the hope they will reach points to this directly. Thank you. 5. Streaming aggregation: So in this lab we'll understand how egregious and works on streaming data. How so far Dart lets create three sample files. Consultative forum and sale volume. Puts the Android, iPhone, iPhone four. We'll just copy this and create three files. We want to know how many fonts are getting sold for Android and iPhone incrementally. So in the second phi it let say a bar, a two L. And in the target file, let's say 30 men to our 22. So we have three files. We'll see how streaming aggregation works is in when sales are happening, files will arrive in the landing Jordan and then we'll get processed by our Spark application. So let's copy this and create a new file. Call it structured streaming aggregations. Let's ensure the input directory is blank. We'll delete these files. So created three files and we want to understand how streaming agrees on walks to two key changes. One is playing an aggregation on the DataFrame and then secondly, changing output more to complete. Because when aggregation it extracts a complete mode, it will do aggregation combining the data received didn't need stream. Okay, we looked at these two integer type. Let's run it again. We can't do aggregation on a string. So you need to make sure there are other schema is modified. And because you are doing aggregation on sum of sales in to ensure the cell volume is of type integer k. And we are also changing the output more to complete. So it's going to combine the import datasets, recipient different streams and then do the aggregation. Started running. Let's copy the files. And first copy's Cell one. We can see that in the first batch, it printed whatever it is it is expected 16 knife for NANDA 59 birds had been shown. This is combining file one and file two. Finally, let's copy the last file. And we should see 28 Androids and iPhones. That should be the final combined sum of sales different foods. So this is how you can do is trimming aggregation. That application is waiting to stream. The sales data has been rendered as coming in. It's reading the file and doing an aggregation in a complete board. So it's giving you the counter propel sales at any point in time. Here we can see the expected output. So this is how you can do streaming aggregation. Thank you. 6. Filtering Stream: Welcome back. Now we'll see how to apply filter on streaming data. There is no difference between the way you would treat batch data, which is the way you create the streaming data in the latest version of Spark batches is micelle K sub stream processing, any kind of filtering, aggregation of persistence operations that you can think up on batch can also be applied to stream and also the other B. Let us now understand how we can apply filter on the file streaming application that we've built will copy the aggregation Scala object. Let's call it Sparks Streaming filter. Now earlier we saw how to read the data from files and do the aggregation and display some of sale volume for each phone. This time, let's apply filter on foreign type and display the sale volume for only one of the phones, will declare a new DataFrame. Let's say iPhone dataframe, and will filter down stream data frame. I use Spark Sequel lard are directly apply a filter on the stream DataFrame. Let's try Spark Sequel. We'll create a view. Now. Spark dot sequel. Select star from stream DataFrame where for the iPhone. And then finally, we'll have to print this DataFrame to the right stream, which is getting printed to the console. We have also included the logger this tame and suppress the log level. Now it is running so that it will not print a lot of information to the log. And we can see only the final DataFrame output will start the class. Let start it. While it is starting up, let's look at the files. It's similar to what we heard earlier. Three files with stale data for iPhone and Android. 42 plus three plus 1060. So we should see total loss 60 iPhones getting sold will not see any Android data because we've applied a filter on phone programming starting up. We would not know when the streaming is going to start because we've suppressed the log. So let's copy the first file. First trail is copied. It will take some time to do the processing. And because log is placed again, we're not gonna see anything will see only the final output. So it will print the count of iPhone only for the fasta file. Give some error. Let's check it out. Stream Df naught found ya'll SUV lab to query under ten view. That was an error. It starting up again. We should apply filter on the tame view Nord Stream DataFrame using Spark SQL to the query should have been untamed view. And it gave it her only when this query got executed. And the query got executed only after the file was moved to tears waiting to stream. That's where you do not see the earlier, only after the filers co-producer there. It's starting up. Let's copy that file again. First whalers mood. So it should print iPhone count from the first fail. Fast fail got processed and we can see that only iPhone sales volume is getting printed. Lets now move the second file. We'll give it some time. Now we can see the output of the second batch. First batch in second bytes combined 5247. So let's now copy the last file. Similarly, you can do all kind up data frame operations on streaming data frames. You can use Spark's equivalent. You can apply a filter directly on the DataFrames and store the final data to console or to another table. Order another database depending on your use case. Now the third batch has been processed and we can see the final count here. And only I found it iss getting are displayed as per our filter criteria. So this is how you can do filtering on streaming data. Thank you. 7. Adding timestamp to streaming data: Let's understand how we can art timestamp to the incoming data. Sometimes when you're reading data from, let's say Twitter uniquely sip dot timestamp and the ferreting some log files, you'll also timestamp. But then for many data sources, you may or may not have done actual pain when the data is arriving. You can hard does is Tom timestamp column to the data frame so that you can use that time information for future processing during different window operation, during different other operation where you need to plan to pay. So let's see how that can be done. Will take the very fast example where we are reading the file and simply printing to the console. So let's take that example and our timestamp to it will say Demo timestamp we typically make is simple change here will say after doing read stream, we have the input dataframe and we lard timestamp to it. So this should be streamed DataFrame with timestamp. And then stream DataFrame will simply say with column timestamp. And Spark SQL is something called the Unix timestamp. Using Unix timestamp bar, you can get the current timestamp and then convert it to a particular format. For that, let's import the Spark SQL you in town. And we'll also import the Spark SQL function current timestamp. Now while reading the data, the current timestamp would get appended to the DataFrame. And will finally relate this to the output stream. As you can see, current timestamp and unix timestamp had been imported. And so now we should see timestamp is a column. Let's see that in action, we'll run this program. Will now start copying files. So creator multiple copies of sales file. Let's copy the one-by-one and see the output. We copied the wrong file, how we copied the sales file instead of Replay file, but still it did not give any error, but it said that we copied the wrong file. So this might happen that we are expecting data in a certain schema, but arrives with some of the schema. So let's continue with that. The key thing we're trying to understand this timestamp, which you can see happening in this particular example. We can see that there's different timestamp in each batch will copy more files. We can see new batches with different timestamp. This is how you cannot paint attempt to urine coming streaming data. Thank you. 8. Aggregation in a time window: We'll come back in the previous lab, we saw how to add timestamp part to the streaming data. Now how can we place this timestamp to get information on what period of time? For example, how do we know how many phones are getting sold in last one minute or one hour or ten seconds. So let's see that in a demo. So there might be business requirement to know what is the sum of sales or what is the average cell in a particular time period. So let's see an example how we can leverage that timestamp to get aggregated data over a period of time using something called window. So we'll look at the aggregation example that we had earlier where we calculated the sum of phones getting sold, will copy that and create a new file. We'll say window aggregation. First thing is we'll create a new DataFrame with timestamp. So it would be similar to what we did earlier. Only additional functionality to artists from Unix. Unix timestamp gives us number of seconds since 1970, and that is in Big Integer. And we need to use from Unix time to convert it to a date format which is required for window operation. So here we are getting the current timestamp converting into a Unix timestamp. And from unix timestamp, we are getting the diamond in the date format. Once we have the DataFrame with a timestamp column, or will limit datetime or something. Next, we'll do aggregation using this datetime column using a window function. We'll do our Spark Sequel window function. And give the column name, will specify the window duration here, ten seconds. Or you can specify whatever time. And then we want to do someone's cell volume. And finally we'll read that are the implicit to fix this error in the dollar symbol. And we don't need to do aggregation here while reading. So we'll read the DataFrame are the datetime column. First getting the timestamp Eugene unix timestamp function and then converting that to date format using from Unix time function. And then we are using the window function to do aggregation for a certain duration. So all these functions are part of Spark SQL library. Current timestamp, unix timestamp, and from Unix time window, these are all available as part of Spark SQL library. And finally, we're doing aggregation on the windows guitar. So this should give us silt volume per every ten seconds, 20 seconds or whatever. Let's try it out. Before you'd run it to make sure you clear it. Many files with similar content. For example, we have created many copies of similar data so that we can process them in batches and does see the output aggregated by time window initially and put directories blank. We'll run the new Windows segregation program. Let's start copying files. It give us the output for the first window, second window on the local machine, it takes a lot of time to do the processing. So it's really easy for us to visualize how this 10 second or 22nd interval is working. But when you run this same coordinate cluster where used number of files are getting progressively seconds, then you'd get a better sense of how the segregation is happening. For now. Just try it out ten, depending on your machine speed, you can see how quickly the data is getting processed and whether you are able to see different window and match the account. 9. Tumbling window and Sliding window: In the previous example, we fetch data for a particular time interval. So that is a fixed interval that is called tumbling window. For example, give me data for every ten minute window. So that's a fixed interval we're specifying. We can also specify the Slade interval. For example, give me data for ten minute interval with a slight interval up to minute. So that way there will be overlapping up data in each interval. That overlapping period is determined based on your slide interval. For example, we may be interested in knowing what is the sale happening in last one hour. And give me that data in a slide interval of 15 minutes. Every 15 minutes, give me data for last one hour. And you can easily specify a slight interval while generating your final dataset. The window function takes another parameter which is called slight duration. And in this particular example, we specify it five-minute as the window duration and slight duration is two minutes. So it means give me data for five minutes with a slight duration of two minutes. The Valenti run the program with the same datasets that we declared earlier. And we can see this output. So try it out. Try with a different window duration and different sleep duration and try to match the account. 10. Appendix - IntelliJ Spark Scala Hive environment setup: And let's search for Java, download or JDK and download. And that is our development kit will be taken to the oracle site. Before installing lips and shows Java is not already installed. Go to the command prompt in Windows are terminal in Mac or Linux. Java dash Thomas Bayes does worsen. As you can see, Java is not recognized, so tau is not installed on this machine. Let's search for Java and C Development Kit, except the license agreement. And download the Watson depending on your operating system, file downloader, Windows 64-bit person. What I call will prompt you to sign up. If you do not have Oracle lady, then create what users need to provide basic information like email id, name, et cetera. And you should be good to go on theory, Malala's very fair. You can log in using your email ID and password. One sign that you should be able to download the installer. Go to your download folder and executed. Click Next. You can leave the default directory or change it to some other directory. You can leave it as default. Now once the installation is complete, you can go to the command prompt again. And a person. As you can see, Java has been installed on this machine. Let's now install IntelliJ idea. It's a very popular IDE for Sparks scholar development in the real world. Search for Indonesia download. You can go to the JetBrains site, download the community edition. And once downloaded, click conda installer. You can keep the default folder and install it. Once installation is complete, go to got desktop and economic delivery idea, icon and launch the tool. You can also find it from Windows. Search bar will now start the Scala plugin to IntelliJ idea. Go to Configure and click on plugins. Here to the marketplace tap. Such Stella. Select the Scala plug-in and click Install will also be using Maven, or which we'll see shortly. Now you're poor restart ID or simply close it and open it again. Welcome back. Now I will write this Scala HelloWorld program, opened the IntelliJ ID. After importing the plugin, you'll have to make sure it is restarted. Now let's create a new project. Select the project type is Mab and not Scala UP select nirvana. And default, or JDK will be selected, give a project name. And you can choose the location where you want to store the project, choose any location on your machine. You can also change the package name, or dot example is the default package name. You can change it to whatever you want. Let me change this to come wh x. So we'll take some time to import the required libraries. You can import in this manually or it can enable lot to import. Now praise all one to go to the project view, experimental project for love. And you will see as the main folder under that visit, our folder will rename the job holder to Scala. Here we can go and create a new scala class, but we don't see any opsin because we need to add the framework support for Scala. Lets go to project and our Premack support and then select Color. And then we'll have to download the right color library. If you don't see anything, he can create and click on Download, you will see all available Scala libraries. Select 2.11.8. That is the one that works well with 30 K ten spark coupon for 43, which will add later. So I've selected 2.11.8. And after that it will go to assign Siemans golan then tried to add a new file. You'll see color class often. So select Scala class and select object. For Scala programs, the starting point is a singleton object, not a class. So let's create an object with any name. Scholar demo is the object that I've created. Go to that object type Maine and hit tab. The default main method we'll get created. And let's say simple print Elon statement and run it. You can run it with a green arrow icon that you see in the main method or in the object. And you can see that HelloWorld, my Scala program is getting printed. So this is how we can write Scala programs using intelligent. You can also go to Scala worksheet tend to write code there. You do not have to create objects. You can simply pay. It said repel interface, type, coordinate and hit the green icon that you see. You'll see the output. You can declare variables for Re equal to three, power b equal to three. And then let's say. A plus B and run it, you'll see output is one is the result of a plus B. Let's add a few more lines of code in that Scholar demo object. So it's a big market for onto practice. You can practice in the Scala worksheet. Or if we're looking at more real-world programming, then create objects, create method, send Arduino programming year. Physic type pullets, correct it in collegial prompt to the error and you can click on there and go to the line which if there are. Yeah, you can see the output. So this is how we can write Scala programs using IntelliJ idea. You have to make sure the plugin is added into the project framework supportive to include scholar. And then you can get going with us Scala programming. Let's now understand how to do sparks Scala programming using intelligent. Before tart, let's increase the font can go to File Settings and did the font. Create a new project? File, new project will select the type is madmen and our default shady cool, we selected next LIGO project a name spark HelloWorld. You can also change the package name. Go to the project folder. You can pray salt one to see the project view will rename the zhao folder to scholars you have done earlier. Refract currently name. Let's add the Scala frameworks support. Go to add framework supports select Scala, select 2.11.8. Click OK. Let's enable Auto Import. Now you can create a Scala object which will be starting point up our application. Create a main method, main and tab. Let's print as simple lane. It's running fine. Next, we'll create a spark session. So before we can create a spark session will have to go to Maven repository and add the required dependencies. So such fortress spark dependencies in the mainland depository will have to add the Spark Core Libraries. Select That is the stable worsen with JDK. Let's add a dependency 600 in the palm dot xml, we manage dependencies of nav on projects through conquered XML. We added the spar co-dependency 2.4.3 and T2 11. Let's also add the Spark SQL dependency. Again, select 2.4.3211. And copy the Spark SQL dependency and pasted in the home dot xml file. So these two dependencies we need to get shattered with spar, Scala programming. Let's now create a spark session. We have the required dependencies in the palm read XML file. So intelligencia will automatically pick up the SparkSession class and asked to to import the library. Let's give our happened name and we'll do a get or create. Now pre-tested first taxes and got created. Let's try to create a w DataFrame. Will create a sequence, and from the sequence will create a DataFrame. We let just two entries. One spark to big data and we'll convert it to a DataFrame. Spark dot create DataFrame and use the sample sequence and convert it to a data frame. Let's say additional print statements. Now let's run it. Sparks isn't God created, data premise created and we can see the output also. We can change the heading by specifying the column names. Let's a course ID and course name. And now it's getting printed with the heading. So this is how you can do spark Scala programming using entelechy. Eloped are required dependencies in the palm read XML file through palm dot xml Melbourne project managers are dependencies you cannot external libraries or jar files. And then after you create a spark Cezanne and get going with the Spark programming. Download when Newton's file from our GitHub repository, future EQ skills sparks color. We did this file to do hype setup on the Windows machine. Simply extract it and copy it to a folder. I will create a folder under the CEO Dr. called Weed newtons and landed that will have to create a bin folder. And under that will store the wind Newton startx. Now go to your windows environment variables and ecosystem enrollment variable to set hadoop home, which would be C, will eclipse folder not been, see when it is. So now we can do Hadoop programming on your Windows machine from intelligencia. You need to make sure Hadoop comb is set. And under the Hadoop job bin folder, the way Newton file is there. So this file is also available in other repositories online. You can search for 8-10 download or you can go to our GitHub repository feature x scale and download from there. Welcome back. Let's now understand how to do spark Scala hate programming using intelligent to create a new project. Go to Finland or create a new project. Select the project type is Maven. Give your project a name, and choose the location where it, where we want to store the project, changed the package name. So we formed this. You should have said you are hadoop home, which I have explained in the previous video. Now click or one and go to the project view. Changed the Java folder named two scholar. Next, we need to archive frameworks support for scholar. And can you be explained these tapes in our other videos? Select skyline, select 2.11.8. Create a new scala class. The page should be object and give the class any name. Eight men and hit tab, you'll get the default method. And let's copy those spark system Creation called from the holder project. We'll just send it to make sure everything is okay. Let's enable high support. So produce Spark can hide programming will have to create a spark session and enable life support. We also represent the Hadoop home directory, which will be C. We notice if required, we can specify the warehouse directory explicitly where the intellectual pick it up from the spark warehouse location. And when you go to another environment from your local environment, you'll have to make sure you have the correct data warehouse directory path. Now let's run it. It given Iraq because snore double to instantiate SparkSession. Let's see. Because hate classes at North found Pm naught aren't any dependencies for Spark hype support. So let's go to marijuana repository and search for Spark Hive. Select the project type and select hours than xi. That is the one we have been using. Copy and paste the dependency in your palm print XML file and make sure scope is changed to campaign. Now run it sparks some God created and they can see that the warehouse directory is getting set to spark warehouse directory. But it will get error here. European command, that lane, which explicitly said STR high unos directory kilogram command and move it to the SparkSession creation section, sparks and got created and we're able to print the data frame values. Let's now write it with a pebble. You can simply do df tartrate typeface, CSV, and we can specify the folder where the file is stored. Now run it. So we can go to the project folder and see that sample sequence local creator can do and that we can see the heart fail. If we get alert while running this, you might have to say permission for your folder. Could do that is simply fantasy command on your windows prompt so that the required permission to execute the high programs using wind written. Thank you.