Clustering in Python Part 2 - Parallel Python and Celery(In web)


Created By: Abdul at May 10, 2020


  Hi everyone! This is Abdul from Pythonist. I hope you are doing great! In our previous tutorial, we have talked about the clustering in Python, I discussed some important things to consider while you are switching from multiprocessing solutions to clusterings. If you dont know about basic concepts of clustering, I recommend you to follow this link. In this tutorial we are going to explore two different methods to implement clustering in your Python projects, let’s get started!

Introduction!

   The very first option is the Parallel Python. Parallel Python has a very similar interface to multiprocessing. Its pretty easy to transfer your multiprocessing solutions from a single multicore machine to multi machines setup. Parallel Python has few dependencies and is easy to configure for research work on a local cluster. It is not very powerful and lacks communication mechanisms, but for sending out embarrassingly parallel jobs to a small local cluster it is very easy to use. And the great thing is that Python has provided a specific module for this purpose named “pp” stands for Parallel python.You can install it using pip install pp.

There are three main things in PP: 

  1.  A server to execute the tasks in a parallel way
  2.  A client to submit Tasks to PP server for execution 
  3.  And of course we need a task to execute.

Let’s try to calculate Pi using the Monte Carlo method:            

     This method consists of drawing on a canvas a square with an inner circle. Then we generate a large number of random points within the square and count how many fall in the enclosed circle. This calculation can be done in a parallel way using PP. 

Lets try to implement it instead of just theoretical explanation:
       So, the very first thing is that we have to install this module, if you are working with Python 2.7 you can directly install it by using pip install pp, although the python 2 is retired now, of course you should upgrade to python 3. For python 3 we have to download a specific version of this module and build it on our system, you will find the link in the description belwo, just log onto that link and download the zip file for version 1.6.4.4 and unzip it in your project root then yo have to log into this folder from terminal and run the command “ python steup.py install” it will install that module and we are good to go!

ok, that’s great!

Let’s try to calculate pi using that! 

So, I will import random, time and pp module and set the number of estimates to 100 millions.


import random
import time
import pp

NBR_ESTIMATES = 1e8

Then I will define a function named “pi_calculation” and pass the number of estimates, then I will create random coordinates and check their position if it’s inside the circle or not and increment the number of trials to count it and finally I will return the number of trials in that circle for pi calculation.



def pi_calculation(nbr_estimates):
  steps = range(int(nbr_estimates))
  nbr_trials_in_unit_circle = 0
  for step in steps:
    x = random.uniform(0, 1)
    y = random.uniform(0, 1)
    is_in_unit_circle = x * x + y * y <= 1.0
    nbr_trials_in_unit_circle += is_in_unit_circle
  return nbr_trials_in_unit_circle

    Then inside the main, I will set the number of processes to 4, you can set according to your system and then I will create the parallel python’s server and pass the number of CPUs.


if __name__ == "__main__":
  NBR_PROCESSES = 4
  job_server = pp.Server(ncpus=NBR_PROCESSES)

Now, let’s print a simple statement to ensure that the server has been started.


 print("Starting pp with", job_server.get_ncpus(), "workers")

Then I will get the total number of trials per process by multiplying the estimates and number of processors and set the staring time.


nbr_trials_per_process = [NBR_ESTIMATES] * NBR_PROCESSES
  t1 = time.time()

To save the jobs I will create a list for jobs :


jobs = []

And we will submit the job to the pp server with the pi_calculation function and input args by looping through the total number of trials and append each job to the jobs list.

 for input_args in nbr_trials_per_process:
    job = job_server.submit(pi_calculation, (input_args,), (), ("random",))
    jobs.append(job)

Then outside of this loop, we will run the job for each job blocks until the result is ready. And we can also calculate the amount of work by adding the total number of trials/process.

 nbr_in_unit_circles = [job() for job in jobs]
  print(nbr_in_unit_circles)
  print("Amount of work:", sum(nbr_trials_per_process))

Now to calculate the value of Pi we have to multiply the total numbers in unit circle with 4 and divide by the number of estimates and then the number of processes.

 print(sum(nbr_in_unit_circles) * 4 / NBR_ESTIMATES / NBR_PROCESSES)
  print(pi_calculation.__name__)
  print("Delta:", time.time() - t1)

And finally I will get the ending time and calculate the delta.

Here's the complete code:


import random
import time
import pp

NBR_ESTIMATES = 1e8


def pi_calculation(nbr_estimates):
  steps = range(int(nbr_estimates))
  nbr_trials_in_unit_circle = 0
  for step in steps:
    x = random.uniform(0, 1)
    y = random.uniform(0, 1)
    is_in_unit_circle = x * x + y * y <= 1.0
    nbr_trials_in_unit_circle += is_in_unit_circle
  return nbr_trials_in_unit_circle


if __name__ == "__main__":
  NBR_PROCESSES = 4
  job_server = pp.Server(ncpus=NBR_PROCESSES)
  print("Starting pp with", job_server.get_ncpus(), "workers")
  nbr_trials_per_process = [NBR_ESTIMATES] * NBR_PROCESSES
  t1 = time.time()
  jobs = []
  for input_args in nbr_trials_per_process:
    job = job_server.submit(pi_calculation, (input_args,), (), ("random",))
    jobs.append(job)
  nbr_in_unit_circles = [job() for job in jobs]
  print(nbr_in_unit_circles)
  print("Amount of work:", sum(nbr_trials_per_process))
  print(sum(nbr_in_unit_circles) * 4 / NBR_ESTIMATES / NBR_PROCESSES)
  print(pi_calculation.__name__)
  print("Delta:", time.time() - t1)


