I have an agent-based model where several agents are launched by a central process and exchange data through another central process. Each agent and communication process interact through zmq. However, when I run more than 100 agents, it sends a standard__ report:
Invalid argument (src / stream_engine.cpp: 143) Too many open files (SRC / ipc_listener.cpp: 292)
and Mac Os reports a problem:
Python quits unexpectedly when using the libzmq.5.dylib module.
It seems to me that too many contexts are open. But how can I avoid this with multiprocessing?
I am attaching a piece of code below:
class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process): def __init__(self, idn, group, _addresses, trade_logging): multiprocessing.Process.__init__(self) .... def run(self): self.context = zmq.Context() self.commands = self.context.socket(zmq.SUB) self.commands.connect(self._addresses['command_addresse']) self.commands.setsockopt(zmq.SUBSCRIBE, "all") self.commands.setsockopt(zmq.SUBSCRIBE, self.name) self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group)) self.out = self.context.socket(zmq.PUSH) self.out.connect(self._addresses['frontend']) time.sleep(0.1) self.database_connection = self.context.socket(zmq.PUSH) self.database_connection.connect(self._addresses['database']) time.sleep(0.1) self.logger_connection = self.context.socket(zmq.PUSH) self.logger_connection.connect(self._addresses['logger']) self.messages_in = self.context.socket(zmq.DEALER) self.messages_in.setsockopt(zmq.IDENTITY, self.name) self.messages_in.connect(self._addresses['backend']) self.shout = self.context.socket(zmq.SUB) self.shout.connect(self._addresses['group_backend']) self.shout.setsockopt(zmq.SUBSCRIBE, "all") self.shout.setsockopt(zmq.SUBSCRIBE, self.name) self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group)) self.out.send_multipart(['!', '!', 'register_agent', self.name]) while True: try: self.commands.recv() # catches the group adress. except KeyboardInterrupt: print('KeyboardInterrupt: %s,self.commands.recv() to catch own adress ~1888' % (self.name)) break command = self.commands.recv() if command == "!": subcommand = self.commands.recv() if subcommand == 'die': self.__signal_finished() break try: self._methods[command]() except KeyError: if command not in self._methods: raise SystemExit('The method - ' + command + ' - called in the agent_list is not declared (' + self.name) else: raise except KeyboardInterrupt: print('KeyboardInterrupt: %s, Current command: %s ~1984' % (self.name, command)) break if command[0] != '_': self.__reject_polled_but_not_accepted_offers() self.__signal_finished() #self.context.destroy()
all code is under http://www.github.com/DavoudTaghawiNejad/abce
python-multiprocessing zeromq
Davoud taghawi-nejad
source share