Discuss / Python / Python3.9.1执行错误的修改方案

Python3.9.1执行错误的修改方案

Topic source

Tiko_T

#1 Created at ... [Delete] [Delete and Lock User]
  1. 主进程QueueManager的地址修改为127.0.0.1b 表示本机,如果在局域网内虚拟机或者WSL等不同机器上执行,也可以修改为局域网IP。

  2. 除全局变量、函数定义之外的逻辑放到if __name__ == '__main__'中。

  3. callable传入重新定义的函数,lambda无法使用pickle进行序列化。

  4. 捕获的异常应该是queue.Empty而不是Queue.Empty。

附代码:

# -*- coding: utf-8 -*-
# TaskMaster.py

# distributed multi process, task manager
from multiprocessing import managers
import random, time, queue
from multiprocessing.managers import BaseManager

# queue that send tasks
task_queue = queue.Queue()
# queue that receive tasks
result_queue = queue.Queue()

class QueueManager(BaseManager):
    pass

def get_task_q():
    return task_queue
def get_result_q():
    return result_queue

if __name__ == '__main__':
    # register two queues to network
    QueueManager.register('get_task_queue', callable=get_task_q)
    QueueManager.register('get_result_queue', callable=get_result_q)
    # bind to port 5000, authentication code abc
    manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

    # start the manager
    manager.start()
    # get Queue object through network
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    # put some tasks to task queue
    for i in range(5):
        n = random.randint(0, 10000)
        print(f"Put task {n}")
        task.put(n)

    # read result from result queue
    print("Try get results...")
    for i in range(10):
        try:
            r = result.get(timeout=5)
            print(f"Result : {r}")
        except queue.Empty:
            print("The queue is empty...")

    # shudown manager
    print("Master exit.")



# -*- coding: utf-8 -*-
# TaskWorker.py

# distributed multi process, task wroker
import time, sys, queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass

if __name__ == '__main__':
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')
    server_addr = "127.0.0.1"
    print(f"Connect to server {server_addr}...")
    m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
    # connect to server
    m.connect()
    # get Queue from network
    task = m.get_task_queue()
    result = m.get_result_queue()
    # get task from task queue, calculate and put result to result queue
    for i in range(10):
        try:
            n = task.get(timeout=1)
            print(f"Run task {n} * {n}...")
            r = f"{n} * {n} = {n * n}"
            time.sleep(1)
            result.put(r)
        except queue.Empty:
            print("task queue is empty.")
    
    # end wrok process
    print("Worker exit.")

Tiko_T

#2 Created at ... [Delete] [Delete and Lock User]

5.主线程中捕获queue.Empty异常并处理才能执行到末尾。

Tiko_T

#3 Created at ... [Delete] [Delete and Lock User]

是进程不是线程,笔误。

感谢前辈,受益匪浅

yoyo~哟哟

#5 Created at ... [Delete] [Delete and Lock User]

启动任务进程在本机启动会报错

    self._send_bytes(_ForkingPickler.dumps(obj))

  File "C:\Users\bliss\Anaconda3\lib\multiprocessing\connection.py", line 404, in _send_bytes

    self._send(header + buf)

  File "C:\Users\bliss\Anaconda3\lib\multiprocessing\connection.py", line 368, in _send

    n = write(self._handle, buf)

ConnectionResetError: [WinError 10054] 远程主机强迫关闭了一个现有的连接。


  • 1

Reply