You can see the result here!

Starting pp with 4 workers
[78544430, 78534524, 78538570, 78545620]
Amount of work: 400000000.0
3.14163144
pi_calculation
Delta: 131.19789695739746 

 Great,

     let’s explore the 2nd option to implement clustering in Python and that’s Celery. Celery is one of the most popular background job managers in the Python world. Celery is compatible with several message brokers like RabbitMQ or Redis and can act as both producer and consumer. In this video we are going to use the RabbitMQ as the message broker for Celery.

    So, first of all we have to install RabbitMQ on our system. You can install it by using the home-brew if you are on a Mac, by using apt-get if you are on linux and by using choco if you are on windows, so I will install it by using home-brew on my Mac by running the command brew install rabbitmq, if you are running this command for the first time it will take a while but if already run this command it will be so quick.

That’s great!

     After its installation we have to export its path like this:

              ~export PATH=$PATH:/usr/local/Cellar/rabbitmq/3.8.2/sbin

     Mostly you just need to change the version here. Now we are good to go! We just need to restart the terminal and run the command rabittq-server. It start the rabbitmq server which can be accessed at localhost:15672. Default credentials are guest-guest for username and password.

  Now lets try to install Celery, it’ just like to install any python package using pip install celery 

Great!

    There are three components of celery. We have the code passed by the user’s process to the celery client api and then it will pass to the third part broker which is rabbitQM in our case and then it will pass to the celery worker node for execution and generate the result.

   I’m going to calculate pi once again by using a different algorithm an dry using the celery for its execution. 

   So, first we have to import the celery as from celery import celery:

from celery import Celery

Then I will create one of its instance as:

app = Celery('tasks', broker='amqp://localhost//')

After that I will create a function named pi_calculation and I will put the code for pi calculation


def pi_calculation():
  getcontext().prec = 5
  pi_val = sum(1 / Decimal(16) ** k *
               (Decimal(4) / (8 * k + 1) -
                Decimal(2) / (8 * k + 4) -
                Decimal(1) / (8 * k + 5) -
                Decimal(1) / (8 * k + 6)) for k in range(100))

  return pi_val

    At this stage we have to import Decimal and get context from decimal,

from decimal import Decimal, getcontext

Ok so we are done with our function but this is still a regular python function to make it a celery task we have to pass the task decorator, so I will say @app.task that’s great! 


@app.task
def pi_calculation():
  getcontext().prec = 5
  pi_val = sum(1 / Decimal(16) ** k *
               (Decimal(4) / (8 * k + 1) -
                Decimal(2) / (8 * k + 4) -
                Decimal(1) / (8 * k + 5) -
                Decimal(1) / (8 * k + 6)) for k in range(100))

  return pi_val

so, the time to see it in action, so First of all I have to start the rabbitmq server on my terminal by using rabbitmq-server command and then we have to run the celery task by the following command

celery -A piCelery worker -l info -Q celery

and but also make sure that you are inside the right directory. great! You can see our task is running but where the output? Open a new terminal and logon to that directory and I will imprort everything from my piCelery and then I will call my pi_calculation in a different way as


>>> from piCelery import *
>>> pi_calculation.delay()

so whenever we have to execute a task function we have to call the delay method and if we have to pass any argument to our function we will pass it to the delay method, awesome! so, if we come back to our celery task you can see the result here:

 That’s how celery works ! I know theres a lot to discuss more but that’s not the tutorial specific to celery.

Here's the complete code:


from celery import Celery
from decimal import Decimal, getcontext
app = Celery('tasks', broker='amqp://localhost//')


@app.task
def pi_calculation():
  getcontext().prec = 5
  pi_val = sum(1 / Decimal(16) ** k *
               (Decimal(4) / (8 * k + 1) -
                Decimal(2) / (8 * k + 4) -
                Decimal(1) / (8 * k + 5) -
                Decimal(1) / (8 * k + 6)) for k in range(100))

  return pi_val

I think that’s enough for now, I hope you really enjoyed this interesting discussion. But before closing I want to give you an advise,

I strongly suggest that you pick a mature library with an active community behind it, supporting the same feature set that you’ll need and not too many additional features. 

The more features a library has, the more ways you’ll find to misconfigure it and waste your time on debugging. Simplicity is generally the right aim when dealing with clustered solutions. 

If you like this tutorial then put your feedback in comments below, and be sure to subscribe to our youtube channel and never miss any fantastic contant in the future, thanks for joining us!

Post a comment





3 Comments
  • Sept. 2, 2020

    Nice.

    QuickLearner 0 replies |
  • Sept. 2, 2020

    Assalamu alikum Brother, I want learn things from you regarding python. I am also a Python lover presently working on Django. Can you please teach me the technical skills regarding python? Mail: [email protected] Phone: 9902092790 Jazhakallahu Khair.

    QuickLearner 0 replies |
  • Sept. 2, 2020

    Assalamu alikum Brother, I want learn things from you regarding python. I am also a Python lover presently working on Django. Can you please teach me the technical skills regarding python? Mail: [email protected] Phone: 9902092790 Jazhakallahu Khair.

    QuickLearner 0 replies |

© 2020 Pythonist.org | All rights reserved | Design by W3layouts.