Transcripts
1. Concurrent Programming Intro: Hey there and welcome. My name is Max, founder of coding with Max and data checkpoint. And this course, we're gonna go over concurrent and parallel programming in Python. We're going to learn about how we can write threading programs, how we can write multiprocessing programs, as well as how we can also write asynchronous applications. And we're gonna go through, and we're actually going to build out a very large threaded program that's going to read data from Yahoo Finance to get a process it and then uploaded into databases. And then we're also going to write multiprocessing programs so that we can make sure we can make the most use out of all the CPU cores available on your machine. And then we're also going to learn about how we can write asynchronous applications, how they work, and what you can use those for. So with that, I hope to see you inside.
2. Threading, Multiprocessing, Async Intro: Hey there, it's max and welcome. In this module we're going to learn about concurrent programming. Now before we jump in and write some cool programs, let's just quickly get an outline and an overview of what we're going to cover here so that we're all prepared and understand generally what we're going to be doing. So first of all, the outline for this lesson is first we're just going to have a short introduction to concurrency, just looking at some examples and why it's even cool and why we may want to use it. And then we're just gonna go into some considerations of things that we have to keep in mind when we think about writing concurrent programs. All right? So usually when we're writing programs, it's very likely that we're going to be writing sequential programs. So one thing happens after the other. And most of the time we're basically almost all the time so far it's written on just using one single CPU. So that means we're not really making full use of all of the resources that may be available to us. So let's take a look at this one example here. Let's say that our machine has four cores and we're running a Python program. And basically what we're doing is we're calculating several metrics. We have data source file from somewhere, and we just wanted to calculate some different metrics. So the way that we can have this program execute is first we calculate the first metric, then we calculate the second one, then we calculate the third one, and then we save it all to a file. And if we have four cores, that means one of our core is, is gonna be running this program and the other ones are going to be sitting idle. Now of course, there are other considerations. You know, if we're running this on our personal machine because we may have other applications open, et cetera, et cetera. But if we were in a Cloud environment or something and we actually have more CPU resources available, then it could be that we're not making full use of all of these resources available. So could be done instead is we could have a program that calculates metric game. And I've kind of changed the sizes here to show the different metrics may take a different amount of time to calculate. So you've got metric a, which is going to be running on the first core. And we've got metric B, which is going to be calculated at the same time on the second core. And then we've got metric SI, which is going to be calculated at the same time on the third core. So because these metrics are independent of each other, we can calculate them all at the same time. There's no reason we need to wait for one to finish. And then we do the second, and then we'll do the third. We could calculate them all at the same time. And then once each one of those is finished, we could also just write them to a file. And then if we wanted to, we could merge all the files together or something that we can also do is we can just usually replace saving to a file instead with writing to a database. So that'll be calculate a metric and then we write it to the database and then, you know, that part is done. So in this case, what we have here is we have a more CPU intensive program. There's a lot of calculation going on, on the bottleneck is really the calculations. And so in this case, it's good to make use of our different cores are different computing resources so that we can do more calculations at the same time. But there are also other programs that we may write where it's not so much calculation, but actually a lot of time spent by the program is just waiting for network responses. So for example, something that we could do is we could request, I'm from somewhere. This could be an API. This could be from a web scraper, dots, you know, hitting website. But anyway, we're just requesting data from somewhere. And then we're waiting for the response just to come back to us. And then now we have the data available. And then we can either write it to a file, for example, or we can write it to a database and running to a database actually is then sending off the request to say, okay, please write this and then waiting until the network response. Yup. Okay, it's written. And then if we are doing this for multiple pieces of data and maybe we, we request the first piece of data and then we wait for the response to them, we write it and then we can request the second piece of data, wait for the response, and then write it. And the writing itself has again, sending off the right and then waiting for it to be acknowledged. So in this case, we're just spending a lot of time waiting for things to happen. We're spending a lot of time waiting for just acknowledgement signals. Yep. Here's your data. Okay, it's finally available to us, such as how to be transferred over the network. Or Here we go, yes, The data has now been saved into the database. So we can have, in this case is we can have multiple threads. So we still have everything running on one single CPU in one single core. But we have multiple concurrent programs running on the same core. So we can have a main thread, which basically, if it doesn't wanna do anything else, there can be some stuff happening beforehand, but really it can just start Oliver child, child threads. And so what the first child thread will do is it's going to request data. And while it's waiting, the second thread can then request it's theta. And then when the data comes in, then it can, the first thread can say, okay, now I've got my data and I'm going to write this to a database. And then it spends time waiting again. And while one thread is waiting for network communication, for network acknowledgements, the other thread can be doing its work. And so you can see here. We have two threads that are running concurrently, all the kind of switching off. One request the data and when it's idle, when it's just waiting for the network to give a response to it, the other thread can be running. Now there is another example of things that can happen is we can have a lot of writing to databases. So one thing here, of course how we can do is we can use threads, again to have multiple threads, each one of them rights to a database. And while that thread is waiting for the database to give an acknowledgment, the other thread can initiate its right, et cetera. Now we can also use an asynchronous program where you basically send the right and then you continue on with the program. And whenever you get the acknowledgement, then you kind of come back to that part completed and move on with the program. So your program isn't blocked by these network IO communication times, but instead you just write to the database. As you can see on the right-hand side here. So we write to the database, and while we're waiting for the acknowledgement, we just initiate the second right to maybe a different database and then we can even write maybe to a third database. And so all of this is going on and in the background, you know, we're waiting until we hear an acknowledgement from any of the databases that we've actually written to. And whenever we get that acknowledgment, we can then continue on with that part of the program. And so really this helps us save a lot of time that we would otherwise spend just sitting idle waiting for network communication to happen. Now this may not seem like a big deal because, you know, internet speeds can be very fast. But if you're doing a lot of network communication as if you're writing a bunch of values. So if you're sending a bunch of requests, your program can be spending a lot of time just sitting around idle, waiting. And so having this option of using threads to kind of do stuff while we're waiting or being able to make use of multiple CPU cores if we have compute intensive operations, this can really be a great benefit to us and it can speed up our programs dramatically. Now, there are of course, some drawbacks and considerations that we need to make here. So the first thing that we need to be aware of as Python has something known as a global interpreter lock, which basically means that we can only be running one thread at one time. So you can see here if we go back to the example, if we look at child one and child thread to wild child thread, one is initiating the request for the data. Chapter 2 is actually idle. It's not initiating its own request. It's only watch held. Target one is inactive where it's waiting. That child thread T2, for example, can start working. So they're not exactly concurrent, but the threads are kind of taking turns. And whenever one is idle, another one can kind of jump in and take that CPU time. And so whenever something is idle, that's kind of when they can come in. So it's not perfect concurrency, but of course we still get dramatic speedups because we don't have to spend all this time waiting and while we're waiting, we're not doing anything else. Now there's also an important issue. We have to think about that not everything may be thread safe. So basically would thread-safe means is that it's okay for multiple threads to be dealing with a certain entity. Or our threads are dealing with data in a way that it doesn't interfere with other threats. Now, one way we can go about thread safety is by using locking. In other ways also just writing thread-safe programs that don't lead to weird things happening when multiple threads are trying to access, for example, share a piece of data. So one of those things that we can get if we don't have thread safety is something called a race condition. Now this is something that you'll see here in threads, but may also come up when you know you're learning about databases. But basically a race condition is when two separate programs or entities or threads, in this case, try to access and modify the same piece of data or even just use the same data. So for example, let's say we have two threads that both make use of a variable a. And thread one reads the variable and then modifies it and at the same time, or shortly before or shortly after thread to also read to the variable and then writes to it. And there are very many different things that can happen. So for example, let's say each one of these threads just incrementing the variable. If our two threads read them almost simultaneously, then they're both going to see the same value. And then one thread, one kind of puts it's result n and just increments it by one. Thread 2 will have not seen that new value, but would've seen the previous value. And so when it increments it by one, it's only in total going to be incremented by one rather than by two because they both read it at a similar time before one of the other ones finished its operation. And then of course you can have other variations of basically the same problem. Where you either overwrite the variable, you perform some downstream operation, for example, addition or multiplying it and then you don't using it down later on. But because threads are kind of happening concurrently, it's very hard for us to exactly know. It's basically impossible for us to know which thread is going to be accessing at first, especially kind of knowing these over a different runs of the same program. So if we rerun our program, this can lead to unpredictable results because sometimes one thread make it the first and sometimes another thread may get there first. And sometimes we may have a race condition. Sometimes we may not because they're also external factors at play. And so. Race conditions can lead to very unpredictable results. And so keeping this in mind is very important that if you have shared data or anything like that, that you always need to think about. Thread safety is the program that I'm writing can interfere with itself. And if you're using other modules or libraries in Python, and some of them will specifically say this entity, for example, this class that you're using is thread-safe or this class is not thread-safe. And so you need to be aware and careful about them because this can lead to very, very weird, tricky and complicated and confusing bugs that are hard to reproduce and very difficult to track down. So the other thing to consider is first of all talking about thread. Specifically, threads run on a single CPU core and they all share the same memory, which is why we have this issue of thread safety and race conditions. And there is also a small overhead associated with just thread switching between the different threads, as well as just management of the threads at self-creation, tearing down of threads. Whereas for multiprocessing and what we have is we have different the same program running on different CPU cores, or the same part of this program running on different CPU cores. And so actually in this case, we don't have a shared memory. Every single process has its own Python interpreter and its own global interpreter lock. So of course, that's great because we don't have to have some of these issues that we have with threats, for example. But there's also more memory. There's also more overhead associated with this. So one of those is reduplicating a lot of the memory usage. And there's just a lot more memory that needs to be used to create and replicate all of these different processes across the different interpreters. And just about concurrency in general, it's very cool and it can be extremely useful in fast. We also want to make sure that the programs that you're writing are actually going to be worth it to write and a concurrent manner. Because writing and debugging concurrent programs can get very complex and it becomes difficult sometimes to find issues in your logic or to debug certain issues because of all this concurrency happening. So yeah, With consider concurrency, we can definitely get amazing speedups. But also remember that not every program has to be concurrent. And it's totally fine, you know, to just continue writing a lot of sequential programs. But then you'll notice like, Oh my God, I'm spending so much time waiting for all this not work. I o, I can really make use of multithreading here, or there's no reason for me to calculate these 80 metrics sequentially. I'm just going to use multiprocessing and make use of four or two or eight or hover many cores you have available either on your machine or in the Cloud environment and just going to speed the process up. So they're definitely write situations to use it. But there are also, you know, we don't have to write every single program concurrently. So just keep that in mind because concurrency does add complexity to the writing and the logic and also to the debugging.
3. Threading In Python: All right, so let's go ahead and actually write our first concurrent program. We're going to start off with threading. Now I'm going to be using PyCharm for this, but you can of course, go ahead and feel free to use whatever your favorite coding environment is. I quite like using PyCharm when we have multiple files, which I will actually end up, you know, going towards the later part of as we're progressing through the threading. So yeah, quite like working with pie chart for this. Otherwise, you know, if I'm doing a quick testing and prototyping and I need to manipulate or kind of want to look back at a lot of data that was created throughout the course of a program or just kind of play around with things. For that, I usually like to use a spider or Jupyter Notebooks. But aside from that, I quite enjoy using PyCharm. Again, this is completely up to you. So I've just created a folder on my desktop here where we're going to be saving all of our work. I'm going to create a new file and call this box. I'm just going to call this my main, but you can call it whatever you want as long as he had the dot py extension. And I'm going to be doing something here that you'll likely see a lot in Python programs. And we're going to start off defining a main function. And we're gonna do pass here, which just lets us basically say we'll get back to this later up. So right now it won't do anything. We're going to have the syntax here. If underscore, underscore, name, underscore, underscore is equal to. And then we've got underscore. Underscore, underscore, underscore, main underscore, underscore. And then we're going to run the main function. Now, basically, what this syntax means is that if this program is directly run, not when it's imported or anything, just one this program is specifically run. That's when this will get executed. Otherwise, if we import this, then it won't get executed. So this is just coordinate eyes syntax. I quite like it. And you'll likely see this in a lot of Python programs. Of course, it's not completely necessary, but it's kind of nice if you want to test the code that you're writing here itself when he kinda want to run it. But otherwise, if you want to import it, you don't want it to be run. All right, so we have our main function here. And basically what we're going to be doing is we are going to be writing, well, two functions. The first function is going to calculate the sum of squares. So we're going to call it def, calculate sum squares and it's going to have an input. We can just say n for now. We'll define that in a second. And our second function is just going to sleep a little. And then we're just going to provide, and here the input and seconds for how long we would like it to sleep. And we'll define that in a second too, because first we just need to import the time module, which will allow us to do this sleeping. So I mean functional modify that in a second. So for sleep a little, all that we're going to do here is we're going to call the sleep function. And we're just going to sleep for that many seconds. For the calculate sum of squares, we're just going to have a sum squares variable that we're going to initiate to 0. We're going to have a for loop for I in range. And, and in each iteration we're just going to have our sum of squares add on this number squared. And then at the end, let's just print out the sum of the squares here. Okay? So now in our main function, Let's go ahead and have 24 loops. I'll say for I in range five, we're going to just run the calculate sum of squares. And this is going to be our input here. And we're going to have a second function that's going to say for I in range, starting from one, going to sex. So we actually sleep the first iteration, we're going to call sleep a little. And we're going to have our value, I hear. And we're also going to have timers that we want to set up. So we're going to measure the calculation times of our program. And we can also use the time module for this. So we're going to have our calculation start time is going to be defined as the current timestamp at this moment. And then once this is done, we're going to say calculating sum of squares took. And then we're going to take the time at the current moment minus the timestamp that we have at this moment. And we first saved our variable here. Now, this is going to give us an output with a lot of decimal numbers. So what we're gonna do instead is we're going to use the round method, and we're just going to round it to one decimal place, place to get this kind of reduced a little bit, a little bit cleaner. And then what we're gonna do for sleep littles, we're going to do a similar thing. We're going to have a sleep, start. Time, which is also just going to be the tracking, the time at this moment, saving them to a variable. And then again here we're going to be sleeping. Took the current time minus the start time. That's how long it took. And to actually have the calculations take some time. We're not just going to put in your eye, but let's go ahead and make this be I plus 1, first of all. And then we're also going to multiply it by 1 million, just so that it takes a little bit of time. Now we can run this. Just, for example, clicking the Run here. And it's going to, well, start running the sequential program for us. And so we can see here currently the sum of squares is at work and took about seven seconds, rounded to one decimal places we can see here. And now our program is just going to be sleeping iteratively. So 1 second, 2 seconds, three seconds, four seconds, five seconds. And then at the end of that, we're going to see, okay, so that whole process took 15 seconds. So in total, our program took about 21 seconds or 22 seconds. You've got 15 here plus seven. So seven of those were for the calculation and 15 of those were for the sleeping. Alright, so this is a sequential program. So let's go ahead and try to add some concurrency here. We're going to use threading for this. So we're going to import threading. Now, the nice thing is, threading is actually part of the standard Python installation. So you don't really have to pip install anything else. You can just import it directly. Now for each of these loops here, we're now going to replace this instead with a thread. So we're gonna do in each of these cases, is that we're going to create a thread and we'll call it t, is just going to be our short variable for thread, threatening dot thread and other several variables that we need to define here. The first thing is the target, which is going to be the function that needs to be run. And for that, for this case it's going to be calculate sum of squares. Note I'm not putting the parentheses at the end. It's just the function name because we don't want to call it yet. This is kinda the reference to the function. Then we can also provide the arguments to our function. We have a separate keyword here, args, and this expects a tuple. So since we only have one input argument, we're going to have our input value here. And then we're going to o. Let's have our, let's define our maximum value is going to be this here, just to have it and don't be a little bit shorter. And so our input tuple is going to be this, but says it has to be a tuple. We have to put an extra comma here to make sure that it's a tuple of size one. Then we're going to comment this out. And then we're going to do the same thing here. We're going to create a thread. And we're going to have it target the sleep a little function. And the input argument there are going to provide is this eye here. And let's just rename this to seconds. We don't need to know these names are the same. We don't need to have this. This just makes a little bit nicer to read. So I'm going to rename this to seconds again, make this a one tuple. And we're going to comment this out. Now, if we run this, it'll be a little surprised because actually nothing is going to have them quite yet. And the reason for this is we actually need to initiate the threads. So the next thing that we have to do is for each thread will have to call t dot. Start. Like this. Now we can run this and we're still gonna get a little bit of a weird result. So let's go ahead and see this. So notice how our program kind of continued on. And then we have the printing outputs. And then this is here. And then we had the sums printed out. So this obviously isn't how long your program took because there's still other stuff going on. But this is just because the threads are running concurrently. So what we want is what we want program to be blocked to wait until all the threads finish. At this point. This is when all the threads need to finish. And then we want to print out how long this took. So to do that, we're going to set up a, another variable. Have it be a list. We're going to keep track of each of these threads and then just make sure our program blocks until they're all finished. So we'll just call this variable current threats. And again, you can call it whatever. And every time we create a thread, we're going to append here the thread so that we can reference it later. Now we're going to loop over our list. And for each element, which is the thread here itself, we're going to call dot join. Now what dot join does is that blocks execution until this thread is done. So we're iterating over this and we're basically calling dot join, which means that nothing is allowed to happen until that thread finishes. So it can't go any further than this, basically. So we've done this for here. Now let's go ahead and do the same thing here. So let's just reuse this variable and reuse this. Let me put this here. And then again, we're going to call dot join to make sure that our program is blocked. And then we wait for this to finish until we then execute this. So if we go ahead and run this now, we can see now, all right, so our execution is actually kind of going as expected. And here now we have our sleeping, okay, so something interesting to notice. First of all, hear this one a little bit quicker, but, you know, really nothing to be excited about. And really this is just because this is, this is a computation intensive method. And so most of the time you're not really going to be getting any performance improvements in terms of like just computation stuff here. So really nothing that is all to fascinating here. And if we wanted to speed improvements at this section, then we're going to need to go to multiprocessing since we can use different cores. But here because each thread kind of can only be running at one time and this is CPU intensive, really all of these calculations, there's not a lot of IO going on. Um, so, you know, it just kind of basically happens sequentially anyway. But the cool thing is here, and the sleeping is that illness is a very, very simple example. The sleeping happens on a thread level, so a different thread can execute while each of these threads is sitting idle. So rather than taking a total time of 15 seconds, which is what our sequential program took. It only takes five seconds, which is basically the longest sleep that we had. The maximum here is five. So we can see in this case, of course, this is the very, very basic example. We're going to be a lot more useful for not sleeping or program. But instead, you know, if this is some sort of like network connection, like making an API request, writing to a database from getting from a database or, you know, whatever else, anything heavy in network IO that just spends where our threads just are, where our program spends a lot of time being idle, just waiting for network communication and for maybe the server on the other side to kinda do its thing and then send back the response. Now we've kind of seen the basics of creating threads here for different functions, but I wanted to show you some other things I think are interesting. So the first thing is what would have happened if instead of calling join at the end here, we would instead call it here. So let's go ahead and call t dot join here. And let's go ahead and rerun this. So what happens in this case? Now this one is still calling the join at the end here. But again, this is like a CPU. The function or intensive in quotation marks of course, but you know, CBO focused. So you can see here it basically seven seconds, which is exactly what we had before. Again, we don't expect any change here. But in this case, what happens is the Join call here means actually the execution is blocked. No, this loop does not continue. The execution is blocked until this thread finishes. So now we're getting back to the 15 seconds that we had before. Because we're not starting all of these threads and letting them run and then blocking the program at the end until all of these threads are done. But instead, we start a thread and then we block the whole execution until that threat it's done. Whereas before we started all the threads and then we blocked execution until all the threads have done. So you can see here even just a slight syntax of having the join here instead of here. In this case, a very large impact on us. Now, there's another thing that I wanted to show you. If we take out the joint statements, which is actually what we hadn't beginning. And if we just remember, if we rerun this, we kind of get this weird thing where we, you know, the program kind of does its thing, the main thread here. And now we actually get all the printing and actually the sleeping is also happening for these different threads. So we've got 10 different threads running five from here. Well actually 11 because this is going to be our main one here, the main program, we create five extra threads here. Five extra threads here. And this one finishes as basically, you know, doing its thing, it's not blocked by this, does these printings, and then this one is running. And it's printing out the sum of squares. And in the meanwhile, these five threads are just sleeping. We're not printing any sense, so we don't see it. So something that you can also happen is you can add a daemon fly here and set this to be true. And we can do the same thing here. Now this becomes interesting when we run this. Because what happens is we run through the main thread and once the main thread finishes, the whole program stops. So what the daemon flag means here is that once the main thread is done, all Damon flags Autor. And it doesn't matter if they finish or not. If a thread is not a daemon thread, then every thread needs to finish. Every non Damon threatens to finish before the whole program can finish. But Damon threads do not need to finish for the program to finish. Once the main thread is done, then if any daemon threads or left, it doesn't really matter. That doesn't stop the program from finishing. So this is an interesting thing, you know, depending on the situation you can or even wanted or you don't want it. But just be aware that having the statement flight here means that these threads, if the program finishes, the main program finishes. And if something is a daemon thread, then that means it won't finish, it will just stop, the program is done. Whereas if we have the Daymond flag to false, then these threads also need to finish until the whole program can finish, which is exactly what we had before. So while this is running, let me just put this back. Now there is of course an interesting thing that we can use with joints. If we have join, again, the joint statement means that the thread that each of these threads that we're calling the join on needs to finish before we can continue on at this point, because it's basically blocking until all those threads are done. So if we rerun this, despite this being Damon threads, because we're calling these joint statements which are blocking further execution. We basically ensure that the programs finish. So we can see we kind of get back to the previous results that we've had. And we can take this even further to say, rather than going over Oliver threads, maybe we only block until the first three are done. And so our sleep here. We would expect this to be three seconds since the other two are Damon threats. And so it can kind of continue on. But the first three are actually going to be sleeping and are going to be blocking execution. So you can see here that's exactly what we get. The first three are blocking execution once the third thread finishes of this section, which is a sleep for three seconds, the program contingent continues on, it finishes, and the only remaining threads are Damon threads. So the whole program is done.
4. Creating a Threading Class: All right, so we've just finished writing our first kind of threaded program. Now what we're gonna do is we're going to clamp a little bit and we're actually going to turn these into classes. And I'll show you how we can actually use classes for threading to. So to do this on first actually going to create a new directory. And here we're just going to have our different workers are different threads just to, you know, have some structure. And then here, this is going to be our sum or calculate sum of squares function is basically going to be put in a class so that we can reuse. It doesn't make that much sense for this use case, but for other more general purposes. So this is going to be r squared, some workers dot py, just in case we want to have several. And this one here we're going to have another one which is going to be your sleep function. So we're going to have it be r, Sleep be markers. And I'm also going to and, and underscore, underscore init, underscore, underscore dot py file, which is going to turn this directory here for me into a Python module that I can import from. So don't even need to put anything in here, but it's just going to allow me to import from this directory. All right, so let's go ahead and start with r squared. Some worker. First we're going to import threading. And then we're going to create a class. It's going to be r squared sum worker. And this worker is going to inherit from threading dot threat. So it's kind of inherit from the thread class here. And then we're going to define the initialization of the class. And we're going to use this keyword super. Usually just auto-completes it. So there we go. And we're going to for parent class initialization, we're just going to use the super method. Now, the super method here, we just define the class that the parents have which we want to initialize. And then we call self here and then we call dot in it. And basically all of the parent classes that this class inherits from will then be initialized. So we don't need to actually initialize every single parent class separately. We can just call the full initialization of all parent classes like this, which is quite convenient. In this case, of course we only have one class, but, you know, if we had multiple here and we just wanted to initialize them without any further trouble. Then we can do it like this. All right? And then what we have in here is we're going to have a calculate sum of squares. So we're going to take this over. And because it's a class, we have to put in here self. And so all this method here is going to do is it's going to calculate the sum of the squares exactly the way that we had it before. Now there are actually two different ways that we can go about doing this. One way is we can leave the input here so that we can just provide it through this class method. Or the other way that we can do it is we can have it be part of the initialization of the class. And then we can set an attribute of the class. And then we don't have an input here. And it's damp, just reference the attribute. Now in this case, it doesn't really matter that much. It may actually be cleaner to use it in here. But instead of just having it like this is actually kind of a syntax that I quite like. Because then, you know, if you're writing one more methods, they end up having like a lot of input parameters. And so in this way, you can just initialize your values. Your IQ can just initialize your class for all the values and then just reference internal parameters rather than having to pass them to all of these different methods if it gets to that point. So we're actually going to use this method. And now there's something else that we can do, which is we can do star, star quarks. And so what this means is this is going to be a mapping of key value pairs for additional keyword arguments that we may want to pass. This could be, for example, name equals, and then whatever the input name is going to be. And then we can have, you know, like persona. I don't know where that came from, but, you know, you can just have all these other input parameters. And rather than defining each of the mountain kind of leaving it open, we can actually just have this be this star, star keyword, arguments. And Maybe there are parameters that we actually want to pass into the threading that we want to set. And so we can just pass these keyword arguments down further here. And so in this way, it's really nice because we can provide all these initialization parameters that can just be passed on to parent methods or we can also use them ourselves. Now, something else, of course, that we have to do is we have to start our thread. So we have to do that to make sure that it actually starts running. Now the interesting thing is when we start the thread, really if you want to do the calculation, we have to put it into the run method. So if we have a run method and if we call self.age star because we're inheriting from the thread class. It will start the thread and I will just start running the run method. And so we can overwrite the run method that we inherit from the thread class to do the running for us. So in this case actually what we're going to do is we're just going to calculate the sum of squares. So we're not actually even going to call this method. We're just going to create the class and then it's going to perform the running for us. Alright, so one more small syntax thing here. I'm just going to add an underscore because just to indicate that it is kind of like an internal method that we're going to be using, you know, internally in class. All right, so this is going to be our threading class, 4 squared, some marker. Now let's go ahead and do the same thing for our sleep Yorker. So again here we're going to import threading. We're going to create the class sleepy marker, which is going to inherit from threading dock thread. We're going to initialize the class and initialize the parent class. And we're going to allow keyword arguments that we're going to pass on to the initializations of the parent classes. And then also here where we can put in is seconds for how long? For how many seconds we want this to sleep for, which is, of course, we'll be using a second. So here we're going to have our sleep. A little function, which is just going to call time dot sleep on the seconds variable. Now you'll notice a nice cool thing up PyCharm did here for me is we didn't import the time module book because, you know, it's readily available. Want to kind of recognize this as an option that we were using it, it actually did the important for me, but otherwise of course, you know, make sure to import the time module. All right, so we're gonna sleep for the number of seconds that we passed in the initialization here. And then we're just going to override the run method to call sleep, a literal method that we've just defined here. All right, so going back to our main function, we no longer need the threading and put here, I think you'll see us again. We're going to, from workers dot sleepy workers, we're going to import our city worker. And from workers dot squared of some worker that we're going to Concord R-squared some worker. Alright, so now we can replace the creation of this loop here instead with just initializing our class. So we're going to have our sleepy worker be an instance of sleepy worker. And we're going to want a hopes that go below. Here. We have r squared, some worker, which is going to be an instance of the square root some worker. And we're going to have the maximum value here as the input parameter. And what we're going to do now for the join is for current threads. Let's rename this to current workers lumps. Technically the same thing that we have here. We're going to keep track of our worker that we've initialized here. And again, we're just going to call the join method on here to kind of block until this execution from this done. And then we're gonna do the same thing for our sleepy worker. And here are, seconds are going to be the seconds I provide here. And this we can comment out so that we can reference it in a second. And again here we can just change this to be our current workers, which is going to include each of the instances that we have here. And then we're just going to call dot join again on here. All right, so let's go ahead and run this. So. I'll recognize that AMI, we should be doing the exact same thing. And it looks like we encountered a problem here. So let's see where the problem is on our join methods here to saying we can't join a threat before it started. So probably forgot to start the thread internal here because we no longer call it here. So let's go ahead and try that one more time. All right, There we go. So we've got our execution output here, and we've got our sleeping, which is currently only three seconds since forever. Still have this syntax here, the minus two. So we can see here we were just, you know, in this case it was pretty straightforward. We're just took her two functions here and put each of these into a separate class, which of course for this case doesn't make that much sense. But if you're doing complex things like, you know, you want to write specific values to a database. And part of that process is structuring all the values and formatting them so that they're in the right structure and then uploading to the database. Then if you have a specific database worker can just put in the raw values and it will just structure the data for you. And then we'll perform the uploading. And all of that is kind of hidden from our main function is taken care of within each class itself. And of course here we're still calling the dot joins in each of these just to kind of block the execution. Now, we can see here or we can't build. We'll see in a second. Remember we had these Dan parameters and we can still do the same thing because we're passing on the keyword arguments. We can set Damon here to true. And then because our sleepy worker accepts all these keyword arguments and passes them on, it's going to take this because it's a keyword argument or finding a specific value, it's going to take this and pass it on to the parent classes, which in this case it's just the thread. So it's going to set the daemon parameter to true, which of course we can see the effective if we remove the join Call here. So let's go ahead and just run this one more time just to kind of see the effect of adding that keyword arguments parameter at the top here. So you can see here the stamen parameter was then passed on to the parent because we're no longer calling the join method, we get the exact same thing that we saw in the previous lesson. Now another thing that we could also do is if we wanted to just have all of these classes always be daemons. We can also set the daemon parameter in here, which will have the same effect. Except of course in this case the logic is hidden from us, which may not be the best thing, and we don't really have much control over it unless we specifically know what we're looking for and we update the attribute of the class itself. But actually something important to know is that the daemon parameter needs to be set before you call the start method. So because we're also calling start in here, if we take our worker and set Daymond to be false, it's not going to work because the damp parameter needs to be set before we start the thread, which we'll see in a second here. So, yeah, there we go. We get the error for this. So of course, you know, hiding this is not the best thing and it's better if we wanted to do this, to pass it on with the keyword argument option that we have here. And in this case, we're just making it a little bit cleaner because these functions, first of all, are no longer here, but there are methods of the class itself. And, you know, all of that logic is kind of passed on to each of the different workers in quotation mark that we have here, they all kind of do their own separate work, responsible for a separate or specific thing. And so we just say, all right, we're going to run, this worker is going to do this thing. And we didn't call the join at the end here. We're going to run them worker. It's going to do a specific thing. And really the main program is just for the execution order and general logic. Whereas all of them like big processing stuff. We don't need to clutter this file and make it very large. We can kind of put each of these into separate files or into separate classes here. And then, you know, if we need to look at the specific logic, then it will all be contained within this one file. Anything kind of relevant to the execution of each of these classes.
5. Created a Wikipedia Reader: All right, so now that we've kind of created the basics of having our threading classes. Let's go ahead and jump into the actual project that we want to be doing to kind of implement this threading and a more useful way. So what we're gonna do is we're gonna take the list of S and P 500 companies on this page here. And we're just going to scroll through all of these symbols. And then for each symbol that we have here, we're just going to visit the corresponding Yahoo Finance page. And we're just going to extract this current price right here. And then we're going to save those things into a database. And so the process that we have here is basically we have three things going on that can kind of work independently. And there's gonna be a lot of network stuff going on that really we don't want to be blocked by. So the first thing that we can do is we can just grab the list of companies from here. And really what that entails is just one request to here. And then just scrolling through this table and always just taking out the first entry. Once we have each of these entries, we can then make a request here to the site and get out this value. So that's just a network request there waiting for this value, kind of extracting it. And once we have that value, we can insert it into the database. So we can see that this would probably be a really nice application for threading if we want to make this as fast as possible, because there's a bunch of network stuff going on, you know, loading pages waiting for it, inserting into the database, waiting for the confirmation. And there's not really a lot of CPU tasks going on. So, yeah, To do this, we're just going to start off by writing the class that allows us to extract the entries from here. So in our workers, I'm going to create a new worker here. And we'll call it a Wiki worker. And for this, we're going to be using the requests library to actually do the requests. And we're also going to be using Beautiful Soup. If you're not familiar with Beautiful Soup or if you've kinda forgot about it and make sure you just check out the web scrapping class again. But otherwise, the library is pretty straightforward. So just kind of going through with it here. It should be relatively straightforward to kind of follow along. We're going to have our wiki worker class here. And we're going to have our initialization method. And really what we can do here is for now, we can just hard code this to be URL that we want to scrub from. I'll just copy that over. And really when we create our class, Let's all we're going to be doing is we're just going to have her yuan or L, The, you know, this class attribute. So the next thing that we're gonna do is we're going to create a class method which is going to be get S and P 500 companies. And what this is going to do is it's going too well. Like the method implies, it's going to get the company's for us. So we're going to make a request. We're going to use the request library, make a GET request to our URL here, like this. And we're just going to assume everything is going on, but we can just have here and like if our status code is not to a 100 and I'm going to return an empty list and maybe we can print out like I couldn't get. And trees, you know, whatever. This is just kind of basic. Of course you can extend this to all the different status codes that you want to care about or even do some logging here, but we'll just have this for the bare minimum. And then we're going to use, rather than having everything in the same method, let's just go ahead and create a second method that is going to actually extract company symbols. And this is going to take in the page HTML. And this method here is going to be the thing that is actually going to be responsible for getting out each of these symbols. So let's go ahead and look at the page source, which you can either do by right-clicking here and view page source or Command Alt J. And we're looking for, Let's just go ahead and get an understanding of the structure. So let's look for the first symbol here, which is just going to be m. And if we kind of collapse this down, this is the table that we're looking for. And this table actually quite nice as we can see. So there's an idea associated to this table. And if we just look this up, then we can see that this is the only ID to this table. So there's no Look at US or anything like that either, which is very nice. So yeah, let's go ahead and create our beautiful loops. Beautiful Soup instance, which is going to take the page HTML. And we're going to use the L x. I'll parser. We're just going to put this here because notice if we don't put this here, we're just gonna get a warning or not even a warning, just like a notification. We can leave it out for a second just just to see. But yeah, just specifying this so that we don't get that logging, which can be a little bit annoying. All right, so the next thing that we have to do is we have to find this content table first, and then we're going to loop over this table. So our contents or our table is going to be soup dot find. And we're going to find by the id, which is going to be the constituents. And I'm just going to go over this a little bit faster since this as more of like web scraping thing, not necessarily focused on the threading, but just so that we have a worker available for us that can actually get these contents so that we can continue using them. So this is our table. And now that we have our table, we want to go over all of the rows. So our table rows. If we go into our table, we're going to find all of the table row tags in our table here. So we can see there's one in the header, and then there's for each row, we have one in here in the body. And then we can loop over our table rows. So we can say for table row in a table rows and we're going to skip the first one. Because you'll notice if you're testing us that the first one is actually in the header here. And so of course, you know, there's no symbol to extract here, so we're going to skip the first one. And then for each table row, we're going to extract the symbol, which is going to be, we're going to find the first T d tag, which we can see here. And from here we're going to take out the text. And we're also going to remove the newline character at the end here. So these are going to be our symbols. And now we can either collect these mentalists and then return the whole list. Or we can also use a generator and just feel or symbol. And then from here we can say yield from dot, extract company symbols. And I'll just need to pass the page HTML DOM, which is going to be just the text part of our response here. All right, so one thing to notice actually is that we're not using a class property here. And so what we can do is we can actually make this a static method which belongs to, is basically shared across all instances of the class. And then we don't, because we're not using any class property is itself. Okay. So this is going to be our Vicki worker. But of course, Let's go ahead and test this. Now because we're using Beautiful Soup. Really. At this point it becomes, there'll be, be a little bit nicer if we want to reuse this later on. Just to make sure that all of our environment, all of our libraries and stuff are kind of contained. We're going to create a virtual environment. So I'm going to go python minus m, Then, then somebody to create a virtual environment. And the current directory that's going to be called Ben. Hit Enter, which is going to create our virtual environment for us. And this just basically gives us a kind of fresh Python install that we can use, which is just if we activate it, it becomes a little bit nicer because then we're kind of aware of what libraries we are using and we can make a specific requirements file so that everything is kind of self-contained and not dependent on our current system setup. So we're going to go ahead and activate our virtual environment. And if we just go into Python, for example, now you can see where in our virtual environment and I try to import this class, we're actually getting an issue because right now we don't have beautiful superstorm. So we're going to say from workers dot wiki, worker, we're going to import key worker 0 and even request isn't installed. So we're gonna go ahead and do pip install requests. And because we're in our virtual environment, is actually going to use the pip version from our virtual environment. So you can see here the PIP for virtual environment here. And then the next thing that we wanna do is we will also want to install Beautiful Soup. So to do this, we're going to type in pip, install the OH, to full soup for. There's no type on here. Okay, Great. And yeah, these are the two libraries that we shouldn't need. So we can go ahead and try that. One more time and great. There we go. It works. We can test this in a second. But let's just also write our current packages into a requirements file so that if need be, we can kinda recreate this later on. Alright, so let's go ahead and test out this wiki and worker. And we're going to make an instance of n. And then we can say for symbol in McKee, worker dot get S and P 500 companies, just calling our class method here. Let's just go ahead, print out a symbol, and then let's interrupt her. Generation from a typo here. Alright. We're going to print out the symbol and then we're going to break. And yeah, here we go. So this is though, a warning that we were getting that kind of like choosing a specific 12. That's why you can just put L, x and L in there. Just let it doesn't do them. All right? Okay, So looking at our simple, all right, so we've got the first one. And then let's just have a symbol list. And let's do the same thing. But let's attach every symbol into our simple list to make sure that we're not getting anything extra, you know, at the end or something like that. And then we're going to look at the length of our symbolised, which has 505, which is actually the number of companies in this. I know it's kinda confusing because it says 500 and it's actually 505. But that's great because that's exactly what we were expecting. And two, no extra confirm those. We can look at the last one. Scroll all the way down. Once too far. There we go. Okay, Perfect. So we now have our class to get the symbols. Now again, we kinda went through this a little bit faster just because again, we want to make sure we focus on threading here and not specifically going through building out west coppers. So, yeah, if you're familiar with Beautiful Soup and wipes cropping, great. And if you're not, you probably would have been able to follow along anyway, since our syntax is quite straightforward here and we're just going through different HTML elements. But if you want a refresher on that, if you want to learn this mixture, you go ahead and just shake up the web scrapping class again. But yeah, so now we have our wiki work are available to us, which is going to be the first thing of course that we need. Which is going to basically see to us with all of our values that we can then use to, you know, later on, look up the individual companies by kind of manipulating this URL here. And then we can also go ahead and once we extract this value here, we can then go ahead and send that into a database. So let's stop here for now and then continue with that in the next lesson.
6. Creating a Yahoo Finance Reader: All right, so now that we have our wiki worker setup, let's go ahead and write the class, which is going to be a threaded class, which is going to extract the price from using this Yahoo Finance URL here. All right, so I'm also going to just delete these two workers here because, well, we no longer need them. So let's go ahead and delete these real quick. We're going to create a new worker here, which is going to be Biao who finance price. We'll call this Yahoo Finance workers. And then in here we're going to first import threading because we're using it. But then it'll have a yahoo Finance price worker, which is going to inherit from threading dot thread, like we did before. And then in our initialization method, we're just going to make sure to initialize that to. And we're also going to accept keyword arguments that we're going to pass on to our parent classes here. Alright, so this is what we had before. Now let's also just make sure to start the thread so you know that we'll get to that point. And then we're also going to have the, we're going to overwrite the run method, but define it in a little bit. Okay? So what do we need? Well, we also need to accept the symbol that we want to have. Sort of want to save that symbol as a class attribute. And then we also need the base URL that we want to use, which is basically going to be everything except for this last part. So this is going to be our base URL. And then at the end here, we can, well, actually, we can have our base year old just defined here. And then we're going to set the actual URL to be our base URL. And let's use formatted strings here. And then we're just going to add the symbol at the end here. So yeah, we're going to just have our base year. I'll be the first part right here. And then over here, we're going to add on her are similar at the end. And so really this here is just going to give us whatever the symbol is. It's just going to basically give us the same result as this. It's just a neater way in my opinion of expressing this. Alright, so now we have our worker here, and now we want to overwrite the run method, which is basically what we have to do here is OK, So now we have this urinal. So, uh, once again, we need to extract this price here. So let's go into the HTML and let's try to find this. So we've got it here on, it's actually in the title. That's very convenient. So let's see if it's anywhere else too, but we could already get it from there. It is also here. And let's see if it Okay. So also returned in script somewhere. So there are different ways that we can go about doing this. One of them is of course, we can extract it. Where was that? From? Where it's appeared as we can extracted from here. But if we right-click and we copy the XPath, and let's just go ahead and take a look at this. Yeah, so it doesn't look like particularly clean. There's no like specific ID or anything that we can use here. And because it's also included in the title, we can't also just extract it from there. This is of course, a little bit dependent is assuming that Yahoo kind of keep this format. But, you know, if, if they change it up, like if they change also this format up here, we're also of course going to have to adapt our scuff or anyway. So let's just go with the simpler option for now. We're just going to be extracting it from the title itself here. Alright. So again, we're going to import from BS folder. We're going to import And Beautiful Soup. And let's go ahead and just test this out or news though. We also need to import the requests module. And let's just go ahead and use this for testing. So we're going to go sets our URL here. We're going to import requests, will say, alright, equals request dot get our URL. And then we're going to have our soup be an instance of Beautiful Soup, taking in the text of our requests. So I have a typo here. I didn't important, important. Try that again. We're gonna go soup dot title, which actually directly lets us access that, right? So fortunately it does, looks like this content is dynamically generated, but that's okay. Off to go about this a different way. We're going to use the L XML library instead. And really we're just going to use it for a very simple thing, which is just going to be making sure we can use XPath since it's currently not supported with Beautiful Soup. So we're gonna say from x, I'm L, x and L, and we're going to import HTML. So I'm, of course we don't have it installed. So let's go ahead and quickly install up stall L x L. There we go. I'm going to write this to a requirements. Put up our Python again, let's try to import that. There we go. And let's take, don't need beautiful superior anymore. Let's go ahead and have our you are we're going to go send a request and probe requests. All right. And then we're going to say, well, our page contents we can call it is going to be HTML from string. So we're just going to import HTML here from L XML. This is just going to help us parse using XPath. I'm again, you'll notice the syntax is very straightforward, so no need to be familiar with this library. We're just going to use basically these two methods from it. First, we're going to put in here the string HTML, which is going to be saved in here. And then this has an ex-pat x path method which we're going to use. And so what we have to do here is now we have to get the XPath for this, which we can do very easily by just right-clicking Copy XPath. So once we find this element here, which we can just do by searching like this, you can right-click Copy XPath, and then go ahead and put that in here. And there we go. So we can see if we can access the first element, the text, then we're going to be able to extract the value here like this. All right, so let's go ahead and implement on. So we've tested this out to make sure that we have something that's working. So let's go ahead and do that again. We're going to have our well, actually we can just copy that, so since we already did it. So we're first going to issue our request, which is going to be through this. And then we have our page contents like this. And then we want to have a price like this. And we're not doing a lot of error handling here. For example, what happens if we don't get a response here? This value cannot be converted to a float or whatever. So of course, you know, if you want to make this more robust, you can add status code checking here like we did in our wiki worker. You can add try-except statements to handle different cases where you may not always be getting out a float value. But just to kind of to keep this simple, we're just going to keep it like this without doing too much error handling. So now we have our price, and then we can just go ahead and just print out our price for now. Alright, so let's go back into our main function here. And now we're going to change this up a little bit and import our two workers and just have them working together a little bit. So here we want to import the Yahoo Finance price worker. So first thing we're gonna do is we're going to create an instance of our wiki worker. And we're going to say for symbol in Wiki worker, don't get S and P 500 companies. This is just going to give us a one symbol at a time. And now we're going to create an instance of the Yahoo Finance price worker. And the symbol that we're going to pass here is just directly going to be the symbol that we have here. And then if we want and keep a list of our current workers, like we did form. Penned are Yahoo Finance, price worker. And then just use the join method here. And take this out. We can still do distracting time. Took and rename our variable here to scrap restart time. All right, so we need to make sure we use the instance of her wiki worker here, not just the cost definition itself. So let's go over what we did. And basically we kind of, we're applying what we learned for threading here now using our two workers. So the first one we have is our wiki worker, which again, if we go over it, it has the main method which we're going to use here, which is going to get the S and P 500 companies, which is just going to send this request. And for this request from the table elements, It's this, I'm just going to extract the first element, which we assume is going to be the symbol. And that we're going to use as input far Yahoo Finance price worker. And for this, we're then going to send a request to this URL. We're going to extract from the HTML using this XML HTML class here. We're just going to use this so that we can actually get the XPath kind of have this be an easy way to get the XPath an extract the price from here and convert it to a float. So we're going to be scrolling over these company lists that we have here. And for each symbol that we get, we're then going to create a thread, which is going to create this request. And then it's just going to print it to the output for now. So let's then we're tracking the execution time just like we did before. And here we're calling join at the end, just so that basically everything is blocked until every single thread finishes. And we don't even need to do anything else here because it has the, we're overriding the default Run method, which happens right after we start the worker. So, yeah, before we run that, I want to implement a one more thing which is just going to be slowing down our perimeter little bit. We're going to import time and we're going to import random just so that we don't spam. We're going to say Time dot sleep. We're going to say 200 random dot random. So that means every thread is going to sleep between 0 to 20 seconds since this generates a random number between 01. So that's going to slow things down just a little bit and it's going to make our program run. Well, I guess at this point, like a maximum of 20 seconds of something does sleep like 20 seconds. We can even push this up to 30 just to, you know, like not spam Yahoo while we're doing this. Because we're going to change things up a little bit later to not like spawn an individual worker for every single symbol that we have here. But yeah, and actually I do want to implement an extra check here, which is going to be if our status code is 200. Then for now, let's just, let's just stop. And, you know, stop execution so that we don't get like span with error messages and stuff. So let's go ahead and run our main function. I'm gonna do it over the terminal this time because I have my virtual environment setup here, which has everything installed that I need. So let's go ahead and run this. And after waiting a little bit, There we go. So we have the prices starting to come in here. Some interrupt us. But yeah, everything is kind of working as intended. Now, I can guarantee you there are probably some errors here that we're not catching, which may be due to a variety of reasons which we should be dealing with better. But for now we're just going to leave this as is for now. We should at least be like logging this out or something like that. But whatever for now, let's just leave it as is, since the main purpose of this was just setting up the threads to actually do the separate requests. So that are the separate threads. So that we can do all these separate requests so that we can, you know, we're not blocked by all of this network time. But going forward, we're actually going to see how we can kind of limit and scale and kind of define how many workers we want to have for each to have all of this be a little bit more structured rather than just spawning and individual thread for every single iteration that we have here. And this is actually also going to teach us how we can pass information down between different threads so that all this stuff can kind of be running concurrently. We'll get to that in the next video.
7. Queues and Master Scheduler: All right, So in the last lesson, we kind of set up our Yahoo Finance worker as well as our wiki worker, which we had before. And we just scroll through all the symbols and sent requests and, you know, got whichever prices we could. But now we're going to systematize this and make it a little bit nicer. And what we're gonna do is we're going to have these be separate processes. So rather than scrolling through the symbol and then for each symbol directly creating a thread, what we're gonna do is we're going to separate all of these entities. So we're going to loop through all the symbols first. And then we're just going to, whenever we get a symbol, we're just going to put it into something called a queue. And then we'll have different threads from the Yahoo Finance workers which are going to be reading from that queue. So we kind of have this intermediate thing. And so yeah, again, that thing that we're going to be using is called a queue. And really the idea, he, idea here is that we can take elements and we can put them in the Q. And so in that way, we're kind of separating how fast or how slow different threads are running. Because the Q can build up and we can put a lot of elements in. And then our workers can, our downstream worker. So the ones that are taking from that queue can just read out from the queue whenever they're ready. So in that way, we also have a little bit of, and we'll see that in a later on. We have more control over, you know, how many input workers we'd want to have if we can scale that. But also how many like readers of the queue, how many consumers of the queue we want to have. If we notice our queue is stacking up too quickly and it's just ever growing, then we can add more consumers. And if we notice that our thing or the q is always empty basically, then we can actually scale down the number of consumers because it's not really providing us any benefit to have so many workers. All right, so to implement a queue, we're going to use the multiprocessing library. And from here we're going to import Q, this. And then we're going to create a queue, which is going to be our symbol Q, which is going to be an instance of Q like this. And so what we're going to do is let's just comment this out for now. Comment this out here. As we scrub through the symbols here, we're just going to use our symbol q. And here we're going to put the symbol, so we're going to insert it into the queue. Now cues are thread-safe, which is great because that means we can actually use threading tax system. All right, and so, yeah, that's all we have to do in this way. We put elements into our queue. And so if we just run this and then we can just print out the queue at the end. And there's actually a get method that we can use which just gets the next element from the queue. So we can run our main function just to kind of see the results from this. So we see we have an implemented queue or an instance of the object here. And using the get method, we get the first value from the queue, which in this case is the first symbol. We expect it. Alright, so now that we're putting these elements into the queue, now we can have a separate stream of processes that can be consuming from this queue. And that's what our Yahoo Finance prize workers are essentially going to be. But we're also going to change things up here a little bit. And instead of having the Yahoo Finance price workers do this directly, we're actually going to change this up and we're going to have a scheduler which is going to take care of the threading. And then we're going to have our Yahoo Finance price worker itself just be the class that extracts the price information. But will have a different class Kino responsible for the multiprocessing part. So we'll call this one the Yahoo Finance price scheduler. And this is going to be an instance of running. And this is actually no longer going to be a subclass of the threading class. So here again, we're going to initialize this. We're going to initialize the parent class. We're also going to allow keyword arguments. Oops, and pass those on to the parent class here. Alright, and then we're going to override the run method. And here we're just going to have an infinite loop. And basically what we're gonna do in this infinite loop is which we're just going to read from the queue. And so what we need here also is we need an input queue because we need a read from a queue. So we're going to provide that as an input argument here has an initialization argument. We're going to have our input queue here. And then we're going to have our next value just be self-taught input queue. Again. Now this is a blocking operation, which means it's going to block this, or this operation is going to block until we actually get a value returned. We'll get to this later. But basically, while this loop is running, it's just going to try to continue reading from the queue. And every time it gets a value, then it's going to continue execution. So also what we wanna do here is because we have an infinite loop here at some point we want to break out of it. So an easy way to do this to say, all right, if we send the w1 value and we're done, now we just want to break out. Aside from that. If we do get a value and this value is not done, we're going to assume that it's a symbol. And so then what we wanna do is we want to have an instance of our Yahoo Finance price worker. And the symbol that we're going to pass here is going to be our value because we're going to make sure our upstream Q actually puts the value in here. And then we're going to also change this run method. And we're going to instead rename it to extract or get price for Spark. We'll just call it good price. Alright? So we're going to call this, and we're going to say price. And here, instead of printing the price out, Let's go ahead and return the price. So we're going to say our price is equal to using the instance of our Yahoo Finance worker here, we're going to get the price and actually need to move this up here, start the thread. And then F we want here we can kind of print out the press for now and later on, we're actually going to put this in a different queue, which is going to be responsible, which downstream worker there's going to be responsible for inserting into the database. But for now, we're just going to print it out. And Yang, we don't need the sleep here anymore. But what we can do if we want to do is we can add a little sleep at the end here, since we aren't really, this isn't like an API request, so we're not getting information from the API in the header is about like great limits or something like that and how many requests we can do. So to kind of be respectful after every single request, we're just going to sleep anywhere between 01 seconds just to slow the process down a little bit. So again, what we have here is we have our scheduler class now, which we've kinda Move to be our master threading class. And this is going to take an input queue as an input. And then we're going to start the thread. And then it's basically going to go into an infinite loop. It's going to continue reading from the queue until it reads the value, at which point it's going to break out. And every time it gets the value and this value is not done. It's going to create an instance of the Yahoo Finance price worker class. It's going to provide this value as the symbol. Since query or assuming that's what we're getting from this class here. And then our young finance price workers just going to be responsible for extracting the price there. And yeah, that's it for now. So let's go ahead and take this over here. And we're going to create an instance beforehand so that, you know, as soon as we start putting values until the queue, we can actually start consuming it. Alright, so create an instance of our Yahoo Finance price schedule here. No, you have to make sure we give the input queue. Now the input queue that we're going to use is going to be the symbol Q, since this is where it should be reading from. So, yeah, right now we just have one thread that we're creating here. Let's just create a list for this in case you want to have more, which we will later on finance, price, scheduler, rents. And like this. And then we can just kind of keep this loop here, no longer need this. All right, so one thing that we have to do at the end here is because we are waiting for the w1 value. Once we're done, make sure that we put in here done. So that our class is actually going to exit out, or that thread is actually an exit out. One thing of course that we can do rather than just putting one is if we want to change this later, which we will very soon, we're going to say for loops. For however many threads we have, we're going to put one w1 value to make sure that every instance of our thread is actually going to break. Because if we just put one value in here, because we're consuming from this queue, every, every thread is just going to be reading one value. And once it reaches that value to basically done. So, so if we have multiple threads and only one of them sees the w1 value, then let's say we have four threads. One of them reads done than the other three threads are not going to be breaking out because the queue is empty and they're waiting for something to read. And this is just blocking any further execution for that thread off for the whole program, just for that thread. So I am, that's why we want to make sure that we just provide enough done values here to make sure that every single instance of the throat that we have actually breaks out. And yeah, let's go ahead and run this. This one should be a bit slower since right now we only have one instance of our thread. That's exactly what we see here. We're just extracting our values and we're putting our values into the queue here. And once a value starts, you know, is in this queue, that's when our worker here starts. So it's already initialized beforehand and it's already starting to run. And as soon as we initialize it at enters this loop, and then it does this. And now it's just waiting until there's something in the queue. Once there's something in the queue, it's going to read from it. As so as soon as the first symbol is put in here, is going to start reading from it. If we want, we can see that we can add a sleep and let's just say sleep for ten seconds. And we can say inserting first symbol. So let's just take a look at that. So we're going to insert the first symbol and then we're going to wait to have a symbol inserted. But as soon as it's inserted, there's a value in the Q and R Yahoo Finance price scheduler that's already running as soon as that something is in the queue, then it can get it out of the queue here and it can start doing its work. So as soon as something is in the queue, then it starts doing its work. Which is really nice because this kind of makes all the processes independent of each other. All right, So of course we can take this away. And now if we want to make this a little bit faster, all we have to do is just, you know, let's say num, Yahoo Finance, price workers. Let's say we want to have for workers. And we can say for however many workers they want to have, you know, we just create one of these. And so now we would have in this case, for workers that are running each of them waiting to read from the queue. And whenever something is put into the queue here, they're gonna start consuming from it and they're going to start doing their work, as we can see here. And our queues are thread-safe, which is a really nice because, you know, we don't have to deal with any of the issues that we talked about beforehand. And okay, so now we're getting some good values here, okay, that we have to deal with. So price is too high. So one thing that we can do here is to do some formatting and call this the raw price. And it looks like there's a comma for thousands. So we're just going to replace that with an empty string like this. Just to kind of fix where it wasn't, like these formatting things. So yeah, we've now reached a really nice point because we kind of have this independence between our different cues. There we go. So we can see the thousands are also working. We have this independence between our different threads because one can, this isn't even, I mean, this would be part of our main thread. It's just putting values into the queue. And whenever something is put in, we have our different workers here that are ready and available to start consuming from it.
8. Creating a Postgres Worker: All right, so now let's go ahead and work on our database insertion worker. So I'm just going to use my local Postgres database. I'm using data group here, but of course you can use whatever coding environment, you know, you enjoy the most or coding IDE. Yeah, If you are not super fresh on how to create your Postgres database, just make sure you go ahead and check out the SQL course. But otherwise, yeah, just spin up a local Postgres server. And then I'm just going to use the default Postgres database here too. And we're just going to create a table in our Postgres database are just going to insert these values into. So I'm gonna say create table, and we'll call this prices. And we're going to have an ID column that's going to be cereal and we're just going to be our primary key. And then we're also going to have the symbol, which is going to be a, just have b, the text. And then we're going to have price, which is going to be a floating point number. And then we're also going to have the time, or we'll call this insert time. Wishes just going to be a, well, let's just have it be a timestamp. Alright, so it's a pretty simple database table that we're having here is just going to be an individual table here with the individual or to be a single table with the prices here we're going to have our ID column, which is just going to be in serial as our primary key. And then just a symbol as well as the price and the time. Or maybe instead of insert time, we'll have just extracted time. So the time that we got it from the site. Alright, so let's go ahead and run that. And there we go. Going back into our threading directory here it's created a new worker, which is going to be our Postgres worker. All right? And here we're going to have two things. Just going to be our Postgres master scheduler, which is going to be an instance of the threading thread like we had before. Just like we did for the Yahoo Finance price worker. And we're going to except keyword arguments as well as an input queue. And then let's just initialize the parents passing on keyword arguments. Set the input queue to be the input queue that we take from the initialization. And then override the run method. Just like we did for Yahoo Finance worker. And we'll say here, again, we're going to be looking at our input queue. And we're just going to wait until I get the next value. And without value is done, and we're going to break out of our loop. All right, So that's basically going to be the essence of our host Chris master scheduler, exactly what we did here for Yahoo Finance worker. And of course, not to forget, we want to make sure we start the thread T2 once we initialize or create an instance of our class. All right, so now we need to do the other thing, which is just going to be treating our actual Postgres worker, which is going to be the one responsible for inserting the data into our database here. And this one in the initialization method. This is going to take the symbol as well as the price as input, and maybe also the extracted time. It's just going to take all of these values as input. And we can clean this up in a little bit and make, have this be kind of a dictionary as an input or something where you can get the values out of. Phrenologist, leave this as is. So we're going to have our symbol B, the input symbol. We're going to have our price be, the input price, are going to have. The extract of time. If you extract a time here, okay? And now we basically need the method which is going to be insert into database. And this is going to be doing our insertion. And in our insertion here we basically need to, first of all, we need to create the insert query. And then what we need to do is that we need to perform the insertion. So we're going to have the Create, insert query be a separate method. Just going to create the query for us so that we can get that back. And then we're going to perform the insertion. So to create the query, we're gonna do is, we're just gonna do this using raw SQL here. We're going to say insert into prices, values. And then we're going to, well, there are different ways that we can do this. One of them is we can use e, f formatting here to say, well, it's also define here the column order interest to be safe, price and extracted time. Let's make sure that fits with our syntax. So put this on a new line just to make it more clear. So again, the first way that we can do it is using the f strings. Like this. Just kind of formatting in the values directly. Like this. This is okay. Because we're kind of doing this internally. And there's no like outside input, but it's not the best. There's actually a better way that we can do this, which is having the text formatting kind of be done with for us. And we just kinda provide like here is going to be the symbol which will provide later. Here's going to be the price, and here is going to be the extracted time. And then we also don't need to have the f string here. And yeah, this is basically the SQL query that's, you know, that we need to have. It's not much, but that's okay. And then we're just going to turn this query. So again, first thing that we're gonna do here is we're going to have insert query just be created for us. And now we actually need to perform the database insertion into the database insertion, we need to set up a database connection. And so to do that, we're going to use a library called SQL alchemy, which is just going to help us a lot. Which is just going to help us a lot like setting up the database connection, executing these SQL commands. And it's really just like a general library that we can use to just connect to databases. But I'll show you around like the essentials just in a second here. But the first thing that we need to do is we need to install it. So we're going to pip install SQL alchemy like this. And then in a second, then we're also going to pip install pies P. And I really hope I'm spine is correctly. But if not, I'll look it up in a second. Pie scopy binary. It's got a typo. So let's try that out. And it's tall. There it is. So this is pip install pi, psi. Always have shallower remembering the pronunciation of it, which is, I can never write it out. Psi cop G to binary. And this is going to allow us to set up the Postgres connections. So we're going to install SQL alchemy. We don't even need to import psycopg2. We're going to install the binary version because it's sometimes if you don't have cell the binary version, they're like issues with the installation. So just always installed a binary version, but this is just basically going to provide us with the driver that SQLAlchemy can use to actually connect to the database. So yeah, how can we go about setting up this database connection while to do that? So we can already see here I've already imported it. But we're going to use this create engine method. And we're going to create an engine. And the engine is going to allow us to interact with our database. So first, we need to get all of our database parameters. We're just going to be our Postgres username. And a nice way to do this is to set this as an environment variable. Postgres. User. Then we're going to get the Postgres password, which we also need Postgres password. And then we're also going to need the host or the URL or wherever it's located. So it's going to be Postgres host. And then we're also going to need to get the database that we need to connect to on the Postgres server. So these are the connection details that we need. And from this we can then create a connection string which will allow us to connect to the database. So the other thing that I've done here is I've used the OS package, which it actually important for me just because it's really nice. But this just allows me to access things from our operating system specifically here going into the dot environ and lets me access an environment variables that I can get. For example, here, I'm going to use the dot getMethod. Now if you haven't seen this before with the dot get method, is if we create like a dictionary for example, and we just create a variable in here. If you do x dot get, unfortunately mean, Let's rename that x-dot again. A is going to get the value from this dictionary. For example, if you do x-dot Gatsby, which isn't included in the dictionary, it's just kinda give us none. So in this way, rather than doing this, which can fail if we're accessing values that are included, the getMethod either gives us the value overturns none so that it doesn't fail. And then what we can actually do is we can use an or here, which what happens is, for example, if you do at stock get B or and then we can provide like a default value that we want to use. A should be a string and integer, integer or whatever. So we can see if the value does not exist, we get the default value. And if it does exist, then we get the value from the getMethod here. So we're going to say getting our Postgres user, or it's just going to be empty. Or it's just going to be empty. Or it's just point to your local host, or it's going to be Postgres. Now, if you have postgres running on a Mac and you installed it, then it's very likely that your local Postgres does not have user name or password. But if you're running it on Windows, it's very likely that you have a username and password. The username is likely going to be Postgres, but the password is gonna be whatever you set your installation. So remember if you're on Windows, this is very likely going to be sudden or Alice is going to complain that it can connect because password is none or it failed to authenticate or whatever. But on a Mac, you're not going to need to set a username and password because you likely didn't set it unless you explicitly set it for your database. So again, what we're doing here is we're either trying to get these values from our environment variables or if they're not available, we select default values. And this is actually really nice because then we can set up different environments, environment variables for different environments. For example, you can have a testing environment, production environment, or a local testing environment. If you don't set anything, then we just take our local testing environments. Otherwise it will take whatever variables we set. So now that we have a hard environment variables here, we can create our engine which is going to allow us to connect to a database. We're going to use the crate engine method we got from SQLAlchemy. And we're just going to use f strings for this. So the first part, it's going to just indicate that this is a Postgres connections. We're going to do PostgresSQL, semicolon slash slash. And then we're going to have our user o. We go, we're going to reuse or semicolon, sorry, colon, colon, the password at the host address, what do they call that? Yeah, Apigee. The host address or the IP address or whatever, slash, the database. So this is our connection string here, username, password, host, address, and the database that we're connecting to. All right, so now we have our engine setup here. We have our SQL query that we've created here. The nice thing about the engine is it provides us a means for creating database connections. But until we actually initialize a database connection, there's no Connection being created. It's just, well, it's basically just a class that allows us to create this connection once we explicitly tell it to do so. So now let's go ahead and execute this query. We're going to say width. And then using our engine, we're going to connect. Now there are different methods that we can use here. One of them is dot connect, which just creates a connection. We can also do don't begin, which creates a connection and begins a transaction. But we don't really need to use transactions here. So we're just going to create a connection. And we're going to have this be in our con variable here. And you'll notice are all remember the syntax from also like opening files from open file name and then like reading it as an all in file, this is the exact same syntax that we have here where we open a file, then we can do something with a file. And once we exit out of this width statement, the file is also close for us. So we have this Context Manager here, which is really nice because we create the connection with it. We can access the connection using this column variables when I call it. And once we exit out of this width block, the connection is automatically closed for us, so we don't even need to worry about that. So now we can just use our connection. And with our connection instance here, we can have this not execute method, which is going to allow us to execute this SQL statement here. Now there's one more thing that we need to do, which is we need to provide this input, since we didn't provide here as f strings. So we need to provide it when we're doing the insertion. Now to provide formatting for this syntax, we actually need to import alum, a special class which will allow us to do that from SQLAlchemy dot SQL. We're going to import text. And then we're going to wrap this in the text here. And then we're going to provide our input here. So we're going to have our symbol, which is this colon symbol, or symbol, is going to be the symbol here. Price is going to be the price. And the extract a time. It's going to be the extracted time here. All right, so let's go over those one more time. So we've got our Postgres worker, which takes in the symbol, the price as well as the extracted time as input variables. And then it also creates a, an instance for a database connection with the postgres connection address. And then we're actually going to call, is we're going to call insert two database. And it's going to create the SQL query for us. Then we're going to connect to the database. So we're going to create a connection to the database. And with that connection, we're going to execute the SQL statement that we've created here. We're going to use this text formatting here because we have this special format which basically allows us some protection against like injection tax. So just that the formatting is proper. And then we can actually provide the formatting here as just a second input variable and some very simple dictionary. And then it's going to fill in the values for us. And then once we exit out of this width statement, The connection is also going to be automatically closed for us because we're using this with statement here. So a very convenient, alright, so let's go back to our master scheduler. And what we have to do here is once we get a value from our input queue, here, we're then going to create an instance of our Postgres worker, making sure to provide the symbol, the price, as well as the extract a time. And so we're, we're gonna get that from, well, and let's just assume that's what we're going to get with our input value here. So our input value here that either going to be a list that looks like this, or it's going to be a tuple that looks like this. Either way, because we have full control over this, we're going to assume this is going to be how we got our data. Of course, you know, we can make this more general and just have this be like a dictionary. And then we could say symbol is equal to the valve, again symbol and do the same thing for price equals Val dot get, Prius and so on. Makes it a little bit less error-prone. But, you know, that's not the purpose of this right now. We're just, you know, we just want to get this running. So we're just going to make sure for now that this is how it's going to work. And then once we have our pros caseworker instance Here, we're going to call insert into database. All right, so this is creating our Postgres worker. Now we're going to continue this in the next lesson because we also need to provide the input queue here. We need to send these values in there. And also, we need to include this in our main function here, because this is already getting pretty long. Let's go ahead and stop here for now and then continue this in the next lesson.
9. Integrating the Postgres Worker: All right, so now that we've set up our Postgres port girl, let's go ahead and continue with this. Integrate this into our main function, and make some other changes down the line to make sure everything is working properly. So first of all, in order, main function, we're going to import the Postgres master scheduler. And we're going to do a similar thing like we did for Yahoo Finance worker here. This and this is instead of finance, it's going to be our post grads. And let's change the input cubed and a second. And just updating the names here. There we go. And NAM, post grad worker, like this and the input queue. So now I need a new Q. And that's going to be a VIP Postgres cue like this. All right, so we've got a Postgres scheduler threads here, number of podcasts workers, suppose to two. For now, let's just see how things are going. Postgres scheduler, which is going to be instance up for Postgres master scheduler, which is going to have this input queue here. And you are keeping track of those threats. All right, so now that we have this Postgres Q, we actually need to get values into it. And so we're going to have this be an output queue. Top we're going to provide to a Yahoo Finance price scheduler. Now, there are two ways that we can go about this. The first one is of course we can explicitly make it an input argument. Or we can actually just use our keyword arguments here. And then we're going to say, okay, so we've got on our output OOP, or output Q, is it going to be? And then from our keyword arguments, we're going to get the output Q. And if this doesn't exist, this is going to be none. Otherwise. Yeah, we're gonna get values from here. All right, so then during the run method, we have the price here. And then we can say, if the output Q is not, not, then into our output Q, we're going to put this price. Similarly. Once we finish here, let's go ahead and just make sure we also send done to our downstream markers. All right, so what did we just do? Well, we added an output queue. And the reason that we need an output Q is because we now have a three step process. The first step is getting all the symbols, which we're doing with our wiki worker, which isn't a thread. It's just, well, it's part of the main term, but it's just a worker which gets all of the symbols that puts it into the symbol Q. The symbol Q is then read by the Yahoo Finance price scheduler, the master scheduler here. And every time it gets the value, it's going to process it and get the price. Now instead of printing the price, it's going to send it on to the output Q here. And the output Q is what our Postgres master scheduler here is going to be consuming from as its input queue, as we can see here. And let's take a look at this. And once we finish, once we break out, we want to make sure that we also put into the output Q and we finished. So then what the workers downstream are also, you know, getting this done message passed on to them. Now an important thing to be aware of is the format that we're expecting here. Here, as we expect the symbol, the price, as well as the extracted to time. So let's actually set those to be the the output output values. So you need to provide the symbol here, which is going to be the value that we get from here, the price, as well as the extracted time, which let's just convert this to an integer. And that's what we're going to put into your output Q. Since again, our Postgres worker is expecting the symbol and then the price and then the extracted time symbol, because they're getting that from the input queue, which has the symbol Q. Price. And then the extracted time right here. And these values we're going to put into our output Q. And then we're going to read these here as our input queue in our Postgres worker. All right, so, yeah, and then we're going to send on the w1 value, which we can get here. Alright, so let's go ahead and, let's go ahead and test this out. See how things are running if there's anything we missed. Got an unexpected keyword argument output Q. Okay? So actually in this case, we should explicitly defined it because we are passing on like output arguments. And so it's giving us out where we are passing our keyword arguments. And it's basically saying, oh, well, okay. The threading class which uses these keyword arguments, doesn't, you know, doesn't know what, how to handle output queues. So actually going to explicitly define it so that we don't pass it on because it's causing errors downstream. Like this. Let's go ahead and run that one more time. See how things are going now. All right, Okay, So seems like things are running and we are getting the values passed on. Bud, we're having some issues inserting the data into our database. Alright? And the main issue seems to be here with our timestamp. And this case it's really just complaining that we're providing an integer here, but it's expecting a timestamp. So let's just go ahead and include a casting here. Let's put this on a new line just to make formatting easier. So we're going to cast this value as a timestamp. We're going to do lists of casting here. Let's go and try that one more time. I'm still getting issues. And let's see what value we're passing in here. Of course, we're using the. So let's start off sending the time value. Oops, we want to send basically like a datetime instance that we can insert. So like this. And then I don't expect us to need this anymore. And let's just go ahead and make sure to cast this as a string so that we get the string format. So that the value that we're going to get here is going to be of like this, something like this, whatever. I'm just making up a time, like something like this format. All right, so we're going to get that using the datetime class here. So we're going to pass on a datetime object, which is going to be UTC. Now, when you use UTC now instead of now, just so that everything is standardized and we do still need the time library to, to the sleeping down here at the end. All right, let's go ahead and try that one more time. Looks to be running and we'd actually don't have any print output, but let's go ahead and check our database. Pulling up a table here. Oh, there we go. So our values are being inserted for us as we can see, which is really great. So this is exactly what we're expecting. And yeah, so basically let's quickly recap what's going on right now because we have many different moving pieces. So the first thing we've got our main thread here. In our main thread, where then creating a symbol Q as well as a Postgres q. These are cues where we're going to pass on the symbols that we get from our wiki scrubber. So just looking at the wiki page here, this is going through all of the symbols here. All the symbols here, and it's passing these on into this output symbol Q here. And then our Yahoo Finance price scheduler, which we have four instances of currently, is going to be reading from this queue. And then it's going to get the price or it's going to try to, and it's going to pass it on to the downstream Q tip to do. And it's going to pass it on to the downstream Q, which is going to be our Postgres Q here. And this is going to take these values. It's going to read from the Postgres Q, which is the output Q here. And our Postgres worker is just going to continue to read from this queue. And every time it gets a value, it is, yeah, I'm going to insert it into the database and we're going to get us the w1 value. Then it is going to break out of the loop here. And we're making sure to just, if we do get the w1 value in our input here, we're making sure to pass that on to the output Q, which is postgres is going to be reading from Sloan. Yeah, that's kinda what we have going on. Let's see how many records we have in our database now. A 180. So we're expecting. 550. And again, this is running rather slow, just mainly because we're we have this sleep thing going on here to kind of limit how fast are going. Because of course this could be running a lot faster. So to just make sure that this is ending properly, rather than getting all of the symbols. I'm just going to have a symbol or not. Yeah, we'll have a symbol counter. And every time we insert a symbol, we're going to increase this by one. And then let's say just to make sure this is working properly. If, oh, on, if we have at least five symbols, we're going to break out of this just to make sure that we don't have to wait all the way until we reach like 550 here, just to see if that's working correctly. So we're going to quit out of this, rerun this. And now it should be going much faster. We should be going five symbols which should be inserted into the database. And then everything should be exiting, which it does. Great. So that's important for us because if it wouldn't be exiting, that means that our threads are hanging somewhere, that there's still an active thread somewhere. And we can see that's what we would get. If we didn't do this. Then our Postgres workers would never be told, you know, done and they would just continue on. Actually, it looks like we're not joining, which is what we see here. But, you know, we're not exiting out because our Postgres, workers are stuck in this loop basically being blocked by this. They're waiting to get the next value, but the queue is empty and nothing's being put on the queue. So just stuck here forever, so it's not going to stop. So that's why it's important that we pass these values on to make sure that downstream cues are also aware that, okay, it's time to stop. So exiting out of this, there is one more thing then we can actually do to kind of help prevent this. So actually, Let's leave this commented out for now and go into our Postgres worker. And here we're going to add a timeout. And let's put this to 10 seconds. And we're going to say, all right, if we don't have a value in 10 seconds, then we want to exit out. So we're going to add a try and except statement here and get the special empty exception, which we can get from Q. Import empty queue is again a default Python library. So we can just import from here. And multi-processing Q, which is what we've imported here, actually just basically directly implements the queue class from the Standard queue Python library. So from here we can get the empty exception. So if we specifically catch the empty exception, then we just want to print out Q or timeout breached and Postgres, scheduler stopping. And then also here too, we want to break out. So in this case, we're not, we're not sending the done signal to a Postgres worker. And so it's going to be hanging a little bit. And but it is going to have a timeout here. And we're actually going to see in just a little bit that the Postgres workers are going to stop anyway because there's nothing in the queue and it's reached the timeouts. So we can see here the first one stop segment, stop. There we go. So that's why it's kind of nice because, you know, he kind of protect yourself against waiting forever. But of course, need to be aware of because if you had a time out in the timeout is not large enough for values to get in from the top. You are downstream workers are actually going to quit before they ever had a chance to work. Or if there's some interruption in between, then, you know, your downstream workers are going to quit while stuff is still going on. So you need to be a little bit careful about this. You know that you don't set your time out to be too short because of course is because it can cause problems. And so, you know, you want to make sure that you ideally always end by sending some sort of like exit command. Let's see your QRS now. Okay, it's time to, time to stop here. And of course we have the timeout here and we can do the same thing for our Yahoo Finance will go here. We're going to say, try getting a value. We get the empty exception. We're going to print Yahoo. Scheduler. Queue is empty. And then we're going to say we're stopping, we're going to break out. And we're going to say for Q and port empty. This, just going to reorganize the imports here a little bit. That all default libraries are at the top here. And all installed libraries are at the bottom. Usually. There we go. Just provides a little bit easier structure that we know these are all defaults and these ones we had to install. Okay, So and all we've also added outgoing didn't actually set a timeout here. So we're also setting a timeout here just in case. But of course, you know, be careful about this because you can have your workers exit while they should actually still be working because there's some sort of delay going on upstream. And we actually don't need any keyword arguments here. So we can just take these out to print statement here either. All right, yeah, so now we have our workers setup to one of them, kind of get the symbols from here. For every symbol that we get, we pass it on to get the data from Yahoo Finance. And for every price that we get, we put that into a downstream q, which are Postgres workers are then going to consume from. Now just some small things left to do, which is just going to be doing the join at the end here. So we're going to go over Postgres, scheduler threats, and just call joins at the end here. And I'm actually going to change the logic of our output QS here little bit, because it's possible that, you know, right now we're assuming we have one output Q, but maybe we want to have more than one output Q. So to kind of accommodate for that, I'm going to read this into a temporary variable. I'm going to say if our temporary Q isn't lot of list, then I just want to have this be a list. So now our output QS are going to be a list that can be one, they can be several. And then we have to of course, change this a little bit. And because we're no longer going to get none here. So what we can do is we can say for output Q in the output queues. And then for each output Q, we're going to send on done. And if there are no elements here, then this is going to be empty. So there are no output queues, then this is going to be empty. Otherwise, you know, if we have one, then we're going to have one. And if we have more than one, then we're going to have more than one. And we can actually yeah. All right. So yeah, so now we've just made this a little bit nicer because we can provide this as like an input list. And let's actually do that too, even though we do accept just the specific variable. But this does provide this as an input, as a list. So can we've got one downstream q? Let's go ahead and run this one more time to make sure it's working. But in this way now we have a little bit more flexibility because maybe she forgot to change it here. Oh, there we go. Four, output Q and output QS. There we go. Alright, so now we have a little bit more flexibility because maybe we don't just want to save this to one database, but maybe we want to save this to multiple databases. So what we can do is we can have our main Yahoo Finance worker here, rather than just having one output Q. It can have a multiple output queues and different downstream workers. So we can say we have a Postgres worker. We can also have like I've read US worker or MySQL worker or whatever other databases you have. Each of them can be consuming from the different cues that we provide here. And our Yahoo Finance worker will just be writing to each output Q. So every single output Q that we have, we're going to write these values too. And then downstream workers can, you know, use all of these things and we basically get the price ones, but we can reuse it in kind of put it into the different cues so that the different workers can consume from them. And so this gives us more flexibility because, you know, it's not always going to be one in, one out. Sometimes there's gonna be no output. Sometimes there's going to be multiple outputs. And so this provides us just a little bit more flexibility in that sense.
10. Yaml File Intro: All right, so now that we have this working program implemented, let's go ahead and clean it up a little bit. Make it a little bit nicer to have an overview as well as being able to expand it later if we want to, without really always just having to add new snippets of code or copy pasting or changing up the logic in there. So instead what we're gonna do, and we're actually going to define this in some sort of file where we can specifically say these are the cues that we have, either the workers that we have. And that will kind of define our program here. And so what we're gonna do is we're going to write a YAML file. And that's what we're going to cover in this lesson, writing the file as well as how we can read it and really what it means to us. So to do this, we're going to start and create a new directory here. And I'm going to call this Pipelines. You can call it whatever you want. I'm going to call pipelines because what we have here kind of in the pipeline, we start with describing the raw data from the Wikipedia site, passing it on to them, be extracted from the Yahoo sign, and then passing it on to be uploaded into Postgres. So that's going to be a pipeline for us here. And so in here I'm going to create a new file. And that's going to be our wiki, Yahoo, scrap or pipeline, and call it dot YAML. Why ML? Now to be able to read this file in Python, and we'll just do those kind of one bit at a time. We're going to also activate my virtual environment here. But I'm going to go PIP and stumble. Pi YAML. Hit enter here and let that install. And this is kind of now and the library that we needed to actually be able to read this. And so in a second, when we write this YAML file, we can use the pie library, which will then just convert this into a basically like a kind of nested list or dictionary structure for us that we'll see in a second. So the first thing that we want, if we just look at our main program here, is if we have queues. So we somehow need to define the cues that we want. And then we also have workers. These are kind of like the essences of what we want. So I'm going to define over here, and I'm just going to say, all right, these are the cues that we want. And I'm just gonna write this out a little bit and then in a second we're going to read it and, you know, we can explore a little bit how this YAML file actually transforms into a Python data structure. So in here, I'm going to just basically list out the cues. The first one, I'm going to give it a name. And that here is going to be, if we go back to our main function here, it's going to be the symbol Q. So I'm going to call this symbol Q, call it whatever you like. And I'm also going to give it a description. Again, don't worry about this right now. I will kind of recover this in a second. I just want to have something in here so that we can look at it. And this is going to be the contains symbols to be scrapped from Yahoo Finance. And then we want to have another Q, which is going to be our Postgres uploading. And the description here is going to be contains data that needs to be uploaded. Postgres. And then we also have workers. And the workers. The first one here, if we just look at this, is going to be our our wiki worker or we can, yeah, okay, worker is fine. Here we can give a description. This scraps raw Wikipedia page and pulls out symbols. And then again, we'll kind of go over this iteratively. But I wanted to have the kind of base like location or we can find this worker. So it's going to be in workers dot key worker. And the class that we want to use is going to be this wiki worker class that I'm going to put in here. And then we also wanted to find input queues, et cetera, that, that we'll get to in a bit. But let's just leave it at this. So now we have something to actually look at. And then let's go ahead and load this in and see how those YAML file actually translates to something usable in Python. So I'm just going to launch Python over my terminal here. And I'm going to import YAML, which we can now do because we installed PMO previously. Now what I'm gonna do is I'm going to say with open. Pipe lines. And I'm going to just copy this name here because I'll probably misspell it. And then we'll have to find the typo. I'm going to open it in reading mode. I'm going to say, call this our infile. And again, let's just go over this and then we'll kind of recap everything that we've done. We'll do a bit of exploration, but we need to have some sort of basis first. And so our YAML data is just going to be, and then we're going to use our YAML library. We're going to do safe load. And we're going to put an a here, our just art file that we've read it in to hit Enter. All right, so we've loaded it. So let's go ahead and take a look at this. So we can see is our YAML data is actually, and if we look at its type, is a dictionary. So the first key that we have are the keys that we have. If we look at the keys, is we have two keys, cues and workers, and that's exactly what we have here. The first thing is a q, and the second thing is a worker. And you can almost now seeing what we get in Python, you can recognize the structure here, right? So we have basically this kind of familiar dictionary format where we have the name and then the semicolon, and then we have the values contained inside. Now what we can see here is inside of cues, we actually have a list. So let's go ahead and take our YAML data. We're going to look into queues. So here we have a list. And the reason that we get a list is because actually we use these dashes which indicate list elements. So in this case we have to list elements in here. But if we look inside, let's say the first list element will again see here we have a dictionary as we do here. So we can see we have like a repeating form out of first we're defining kind of dictionary keys. Then inside we're defining the contents. And then within, in this case we're using list elements indicated by a dash. And then inside here we again have a dictionary that we can see here. We've got the name, which is the name here, and we've got the description. Now, when I'm writing this, you may have thought, Oh, you're using specific keywords to kind of indicate all these things. I don't know. I just decided to use name description, location class because they're convenient and they're clear to the programmer when you're reading this, they're very descriptive and clear, but y'know, really doesn't care what would use. We could really use anything here because again, we're just getting dictionaries and lists from here that we can use. But just to make our lives easier, either when we write it now or when we come back to it, when we want to show it to other people, we want to make sure that we use descriptive names so that, you know, it's kind of clear what's going on. So something that we can also do actually, which can help clear the syntax up a little bit. Instead of using this syntax down here, we can actually use a syntax like this and the result, but we're going to get is going to be the same. So let's just go back up here and we're going to reload our data. And then let's take a look at our YAML data again. And we can see here the values that we get here are, notice I have a typo and description here. But we can see that the values that we get here are still the same, right? Nothing is changed into the exact same format is here. Now this is just a different way of representing it. So if you want, this kind of almost brings it back exactly to the Python structure that we have that we're kind of used to in Python, except of course we use quotation marks since we're actually defining a dictionary and otherwise we'd be referencing variable names. But once you get used to, this format may actually be a little bit cleaner to read than this one. So that's why I'm going to undo these changes. And I'm going to kind of have it the way that it was before because it becomes a little bit cleaner to read. But the other way is just to kinda show you, again, maybe translate it back to things like, you know, in Python so that you can better understand what exactly is going on. But we can see here, okay, so we've defined the cues and again, we call them cues because it's convenient for us, not because it's needed in it in any other way. And we have these workers here again because it's convenient for us. So we have our wiki worker. And again, here these names do not need to be the same. It's just, you know, kind of convenient and descriptive to use, but we can call this something completely else. This year is important because we're going to be using this later, not now, but later. We'll use it to actually dynamically load and initialize this class from this location. So we want to make sure that this location path and could also call it a path-based path, whatever is correct. And then in here we want to make sure that we're referencing the correct marker like this so that we can actually initialize it. So we have the one worker here. I also want to put input queues. Now in this case we actually don't have an input queue. So I'm not going to define it. But maybe we want to pass like input values. And these can be just a list of URLs to scrub, for example. So something that we could do is we can take this URL here that we want to scrap and rather than hard-coding it into our wiki worker, we can actually define it in here. And then if we have other wiki sites that follow the same structure that actually let us reuse or worker, or we expand or worker to kind of be able to accept these different sites. We can actually put more URLs here when we get to that point. Or, you know, if you ever have data like that. But right now we're just going to stick with this, but, you know, make it a little bit more general so that we're defining everything over here. So we've got our first worker. Then we're going to create our second worker, which if we look back at our main function here, we have our Yahoo Finance worker. So this here is going to be our Yahoo Finance worker. The description here is going to be pulls data or pulls price data for a specific stock symbol from Yahoo Finance. Location here is going to be workers dot, and I'm just going to copy this name here so that there's no typos because that's going to be frustrating. And the class that we want to load here is going to be our young Yahoo Finance price scheduler, not the price workers going to be the same one that we're using here. So this is what we want to use. And here we are going to have an input queue will just have this be a singular input q. And that is going to be actually we can just directly keep this as a dictionary instead. So our input q here is going to be, well, the output of here. So let's go ahead and define output QS. This one, we're going to have a plural in case you want to send it to multiple cues. But for now we're just going to send it to the symbol Q that we have defined up here. All right? And again, we're just kinda referencing the names here because these are all just general queues. And so we can reference these different cues. These are, we'll see later on how we're going to use these. But yeah, this, having these naming conventions, especially for the Q kinda just lets us properly name and a reference everything. So our input q here is going to be our symbol Q. And our output QS here are going to be just our Postgres uploading Q. Now, the reason that I want to have multiple output QS is maybe I don't just want to send this information to one location in one type of worker downstream, but actually multiple ones. So maybe I want to have my wiki worker send it to the Yahoo Finance worker to get the price data out, but also want to send it to a different worker, which just directly store is all this raw data somewhere and doesn't do anything else with it. Or maybe it does something completely different with it. So that way I can send it to multiple locations and different downstream workers can see these are reading from here. Their downstream can use this information in different ways. Right? Let's define our last worker. If we go back to our main function, that's going to be our host Chris worker. So that one here. So here we're going to call this our Postgres worker description. Take stock data and save in Postgres. Location is going to be eating. Workers, dot postgres workers. And the class that we're going to use is again going to be the master scheduler. Since that's going to be the one that's going to read from the queue and everything. Just like we're doing in our main function here. Okay? The input queue. There's going to be postgres uploading. And we're actually not even going to define an output queue because there's, you don't want to send this data anymore. All right, so these are kind of our base definitions. Now we may update this YAML file later or we may change a little bit as we're going on. We actually notice that maybe you want to describe things a little bit differently. But for now, this is kind of the base description of a program, of course, right now doesn't do anything because we actually have to make use of it, which we will do in the coming lessons. But let's just go ahead and read this in so that we can see what we have. And of course, to make sure it's working. So taking a look at it, we have the different cues and workers. And in the queues we have two queues, the symbol Q as well as the Postgres uploading. And in our workers we have taking a look at that. In our worker is we'd have several different ones and we have the wiki worker, which also has input values here, we can see that they were transferred properly. We have our Yahoo Finance worker is going to reference the young friend is price scheduler. And our Postgres worker, which is going to reference the Postgres master scheduler. So now we kind of have this full pipeline, define it in this YAML file. Again, of course, we haven't made use of it yet. That's what we'll be doing in the next sections. But hopefully you can see that if we can transfer the general description of how a program works, instead of like coding it all in here to having it be much more configurable in this YAML file. That provides us a lot of flexibility because once we dynamically create our workers and have them send to each of these queues. If we want to add another database saving somewhere else. Later on we say all we also want to save here actually only have to do is first we have to write the class for it like we did here that can take care of it. But then let's just say we want to save this to I don't know, something like a Redis database. So we're just going to add another queue. And then we're also going to send it here. And then we can say, oh, I just noticed there's a typo here too. So let's go ahead and fix that. But I mean, we haven't written this of course. But this is kinda the idea. All we have to do is once you go and it's going to be r cubed. So this is kind of the idea that once we have the class written for it, rather than us having to change order our main function and kind of adapt everything to that. It's much easier than we can just change the configuration files. And actually we can create several different configuration files for to several different pipelines loving want to run, but the code is going to stay the same. And it's just the different pipelines that we have available. Or maybe you want to provide extra input values with this sort of format. I hope that you can see it becomes a lot easier to change, manipulate, add, remove, or even have different flows. Well, just being able to reuse the same code and all that we're changing really is the general flow and the configurations and stuff like that. So, yeah, let's go ahead and take this back out because of course, this doesn't exist and we haven't implemented this type of class. But yeah, just kind of more for an example that you can see how we can extend this and how it becomes so versatile and useful. Because again, we're changing this YAML file here and we can actually create several different YAML files for several different pipelines that we want to run. Our code always remains the same.
11. Creating a Yaml Reader: All right, so now that we have this YAML file defined, which is going to be a pipeline. Let's actually go ahead and implement something that can read this file or just taken a general file that we assume has this sort of cube worker structure. And then it's just going to treat the pipeline for us. Because our Vicki worker right now is also not a threat of self, but just kinda like a class that does the work and just kinda returns the values for us. For now. I am going to just comment this one out here. And we can just leave that in our main function for now and later on if we want to, we can change her refactor wiki worker to instead work like our Yahoo Finance worker or our Postgres worker, where we basically have this master scheduler thread as well as the worker itself, but can take care of the work so that we can actually pass in these values and read from another upstream Q that we can then feed these values into. But for now, let's just leave this as is, and let's just focus on implementing yeah, Basically everything else. So I'm going to create a new file here. And I'm going to call this one. I'll just call this one YAML reader. We could call it something else like YAML Pipeline execute or whatever. All right, and then here, for now I just want to first of all, and import yaml. That's something we're going to need for sure. And then let's go ahead and just define a class and then import the rest of the stuff as we kind of need it. So this is going to be our YAML Pipeline. Guess we can call it yellow pipeline executer. And then we'll go ahead and run the initialization method, which for now let's just leave that AS empty. You will get back to us. So what are the other methods that we need? Well, first of all, we need something that actually processes some sort of pipeline, right? We need to be able to process the pipeline that we want to pass here. So there are two options for us to kind of pass indices this pipeline. One of them is we put it here. So we can have, we know the filename or the directory and found him here that we can read from it. Or we can actually initialize the YAML Pipeline executer with the pipeline that we wanted specifically to execute. I'm going to choose that because I think it's a little bit nicer to kind of, you know, say that every single class that you have here is responsible once you have an instance of it as responsible for executing that pipeline. Rather than one pipeline executer being able to execute any pipelines at, it just makes it a little bit easier to reason about if you ever initialize or if you ever, ever have multiple pipelines to run, you can just initialize different pipeline executors and kind of name them accordingly, you one for each pipeline, rather than just reusing the same one, but passing in different pipelines later. That way, it can get a little bit more. Not confusing, but maybe like a little bit more tricky to just follow it or it's not as obvious as you could get, get it if he did it with the initialization. So here I'll just say pipeline location, which is going to be three in them. Or calling filename is because it's going to be a path which is going to be in like pipelines in here. So just want to have this kind of be just like a general location. So we'll set this attribute. Pipeline. Location is going to be equal to this attribute that we said here. I'll write a song. The different parts of our pipeline is, and if we also look at our main function, okay, So what did we do wrong? Need to initialize the cues. How many to initialize the workers? And then we need to send done to the cues to. So the first thing, of course that we missed your two is we need to also read and process this YAML file, or maybe processing happens later, but first we need to read it in. So first method that we want to have this load pipeline. And from here, I'm putting these underscores here just because underscores generally mean it's kinda like an internal attributes or method to the class. Whereas none underscore methods on what kind of and to be used from the outset. Of course, you can still access both in Python that this kind of indicates to you this is a method that I want to use internally because I don't want someone else about the code. I don't really want them calling load pipeline. And so this kind of indicates that this is being used internally. Similarly here the pipeline location is being used internally and shouldn't really be messed with. Because, you know, we have the pipeline location defined from the initialization. And so of course, you know, we should load the pipeline with the same file and that should really only happened once. And that should be kind of predefined and set like that. So yep. So we have to do here is we're going to open our goo juice. So many suggestions. We're going to open our file and whatever variable you want. And we're going to have our YAML data be equal to camel dot safe load input file here. There we go. All right, so this is kind of what we did in our last lesson to just, you know, loading in the YAML file here, putting it in this YAML data attribute. All right, so that's actually the first step we want to process our pipeline room. First thing that we wanna do is we want to make sure we actually load this pipeline n. Okay? So now that we have this YAML file loaded into a Python object that we can read. The next thing that we want to do. Also, if we look at our main function, is we want to initialize our qs. Okay? So we are going to have a new method. There's going to be initiated lives. And they're the type on here, cubes. And yeah, From this we just want to initialize qs, okay, so now we need to track our cues somehow. So I'm going to create in our initialization and another attribute which is going to be our QRS. And then we're going to say for queue name, or actually it will say for Q in our YAML data. Cuz so what we're doing here is we have, this comes kind of another dictionary. Since then we have one key here, which is cuz if you remember, it's not, then Let's quickly, quickly take a look at that again. So we're going to import YAML. And then let's just get this name properly. Certain essay with open pipelines slash this server, he'd say, Yeah, I won't date on this safe load. So this is what we're going to have in here. And so in our cuz we can see we've got this list here and we've got the different cues with their names as pulsive descriptions. So what we're going to do is we're going to iterate over these different cues that we want. And for each of these cues we're going to actually create a queue. So going back to our main function, basically we're going to do this. And to get this Q, we need to import this cue from multiprocessing. So we're gonna do it like this. And then for each queue that we have, we're going to say, or we can get the queue name if we want, which is going to be Q2. Name. Ever going to say, loops are our specific queue name in our cuz dictionary here is just going to be an initial list version of the queue class from multiprocessing. So we're doing the same thing here. But rather than defining these cues like this, is basically we have an object or a dictionary that will look like this. Of course, you know the names that we're using is a bit different. But this is kind of like the idea of what we're doing. And of course, we have the freedom to kind of change this because we can, if we add more cues in here, then this is just going to take care of that. But really this is the format that we're getting here. And rather than using, and I'll see you in a second, rather than referencing this variable which holds the Q, we can then just later on preference that specific queue by accessing the dictionary component. So we're still kind of doing the same thing, but in this way, it allows us to first of all distinguish our different cues because we have the different names associated to them. And it also allows us to scale them a little bit easier dynamically because we have an object that can hold a 0 undefined. While we're writing this code number of cues that may go up or down as needed. But yeah, basically, you know, it's not really anything different to setting them in an, a distinct variable. But we don't access them directly over the variable, but rather through a key contained in a dictionary. So that's what we're doing here. And of course, that is what we wanna do. The next thing that we wanna do is of course, initialize the cues. All right, so we've loaded in our YAML file and we've taken care of with this part here. So now let's go ahead and take care of with initializing our workers. So. That is going to be our next method. We're going to initialize the workers. So we're gonna say for worker in self dot YAML data. And this time we're going to loop over these workers like this. And now we want to, well, first of all, we want to keep track of each of these workers. So we want to know what workers do we actually have available or, you know, which workers are currently working? So you're going to keep track of our different workers here, like this. And then, okay, so now we've come to a little bit of an interesting problem because we are passing the class as well as the location of where we can find this class. But actually, this is just a name like we can't just initialize this. Whereas in our main function, we specifically imported it. So how can we go about doing that? Well, there's a cool library that we can use, which is called import lip, which likes, lets us take care of this. Now, another thing that you also notice here, quickly pointing this out is any libraries that are default installed will go on the very top. And then once we get to libraries which are pip install and install from an external source, we are going to be separated by a new line, and these are going to be grouped together. And then we can see in the main function, we did the same thing here. So this is an internal comes with Python. Then we have a new line, this library and anything else that would be pip install will go here. And then we have a new line. And these are all kinda of our custom pieces of code that are contained within our repository itself here. So this way, you know, we're kind of able to just separate a little bit easier the different locate shootings and it's a little bit easier to make clear like what belongs to what. Again, this is more of like a visual thing. Of course anything that's necessary. It just makes it a bit cleaner. Just wanted to point that out in case you're wondering why I'm doing this. So we have this important lib thing. So of course what we need is we need the worker class that we can actually initialize. So we want to be able to read this general worker class so that we can do something like this. But of course we don't know which workers we're going to get because this pipeline can really be defined. However, right? Like that's the great thing. But of course, 0 noticed left dot in from last time. That's the great thing. But of course, we also have to be able to properly deal with that. So to do that, we're going to use the import limb or actually if we're going to use first attribute. And I'll explain, let's just write the whole thing out and then I'll explain the whole thing here in a second. And we're going to do import lib dot import module. And the module that we're going to import is going to be the worker location. So we're going to access the specific worker and we're going to access its specific location. And then the attribute that we want to access. And again, let's just write this out and then let's go over one more time. There's going to be the worker class. Okay. So what did we just do? Well, let's take a look at our workers. So let's take a look at our first worker here. So really, you know, of course if we're looping over these workers, then we're going to get each of these elements here one by one. So we've got the name, et cetera, et cetera. Right now we're looking at the location. This one here. So this gives us this thing here, right? This path workers dot yahoo Finance worker, workers dot yahoo Finance worker. The same thing like we did here. Workers dot yahoo Finance workers. Okay? So our import loop is going to import this module like this. So basically it's going to do something like import. And then it's going to just import, import this. Basically kind of it's allowing us to do this. Because we don't just want all of this module, but rather we want to get a specific class from it. We need to get a specific attribute out of this class. Now, this is an internal Python method that basically lets us access attributes. It actually works the same way. If we had like a specific defined class, we could take our class here and, and let's say we wanted to. We have our Yahoo Finance schedule. I scope and taking a look in this. So here we have the input queues. So something that we could do to is we could say like Get attribute here, and then we have the input queues. And so basically, this is kind of a way for us to take attributes out of classes, or in this case also out of modules. Because doing something like this doesn't actually work for classes and works or dictionaries, but it doesn't work for classes we would need to do, is access it like this, right? But what if we want to dynamically access a variable? Then we can't just like there's no good way we can have this syntax if this is supposed to be dynamic. So the way that we can do that is by using this get attributes method. And, and this dynamically lets us define the attribute that we want to take from this class like this. So similarly, we can use the same thickness, but it also works on this import module here. So we're importing it and from it we're extracting this worker class. So basically this thing here is the same thing almost as doing this here, except that we're specifically already, you know, we have this class that we're now taking. And we have it specifically in this worker class variable here so that we can use it and we can initialize it. So of course we need to do those because again, this can all be defined at will. And so we need to be able to actually import the class and using or an initialize it and use it without knowing what class that's going to be. We kind of need to, as long as we have this format of location class, you know, as long as that is obeyed, we need to be able to import that and initialize it. All right, so we have our worker class, but we also need some other properties associated to it. So important things if we look at our two workers here is we need the input q and we need the output QS. If we look at our Postgres worker here, we only need the input queue. Okay? So we're going to just read the input queue. And that is this attribute here that we've defined with workers. So we're going to say our worker. And here what we're going to is rather than just trying to access the attribute directly like this. Because in some cases, for example, if we have this defined, this wouldn't work because our input queues actually not defined here. So to be safe against something like that, we're actually going to use a method dot get. Now, I want to show you this real quick. So if we have a dictionary, like this, very simple dictionary, if you do x dot get a, we're going to get the value of the key a, which maps to be here. But if you do x dot get say, one or the string 1. So this is looking for the key one. That value is not in here, so we're going to get none. So in this way, rather than this failing, it's either going to get the value here if it exists or doesn't exist, it's going to get none. Okay? So that way we can, you know, we're, we're going to be sure that we don't fail here. We also want the output QS. And from here we're going to just use this format. It's going to be our output QS here. Okay? And let's see, do we need anything else? So we need our input values and our upper cubed. All right, So we have that in. But currently our input queue here, it'll actually just a name. It's not the queue itself. So we need to get these cues and then we can pass them on to our workers. So we're going to sit at a time, we're going to create a new variable that's going to be our initialization parameters. And here we're going to pass the input queue, which we will define. And we're also going to have us for now just the output queues, which will also define in just a second. All right, so here we actually need to pass an instance of the cute like this, right? We can't just pass the name. This because it's just a name isn't helpful. So for this, we have our cues here. So what we wanna do is we basically want to get this input queue here. We want to get it. We want to get the actual queue Instance. So we're going to say self.users dot input, oops, and put QM. If you put q is not none. None. So how did we define this? Oh, yeah, of course we need the comma afterwards. So there's just a shorthand if else statement that we can write on one line so that we don't have to write it over multiple lines. Mix a little bit easier to have that on what lines, we can have all that logic in here directly. So if we do have an input queue, then we're going to take the actual instance that we have defined here and use whatever Q is associated to that. Otherwise, this is just going to be none. And here what we wanna do. Is a similar thing, which is going to be, we want to. Now, here we have to be careful because we can have several output QS, right? So we want to make this more like we want to make this a list. Someone respond to say here, off dot-dot-dot output Q. For output Q. And cues. If none, of course, this is a little bit long. Am I going over the recommended number of lines? We can split this over 2 ions by just putting a backward slash here, which allows us to kind of split this over two lines. Okay, so now we have our initialization parameters. Again, what this does is it allows us to break a statement over multiple lines without breaking the logic. Because if we did this or logic would be broken, right? We just have this list here. And then the next line we would execute the statement. It'll be off of this backward slash year. It actually joins this logic together and basically says this logic continues right on the next line. And in this way we just don't have to deal with this overflowing text. Alright, so we have Initialization Parameters cell of our worker instances here. So let's actually go ahead and initialize our workers. So first of all, I just want to also want to get them or could name, of course, proper name. It's going to be extracting the name. So for each worker name, for now I'm just going to create an empty list for it and then we're going to append the different worker causes. So for now, I'm gonna say self-taught brokers that worker named dot append and have those worker class here. And we're going to pass to it the initialization parameters. So this allows us to do, is it takes this dictionary and it basically turns every dictionary in here into a key value pair that we can pass it. So what would happen in this case is doing this is the same as having something like if our input q here is none looping like this. And this would, for example, be self-taught cuz if we take, well, let's, let's take the example for the young finance worker. And I'm just guessing here. And then let's go over what's going on. So this star, star basically unpacks this dictionary and for each key in here, has the key here is this, and then maps it to a specific value in the dictionary like this. And it does this dynamically. So if you add in more parameters here, then all of these, you know, each of these will be defined in a key value pair mapping here. So this is really nice because it kind of allows us to unpack our initialization parameters and pass them individual, like key value pairs in here so that we can specifically defined like this is a very peculiar their output queues, of course, for this to work, we need to make sure that that are named here, input q and q. So this is already going to be a problem because I've called this output queues, but this is called output cubed here, so that it will create it like this. Now, another problem, of course, is if we provide parameters here that don't exist. For example, in this case, we actually both have input here. We don't have an upper Q. So really if we just have this here, then it would complain because we're passing output Q as an initialization parameter. But it does not exist in the Postgres worker. But that's where these star, star k Works. Or keyword arguments comes in. Basically allows us to pass in an E set of keyword arguments. So anything that is defined with the keyword and set equal to something without really having to worry about, you know, like the repercussions. It just kind of takes it and then we can do with it what we want if we want to use it. In this case, we just pass it on to the thread to parent class. But we could also just completely ignore this and not do anything with it. Hello. This is a really nice way to kind of do out because we can just unpack this dictionary and immediately, you know, pass it in here. And yeah, so we almost have the logic that we have here. There's still some things missing. For example, here we're defining the number of workers which we currently don't have. That's an easy fix for us because we can just go in here. And we can say, for example, instances. Let's say we want form. And I'll take the same thing and put that over here. This doesn't have to be the same order. Again, we're getting a dictionary mapping from this. It's just convenient time to be the same order. And here we can say, or Let's say we want a lot of Postgres workers, but we don't want that many scrubbers. So now we can say number of instances that we want is just getting the instances attributes from here, like this, and then rather just initializing it once we're going to say I in range the number of instances. And we want to create Domini workers. Here. Actually, it would probably be better for us to do something like this. And we can actually also do is put a default or a fallback parameter. So in this case, if we go back to this x dot dot a, we get the value b that's associated to the k here. If we do x dot dot wanted to exist. But if we do this, that means if it doesn't exist, then we default back to this. So if for some reason the instances it's not defined, we know we want to work with for this because it's in the pipeline. So we're just going to use one. Otherwise if the instances artifact and we're going to use that. And so in this way, we can now dynamically increase if we wanted to or decrease whenever we want the number of workers in here, and we will just create that many workers. And then the last thing that we wanna do is, of course we want to call this method self dot initialize workers. And then we also want to be able to join the workers like we have here. So workers and we already said for worker in, workers will say for warping name and workers and then we'll say for worker thread. So remember we have a, this is a dictionary and the each key here is the name of the worker. And then with each value we're going to have a list. And each of these lists will be a worker class. So each of these will be a running thread. So we want to loop over each thread. And then here we would call the dot join like this. All right, so this is going to be the class. And we're going to stop here for now because we're obviously this class is implemented anywhere yet, but this is going to be kind of, this forms the basis of her class that allows us to do all of this stuff. So the following lessons we're going to implement this class and kind of move it over into our main function and, you know, functionality over there. We're going to do any debugging, of course, because we haven't been able to test this class at all, right, now it's just kinda been written like that. So it's probably likely that there are some bugs in here that we're going to need to fix. Some also do that and then kind of take that over. And maybe then also changing a functionality a little bit so that we can actually have our wiki worker here also be something that we can define how specifically we want to be able to maybe pass in multiple URLs. Yep, But yeah, I guess we'll do those in the following lessons, since you don't want to drag us out for too long anymore here.
12. Improving Our Wiki Worker: All right, so now that we have this YAML we go to find. Let's actually go ahead and implement this into our main class because right now we're on to our main function because right now we just have this cost fine, but we're not really doing much with it. And it's not perfect yet as we'll see in a second. But we're going to say from M rigor, we're going to import the YAML Pipeline executer. All right, so we're going to have our YAML Pipeline executer be equal to. And now we need to define the pipeline location. So our pipeline location from now is going to be pipelines slash wiki, Yahoo's copper pipeline dot YAML. Now this is hard-coded enough. Ideally it will move this to environment variables, but we'll do that later on. Let's just get this stuff working for now. Alright, so this we don't need anymore because we have our YAML Pipeline executor or wiki workers. Interesting, because we still do need that. This we don't need it anymore. And this we also don't need anymore. So we need to change our processing pipeline a little bit though because we can't, we'll see in a second, but we can't actually join the workers yet. Because right now there's kind of like this haphazard. Not haphazard, but like it's a little bit still not perfect because we have some stuff within the YAML and execute her and some stuff as we'll see in a second still without it. So if we just actually call process pipeline here, we're going to load the pipeline. We're going to initialize a QRS and workers. We don't want to join it because if we do that, then, you know, I'm basically all the workers were kinda finish and then we haven't put anything in yet, but we can't put anything in yet because the q's are only initialized once we actually initialize the queue and the process pipeline step here. So unfortunately, for now, we'll fix this later, but for now we're going to have to turn off this joint over here and we're going to have to manually call it later on. We're not manually, we're going up to just call it later on. So we have our symbol Q. Now, the Q that we want. For now I'm just going to use them. Hard coded value is going to be this symbol Q here. And of course, now we want to make this all kind of this YAML Pipeline, I think here. So, but for now let's just kind of test the functionality. So you're gonna go into our YAML pipeline executer. We're going to actually access this attribute. And here we're going to access the, the symbol Q, because that's where this needs to be written into. So that's the key we want to write into, of course, non-ideal syntax. But because our wiki worker is actually not defined the pipeline and within the ureter kinda have to do it like this for now. Alright. Then printing done afterwards. So currently we also still have to do this. Um, so for now, we, because we can now define like the number of different instances here by ourselves. We're just gonna take some large upper cap. And we're going to say, instead of, you know, specifically saying some number, we're just going to say 20 frames. But of course this is not ideal either. But, you know, let's just have this be like that for now. We're going to put done to the symbol q. And here where the threads are being joined, so on. Okay. So now we have to join our threats and shown some other syntax, try and workers. Let's make sure you join lurkers, okay? All right, so let's go ahead and run this and see if it works. Probably bugs that we have to fix. All right, so our Postgres worker got an unexpected keyword argument output Q. So let's take a look at that. Postgres worker here. Got an unexpected true. Where is our issue? Initialized workers, worker class. Okay, So think the problem is you're doing Postgres master scheduler. No, that's not the problem. Postgres master. And it got unexpected keyword argument output Q. So looking at our log here, it looks like this is complaining as soon as we do the super initialization. So maybe or threading cloud is, threading class is actually complaining about those. So let's test that. We're going to go into Python. We're going to import. Writing, we're going to say threading dot thread. And let's just try output Q equals a. So that's where the problem is. So we're going to trust, put this output Q in here, so that we don't pass it down to the keyword arguments here. Alternatively, you can also do is we can remove it from this dictionary by using this pop call, which is just going to remove it and actually kind of give us the value here, but it's just going to remove it from the keyword arguments. So, and here we probably want to have some safety. Want to say if output Q and keyword arguments, then we wanted to just remove it. So let's try running that woman time. Alright. So far no nurses, probably good sign. All right. So we reached some sort of timeout which okay. But it looks like it finished. So we had some sort of time outreached for whatever reason for now. But of course, this time I come from here. So young. Aside from that, it looks like it's working. So that's actually pretty great. That means we can take this code and we can see what's leftover and what else needs to be implemented. All right, so of course, one of the things that we want to move over is all of this is not very nice, right? Like ideally, we will have the wiki worker. And all of this process that we keep workers values are being fed in. And all of that is being done in this process pipeline stump. And that way we don't have to call them join and recurse down here either. So how are we going to go about doing this? So the best way that or not, I think the way that I would like the most is for us to kind of extend to this wiki worker. And basically what we're going to have as being want to have a similar kind of option where we have this masterclass that we can provide values in for. And from these values, then we're going to have, you know, wiki worker instances or just a single Wiki worker instance, maybe that's going to take care of that. So what we can do is we can have a wiki worker master scheduler. And we're also going to actually have this be a threading dot thread instance. It's already in credit for us. Nice. So the reason for this is because in our YAML reader, we're actually going to call the join method. And otherwise, in this way We do get like a nice call of the join method because it comes inherited from the thread class. But otherwise we'd actually need to define it ourselves so that when we call it join, you know, it doesn't break. And you could even do that by just doing something like this. Just makes it available. You know, if, if it didn't come inherited from the parent class, just so that it doesn't break in this step so that we can follow that same kind of logical order. But let's actually make it a threatened and let's go ahead and initialize parent. Here we're going to take keyword arguments. Again, frequent pass on to the threading parent. Yep. End. What do we want in our Vicki worker here called one of the things that we want to do. Of course, can't forget if you want to start it. And then we also have to have fee unclass here. And we're going to do something, you know, while whatever this is is true. Okay? So what do we want in our Mickey master scheduler? So if we take a look back at our main function here, we want our worker to basically be able to maybe take different input URLs, create a wiki worker with each URL, and then have that wiki worker kind of go through its process by getting the symbols and just putting them into an output Q. So what we need for this than we need if the output Q. And from here, we want to make sure that you don't pass the input queue because that will get the same complaint. So we're going to say if our input q is in our keyword arguments, we just remove it and just hopped out of that. And another thing that we want though, is we actually want some sort of entries that we can pass in, right? Like we don't have an input queue because we were not going to be reading from an input queue, but we do want entries that we can iterate over. So we're gonna say, we're gonna assume that we're going to be passing and entries are going to say entries. And of course we don't want to pass entries either. So entries room to take from here. And we're going to pop out again like this. All right, so yeah, so this is kind of the format that we want. Of course, we also want to actually set the output Q to the accused. And let's actually use the same syntax that we're using here. So that we can set multiple output QS if appropriate. Thought way it we can, if we go back to here, that way, whenever we get values, if we want to, we can put them to different output queues so that, you know, different upstream workers can use them for whatever purposes. Using that for, alright, If Q2 and okay, so now we need to define these entries and we're also going to change our run method here. So rather than saying while true, we're going to say for entry and venture is because now we actually have a limited number of entries that hope that we want to go over this. And I'm rank sum. Let's update our YAML file so we want our wiki will occur. And rather than initializing arm freaky Bucher, we want to initialize the wiki worker master scheduler. We've called these input values. So if you want, we can update from entries here two input values. That's totally fine. The point is I have to make sure that we of course, also do this here. Okay? So we want to take input values in here. And then for each input value like this, we want to iterate over these. And for each of these input values, we just want to put these values into the symbol output Q here. All right, so we've got our entries here. So for each entry, we then go on to have on our Wiki Corker, we want to initialize this. We actually want to initialize it with a specific URL. So what we're gonna do is we're going to move the URL into the initialization parameters here. And that way we can initialize it directly for the different entries that we're going to provide through here. And now, we also need to, well, if we go back to our main function, we can iterate over the different symbols and then we want to put those into our output Q. So for now we can just take over this logic using the symbol counter to that. The only reason that we are using this is again, remember, so that we don't put all the symbols into the output Q so that we're scrapping all of these different values, apply them into Postgres. And, but rather we just do the first five just to make sure that it's working. We don't need to do like all 500 right now. So we're going to take over this lodging and we're going to have, I'm older. I'm gonna say four out, put q in our output queues. So for each output Q, Oops, No, we want to make sure we put that symbol in there. Right? So let's go over this logic here. So we have our wiki worker master scheduler, which is going to be an instance of the thread class. For this, we're going to take keyword arguments and output Q. And if we haven't input q and we want to make sure that gets removed so that we don't pass it on to the initialization of the threading parent class. We also want input values, and these input values are going to be different URLs that we can define here like this. And of course, for our output cuz we wanna make sure that we have the option of having different alpha cues so that we can have in here, for example, like symbol Q2 and like second symbol Q. And if we have this, let me also of course need to define it up here, but, you know, just if we want to put this to more than one Q, so I'm more than one different type of worker can consume this downstream that we have that option. All right, We wanna make sure we initialize a parent thread. And then we want to start the process, just like we do here. Now, for this to work, we also need the run method, which is going to be called Kind of which can be taken care of automatically for us because we're throwing parent. But different to our downstream workers which are going to be waiting for values till it come out of the QRS and read them. Different from that. We are not reading from an acute here, but we're reading from a predefined set of entries. So yeah, we just want to make sure that we were going through the predefined set of entries that we have in here, because we already know what these values are. So we don't need to, you know, try to get them from some sort of cute. So we're gonna go over the different input URLs. For each input, you are going to create a wiki worker. This wiki worker is going to take care of the scrapping, so finding each of the symbols. So we're going to have that same logic here. And for each symbol that we get, we want to put it to the output QS, since we can have multiple Olympic youth, we want to make sure that we loop over each upper Q. And then this stuff here, this is really just so that we don't scrap 500 symbols, but only do the first five just to make it go a little bit quicker. All right, so let's take this output and at the end. So this is also logic that we want to move actually into our quickie worker. And at the end here, I want to say for for output Q and the upper cues. And then here again, we're just going to say like a ridiculous number right now. 20. We wanted to make sure we send. And we only want this to happen after, you know, all of the symbols have been sent. And then once all that is over, we can actually send done. So we don't need this anymore here. We don't need them join workers anymore because we can do that now. And here. And I'll write, and let's take a look at that. All right, so Let's go ahead and try known, see what happens. See if we missed anything. Right? So it's looking for input values 0, 0, course, and our yellow reader. We also need to make sure that we pass listen. So input values equals worker dot. Yet input values. I'm actually going to do this down here. And I'm gonna say, if I use is not an anomaly, then I want to put them in the initialization parameters. The reason I'm doing this, rather than always providing the mean and the initialization parameters, like we do up here, is because our input values is kinda like a special case, specifically to our wiki work here, right? Because in all the other worker is there going to be inputting up accused, maybe not the accused with they're going to be genuinely like input and output queues. But our input values are really only going to be specific to our one worker here. So rather than putting it in here and then going into each other worker and making sure that this input queue, these input values are not passed as keyword arguments down to the thread, like the threading initialization here. I'm just only going to pass it in if it's actually in there and that way I only need to take care of it over here. Alright, so let's try that again. No error so far, which is generally a good sign. But let's wait and see what happens. All right. Cool. So it looks like things are working. And yeah, we've got our wiki worker now that, you know, is taking all these values. But of course to make sure things are working, Let's just go ahead and print out in our Postgres worker received this just to see if it arrives all the way, all the way down stream. All right, so it's receiving values. So everything is working, which is awesome. There's, of course, still one thing that's not working, which is it's not receiving done properly all the time and it's not like probably breaking out. And the reason for this, if we'd go into Yahoo Finance worker, is actually it's only putting done here. And so the problem here in this case is, if we go here, we have two instances, but here we have six instances. So you'll notice that the difference, six minus two is four. We have four instances that are not receiving, that are done signal right now. So what we can do for now is just going to take this out because they're breaking out. And we're going to say here, for alpha q and up accuse ticket will just say for I in range 20. For each output Q, we're just going to send a bunch of them. Of course, this is not ideal because if we have 21 workers and a lot of the same problem, but for now, you know, it should be fine. So we can just quickly rerun that. The other thing you'll also notice is that it didn't define instances here. And that way you can see that, great, and everything worked well because we didn't define like the instances here. This took care of that. Now this is another little kind of nuance that we have because here we don't really want several instances. Because if we will have several instances, they're not going to be consuming from the same queue, but instead they're going to be repeating this process. So here we actually don't want several instances because if we do, like I said, they're not consuming from a queue. So they're not taking a cue down and splitting the work. They're going to be repeating the work because the way that this works is it's actually going to iterate over this set input values. So here we need to be careful that we only have 11 instance. So we can leave, for example, on note, only have one instance here. Otherwise, we scrap this symbol. So of course, you know there are different ways of going about this. One thing that we could do is we could again like a separate this out and you know, just have our wiki worker itself consumed from a queue. And we feed these input values into q itself. But at some point, you know, we need, do you need to get to the point where either we have a worker that only has one instance that just feeds all of these input values into a queue that the wiki worker could then consume from. So for example, we could have like another worker here, just going to be, you know, URL cedar. And this one here can take like this one here could take these input values. And then the output queues. Queues could, for example, be URL outputs. And then in here and put Q, we can read from here. And in this case, like this worker is going to be the one that can only have one instance. And maybe it's, you know. So like this problem is going to be kind of like extended forward, upwards. But in this case, we can have multiple threads here because they're no longer going to be reading from input values, but they're going to be creating from the input queue. Um, so if you want, you can actually play around with that and implement that so that we can actually scrap different UK or else at the same time if we have more than one instance here. But yeah, of course, at some point we're going to be running into this issue where we need to just provide some sort of seed. And if we have more than one instance providing them the c direction and beginning duplicates. So yeah, you know, just be aware of that, that problem does just exist. And so we can leave a note, for example, to say like just only have one instance here. And we can even make this clear again, we can say instance one and we can say, Please don't change us. Otherwise, we do duplicate work, something like that. See you note above. And we run this one more time just to make sure it's working well and put q this one doesn't exist. And if everything is working properly, but, you know, just kind of be aware of that. We don't want to have more than one instance here because we're iterating over these like manually defined input values here. But it's up for a minute. Yeah, Looking at our outcome and looking at our main function, things are actually looking pretty great. The only other thing that we can do, of course, is if we want to get proper execution time. And we put that at the top here because otherwise probably starting this copper time after this, but we're calling join workers here. So we're only going to be starting this once all the work is done. So if you want to get proper time, and of course we need to put this above like this. And we can also remove all of these unused imports here because all of these imports have now been moved to our yellow reader. We can just save and rumor or more time to make sure this is working properly. But now this is really nice because look at how small our main function has become. It's become so simple. We just have the pipeline that we want to run. And then we just have our pipeline execute or which takes care of everything. And now if we want to do more work, then, you know, we can extend this. We can either, if we have more URLs which is follow the same structure which we believe our wiki worker can take care of. Then you know, we can put in more input values here. We can extend this to have different outlook. Usually we previously talked about maybe saving two different databases or something like that. And of course, you know, if we create an extra worker, we need to make sure that we define that worker too. But really all of the hard work has been transferred to the yellow reader. This thing actually dynamically takes care of just this YAML configuration file here. So really like we can extend this as much as we like. Of course, like the only thing that we need to do if we add a different type of worker, we need to make sure that, you know, we define the worker class and everything like that and make sure that's working properly. But once we have that worker class available, we can reuse it. We can extend our pipelines, right? Different pipelines. There's a lot of cool stuff that we can do and everything we can define over those pretty readable and easily maintainable YAML file. So it's so easy for us to scale up and down the number of different workers that we want to have. If we see, for example, o our Postgres workers a bottleneck. Because really everything is just kind of waiting to be saved into Postgres can increase the number of instances. Or if it's doing fine, we can actually leave it as is, or even decrease the number of thread isn't many. We want to scale up the number of scrapers that we have. So there's so much freedom for us to kinda like tune our pipelines here. And all of this has been moved over to this YAML file and our silver code, we'll just kind of dynamically take care of all of this.
13. Improving All Workers and adding Monitoring : All right, so in this step, we're going to clean up our art YAML Pipeline executes or a little bit more, just looking out our main function here, really what we're doing is we're calling this process pipeline step, where we do everything from like loading the pipeline and then initializing them like queues and workers. And then of course calling this joint here so that, you know, we wait for all of these workers to finish. And if we just look at the way that, Let's see in our wiki worker for example here. So the way that we're handling sending done right now is not ideal. Because really what we're doing is we're just sending this, say, 20 times. Which is, well, it's a little bit weird because imagine we have several of these workers running at the same time. So in this case, if we are going to be sending done, but one of the other workers is actually not done. And we're sending done more than we have that many number of workers down stream, then they're actually all going to be reading done and they're going to be stopping. Despite maybe some other worker, you know, still putting values into that queue later on. Now of course, in this case, well, we can have that for the Yahoo Finance worker, for example, for the wiki work are currently not so much because we're kind of limiting the instances here to one. Since we need to make sure that we basically provide this non duplicates seed. But this can't happen with the Yahoo Finance worker. If one of them finishes right before the other, and it just puts everything in there and then the other one is actually still running and putting some for use in while then all of the downstream workers are going to be reading this and they're actually not going to be getting the final values that aren't in this cube. So in this step, let's optimize this a little bit. And let's take this sending the signal out from our workers and we're going to move it into our main main worker. Essentially, we're just currently not a worker, is just kind of like a class that helps us execute the pipeline. But actually we are going to import threading. And we're going to make this a thread 2. So we're going to make it basically our main worker. And here what we're going to do now is we're, well, of course we need to add the run step. Here. We're going to be taken care of monitoring progress and sending done commands once everything is done. So we're also going to be taking away that join Workers. Because the first step that we're going to be doing once this process starts is we're actually going to be while running the process pipeline step. And so if we have joined workers here, then basically everything is going to be blocked until we reach this point. So of course, we can't call join workers and actually we don't even need it because also here we're going to have like an infinite loop that's basically just going to take care of everything, can take care of all the monitoring, etc, wall, everything else is active. And for this then of course we also have to just start our thread so that, you know, starts running. Okay, so what are we going to do here? Well, really what we wanna do is we want to know for each worker, first of all, how many, like what q is sending two, and also how many workers are still alive. And then what we want to be able to do is once all workers are no longer alive, we want to send done to all of the output queues that it sends to. So for example, if we have three output QS here and this one is done, then we want to send done to all of these output queues. And ideally, the number of times we sent done also depends on how many instances are actually consuming from that queue. So we're going to have a separate thing that just helps us track how many instances are, how many workers are consuming from each queue. So let's go ahead and set that up. And we're actually going to make this part of our initialize workers. But first we're actually going to put this in our initialized function initialized method here. I'm going to write out some of the stuff and like explain what was going on with them. We can recap again once it's out, because once we have it written out, obviously it'll be a little bit easier to understand the logic rather than explaining it in a more abstract sense. So we're going to have here Cube consumers. And we're also going to note down the downstream cues. So the Q consumers variable is basically going to say, for this symbol Q, this is how many workers are consuming from this queue. And the downstream cuz is basically always going to be the worker and what cuz it's sense too. So these are the two extra attributes that we're adding here so that we can keep track of this information because we're going to need it later. So now when we're initializing our workers, we have to keep track of, well, first of all, what cues does it right? Two. So we have the output QS. Sure. Here. So we're going to have, we're going to add in here in our downstream cues. For this worker is going to be writing two yeas output queues. Now notice we're taking the names of the output QS here so that we can reference them later on here. But we don't actually need to take the instance of the queue. We can just track the name here. And then we also want to know what Q is being consumed. So if we do have an input queue here, and then we can say, if input q is not none, then we want to note here this Q as being consumed by this many workers, right? So depending on the number of instances that we have here, and that's how many workers are going to be consumed from the queue. Now, with this, we kind of have an implicit assumption that each queue is kind of going to be for one worker. So for example, if we have a second worker that wants to make use of the results from here, it's not going to be reading from this queue because that's basically spreading our values over two separate workers that are going to be doing two separate things. If you want to store this raw data somewhere, then we have to have a separate output Q, so that we don't miss any values that basically the two workers, one of them is going to be the processing and the other one that's just going to be storing the raw values. They don't want to read from the same cube because then they're kind of like sharing the data and one of the parts of it are being saved and other parts of being processed. But really we want every, every step in this pipeline to process the whole set of data. So if we need this dataset 2, then we need to add another output Q here to make sure that we can use the full dataset. And so with this and what we're writing here, again, we're assuming that Every cu is specific to one worker. So if we're consuming from the symbol Q here, then we can't consume from the symbol Q with a separate worker elsewhere, because otherwise, they're going to be splitting the data in here because they're all just going to be taking it out. So that's kind of an implicit assumption that we're making here. Okay? But now we have, you know, now we know what worker what Q each worker is writing to, and also what q, how many workers are consuming from each queue. All right, so we have the Run process here. So now what we wanna do is basically, and we can set this at an interval. So I'm also going to import time because this lets us have a sleep function at the end. So maybe we sleep for five seconds at the end. So that we don't like do this continuously, but basically do this at intervals. So here we're going to be doing our monitoring. So what we wanna do is basically for every worker and for every thread and every worker, we just want to check is it's still running. Um, so what does that mean? If we look at our Yahoo Finance worker, for example, once we break out of this loop here, and basically this method comes to an end, then the worker is no longer running. At that point, the worker is no longer alive. And so basically we want to be looking for the points when, for example, this worker breaks out of its run loop or finishes its run method here. At that point it's no longer alive. At that point we know, okay, this worker has finished. So we can start sending out and done to the downstream consumers. Of course, if we have more instances than we need to make sure that it's not just one instance that finished, but that it's every instance that finished. So what we're gonna do is we're going to say for worker, worker name in, and now we just want to take our workers. So looping over our workers. And here we're going to keep track of the total worker threads life. So because each of our workers can have multiple threads, we just want to keep track of the total number of threads that are currently running. So we're going to be looping over each side. We're going to say for worker thread, just like here, looping over each thread that we keep track of here. So how can we check if it's lifo? It's actually relatively easy. All of that we have to do is we have to take our thread and we can just use this method is alive. And this will tell us if the worker is still alive or not. So we can say if a worker is alive. Then we just want to increment this by one. So basically for every worker that we have that's alive, then we're going to increment that by one. And so at the anterior, if we have none of these workers alive, so if all of the threads have finished, now we can start taking the downstream cues from here, and we can start sending done to it. So we can say, all we need to get the downstream cues. And four here, we have to check first, if not none, because we can't have cases. For example, our Postgres worker doesn't have any output queues. So we wanna make sure that the downstream cues that we have, that these exist. So if reduction cues or not none, then we want to, for downstream Q in the downstream cues that we have for each worker. So what we're doing is we're looping over, Here's a YAML file. We're looping over each of these output QS here. Since that will, what we're keeping track of in this downstream cuz variable is we're keeping track of all the output queues. So we can call this for downstream Q. We can call this output Q to kind of be more actually consistent with a syntax that we have here. So we can use this to route for each output Q and here. Then we want to have number of consumers. So now we want to know for each output Q, how many threads are actually reading from here. Well, we track that right here. So we know for each q how many threads or reading from it. So the number of consumers here. We can just read from this dictionary that we've set up earlier here. And then we can say for I in range number of consumers. And now for each of these, we can send done to this output Q. So we actually have to get this FAQ. So and I'll, I'll kind of recap in a second. But first, let's just write this out so that we have, you know, the full thing here. So we're going to be taking our output Q. And just like we have here, we're going to be putting done. Alright? Okay? And then the final thing that we can maybe do, if we have no more of these threats alive, then we can actually remove, we can remove that kind of worker from our tracking so that we don't need a track it anymore. And now, okay, so how are we going to exit out of this infinite loop while we want to also keep track of the total workers alive. And basically, we can probably do this up here because we want to keep track of this at every iteration. So every time we're going through and for each of the workers, we want to know, you know, how many workers and total alive. We're going to be going over all of the workers. And then here we can actually say, well, we can either do while true, and then you can say here, if total workers live is equal to 0, then we can break out of it. Or we can say, while total bookers alive because non-equal to 0. But then of course we have to define it up here and yell. So in this case, either of these options are kind of okay. And it really depends on what you want to go with, I guess here. Well, since we have this now and we'll just kind of go with this. It's a little awkward to have like this duplication here. In this case, I could also put it towards the back so that we basically recount, reset the counter at the very end. But really it's not going well. We don't actually want to reset the counter at the very end because then we would be breaking out of this loop. So yeah, we do want to have it at the top here. Okay, so let's just go over what we have. And actually the first thing also that I'm going to do is I am going to just comment all of this stuff out because we no longer want our threads, of course, to be sending these done signals. But instead we want our main pipeline to kinda be responsible for this. So what did we just do? Well, first of all, we took the join workers out of our process pipeline step. Because if we do that, then basically this step here is going to be blocking and we don't want that. So we've taken that out of this step here. And in our initialized workers, we've added two extra attributes that we're tracking. Each of them has a dictionary. So for each worker, for tracking what Q, they send two. So we're tracking their output Q. And for each q, we're also, are for each worker, we're also tracking like what q they're reading from and how many instances are reading from the queue. Again, making the assumption that every cue is specific to one worker. So we can't reuse symbol Q, for example. Here, like this doesn't work because then they're going to be spreading the data that comes from here. And this is going to be reading some of it, and this is going to be reading some of it. But neither one is going to get the full, complete dataset. So we can't do that. So with this assumption, we're basically saying, Okay, every type of worker and we can scale the number of instances here Still no problem. But every type of worker that we have here has one specific queue associated to it so that we know now, okay, for each q, we know this is how many instances are consuming from it, which we can see here. Okay, So now we're just keeping track of this Q statistics. So that later on when we call this run method or, and then the next steps of this run method and bring you are using the run because we've turned this into a threading class. We are now starting a loop that's basically going to keep track of like monitoring for us in one hand. We'll actually get to implementing the monitoring in a second. But also it's going to be taking care of sending done to the output queues. Which again is important because if we do it in here, than one threat could finish, I could start sending done, and another thread could be running, and it will actually be adding values after the done here has been sent. But, you know, with that, we're sending like 20 done messages. Then the downstream consumers are actually all going to be stopping because they're going to be getting the w1 value. And then they're going to be some messages in the queue that are going to be left unread. So we want to avoid that and we only want to send done once every single worker has finished. So the way that we're doing that is we're looping over each worker that we have. And for each worker we're going over every single thread and we're just checking if that thread is alive or not. And now what this means is a thread is no longer alive if it finishes its run method here. So basically if it breaks out of this run loop, that point of thread is no longer alive. And this wouldn't trigger. But if it is still alive, that means it's still in this run loop here. And so we're just keeping track of the total number of worker threads that are alive. And if none of the threads are alive, that means every single worker has finished. It's broken out if it's loop. So then we know, okay, Now we want to send done to all of the cues that it writes to. So we need to loop over every single output Q, m for every single output Q, we need to send done a number of times based on the worker that reads from this queue, which is this one. How many instances that bar? So for example, in this case, we have six instances reading from Postgres uploading. And so we want to make sure that we send done at least six times so that every single instance here reads the done message. And that's the logic that we have implemented here. And then at the very end, we're just kind of taking this worker out of the tracking so that, you know, once it's done like we no longer really need to do the same thing. We know, like once we send all of this done here, then, you know about that worker has fully finished. We don't need to track it anymore. So there's one more thing that I just want to add here, which is going to be worker stats. And what I wanna do here is just add in the worker name, as well as how many workers are alive. And then at the very end here, I wanted to print out autoworker steps. Now this is just kind of monitoring for us so that we can actually see what's going on. And here in our main thread, I've kind of replaced this process pipeline was just the start step so that we actually start this run method here. Alright, so you're not just activate my virtual environment. And then let's go ahead and run and actually need to turn on Postgres to. Because right now my Postgres instance or Postgres server is not running locally, so just need to make sure that starts up. So let's see, There we go. All right, so let's go ahead and run this. Okay, we've got an error here. The initialization method is not called. So we want to initialize the threat of course too. All right, so let's try that one more time. Okay, So it's running again. So for this currently, well, first of all, we have this extraction time again, which is just because we're not calling them join methods. So this is kinda like the same thing that we had before it rain because we're not calling join here. It basically jumps to hear kind of almost immediately. But the other thing is, it seems like we're not printing out yet the statistics here. So I'll, of course. That's kind of a mistake here. If we set this to 0 up here, then of course we're not even going to be entering this loop. So let me fix that. And just up here, say that was kinda my button. If we set this to be 0, if here, and then if we say, while this is not 0, well, this is already 0, so we're not even going to enter this loop. So take this out. But there's actually one more thing that I wanted to do, which is to have a list of workers that we should delete. And rather than deleting it immediately here, I actually want to append those worker name and then delete it later. So I want to be able to hear maybe for worker and to delete here, I wanted to lead it. The reason being that if we deleted up here, we're still iterating, like we're still looping over this. And so for deleting while we're looping, then Python is going to be upset at us. So we don't want to do that. We only want to remove once we've finished the loop, which is why I'm kinda keeping it like this. So let's try that one more time. Okay, there we go. So now we're seeing our actual worker stats. So we can see our wiki worker here finished. And we also noticed kind of going up here. We're actually seeing nine workers here, which is of course not true. This isn't mistaken our monitoring, because here we want to show the total worker threads alive, not the total number of replicas alive, which is kind of aggregate it. Now something that should be happening but isn't actually happening. And I'm just going to take down this sleep time here just to make sure that this isn't an issue. But we can see going through our logs that actually our workers are reaching timeouts and that they're not being stopped from our stop signals. So of course we don't want that. We want to make sure that your timeouts are being reached atoms. So we don't want that. We want to make sure that, you know, our workers are actually stopping with the method that we're doings that we have here. So taking a look again at our loop, well, this should be is not none. Because we want to go if we actually have downstream workers. So let's go ahead and try that one more time. And there we go. So now we fix the problem which we had earlier, which was we weren't sending done from here, but rather we reaching timeouts. And of course, we can see that it was great to have these timeouts because it kind of help combat some of the actual bug that we had here. We were not kinda stuck in an infinite loop. But yeah, so we've kinda finished this part, which is actually extremely nice because now we are no longer sending done from here. So let's go ahead and remove this too. And that way when we have multiple instances, then it's not if the first one finishes and it sends done, then, you know, everything else downstream is going to stop. And even if we only send done once, then there we went into problems like what happens if while the number of downstream workers is not equal to the number of workers that we currently have. If we have more downstream workers and they're not going to finish and they will reach timeouts. And even if they do, then they will be stopping before all workers he F-stop, which means our throughput is gonna go down. So having it in this way, It's much nicer because now we're sending done. Once we know all of these, all of these workers have actually finished. Now there's one more short thing that I wanted to show you and I know we're getting kinda long here, but actually we can't run this because unfortunately it doesn't work on Mac, but this does work on Linux. So we're keeping track of our workers statistics here. And we probably wouldn't like to do a similar thing for q says it's right, just understand about how much data is actually in our cues. Now, there's this method if we look at the queue. So again, not gonna be like writing the full thing out because it's not going to run anyway. But we can say like for Q in, in our QRS and then just having this queue. And then here we can look at queue size. So this is again, something that isn't all available, unfortunately on Mac, but on Linux, this is something that you can actually do is you can get the queue sizes. Now, um, and this is just really nice because like basically, you know, you're able to monitor how much is in each queue. And with this, then you're able to say, Oh, okay, you know so many workers or have, but I can actually see like, oh, this Q super full. So I'm actually going to add more instances there and maybe I take some workers out elsewhere where queue size is always 0. So having this Q monitoring tool is also really nice. And of course we can put it into like also like oh, Q starts like this. And then here we can, you know, just like we did before, rather than just printing this out. You can add this here and then here we can print out like accused dots. Now unfortunately, and you'll see if I run this, this is going to cause an error because basically this is not implemented on Mac gently. So this would run on a Linux system, but unfortunately it does not run on Mac. So yeah, we can't use it here. But most of the time if you're actually like using stuff in production and it's probably going to be running on a Linux system because it will be deployed in like a Docker image or something like that. So in those cases, you are actually able to leave this piece of code in. And you'll also be able to monitor the queues statistics. And with that, then you'll be able to get more information by going through the logs. Scale up the number of postgres instances. I scale down the number of instances maybe because it's always 0. And so there's no point in having that many instances if our q is always 0. And so yeah, without hopefully you're able to take this information that you're getting and make smarter decisions about how you're, how many instances you're actually attributing to each of the different workers that you have here. But yeah, that kind of like brings us to the end of this optimization here. And we're getting actually very, very close to finishing this. So the only thing left, which is what we'll cover in the next video, is basically we want to take some of the definitions that we have, specifically like the pipeline location or something. And we want to, or I want to put it into like an environment variable. But again, we'll talk about that in the next lesson.
14. Final Program Cleanup: All right, so in this lesson we're just going to be doing some quick clean-up, basically going and just moving this to an environment variable, but then also having like a local environment variable that we can use for actual testing and stuff like that. So just that all of this becomes like a little bit more free based on whenever we want to start the program, we can kind of define it on that sense. So you are, of course, the first thing that we're gonna do is we're going to say this environment get, I'm just going to call this our pipeline location. And here we also need to import OS. Now what I'm going to do is I'm going to create a dot ENV file, which is going to be our environment variables. And I'm actually, let me rename this because I'm going to call this thought and local. So this is going to be like our local environment variables. That means if we want to have later on like staging and production environment variables, then we can change each of these n depending on what we want to test on. We can use different things. I'm mainly you want to be testing probably on local and staging. So we're going to define our environment variables here. And we're just gonna, just gonna take this over here. So what this expert allows us to do in a second when we use this source command, is we're able to, while we are able to source the environment file. And then what it means is this export just will have like set these environment variables in our terminal here. So that way we can just, you know, have them be defined in our terminal, in our running terminal instance. And then when we run it, then we're going to be using those environment variables. So we add the pipeline location, but actually, let's go through a different workers because in our Postgres worker we're also using environment variables. So we have Postgres user, which is just going to be an empty string. We have the Postgres password, which is also going to be an empty string, and the Mac case in the windows case here you're going to have values set here. The Postgres host is going to be our local host. And the Postgres database is going to be what we have here. Now in this case, I'm actually going to be removing this from here. So that way we only have it defined here. As far as I know, this sourcing of environment and this syntax is specific to like Linux and Mac systems. But you know, if you're on a Windows, you can still have the similar syntax again, like we talked about last lesson. You're actually going to be, you know, if you're doing it or someone else's helping you do at deploying it somewhere, that this is mostly always going to be run unlike a Linux-based system. So, you know, you'll be able to use this environment. And the environment syntax would work on a Windows too. But specifically like this exploiting syntax in this way, I'm pretty sure is just like Linux and Mac specific. But don't worry about it because this is just kinda like the final stages of clean up. And again, if you're running it somewhere else, you can actually pass those environment variables. And so we have our environment variables here. And we have here. Here we need to say, if we don't have a pipeline location, then we're basically going to exit out with an exit code. Sets failed and we're going to say, I'm pipeline location not defined. And then if we do want to have the extraction time your tube, because our main executer, our YAML I executors actually also a threat now, well we could do is well, we could call this like showing workers. But really we know looking at this, our main thread here, or our yellow worker thread, is only going to be exiting anode once all of the workers and Saturn done. So only once all the workers inside are done are we actually going to like break out of here anyway? So really we can also just call it, call the join on here if we want to get this extraction time. So Let's go ahead and just test how this is working. We're going to activate our local environment variables so we can see if I want to print out this variable. Okay, so now it's defined. And this echo is just a terminal command that we can use. And this dollar symbol here basically means we want to print access the value of a variable, which we've defined here. So this is very similar, just like defining variables in Python is just, the syntax is a little bit different for like the terminal, but it's basically printing out the value of pipeline location. And here we're defining the pipeline location variable to be of this value. All right, so let's try running our main function again and just making sure that everything is working properly. And after we've made these changes, which it looks like it is. Alright, and so we also have proper extraction time. And then one more thing I want to do is, I mean, we probably don't want to be printing out all of this stuff all the time. You know, this is more for like testing purposes while we were having him. So we can just remove that printing and I run it one more time just to make sure that the output is clean too. So everything looks really clean. And yeah, there we go. This kind of like brings us could also remove this basically any unnecessary commented out stuff. And yeah, this kind of brings us to the end of this whole threading tutorial where we've built out and you know, this whole threading program to do like scrapping and database uploading and all that stuff just to get around. The issues are like the timeouts that we experienced on at the time was the waiting periods that are associated with network communication. So I hope that you've enjoyed this, you've learned a lot and you're able to transfer some of this to either your own projects or your own work. And again, make use of this specifically in cases where the limiting factors are network communication times. Again, threading is not optimal to optimize computation heavy workloads because we're not making use of like anymore CPU. It's just when we're waiting for networks to respond to. We have network communication than other threads can continue running while the other threads are just waiting for some response from somewhere else over the network. So again, you know, great job on completing this section of the course. I hope you've enjoyed it. I hope you've learned a lot.
15. Locking: All right, so now that we've built those pretty cool threading application and something that we've used a lot inside actually was all of these cues, which if you remember at the very beginning when we were talking about the QRS. Let's see where do we initialize them over here. So we talked about these being thread-safe. So what exactly does that mean? And this lesson, I quickly wanted to go over locking and something known as a race conditions so that you can understand some things that you may need to be wary of when doing threading. So I'm gonna create a new file here. I'm going to call not locking. And what we're going to do here is we're just going to import threading. And we're going to have a small function. We're going to have a global counter that will set to 0. And we're going to have a small function called increment. And this is going to take our counter and we're just going to have a loop here. We'll say for I in range, and then start off with 10. I'm using powers of 10 here, so that on-off to write out ten and then a 100 and 1000. So I'm gonna use powers of 10 just to make this a little bit easier and you'll see why in a second. So we're going to have a loop here that currently just does ten iterations. And during each iteration it's going to implement a counter by one. Now, here I'm just calling the global variable in case you haven't seen this before is since we are modifying this value, we actually need to make sure that we call the global variable at the top here, since we're not just accessing it, but we're actually modifying it. So now what I'm gonna do is I'm gonna create four threads. I'm gonna say for I in range four, we're going to create a thread. And our target is going to be this increment function. And then we're just going to append these and then we're going to start all of them. So we're going to say t dot start. And then we're going to also call the join method to make sure that we wait until they're all done. And then at the very end, we're going to print out our counter value. So pretty straightforward program. Really what we're doing is we have a global variable here, counter initialized to sigma. And then we just have a function which is going to take those global variable that's going to incremented ten times. And we're going to run this over four threads. So each thread is going to call this function. So in total, we should have a value here of 40. Since we have four threads, each thread calls the function. Each function call will increment this value by 10, so 4 times 10, that's going to be 40. So let's go ahead and run this. All right, For it looks pretty good. So let's try to increment this number here. Instead of doing ten, Let's do it 1000. So it's going to be 10 to the 3. All right? And let's go ahead and run this. Okay, Four thousand, That's exactly what we expect. Try to increment this one more time. Instead of doing 1000 operations, we're now going to do a million. So what we expect to see here is going to be 4 million. Since each value we're just going to be incrementing a million times and we're gonna do that over four threads. So let's go ahead and run this. Oh, well that's weird. Why are we getting this and not 4 million? So what we've encountered here is something called a race condition, where basically we have a shared variable, which is what we have here. And several different entities are trying to access it at the same time. So what's happening at some point is we have our counter, which is equal to whatever value x. And then we have one thread. One thread reads it and it sees oh, counters x. And then it's going to try to increment it afterwards. But at the same time, we have another thread, say thread T2, that also sees counter is equal to x. So then what happens is when we have thread one is saying counter is equal to counter plus one. So that means an essence counter's equal to x plus 1. And then we also have thread to doing the same thing. So we have in this case is the result that we get is actually counter is equal to x plus 1, even though we've had two threads access it. And the reason that this happens is because sometimes these values basically they see the same thing and then they call their operations, but they didn't read it sequentially. So they didn't read a one after another. It didn't get executed like this. In which case we would have had the proper value here of this, but they read it at the same time or close enough before this increment operation can happen. That After this increment operation happens across the two threads. And in this example, we're still only left with x plus 1 because they saw the same value initially. And so this is kind of the problem that we're seeing here. I'm obviously we're seeing it happen much more than once. But we can see that as these operations in these threads increase. And in this case we just have to bump this number up to make sure that we increase our chances of something like this happening. We're entering this race condition where the value is being accessed by multiple objects at the same time, each is performing the operation and then their overriding the value. But sometimes the changes that we're getting are not the ones that we expect. Since two threads both increment x by one. And so we're not getting a plus two here, we're getting a plus one. Now obviously this is not always the case because we do have, you know, we're, we're almost halfway there. If I look at these numbers, yell, so we're about halfway there. But obviously this is happening quite a lot. So what can we do about this? Well, there's this cool thing called a lock, which is also why we've called our file here locking. So we're gonna go into the threading class and we're going to initialize a lock. Now, the two methods that are available in this law class that really of interest to us are going to be locked dot acquire and Locke, Locke dot release, comment those out. So these are the ones that are going to be interesting tests. And what these do is these put a, basically initiate a lock. So at this point this is a blocking operation and nothing else can happen while this lock is acquired. So this thread basically has claimed the lock. And it's saying like I'm running right now on until it releases it or actually until any other thread releases that lock, then nothing that we can no longer encounter these, these race conditions. So what happens in this case is threat. The threat is going to start and it's going to read the counter-variable. But first thread is going to start and it's going to say, Oh, I'm throat is, thread is locking. And then It's going to read the counter variable. It's going to increment it by one. And then it's kinda say releasing lock. And then we're going to have a different thread, which is also going to lock. And then it's going to see you the result up here, which is x plus 1. And then we're going to have x plus 1 plus 1, which is going to be x plus 2. And then we are again releasing the lock. Now obviously in this case, we are kind of breaking some of the parallelism that was available to us and kind of forcing sequential operations. Now obviously, we don't want to lock everything all the time because otherwise we're not going to be gaining a lot of benefit. But really again, if you think about it to the very, very beginning, we're using threading, mainly when we have blocking network IO operations. So when there's a lot of time spent between network round trips, and we don't want to just be sitting idle waiting for a response. That's when threading is really going to thrive for us. So in this case, when we're doing operations locally, we can use the locking to make sure that our operations are going to be consistent and that there that we don't encounter any race conditions. And then really again, the point of throttling is going to be more towards network IO. So obviously just be aware of the effects of that you will have when you use locking. But let's go ahead and see this in action. So what we're gonna do every time before we increment the value, because we're going to say lockdown acquire. And afterwards we're going to release this lock. So let's go ahead and run this one more time. And there we go, we get the expected result. Now. So there's a method that I, that I would recommend using here though that I think is a little bit cleaner then lock that acquire, lock and release. Because when you call acquire, you just have to remember to also call release. Otherwise you're going to start running into problems. So let's see this on a small scale. And let's run this. All right. So I'm finishing once because we're not releasing the lock. So we basically acquired a lock and on nothing else can run until this lock is released by something. So I'm going to interrupt this. So obviously, we don't want to forget this. So the easier way that we can do this within Python is using the Context Manager. So what we can say is instead, with Locke, we're going to increment the counter by one. So what this does is everything within this indent here. So knock-in. And you are locking. Everything within this indent here is going to be within the block dot acquire and then locked out release. So soon as we go out of this indent, where essentially calling here loc dot release. And at the very start of this nnd, we're going to call laptop acquire. But this way is obviously a little bit more clean because we don't have to call the actual release. We don't have to make sure that we don't forget them because that can be problematic, especially if you have programs that are going to run for a while anyway. And it takes you quite a while to figure out that you actually forgot to release the lock. So instead, we can just use the context manager and say with lock. And then everything in here is going to be between this lock that acquired at the very beginning and locked up or at least at the end. And we don't have to worry about potentially forgetting that we forgot to release the lock. And so we can see we also go back to the ten to the six which we were at four. Here, we should also get the formula that we were expecting. So this is just an important thing to keep in mind because previously we were using queues for everything which are thread-safe. But sometimes if you do other things, especially if you're accessing shared variables, then there may be problems there. So just think about how your program is interacting. And if you need to use locks and if you know you are in them. If you've written a program where potentially you may actually encounter a race condition, just keep in mind that locking is going to be an important tool to use there to make sure that you actually get the correct results that you're expecting.
16. Multiprocessing Intro: All right, so now that we've seen a good amount of stuff about threading, let's go ahead and take a look at multiprocessing. Now, first I just want to write out a simple threading script again. And then I actually already have my activity monitor open so we can monitor or CPU usage a little bit. And then we're going to see how that changes when we go to multiprocessing. So I'm going to say from a threading, I'm going to import thread. We're going to have a function here that it's just going to say check values in list. And it's going to take in some lists, we'll just call that x. And what it's gonna do in here is it's going to have a loop. We'll say for I in range. And let's do ten to the power of eight, so 100 million. And all we're gonna do is we're just going to check if I is an x. And then we're going to create some threads. So we're going to say number of threads is going to be four. And we'll have our threads list here. We will keep track of it. And I'll say for I in range number of threads, we're going to create the thread. We're going to have it. The target function be this check value in list. And the arguments that we're going to provide to it is going to be the list that we want to use. So we'll say comparison list. It's going to be equal to 1, 2, 3. Provide that as input. So what we're gonna do is we're just going to check over four different threads looping over a 100 million values to see if any of these values are within this thread. So basically just doing some CPU operations without any network components. And of course we want to append those threads to our list where we keep track of them. And I'll say for tn threads, we're going to start our threads and we'll say for 10 threads, then we're also going to join or threats. And let's also just import time here. And right before we start, we'll say start time is timed up time. And then we'll say everything took. We'll do Time.deltaTime minus start time, seconds. Okay, so what we've done here is just all basic threading program. We start and join our threads, of course, like we've covered a lot recently. And we just have the simple list which contains three values and for four different threads, we're just going to in each thread loop over. So we're going to be doing a lot of repeat, repeat operations actually, for each one of these, we're just going to check if the value is within there. And then we're just going to time this whole operation to see how long all of this takes. So I'm going to just start our main function here. And if we go to our activity monitor, then we can see there is going to be, of course, a little spike, which is what we would expect. Because obviously something's going to be working. We can actually see our Python process here are now just doing work. And it's going to be taking a little bit of time to just, you know, finish all of this. But hopefully it should finish soon. All right, So it took about 33 seconds and we can see our CPU is a cheer, not super significant, of course, kinda little bit obviously above the standard values that we have, but we can see generally how long it took. So of course, because we don't have really have any network components, a lot of this stuff is happening one after the other, um, because we're not really having any network waiting components. So now let's, instead of using threading, let's go ahead and use multiprocessing. So we're going to say from multiprocessing, we're going to import process class. Now the nice thing about the processing and the multiprocessing library is that it's actually very, very similar in terms of how it's written to the threading library. So although we have to do really is just change out these names really for readability. The only thing that we had to change was this thing here. And now we're going to be using multiprocessing instead. So what we're gonna do is we're going to, and let's update this to it's no longer number threads, it's number of processes. We're going to create four separate processes. Then we're going to start them just like we did before. We're going to wait for them to finish just like we did before. We're providing the arguments here just like we did before. And the function that we're calling has also not changed. So we're basically doing the exact same thing, but now we're using multiprocessing. So let's go ahead and run this. And I'm going to open back up here our activity monitor. And now all of a sudden you can see it spike. And then a second here, we're going to have four processes. And so that's the number of processes that we've actually created running. So what happens in this case is we're actually making use of more than one core. So I have more than one core available on my machine. And so because of that, we can make more use of it. Now obviously, as we can see here, are speedup is not exactly proportional. There are some other factors going on. So one of the things that you would have seen is that the CPU utilization for the four processes was not the same as the CPU utilization that one of the processes had. So I think, well as monitoring this, it said about 80 percent for each of the processes, whereas the threat actually almost 99. Something. So much higher utilization in that sense. Obviously we need to leave room for other stuff going on too, and our operating system, we usually take care of that, but we can see that we still got a speed improvement. And the amount of speed improvement that we can get from here depends a lot on the number of cores available on our machine. So the more cores we have, the more processes we can we can create a lot of processes, but the more processes we can execute simultaneously, because we can execute each of them on a different core. And so that's really what's happening in this case, is every single time we create and start that process, we're taking this Python interpreter and we're basically starting a semi fresh version on another process so that they're not conflicting and that there's no global interpreter lock going on. So these are no longer hindered between this global interpreter lock. And so the benefit that we get from this is the speedup. Now, like we talked about in the very, very beginning, threading is what we're going to be using it for heavily limited by network IO. Whereas multiprocessing is what we wanna do for limited by CPU utilization or CPU intensive processes, which in this case there's no network or waiting or anything. It's just CPU utilization, checking if one value is within a list. And so from this we can see, obviously, we're going to be begetting a little bit better results from our multiprocessing.
17. Multiprocessing Queues: So obviously, we can improve the way that we're making our program here by rather than redoing the same workload, something that generally would be more fitting to multiprocessing function would be to split out our workload over different segments. So let's say that instead of redoing the same thing for different times, we just want to kind of distribute our workload. So we're going to have some extra parameters here. That's going to be our, we'll just call it AI right now and, and then we're going to have a total number of processes. So what we're gonna do here is we're basically going to make use of her eye and our total number of processes to split this into equally sized buckets. So we're going to start off with basically having a lower bound, which is going to be I times 10 to the 8. And then we also want to have an upper bound which is going to be off. We don't even need the total number of processes here, which is going to be I plus one times 10 to the 8. And we can even take this value away, extracted a abstracted away into another variable. So we're gonna say max number to check two is going to be ten to the eighth. And so then we're going to start from our lower bound. We're going to go to our upper bound. So we're going to say from I in range lower to upper. And so for example, if we have 0 here, we're gonna go from 0 to basically, well, one times this. So we do need a number of processes. That's where it came in. Number of processes. And we're just going to divide this here by the number of processes and convert all of this to an integer. Since our range function here quires that we use an integer. So we're gonna go from, the first step is going to be from 0 to one times this number divided by the number of processes. So it's going to be 1 over 4 over 10 million of a 100 million, which is going to be 25 million. And then at step when I is one, we're gonna go from 25 million up to 50 million. So it's going to be 2 times 25 million. And so then we're gonna go and these steps. Alright, so the eye we already have here, I'm from our eye here, the number of processes you also already have, because that's what we're kind of defining in this range loop here. So what we're doing in this case is, well, right now we've just rewritten it to make it a more of a at least semi realistic program to understand or to see how we can just split up the workload. And if we run this again, then obviously in our activity monitoring or second, we're going to see your CPU load, load here spike. But obviously it's going to be much shorter because we're not doing the same thing four times, but instead, if we split up this one operation across four separate processes. So yeah, So generally Though, we don't just want to have this information, but we also want to use it. And so we've seen previously a lot that we can actually use the queue to pass on information. And we can do a similar thing here. We can still also use our queue and we can create an instance of the class outside. And we can also provide that as input, like we have here. And so maybe the information that we want to pass on is an each bucket, how many values are in our list and how many values are not in our list. So we'll have a variable that has number of hits, just going to be 0. And then we're just going to say if x, we're going to increment or number of hits by one. And then at the very end, we're going to put this value into the queue along with lower, upper, and a number of hits. So that way, we can do some processing. We can actually make use of the information of the results that we're processing. So we're just going to check in each bucket range how many of our variables are actually within this list. We're going to put this into a queue. And then here towards the end, we also want to just put done into the queue. And then we're going to have just a simple while loop that's going to say while. And then we're going to say v is equal to Q dot gets. If v is equal to done, we're going to break out. Otherwise we have our lower or upper number of hits and Veeam. And we're going to say between lower and upper, we have number of hits values in the list. Alright, so let's run this one more time. And again in our CPU graph, we can see the small spike here. So we can see now we've got some good information. So between, in this case, 0 to 25 million, we have three values in the list, obviously, because we have 123 in the list. And then between the other ones, we don't have much information. So the point of this lesson is that we can see we can still continue to use Q in the same way that you used it before, but being able to pass information on between and processes or different threads like we did before. But now it's going to be between different processes and using that in there. And hopefully, you can see that we can use processes to basically spread out the workload. Obviously, the example that we have here is a very simplified example. But we're able to still save a decent amount of time by instead of using their serial execution by just doing everything at the same time. Or even threading where it's basically going to be the same thing because we're so CPU-intensive, were able to make more use of the number of cores that we have available on our machine. And actually make use of more of our CPU to help us complete these computations faster. Now, this is important for one, if you're running stuff on your own machine, just wanted to finish faster and you have a lot of resources that are just sitting idle. This can obviously help you spoke of programs, but also if you're deploying applications and you have extra CPU resources available, if you're not making use of those resources, they're just gonna sit idle and you're going to pay for them, but you're not actually going to be making use of them. So obviously it depends, you know, if you have a single core on whatever you're deploying, then this multiprocessing isn't going to help much because you still only have the one core available. But if you do end up having a machine or having a deployment where you have multiple CPU cores available. You want to make sure that you can make the best use of them.
18. Multiprocessing Pool: All right, so, so far we've seen how we can use multiprocessing to help us distribute some of our work across different CPUs. And obviously the pattern that we're seeing here is very similar to the threading pattern that we'd gone into detail with previously. And you can probably already guess. But the way that we did class inheritance with threading and everything that we did when we created our threading workers, we can do the same thing with the process class. So we can inherit from the process class and we can have a start and join methods defined within as well as their run methods. And we can basically approach creating a multi-processing program around multiprocessing classes that all inhibit and inherit from this process class, just like we did when we went through all the threading stuff. And we're not going to go down that route because we already did that for threading. And obviously, there'd be a lot of repeat stuff that we'd be going down. So just be aware that what we did for threading and the approaches that we use in the syntax is almost going to be identical to multiprocessing, except rather than using the thread class, you're instead going to be using the process class here. So what I want to show you now though, is sometimes when we write a threading program, previously there was a lot of network IO and it can make sense to have these like intricately develop programs for we can use cues to pass all the state on internally. And while a different threads are responsible for different services that all kind of deal with the network load there. But sometimes when we're doing multiprocessing, we kind of have our workflow cut out and rather than like rewriting everything, we just know, oh, here's something that I want to optimize. And obviously we can do the same thing with threading if we have like a small component where we're like, oh, here's a lot of network stuff that I've basically can optimize and do you know, rather than waiting and serialized, I can just do all these things and basically half of all the threads so that the waiting happens almost simultaneously. But for processing now I just want to kind of focus on that component. And rather than spawning the individual processes like this, using queues to pass on information, I just want to simplify this thing a lot. And what I wanna do instead is I ultimately want to get to the point where I can just pass our lower and upper bounds. And basically kind of just distributed this work for us and then just get like a return value center. Take the q here out to take this Q out. So I don't want to use cues or anything. Basically, I just want to have a variable that I can have like a result variable that's going to say something like 30, 0, 0, and so that I can just continue using that. Now before we do this, let's start with something more simple. We're just going to start with the basic processing pool and we're going to have a much simpler function where we're just going to square everything. So we're gonna go very much back to the basics, back to kind of much more simple examples. And we're just going to have a simple function first that lets us square everything. Because ultimately, once we can do that, we can then do more complicated stuff in the syntax, dot differs a little bit of how we're using the pool, which is why I want to start with this very simple example first so that we can just see how this works and how we can get that result. Then we can go about updating or syntax so that we can do the still not super complicated, but obviously passing more variables and stuff thing that we had before. So instead of using the process, we're now going to use the pool class. Now with the pool class, we can initialize it using the context. So with pool as pool or as P or whatever you want pulls pretty nice because, you know, obviously is pretty specific. We can be more specific and saying like multiprocessing. Pretty sure there's a typo in here somewhere. Processing pool. Or if we don't want to write everything out like that, we can just write like and people to be a little bit more specific. But basically we're just going to initialize this pool class and then use it within the context. So now we have available to us a pool of processes that we can use. So basically, and we'll get into this in a second. But we can actually define here the number of processes that we want to have available in our pool. For example, who put the number two? And here, that means that we have the potential, but we don't necessarily need to use it of two processes that are available to us and we can continue performing operations and basically taking those processes. But once all of those processes are removed from the pool, they're no longer available to us and we basically have to wait until the process gets returned to the pool so that the next operation can then use it. So again with us, we're basically, they're just going to create like a small, you can imagine it like a small little bucket. And in that bucket we have a number of items available. And we can take out of that bucket. And once that bucket is empty, we have to wait until, you know, something, someone puts the item that they took out back in. And then once something is in the bucket again, we can take it back out. So right now we have a pool of size two, which means we have two processes available to us that we can use, which essentially means we can use to course, since each process is going to be spawning a new core. So what we're going to do here is we're going to use a new method. We're going to use our multiprocessing cool. And on it we're going to use this map method. And here we're going to put the function that we want to use. So that's going to be the square. And then we're also going to provide our input variables here. And we're going to save all of this in a result variable. And we can take out the time component here, since we're no longer using that. And let's just go ahead and print out our result. This can't just run our main function. Okay? So basically, we got exactly what we expected, which is we took our input list here and we got the square of each value. But now the interesting thing that we have with us is that we're able to use this pool and we're able to apply this map method on there. And we're just going to, it's basically going to apply this to every single component. So it's going to be iterating through those one by one. But we're going to get the same structure back, but with our values internally updated, even though we're just operating on one variable, we can return this and basically get our new result, which is going to be our original list here on our comparison lists at this point. But it's going to be squared. So every single value here is going to be squared. Now, this value, obviously, it may depend on the machine that you're running out. Sometimes you can only use one CPU because you only have one available. Sometimes you can four or six or eight or 16 or however many you have available on your machine. So how can we make this little bit better so that it can actually adapt to the machine that it's running on. Well, there's this cool function that we can import called a CPU count. And the CPU count, as you may have guessed, it, is just going to give us the number of CPUs that we have available. So what you can do is you can say number of CPUs available. It's just going to be, and we're just going to invoke our CPU count. So for just print this out. And you can see my machine it's going to shoot for since I have four carbons available. But this might differ depending on the machine that you're running on. But maybe the number of CPUs that you want to use is going to be one less than the number of CPUs available. Because if you're using all of your cores, then there is, you know, everything is going to be used. And so there may not actually be enough CPUs for other things to go on. Other operating system processes, for example. So generally what you wanna do is you can say the number of CPUs to use is going to be our CPU count minus one. But in the case where we only actually have one core, we want to say, all right, well it's going to be the minimum. So I'm just going to be thought the maximum between one and the number of CPUs available minus1. And then we can use this here. So in this case, for example, I can print out number of CPUs used. And now we're going to show three, since it's going to be the maximum value between 14 minus one, which is three. So the maximum value between these two is going to be three. But in the case where we only have one CPU, so we're, this value is one. We're instead going to be using the default one. Since obviously we still wanted to do something. So in this way we can say, all right, so depending on the machine that we're running on, we can see the number of CPUs available. And then we can have our pooled be of that size, that many number of processes are available for us to pull from. And then we can map on these input objects into the function that we want to use for this and then get those just really nice return value that we can just continue on using. So then rather than having to do this cereal. So we're iterating and we're saying k for I in range of however long or list is. And then we're going to square and we're going to put that into a new list. We're going to, we're going to update our value, whichever one, and then continue using like that. We can just use this multiprocessing pool and basically map these values into our function and just get the updated return values so that we can continue using this. Because these, these dysfunction can happen in parallel because obviously it doesn't depend on anything else around it. And so with this, we're able to scale, use this to very quickly speed up certain parts of our code that don't necessarily need a full refactoring, like the way that we saw with threading. But you know, just a small line here could actually help us distribute a lot of the work. And then we can just continue on in our regular manner.
19. Multiprocessing Pool Map Multiple Arguments: All right, So now you will probably be wondering, okay, well how can I pass multiple arguments? How can I do something like this? And I want to do x, but I want to do x to the power of y like this. And I want to say, you know, our new power value is going to be three own. You may have thought that. I'm just going to put that extra argument in here and that's going to take care of it. Unfortunately not. And I'll show you what's going to happen, but I have to undo this. Otherwise we're going to run into a different issue here. But if we run this and we look at the output, we're no longer now passing the individual values within the list here, but we're passing the list itself. So we're iterating over every element in here and passing that as input. So if we put y here, we're not going to get the results that we want regardless of if we have it. And see here, are we still call, we're not printing it out because our function is actually already failing because we're missing an additional argument for why. And even if we do something like this, neither of these are going to work for us. So how can we provide this extra parameter here? Well, we're going to use a Python package called func tools, which comes already pre-installed with Python. And from here we're going to import a method called partial. Now partial is going to allow us to create partially defined functions. So we can partially call them, but they won't be fully called. But we can already provide us some of the parameters beforehand and then we can lose it later on. Now to do this, because the way that we're going to be providing parameters is going to be in a specific order. And so we wanna make sure that we leave the parameter that's going to be basically the variable that's going to change until the very end. So I'm going to create a partial function using the partial method. I'm going to have this square here. And the value that I want for y is going to be our power here. Now I can use this partial function in the map method and I can take all of this stuff away. Now our map is going to just iterate over this one-by-one, and it's going to call this function here. This function is already partially defined by calling taking the square root method and providing this first parameter, power like this. So if we run this now, we're going to get everything cubed, since that's what we have here. Obviously, we can change this everything to the power of four, like we have here. And we can also extend this for extra parameters like addition, component. And let's say we want to have everything to the power of three. And then we want to have some additional component, and that's going to be 2. We're going to provide that here. So we're just going to add that onto the result at the end. So if I run this, then we're going to get everything cubed and then we're going to have a plus two that comes from this typo here. So we can see if we want to use the map method and we want to have extra variables in here. We can't just unfortunately turn this into a tuple or a list and then think that, Oh yeah, everything is going to loop over that. Unfortunately, that's not how the map method works with multiprocessing pool. But if we have fixed variables that we already have defined beforehand that we just want to provide here. And our last component here is going to be the thing that's going to change. Then we can actually use this partial method from the funk tools. And here we can partially define our function. And then we can fill it in with the different components that we have here to kind of call the function and it's complete way and really get the results that we want while already having some of the arguments passed.
20. Multiprocessing Multiple Varying Arguments: All right, so previously we've seen that we could provide additional arguments by using this partial method in our func tools, a library here that we've imported. And we were able to partially define functions or partially provide the components of functions. And then we can use our map method here to kind of provide that remaining component. But what if we don't want or don't have fixed components, but instead we want to have some different entity, for example, some powerless. And here what we wanna do is we want to have the first element here, for example, the power for the second one to the power 5 and the third one to the power of six. So we already know what we want to provide. How can we do this in a way that's not like this, but ideally just being able to provide that directly and perform those calculations. So I'm going to do some quick clean up here, remove this stuff, and also do just some reorganization of our function parameters. Just to have it a little bit more should be to lie. A little bit easier to see. And we also don't need this partial function definition here. So what we wanna do is, or what we can do is we can use a method called star map. Now in our star map, we can provide an input here that has the following, that looks like this. And then inside we can provide additional elements that we can iterate over if we want to. And all of these elements, we can then pass as individual parameters to our function. So for example, in our case, what we wanted to do is while a first element that we want to pass would be 1 and 4, since that's going to be our x and y. The second one is going to be 25, and then the third one here is going to be 36, since we want to provide these two together and then provide these as our x and our y parameters. And so if we provide this as input, then basically what we're going to get is the execution like this. And obviously we're using who the multiprocessing pool still. So we're going to be able to use different colors to help us do this. So really what we need to do right now is we need to join these two lists together in a way so that we can get this format here. And then we can provide that as an input. So we're going to have our prepared list, which is just going to be empty list that we're going to start out with. So we're just going to iterate over the length of this list. And so our elements that we want to add here is just going to be, we want to have here, our first element is always going to come from our comparison list. It's going to be the first element here. And the second element that we want, it's going to come from our power list, which we can put 0. We want the ith component. And so we can print out the last time we're going to insert some lists to use as input. Just going to be this. And then we can run this. And then we'll actually be able to provide each of the elements inside as one of the parameters in here. So let's go ahead and run this loop. I have a typo here. And I also forgot to update. This lifts in here. So let's try that one more time. There we go. So this is the list that we're using as input. And we can see that our x and our y. So these are our function parameters that we're passing. So this is going to be your x and our y, our X and our Y, or X and our Y. So we're doing one to the power four, which is 12 to the power of five, which should be 32, and 3 to the power of six, which should be 729. So we can see that using a star map, we're actually able to provide multiple input parameters, especially if, you know these are going to be changing. It's just important to make sure that we have this sort of structure so that we can pass these in. And then the elements or the values that we're passing into our function are going to be in the order that we have the parameters in our function here. And so through this way, we're actually able to pass in multiple parameters. So the bat, we don't need to use these partial functional, function definitions if more than one of our variables is varying.
21. Multiprocessing Checking Elements In List In Certain Ranges: All right, So let's finally come back to our initial slightly more CPU intensive function that we were using, which is just going to be checking if a value or how many values in our comparison list are actually within a specific range. So can say, call this for example, check number of values in range. And then we're going to have here is going to be our comparison list. And here we're going to have our lower and upper bounds. And then we want to track the number hits. And then we want to say for I in range from lower to our upper bound. If eyes and our list here. We just want to increment this by one. So yeah, we're missing an I here and at the end we want to just return the number of hits. So again, the point of this function is not really to do anything more except to be something some way to, for us to kind of simulate just a little bit more of a computationally intensive task because it is going to take a little bit of time to calculate this. So the different lower and upper ranges that we're going to have. And if we want, we can just specify these range rule. Let's call these lower and upper bounds. And of course we can generate these two with like a little for-loop. But actually if we're just splitting into four, basically you can see what we're doing is we're gonna go from 0 to 25 million. So it's going to be ten to the power of six. And then we're going to go from 25 million to 50 million times ten to the power of six. And then I'm just going to copy this. And we're gonna go from 50 million to 75 million. And finally, we wanna go from 75 million to 100 million. So these are going to be our lower and upper bounds. Now of course, we need to get it into this sort of format. But instead what we want in this case is we basically want our comparison lists for the first input value. And then we want our lower and our upper here. And then we wanted to do these or the other ones too. So we still need to do like a little bit of joining like we did here. But since we don't have many values here, I'm just be easier to kind of put these four values and by hand. But obviously we can write a little for loop to populate this list for us too, especially if you wanted to like 50 or a 100 or even like 10 or 20 different balance. Yeah, anyway, so we again just need to prepare our list. So we're going to use our comparison list here. But instead of iterating over this length, we're basically going to add this as one of the elements here. And so the number of different input values that we're going to have is actually going to be the lists, the length of this list. So we want to iterate over the length of this list. Here we want to add the full comparison list. And we also want to add the individual components. So it's going to be our first and our second value in here. And then we're going to provide this as input to do our calculations. So again, what we're doing is we're taking this list and we're basically having our new prepared list, which is going to look like this and like this. So that's just what we're doing here. And then this component here is going to be our first input here, which is going to be a list. And then the other ones are going to be our lower and upper bounds. So I'm going to undo this obviously because that's what this little loop here as four. And then let's go ahead and run this. And so we can see these are going to be our input parameters here. Each one of these, like we have in the function arguments, this is going to be our comparison list, our lower and upper bounds. And so we can see obviously in the first range we have three elements that are contained side because their numbers are 123. And then on the other ones were all going to have 0. So this is kind of going full circle back to what we started with four, sorry, example for multiprocessing to kind of help us do this. And now we can use our startup method in this case to provide these multiple input arguments like we have here. Now, one thing that we can do actually to simplify the syntax a little bit, is rather than listing out the lists like this. Instead what we can do is just use this little asterix. And what that's gonna do is it's going to unpack all of the values in here. So you can see if I rerun this, we're going to get the exact same results. The only difference is rather than me having to reference each of these individual components like we did here. This star is basically going to unpack it and it's going to take all of these and provide them as additional input values one at a time, following exactly in the order that we have them here. So this is just a little shorthand that I can use rather than using this because I want to get all the elements. But if I have like, you know, 10 or 20 or 15, like it's gonna get longer and it's going to be tedious. And so really what I wanna do is I just want to have all of these values here, but I don't want them in a tuple format like this. I just want all of the individual elements one after the other. And the way that I can do that is by using the star, which is going to provide each of these elements inside as individual values here. So this is just a shorthand to kind of keep that cleaner and also make it a little bit faster to write.
22. Intro to Writing Asynchronous Programs: Alright, so let's now start taking a look at asynchronous programs or how we can write an asynchronous program. And this is going to be our third option that we have for concurrency. And you'll notice a point. It's probably going to look similar to threading, although it again different because with asynchronous programs, we only have one thread that is running, and we also only have one process. So let's go ahead and just get right into it and see what happens. So the first thing that we're gonna do is we're going to import async IO, which is just going to help us with all of this. Now I'm on Python version 3.7 and I recommend that you also are working on at least Python version 3.7. But also obviously of course, if you're on a later version of Python, it will be better just because up to this point, the asynchronous library has been a little bit not experimental, but it's been changing a lot. And at this point there seems to be of a set state with how it's working. So I would just generally recommend, make sure you're using a later version of Python if you want. Or at least Python 3.7, which is again what I'm on right now. So we can import async IO and which should be a default Python package. And then we can start defining our first asynchronous function. So we can do this by just having a regular function. For example, if we want to have some sort of sleep function where all that we do here is sleep for, let's say five seconds. If instead we wanna do this asynchronously, then we would put in front of here the keyword async. And then here instead of using the timed up sleep, we would actually use a sync IOs sleep method. And we'll talk a little bit more about the reasoning in just a second. But basically what we get from this, this is how we're going to define an asynchronous function. Now we need to do one more thing here, which is we actually need to await this function or this co-routine is actually what it's called. And really what's happening here. And the whole process of using asynchronous execution and Python is, and we'll see how to do this in a second. But we start something called an event loop. And then we can add, start adding tasks or scheduling tasks with this event loop. And then inside that event loop will have objects called or will have things called futures. And these futures basically represent something that has not yet come back, but something is going to come from this. So this may sound a little bit confusing, but generally what happens if we schedule several asynchronous functions? Each of them will provide a future that at some point and they will finish or they will reach some sort of check point. And from that point on, we can then do the next thing with that co-routine. The whole thing that we've defined here, this is generally referred to as a co-routine. And it is the things that we will add and schedule in our event loops. Now, as we continue coding, hopefully this will become a little bit more intuitive and we won't get so lost in all the syntax of everything. But obviously, because we aren't going through asynchronous stuff, it's important to just mentioned and talked through some of these keywords and the beginning, but also as we're continuing through. So if we wanted to run this, if we just called and actually let's rename this to async sleep. If we just called a sink sleep. Sleep like this. Unfortunately, it's not actually going to run. We're going to get an error here, which is basically relating to that. Well, for one it says the Coubertin was never awaited, which is what we have here. But even if we add this await keyword, it's still not going to function because we don't have an event loop running right now which is consuming all of this. And aside from that, we actually can't use the await keyword outside of a function that's not async def. So how can we get this thing to run? How can we get these bare-bones going? So we're going to create a new function, which is going to be our main function, which we're also going to define with async def. And what we're going to do here is we're going to await the response of our async sleep. Now, the await keywords here because we have this event loop and we can have several different co-routine scheduled in there to run. Basically what this awaits keyword does is it gives control back to our event loop to continue with other things. So basically it tells the program that at this point we're, we're waiting for a response and until we have that response or until we have something here that we need to continue on. There's really no point in giving me all the resources because there's nothing I can do until I get this. So that's generally the point of a weight. But even if we have this, we still can't call main Hulk. So how can you get the main to run? So what we can do is we can use this async IO dot run method. And here we're going to put the main co-routine that we want to run. And this will basically start the event loop for us. And it will start the consumption of the event loop and basically take care of the whole process for us. There are other methods. For example, you can get a running event loop and you can run until completion or these other things that are more specific to the event loop. But really just using async IO dot Ron will take care of all this management for you and you don't really need to worry about any of the lower level things. So this here is going to be our entry point. Now an important thing to note is that we can only have one running event loop per thread. And by default we're only going to be running this on one core and one single thread. So basically what that means is we're only going to have one async IO dot rung call, which is going to be the entry point to our program. And of course, we can wrap this in the if name equals main. Um, so for running the main file here, then we're going to start with can go into this event loop. All right, so let's go ahead and try this now. So we're running the program and all that we're doing is asynchronously sleeping for five seconds. But obviously not much else is going on at this point because we're only doing one thing. And even if we're giving back control to the event loop, which is what we're doing here. There is nothing else running, there's nothing else going on, so there's nothing else to continue working during this time. So let's add a second function now, which is going to be a asynchronous function to print out hello. And all that this is going to do is it's going to print out like the name suggests. Hello. All right, so what happens now? If we scheduled as here? And then let's put some extra printing out in here just to see where we are in our program. So you're going to say before sleep. And then we can put another print after. Here. We're just going to be after sleep. All right, so what's going on? We have, well, we hover entry point. So we're starting basically the event loop and we're starting the x or the scheduling of his main co-routine. Then we go into this main route co-routine here. And now we go to our first line which is going to be awaiting the response of this async sleep co-routine that we have in here. Once we go in here, we print out before sleep, and then we have this await call. So what happens with the await call is the way call is not blocking. It allows execution of other co routines. However, it does stop the execution of going further down in this function. So, and if we run this, you'll notice we'd go into this function and then we call before sleep. And then we sleep for five seconds. And then we go into the after sleep. And only then do we go on to the second statement here. Unfortunately with this, we can't skip over and go to the next thing. This await is basically a breakpoint in our co-routine, in our program execution as specifically in this function are in this co-routine. And this is kinda where we momentarily halt for execution. And it basically says, or the function or the co-routine at this point says, I don't need control right now. I'm waiting for a response. Feel free to continue you doing on other things. And then our event loop can go on and do other things. And whenever this function is ready, it can give a signal saying, Hey, I'm ready to continue. And then control can be given back to it. But because we only have one main co-routine running, and inside of these, we call tumor Cole routines. These are all still being executed sequentially and there's no there's no multiple co routines being scheduled at the same time. So that when we reach this await call here, we can actually continue with execution of another co-routine. And we'll look at how to do that in the next section. There is one more thing that I wanted to show you beforehand though, which is how can we return values from this? So it's going to be just like we have regular functions. So let's rename this from print hello to return her alone. And this is going to instead return hello. And so to get the result out, just like with a regular function, we call our return hello. But now because it's an asynchronous function, we have to await it. And once this function is done, then we get the return value in here. And then we can print it out this. So it's going to do the exact same thing. It's just here. We're not printing it out internally, but instead of returning a value from the function like this, and you'll notice that really the main difference to a regular function is the async and await. So we could do the same thing here, like this. And this would just be a regular function call. And we can get the value returned from here like we normally would. But this we can't do, we can only wait co routines. So this is going to cause an error for us, as we'll see in a second. And something else that we also can't do is we can't. If we remove this and remove this a weight, we can use an await keyword inside of a non and not inside of a co-routine. So we have to have this async definition in front to be able to wait internally. So we can't use await without having an async def outside of it. And if we have an async async def definition, we have to await the response because otherwise, basically you're kind of creating a co-routine object here, but it's never been scheduled, it's never been away to it, so it's never actually been executed, never been added to the event loop. So just be aware of these things that if you're using async, you always have to await this response. And the await call here is not blocking. However, it will stop further execution of the function and give control back to the event loop so that other teams can run in the meanwhile. So with that, I know it's a little bit confusing. The first time we're talking through all of this and there are some weird terms popping up. But as we'll go through the next couple of lectures, hopefully you'll get a more clear understanding of how all of this works. And probably also with time, this will become a little bit more comfortable for you.
23. Asynchronous Tasks: So the next thing we're going to look at now is how we can create tasks which are, well, we'll see. So the first thing that we noticed last time is although we're using asynchronous functions here, and we can also switch this back to async def and maybe just return this to a print hello statement. Like this. We just print out hello and tunneling. Even though we're using this asynchronous syntax, the execution that we're getting is still essentially synchronous because there's nothing else that, you know, the control can go over to. So although we're using this asynchronous syntax and our control is being given back to the event loop. There's nothing else up schedule that can go on. And so there's nothing else that can be worked on while we're waiting a response. So similarly, if we had another sleep call here, well, notice that we still, we still get this synchronous execution of recycling going to start sleeping, will sleep for five seconds. And then we're gonna go into the next point. And here we are for sleep. We're going to await the response here. And then although execution was given back, There's nothing else that can continue. And at this point, although execution is given back to the event loop, again, there's nothing that can be continued because the only thing that can't continuous further execution of this function block, because we're still awaiting this response here. And there's nothing else that is scheduled like that can be worked on. So generally what we can have though, is we can actually start creating tasks. So we can have several tasks in our event loop that can be worked on while things are being awaited. So let's see an example of this. Let's create a task. And we're going to do this by going into a sink panel. And we're going to call here create task. And I'm going to take this second async sleep call. And I'm going to add the task and here. And then I'm going to await the task later. So what happens when we create a task is we basically schedule this co-routine to be worked on whenever it is convenient. But until we await it, there's no stopping in the flow of our co-routine to stop at that point. So what we get at this point is remember before we had our two sleeps that are happening one after the other. In this case, we've added another task. And so if we actually rerun this, we'll see now we have two before sleeps that are going on. And then, oops, I have a syntax error here. I have to take the function call away because it's an object that we're waiting, not a function or a co-working method, because we've already kind of called the function. So we just have a coach, an object that's going to return to us. But anyway, we're still awaiting this. But we'll notice here that we are now sleeping once and we are now sleeping again. And then basically we're starting this twice and then we're in the timeout period, and then we're after our sleep. So if we import the time module, whereas before we were executing, it's still sequentially. So we can have a start time here, which is going to be our Time.deltaTime. We can print here. Total time is going to be time, time minus start. So we'll notice that our execution now is not going to be around the 10 second mark, which is what we had when we were sleeping twice in a row, but rather are on the 5 second mark. So we've created this extra task. Then when we go into the await call, we now have another co-routine that we can go over to where another task that can also be executed. And so in this case, we're actually having two things being run at the same time where I'm at, where control can be given back to the event loop. And things are scheduled to run asynchronously, but essentially simultaneously. So if we add in just some extra number here and that we can print out to understand, you know, where we are in each one. We print out n here, and we print out n here, and then here we want to add one. This is going to be the task that we create and schedule. And then here we can add two. So if we run this out, we can see that, well, first, we're rescheduling this task, but then we're awaiting this call here, which then comes to this. And this is kind of where our execution here stops because we're just awaiting the response here. So we've given control back to the event loop. And now we have another task that's been scheduled. And now as a convenient time to work on it because nothing else is being worked on. And so this task is now basically as we can see here, starting to be executed once we reach this await point here. And then we finish, or then our first Coubertin here finishes. We can see execution comes back to that co-routine after we go into this step here. And then we await the result of this task here. And so now we're basically no longer executing this co-routine block until we get the response here, which is why we have this call here. And then afterwards we rate the response of this, which is why we have the hello here. If we flip the order of execution here, you'll notice that the order that we have printing out there are also change will have the hello second last, and then we'll have the after sleep after that because of the order of the await statements here. So we can see now we're getting into this more concurrent nature because now we actually have multiple different co routines that are running at the same time. In this case, it's just too. But as you know, one co-routine is kind of in the place of just waiting for a result and it gives back control to the event loop. It can actually go on and you can start working on other tasks until they are it reaches this await point. And then once another co-routine has finished, execution or control of the execution will be given back to that co-routine.
24. Async Gather Method: All right, so we've now seen our first example of how we kind of have this concurrent execution by having tasks scheduled. And while one co-routine is basically awaiting a response, the execution of another can continue, which is what we had when we scheduled a task here. And then execution of a kind of happened when it was convenient, when nothing else was being worked on. But we still have this problem here where for example, we have this print hello statement. And this one is still being awaited after this, only after this sleep here finishes. And know ideally, we wouldn't want that ideally these two could actually run concurrently to. So for example, let's say we had two API calls here that just did not at all depend on each other. So this here hits API one and this here hits API to We either way, even if we had it like, like this, or if we had it the way before, it's still not working the way that we wanted to because we're basically awaiting the response of one API call. And then we're going to then, then we'll only are we initiating the call of the second one, which is not what we want. We want them basically both the start almost immediately, one after the other, and then we want that timeout period to overlap so that all of that network waiting time is basically not happening sequentially, but it's almost happening at the same time. So either way, regardless of how we flip this order, we're not really going to get to that. So how can we do something like that? Well, we can use is something else from async IO, which is called gather. So we can go into async IO and here we can use this and gathering method. And here we can now put a series of coroutines and they will basically all be scheduled and executed concurrently. So we don't have this one after the other, but they're all kind of scheduled at the same time. And whichever one, you know, whichever finishes and what order. Basically, we can kind of skip around through that. There is still some order that we have in here, which is the order that we put them in. But we're now able to run all of this concurrently and the way that we wanted. So let's see this in action. How would we do this? Well, we can have, for example, are first co-routine here, which is going to be our async one. We're going to have our second co-routine, which is computer Eysenck's sleep 2. And then we're going to have our third one, which is going to be printing out of hello. We can now remove the scheduling of this task. And if we run this now, we'll notice that we're going to go into the before sleep one, which is here. Then we're gonna go into the before sleep too, which is here. Then we have our print low because both of these co routines are, they're not blocking but they're essentially they're awaiting a response. And so they can give back control to our event loop. So we go on here, we execute this, we give that control to the event loop. We execute, we go and here we execute, we get to here, get back control. We go in, here are co-routine finishes because there's no await ever. And then we can go back to the event loop and the event loop weight. And then it will see because this one was scheduled first and they sleep for the same time. We come to here first. And so we can continue with execution of this until, well, our co-routine finishes here. And then control gives, given back to the event loop and then we see what else is going on. So we have this one here, which then finishes. So let's update this a little bit and also just sleep for that amount of time. We'll see that the scheduling order is kind of what we have it here, but the finishing order may not actually be like that. So in this case, we're going to expect this one to be scheduled first. Then we'll get back control. Once we hit this point, then we can start running. This one here. Would give back controlling. We hit this point, we can run this one. We finished the whole co-routine, and now we go back to our event loop. And at this point, this one will actually finish first beforehand because it's sleeping for a slower, for a less period of time. So we can get back to this one. I finish this co-routine because there's nothing else going on after the sleep, after this print statement. And then afterwards we have the one co-routine left that were waiting, which is going to be this one here. So if we run this, we can see that the order in which they finish dots, basically we can then continue on with the execution of those. And so with this now we have a much better concurrent nature because we're able to, rather than having all of these statements p sequential, instead we have them all kind of running concurrently and we're starting them. And whenever we hit this await 0.1 co-routine, we can basically give back control and another co-routine can continue running. And if we've given by Control T, little t, Then loop and everything is kind of running. As soon as one thing finishes, we can go there and we can continue with the execution of that Coubertin getting this much nicer concurrent nature that we're actually after. So at this point you may be wondering, well, what's the difference between async and threading? So generally, and if you remember what we talked about in the very beginning, our async here, this is running on one core and also any single thread and everything is done through these scheduled event loops where we have a way to bulls and basically futures that represent that a response is going to come. But it's not there yet. Whereas with threading, we're creating actually multiple different threads and there's also overhead associated with that. Whereas here we're just in a single thread and we actually have less overhead. Now, generally, when would you use either of these two? Well, you would or I would generally use threading when we're building worker like programs like we've done in our threading module, where we had a lot of different workers performing different objectives. Whereas if you have just individual tasks, like we have more in these cases, then you can go to a synchronous. Obviously, if you feel much more comfortable with one or the other, you will probably tend to go towards this things and you can probably see achieve similar concurrency. Although just remember that with Async you're just doing a single core, single thread, whereas with threading you're doing single core multiple threads. So there is that difference there and there's also overhead associated with threading. But obviously, you know, it also comes down to comfort a little bit. The other thing is that in web applications, a lot of times you're going to be using asynchronous. And so it's very usual to have asynchronous and point definitions where you can then hidden endpoint and that gets hotter to the event loop so that you know your different endpoints and your server in general is not blocking execution because it's trying to execute things in parallel. So asynchronous is to kind of sum it up. Generally, from my perspective, used a lot for tasks as well as web development. Whereas threading, I would use much more for building worker like programs like we've done back in our larger turning module.
25. Using Async Timeouts: Now, generally when we're dealing with network communication or some sorts of Io, there may be problems elsewhere that can cause some sort of interruption or basically some things can take a lot longer. For example, if a server is overloaded or is currently unresponsive or very slow, if we're trying to ping that server and it takes us forever to get a response. Then even if we're using some sort of concurrent execution like we have here, there can be one straggler that basically takes everything and just slows the whole thing down because we're waiting for that one thing to finish. So there's a nice method that we can use for this. Have some sort of limits on how long we want to wait for responses. And we generally know how faster endpoints respond, as well as the range of variability that we have. And we know that with very high certainty, we're able to get responses and less than this time period. And the time period is a good time here to use. We can also use something called the timeout. So we can do, for example, is we can say async IO dot wait for. And this will basically mean that we're going to wait for this many seconds. And if our co-routine does not finish at that point, then we're going to raise a time-out exception for it. So let's say we want to give it five seconds. And we're also going to wrap this into a try and except statement where we're going to specifically catch a timeout Aram. And here we're going to print out encountered time out error. So in this case, if we run this, everything is going to be fine because it takes two seconds and no time limit, it's five seconds. But if we switch this to, let's say thirty-seconds, like something is running really slow or there's just a time-out window of 30 seconds and it's not responsive. And we're kinda keeping this connection there and we're waiting for a response, but it's never going to respond. And the connection will just be interrupted after, say, 30 seconds. If we put in here a wait for five seconds, then after we reach that 5 second mark, then we're basically just going to kill this. And we're going to instead reach an error, which we can see here in a second. So we can see we've encountered a timeout error. And because we have this timeout component here, now, be aware that because we are kind of encountering this exception case, we can see we're going into the before sleeps here. And we've actually entered this exception case. So the other co routines that we also have, as we can see here, that actually not have the opportunity to finish because we've encountered this exception with the weight for method. So obviously this can be a really nice thing to use to make sure the parts of your execution or not blocked for a very long time because of outside reasons. But obviously you also want to be careful with this since it also can interrupt other things that are running if they if they're being awaited for at the same time, like in this case that we have here.
26. Creating Asynchronous For Loops: Now another case I wanted to go over was asynchronous for loops since he may sometimes see this. But I want to go over it and basically look at the behavior to make sure that we understand what exactly an asynchronous FOR loop is and what it does and does not do. So let's go ahead and change up our async sleep here instead to be a generator, which is going to just yield values, and then afterwards it's going to sleep. So we're going to say for I in range, and we're going to yield the result I. And then we're going to sleep for i seconds. And let's say for I between one and n. And we'll say that n is equal to the maximum value between two end. And just so that we do a little something, right? And so now what we can do is we can use an async for loop and we'll say for k in a sink sleep 5, and then we can print out k. All right, so let's go ahead and run this and let's also talk through what is happening. So we've had our five-year as input just because, well, that's kinda overprinting out here still. And then we are basically selecting her and making sure it's at least two. And we're going into our for-loop here, and we're starting from one up to n. So it's going to be from one up to but not including five. So from one to four, like we see here. And then we yield this response, and then we await this sleep that we have here. Now we can see going through here, or total execution time is about ten seconds. So if we have one plus two is three, plus three is six, plus four is ten. So we're still executing this sequentially. We're not doing this concurrently, otherwise, our execution time will be four seconds, since thus the longest sleep that we have here, or roughly about four seconds. So the important thing about this async for is that basically we have another stage where we're able to give back control to the event loop. So rather than having a for-loop where basically this iteration kind of all happens, the async forward gives us another stage where during this iteration, if we're waiting for a response, we can again give back control to the event loop and then come back to our coroutine once we're ready to continue with our loop. So that's just an important thing to note that if you see an async floor, if you think about writing an async for, it doesn't work. It doesn't run concurrently and run sequentially, but it basically adds another stage where we can give back control to the event loop so that other things can continue on with their execution.
27. Using Asynchronous Libraries: So the next thing I wanted to take a look at is using other libraries with asynchronous processing. So some library support async and others do not. And I just want to go ahead and look through these and basically show you a quick comparison of, you know, how that would look like. So we're going to use, and I've already have like a little boilerplate here basically with some URLs that I've picked up. What we're gonna do is run a very simple program, was just pings each of these URLs and then just gets the texts response from it. It doesn't really even do anything with it, just kind of getting it. So we're going to write this in two ways. One of them is going to be using the synchronous version was just going to use the requests library. So you don't have that. You can go ahead and write pip install requests like this. And then we're also going to use the IO HTTP, which is going to be HTTP like this pip install that. And one of these, you can probably guess it's going to be the HTTP is going to allow us to do it asynchronously while the other one is asynchronous library. So we're going to start off with the synchronous one. We're going to import requests. And what we're gonna do is we're just going to run a little for-loop. We're going to say for URL. In URLs, we're going to call requests dot get URL. And then we can time our process here and we can say our, our start times Time.deltaTime and our end time is going to be Time.deltaTime. And then we can say crests took and then we have our end time minus start. So we can just run this and this is just going to be the standard typical synchronous way. And we can see this is going to take a couple of seconds, probably, say 3.5 seconds right now. So let's go ahead and write the asynchronous version of it. And let's also look at the individual components and baldness. So we're going to do are facing death and we're going to say get URL response. We're going to pass in here a URL. And now we're going to use our AIO HTTP library that we just imported or that we just installed. So we're going to import the IO HTTP like this. And we're going to say with AIO http dot client session. As session, we're going to make this an async width. And I'll talk about this in a second. And we're going to do asynch with session, don't get URL as response. And then finally, we're going to return a weight response dot txt. So we'll go over each of these lines in the second. And here of course, we also want to make sure we get the texts response. So we can say sync text response going to be this. And here we can just appended onto our list. And then we can do something similar. But here we have our tasks. And for each of these, you want to async o, we wanted to create a new task, which is going to call our GET URL response. We're going to pass. That's not what I want. Move at all. Where are we? Here? We're going to call her, get through our response. We're going to provide our URL as input here. And then we're going to await Async IO dot gather. And we're going to click in all of our tasks and basically unpack this whole list to provide them as individual arguments. And then our async text response is going to be the output that we get from this. And then here we can say async requests took this. Now let's just go ahead and run this. And then let's go ahead and come back to this method here. We're back to this co-routine rather to understand what exactly is going on. So let's just run this so that we can actually compare the performance. So we have our synchronous process going on. Oh, I forgot to initialize our object here. So let's go ahead and run that one more time. There we go. So with our synchronous process, we took a little over three seconds, and with our async process, we took about 1 second. So let's go ahead and take a look at this here. So what's going on as we have several different pacing with, so if you've programmed in Python for a little bit, you're probably familiar with the, with statement, which is basically we're creating something as a context or reasoning or Context Manager, which basically handles the creation as well as the tear-down of whatever we have and we don't have to worry about closing something. We were able to use it within this context of this with statement here. Now the async with that we see here twice. Each of these times we're basically giving the opportunity for the event loop. Two, we're basically giving control back to the event loop and we're giving an opportunity to start or continue with other tasks. So basically here we have three instances, one with the weight and the other two with our asynchronous contexts managers here, where we're giving control back to the event loop so that it can potentially continue on with something else. So there's a lot of handing back control to the event loop that's going on here. That really makes it really nice and like asynchronous, and especially with these different parts. So one of them would actually be getting a session from our session pool. Then the next would be actually performing the requests, which would involve a lot of the network waiting. And then the other component would actually getting out the full HTML response, because we may not always have that. And so we have to gather all of that data and basically parse it into a variable. And there's also some waiting time involved here. So for each of these components where we actually may be having some sort of waiting component. We basically give back control to the event loop until we are ready to continue with this. So there's a lot of asynchronous or there's a lot of giving back control to the event loop. These different calls that we're making here, basically we're doing it three times. Whereas for our requests, obviously, the requests library itself. Where are we? Here we go. So this itself is just kinda of a blocking call. And there's no, there's no asynchronous getting of a session from a pool. We're also currently not actually using sessions in this case. We're not asynchronously, you know, getting the request as we would be in this case. And we're also not asynchronously kind of getting that HTML data and having it gather in the background and then continuing on and we're ready for it. Now of course, we can optimize this a little bit and we can put this self into like an async def, get URLs with requests, and we'll put the URL here. And then here we can do here, we can just return this. But as you'll probably also notice in a second, if we have our tasks here and we schedule these tasks. So we'll go into a sink, io dot create tasks and we'll test out of pens. And if we want to create this as a task, passing the URL parameter, there's still no await call anywhere. So even if we try to use this asynchronous method of async, I know Don gather, and we just try to execute all of these tasks asynchronously. And somewhere here we should have our sink text response, I think it was called. So even if we tried to use this approach, we're not actually going to get any performance benefit from it because there's no await call anywhere. There's no point where we return control back to the event loop. So even if we try to wrap this into like an async function, as we can see here. All of this is a synchronous process and it's completely blocking. Whereas in our asynchronous process, we're basically have these different opportunities and getting the session from the pool, the actual performing of the request, as well as the parsing of the texts response into getting a text response out. Each of these three components basically allows us to give control back to the event loop so that other things can happen concurrently or while this thing is waiting instead. Whereas in our synchronous case, we have no opportunity to do that and we're basically blocked until this request sends it off, gets the response, parses the texts response. So all of that is a blocking process. So as we can see, there are some great benefits that we can actually get from using asynchronous requests obviously. But we have to be careful that we're also using the appropriate asynchronous library. So in this case, it's going to be a 0 HTTP for file input and output. There's also actually something called AI file, which you can use where you can make file reading and writing, as well as just general file operations kind of asynchronous like this. There are also other things like database connections. There are a lot of asynchronous libraries that you can reuse for that. But you have to make sure that if you try to make this asynchronous, you have to use the appropriate libraries also that support this asynchronous manner. Otherwise, you're just going to have blocking components because it's a synchronous process and there is no control being given back to the event loop.
28. The Async Wait Statement: All right, so we've seen some pretty cool stuff right now with async processes. But let's go ahead and look at one more cool thing, which is going to be waiting until some coordinates complete or some tasks complete and then dealing with those and kind of going about that iteratively. Because currently what we've seen with gather is that we're basically awaiting everything to complete. Which means if we have a bunch of things that only take a short amount of time and one of them that's a straggler. This can obviously also slow down our program. So I'm going to again import async IO. I'm going to have my if name equals main. And here I'm just going to async IO dot, run our main co-routine, and let's go ahead and find that. I'll write. And I'm also just going to use the sleep again. So we're going to go into async def and we'll call this our async sleep again. And here we're going to have a perimeter. I just how long we're going to sleep for. Or if we want, we can view it a little bit more explicit about them and call it something like a duration. And then we'll say we're going to await Async IO dot sleep for the duration. And then we're just going to return the duration. Just took, also have something returned from there. All right, so in our main Coroutine here, basically what I wanna do is I want to schedule 10 tasks. And then whenever one of them completes, I want to be able to deal with them. So what I'm going to do is I'm going to have a variable here called pending. And you'll see why in a second, which I'm going to initialize to just be an empty set. Then I'm going to iterate for I in range one to 11. And for each of these, I'm going to add an element. And we're going to say async IO dot create task. So we're going to create our async sleep with the specific duration. So right now and we've seen the create task before. We're just creating a bunch of tasks that we're adding into this set here that we've called pending. Okay? So now we can use something called Async, IO dot wait. And here we can provide a interval of tasks. So for example, a set or a list of tasks. And, and we can have different behavior for that, which is what we'll look at in a second. So let's just kinda keep it with the default behavior for now. So we're going to have our first just interval of tasks, which is just going to be the pending that we have here. And the return that we actually get from it. The variables are called done as well as pending. So that's kind of why we have this variable name here. Because the responses that we get are actually split into two things. So these ones are the tasks that have completed and the yellow, the tasks that are still pending. So we'll leave this like this for now. And then we're just going to print out are done tasks. And we'll also print out our pending tasks. And leaving that as is for now and we'll make some more changes to it in a second. Let's just go ahead and run that first. And yeah, just go ahead and hit Enter. So should take about 10 seconds. I don't really have any extra printing here, unfortunately right now except for are done and pending at the end here. But There we go. So we have our first result set, which is going to be this one here. And we can see that we have a set which kind of has the task, as well as also the result that actually came with it. And then we finally also have this empty set here, which is basically the remaining pending tasks. So we have something cool, but essentially what we have right now is kind of the same thing that we already have with the gathering in essence because we're still waiting for everything to finish. So there are different ways that we can go about doing this. One of the things that we can do is we can add something like a timeout. Now this time on is different than what we had for the weight for, in this case, we're basically checking or we're basically gonna get our result here for Don and Pending once we reach this timeout limit. But that doesn't mean our current scheduled tasks are going to be canceled. It just means we're going to get our response at that time. So what we can do is we can say, while there are still pending tasks. So while the length of our pending set is greater than 0, we're going to go ahead and print this out. So if we run this, now, we should see it print about every two seconds, which is what we have here. Obviously there's a Lot of spam to our output as we can see, because it's very verbose. Each of these elements, there's a lot of detail to it, but we can see the the set that we have for appending results is basically kind of just continuously decreasing in size. And so every two seconds we kind of come back and we see what results have already finished. So what are in are done. And then we just continue with the current pending tasks. Now, obviously if you want to get results out, we probably also want to do that. And so we can actually iterate over are done tasks. And we can say for done tasks, portal say for Don task and done if you want to get the result, we can just await, await that element or we can await that task. And from there we can get out the result that we have here. So let's go ahead and run that one more time. So we're going to see basically the order that we would expect it in for these tasks to finish. So we can see here that every two seconds, essentially, we're stopping this or we're reaching a timeout for our Coubertin here and we're getting the responses. But these tasks have still been scheduled and they aren't technically and progress as we can see, because it's every two seconds we kinda get the responses. And we can then loop over them. And if we use this await, we can get the actual result like this. But obviously this is, this is pretty nice because now we don't have to wait for everything to finish, but we can kind of processes in chunks as it finishes. And of course, something that we can also do is we can add more things to the pending list or to the pending set in this case. So what we could do, for example, is we could, you know, at the bottom here just, we have more tasks to add. I'm just going to do a very simple just adding a task that sleeps once. And we'll basically say, we'll do this once. Add Task equals false. And then we'll say, if that task was true, if add task, then we want to add the task and we'll just set this variable to false. So this is just infinitely and basically get stuck in this loop. But the point that I'm trying to make with this is that we can just continue to add more tasks to in this case are pending set. And so this gives us a lot more flexibility because as things come in, we can continue to create and schedule these tasks. And we can kind of get these responses whenever they're ready using this async IO dot weight. In this case, we're using a specific timeout, but there's actually also a different method available to us. And that parameter is called return when. And here we can have different options. The default, I think, is all completed, written in all caps. So if we don't set a timeout, then we're only going to get our response once all of the tasks have completed. And that's kinda what we saw in the very beginning when we didn't provide this parameter, that basically we're kind of waiting for everything to complete. So we'll see one through 10 printed out. And then when we add it on, we're going to see another one at the very end here. And we can see that basically we're waiting for everything to finish. This is not an order because sets are not ordered. So we're just kind of getting all of the completed tasks like we have here. But we can also change this, but we're not if we don't really want to use a timeout either because maybe your tasks are actually going to finish extremely fast and we're not really sure what to use there, or we kinda wanna do it as tasks finish. We can also use the return one statement to be first completed. So that means whenever something completes, then we basically kind of exit out of this co-routine here and can continue on with the rest. So in this case, we're going to see these numbers printed out one by one in order. As we can see here. That's because every second one of our coroutines, it's actually are one of our tasks, it's actually completing. So we can see that this is actually also really, really cool thing to do. Because compared to all the other things, you know, we can run into issues back there where basically, you know, if one thing takes much, much longer to run than everything else, then we're still going to be spending a lot of time waiting. And so this weights co-routine that we have available to us actually allows us a lot more flexibility in that sense to really get two things, either when they're ready or also within specific time interval so that we can continue processing things that are done. And we can also add more tasks so that they also started working to really get a much more dynamic and even more concurrent program.
29. Combining Async and Multiprocessing: All right, so now I wanna take a look at how we can combine multiprocessing and asynchronous programming. Because previously we talked about that we can only have one event loop per thread. But something that we have to remember in Python is that we also have this thing called a global interpreter lock, which basically limits the actual concurrency that we can get. But fortunately, when we do multiprocessing, we actually have a one Gill per process. So we can make a lot of use out of the full hardware available to us, as well as some of the limitations of Python's concurrency by using multiprocessing and combination with asynchronous. So we're gonna go ahead and import async IO. And we're also going to go ahead and import multiprocessing. And we're going to have are if name equals main call. But for now we're just going to pass on this. And right now what I'm gonna do is I'm going to define a class called multi processing Async. And this is going to inherit from multiprocessing, don't process. And so what this class is gonna do is it's going to spawn a process for us. And in this process we're then going to start our asynchronous event loop. So we're going to go ahead and call the initialization and just initialize the parent class. And we're also going to provide as parameters some durations because what we're gonna do is we're just going to emulate some sort of long-running network IO timer or just other IO time for just using the sleep co-routine again. So we're going to have our async defined. And we'll just call this the async sleep. And here we can pass in the duration. And all that we're going to do is we're just going to await our async IO dot sleep for this duration. And this we can actually make a static method since we're not using any of our class attributes here. So then we also need our run method. So this we're going to define just the normal way. And in our run method, they are here we want to start actually consuming our event loop. So we're going to go ahead into async IO dot run. And now we need to provide the main entry point for our event loop, what we wanted to do. So we also needed to find that co-routine within our class. So we're gonna go into async def. And then we're just going to, Hey, I have here, we'll just call this consecutive sleeps. And what we're gonna do here is we're going to have a list of tasks that we're going to first schedule and then we're just going to wait for them. So we're going to go ahead and well, we can have our pending, which is going to be our set again. And then we'll say for duration, in durations to our pending. We're going to add and then we're going to create a task. So we'll create a task using the self.age async, sleep for this duration. So basically what we did also in the last lesson, we have our pending set. Then we're going to schedule a bunch of tasks based on the input durations here. And we're going to have that all keep track of it all in there pending set. And then we're going to say, while the length of patenting is greater than 0. And then we're going to say done and offending equal to async IO dot weight. And we're going to wait on our scheduled tasks. And let's use a timeout of just 1 second here. And we'll say four, done task and done. We're going to print out the response that we have and the task which currently is nothing. So let's just go ahead here and just return the duration just so that we're returning something. All right, and then we're going to call this in our entry method for the multiprocessing class. So here we're going to run our code, self dot consecutive slips. Okay, so let's go over this one more time in terms of like what we're actually doing here. We're inheriting from the multiprocessing dot process class. And that means that when we initiate this class, we're actually initiating essentially a child that inherits from this process. So we can create a new process for this that can run on a separate core us separate CPU. We're also going to provide it with a set of durations. Again, this is just to have something here so that some task connects to be performed for us. Now our entry point, and you'll remember this from threading. We didn't go into too much detail for this, for multiprocessing, but you'll remember this from threading. It is whenever we call the dots start, That's where we're going to run this run method here. And in our run method, all that we're doing is we're providing the entry point for our asynchronous process. So we're going to call async IO dot run. And here we're just going to have the main co-routine, which is going to be this consecutive sleeps, which we defined here. And now this you'll probably recognize from the previous lesson where we're just adding in just scheduling a set of tasks. And each of these tasks has just going to sleep for a certain duration. And then while we still have pending tasks, we're just going to use this async IO don't wait method. Using our pending tasks here with a timeout of 1 second and every second we're going to loop over are done tasks and we're going to print out the results using this await call here. So now the cool thing with this is we can create a multiple processes. And they can then, we can then split up the work like that. And we can actually have multiple event loops running. One is going to be on each process. So let's create the initial durations, which we can just initialized to be empty. And then we'll say for I in range from 1 to 11, just like we did for two durations, we're going to append I. And now we can also create two processes if we want. So let's just have our list here of processes. And we're going to say for I in range 2, and I'm not going to do all too much here. I'm going to append to our process and initialized version of our multiprocessing ace and class up here. And we're going to initialize it with some durations. And those are going to be durations. And then we're gonna go from i up to, so you have to do I times 5, up to I plus one times five. And so on. That way we're going to go basically just an increments of five. So when I is 0, we're gonna go from 0 to five. So it's going to be the first five elements. And when I is one and we're going to go from five up to 10. So basically tech taking the second five elements from there. And now we have our two processes, but we still need to start them. So we're gonna say for P and processes, we're going to just call p dot start at the very end for p and processes, we're going to call P dot join. So we're waiting for those to finish. Alright, so let's go ahead and run this. And it looks like I have a syntax error up here. So let me go ahead and take a look at what's going on here. And let's go ahead and scroll through our logs. So our Coubertin was never a weighted. So let's try that one more time. There we go. So now we're basically running on two separate processes. And yeah, we're able to kind of do this nicely. One of them is scheduling one set of tasks here they want a scheduling the other set of tasks. Obviously this is not super optimal because well, for one, we are only sleeping for ten seconds and we have a lot of the shorter task on one process and a lot of the longer test set another. So one process is going to actually finish. So we can also print out here. Process finished on processes are going to finish before the other, which means there's some time where one of our two CPUs in this case that we're using or whenever two cores that we're using is sitting idle while the other one is still running. So obviously we can distribute that workload better to hopefully get a better effect from it. But it's still, it's working nicely. So now we're able to combine a multiprocessing as well as async together and were able to get around some of the limitations that we would have with a single thread and just being able to run a single event loop. And basically making the most use out of a hardware if we have more than one core, by being able to make use of multiprocessing. And through that then, being able to have an individual event loop running on each individual process, which can hopefully also help us speed things up even more. Because we can get into these limitations where we'll actually still spend a lot of time waiting just because, you know, whatever reasons there may be that, you know, things are just slow and we can still, even if we have co routines, if we have thousands of co routines are suddenly with tasks scheduled, we can still only switch off between one of them at a time. So if we have more than one event loop, we're over more than one process, we can actually get through more work quicker. And when done tasks are done, we can get to them quicker or when they're finished, They're waiting process. We can get back to them quicker so that they can continue their work quicker or pass things on downstream and get really complicated. Also like we did in our threading example. But really taking it to the next level because now we're combining this asynchronous behavior that we can get that concurrency in, as well as the multiprocessing, which gives us extra concurrency with extra compute. And so we're able to deal with both the network timeouts with the asynchronous process or just general IO, not even timeout to be just general IO waiting times. And we can make the most use of our hardware by using the multiprocessing so that we can use up all of the resources available to us and make the best use of that. Now one last thing I do want to mention, which is something that we haven't encountered here and may not even be something that you ever have to deal with, but just be aware sometimes that tasks can also timeout. So if you're scheduling too many things and you have some timeout parameter in there. It could be that your tasks actually timing out before you can even get to them. So just be wary of that if you start running into some extremely weird issues because you've built this extremely complex program That's just doing a lot of work, then that may be something that you want to look out for is, you know, I my scheduling too many tasks at the same time and or some of them actually timing out before I can even get to dealing with them. In which case, you may want to reduce your batch sizes so that you can get through those in a better manner. So just kind of one last tip they are if you're going to go really deep with this and try to build something very large and complex that's gonna take care of a lot of things. Just be aware of your batch sizes. Because, you know, that could be something may not even be, but it could be something that you could run into. And I can cause a lot of headache if you're not even sure what you're looking for. But yeah, so I hope you've enjoyed that and I hope you've learned a lot and I hope you can build some really cool concurrent and parallel programs now.