多进程学习,介绍了 ProcessPool 类并且在最后进行了比较。

what is such a multiprocessing system? We have the following possibilities:

  • A multiprocessor- a computer with more than one central processor
  • A multi-core processor- a single computing component with more than one independent actual processing units /cores.

In either case, the CPU is able to execute multiple tasks at once assigning a processor to each task

As CPU manufacturers start adding more and more cores to their processors, creating parallel code is a great way to improve performance. Python introduced multiprocessing module to let us write parallel code.

因为 CPU 现在是多核的,所以自然就可以使用多进程去并行处理计算。

1
2
import multiprocessing
print(multiprocessing.cpu_count())

Depending on the application, two common approaches in parallel programming are either to run code via threads or multiple processes, respectively. If we submit “jobs” to different threads, those jobs can be pictured as “sub-tasks” of a single process and those threads will usually have access to the same memory areas (i.e., shared memory). This approach can easily lead to conflicts in case of improper synchronization, for example, if processes are writing to the same memory location at the same time.

多线程可以共享一个进程的资源,与此同时容易发生同时读写同一个 memory area 的情况;相对来说 multi-processing 是可以处理上述的冲突。

Process 类

Process used when function based parallelism is required, where I could define different functionality with parameters that they receive and run those different functions in parallesl which are doing totally vairous kind of computations.

There are two important functions that belongs to the Process class – start() and join() function.

If we create a process object, nothing will happen until we tell it to start processing via start() function. Then, the process will run and return its result. After that we tell the process to complete via join() function.

调用 start() 之后程序开始运行,当调用 join() 之后进程终止。

Without join() function call, process will remain idle and won’t terminate. So if you create many processes and don’t terminate them, you may face scarcity of resources. Then you may need to kill them manually.

(所以调用 join() 是必须要做的一个很好的习惯问题)

  • .start() helps in starting a process and that too asynchronously.
  • .join() method on a Process does block until the process has finished, but because we called .start() on both p1 and p2 before joining, then both processes will run asynchronously. The interpreter will, however, wait until p1 finishes before attempting to wait for p2 to finish.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Process
def print_func(continent='Asia'):
    print('The name of continent is : ', continent)

if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()

    # instantiating process with arguments
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # complete the processes
    for proc in procs:
        proc.join()

如果想要在不同的进程中共享数据,那么可以使用 multiprocessing 中的 Queue

Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.

Queues are specially useful when passed as a parameter to a Process’ target function to enable the Process to consume data. By using put() function we can insert data to then queue and using get() we can get items from queues. See the following code for a quick example.

python 中的队列 queue, 在这里可以用来 share data。使用 put 放入 queue 中,使用 get 获得相应的data,按照 first-in-first-out 的顺序。

Process 中常见的还有以下属性:pid, is_alive, name

getting Process ID, Process name and checking if alive

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import multiprocessing
from multiprocessing import Process,current_process
import os
def child1():
    print("Child 1", os.getppid())
    print(current_process().name)
def child2():
    print("Child 2", os.getppid())
    print(current_process().name)
if __name__ =="__main__":

    print("Parent ID", os.getppid())
    p1 =Process(target= child1, name ="Child 1")
    p2 =Process(target= child2, name ="Child 2")
    p1.start()
    p2.start()

    p1.join()
    alive = "YES" if p1.is_alive() else 'No'
    print("Is P1 alive? {}".format(alive))
    alive = "YES" if p2.is_alive() else 'No'
    print("Is P2 alive? {}".format(alive))
    p2.join()

锁机制不是很常用,但是写一下吧

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Process, Lock

lock =Lock()

def printer(item):
    lock.acquire()

    try:
        print(item)
    finally:
        lock.release()

if __name__ =="__main__":
    processes =[]
    items =["nacho", "salsa", 7]
    for item in items:
        p =Process(target= printer, args=(item))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()

How to retrieve results in a particular order?

(可以将文本中的index 信息放入到最后的结果中,所以这样就能保持原来的顺序)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, pos, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put((pos, rand_str))

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, x, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)


Pool 类

The pool class schedules execution using FIFO policy. It workings like a map reduce design. The input are mapped from different processors and bring together the output from all the processors. After the running the code, it restores the output in the form of a list or array. It waits for all the jobs to finish and then returns the output. The processes in execution are put in memory and other non-executing processes are puts away out of memory.

Pool 可以理解为 map-reduce,那么得到的结果是和原来数据的顺序是不吻合的。

The pool distributes the tasks to the available processors using a FIFO scheduling. It works like a map-reduce architecture. It maps the input to the different processors and collects the output from all the processors. After the execution of code, it returns the output in form of a list or array. It waits for all the tasks to finish and then returns the output. The processes in execution are stored in memory and other non-executing processes are stored out of memory.

image-20210207140952235

在初始化Pool() 的时候,默认 processescpu_count

processes is the number of worker threads to use. If processes is None then the number returned by os.cpu_count() is used.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import multiprocessing
from multiprocessing import Pool 
def double(x):
    return x*2

if __name__ =="__main__":
    nums= [2, 4, 6]
    cpu_count = multiprocessing.cpu_count()
    pool =Pool(processes= cpu_count)
    results =pool.apply_async(double, (7, ))
    print(results.get(timeout= 1))

using pool: data based parallelism, it offers a convenient means of parallelizing the execution of function across multiple input values, distributing the input data across processes.

  • apply Call func with arguments args. It blocks until the result is ready.
  • apply_async It is better suited for performing work in parallel.
  • map A parallel equivalent of the map() built-in function (it supports only one iterable argument though, for multiple iterables). But it blocks.
  • map_async It is better suited for performing work as map in parallel.

The Pool.map and Pool.apply will lock the main program until all processes are finished, which is quite useful if we want to obtain results in a particular order for certain applications. In contrast, the async variants will submit all processes at once and retrieve the results as soon as they are finished. One more difference is that we need to use the get method after the apply_async() call in order to obtain the return values of the finished processes.

比较

Pool 和 Process 的比较

  1. task number 方面

As we have seen, the Pool allocates only executing processes in memory and the process allocates all the tasks in memory, when the task number is small, we can use proces class and when the task number is large, we can use the Pool. In the case of Pool, there is overhead in creating it, Hence with small task numbers, the performance is impacted when Pool is used

  1. IO operations

The Pool distributes the processes among the available cores in FIFO manner. On each core, the allocated process executes serially. So if there is a long IO operation, it waits till the IO operation is completed and does not schedule another process. The Process class suspends the process executing IO operations and schedules another process. So, in the case of long IO operation, it is advisable to user Process class.

image-20210207141920384

Pool:
When you have junk of data, you can use Pool class. Only the process under execution are kept in the memory. I/O operation: It waits till the I/O operation is completed & does not schedule another process. This might increase the execution time. Uses FIFO scheduler.Process:
When you have a small data or functions and less repetitive tasks to do. It puts all the process in the memory. Hence in the larger task, it might cause to loss of memory. I/O operation: The process class suspends the process executing I/O operations and schedule another process parallel. Uses FIFO scheduler.

参考文献

Python Multiprocessing Example

https://sebastianraschka.com/Articles/2014_multiprocessing.html