[checkmk-commits] agent_azure: change parallelizing mechanism

Moritz Kiemer mo at mathias-kettner.de
Tue Nov 20 10:11:21 CET 2018


Module: check_mk
Branch: master
Commit: f24a61e41d7a0fc58cb451af545cbb5e9072f6a9
URL:    http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=f24a61e41d7a0fc58cb451af545cbb5e9072f6a9

Author: Moritz Kiemer <mo at mathias-kettner.de>
Date:   Fri Nov 16 09:12:52 2018 +0100

agent_azure: change parallelizing mechanism

Change-Id: I969d3c155d451147e06a7919c27a1a128ae62df2

---

 agents/special/agent_azure | 150 +++++++++++++++++++++++++++------------------
 1 file changed, 91 insertions(+), 59 deletions(-)

diff --git a/agents/special/agent_azure b/agents/special/agent_azure
index 59fcae7..b0ea323 100755
--- a/agents/special/agent_azure
+++ b/agents/special/agent_azure
@@ -42,15 +42,16 @@ import sys
 import re
 import argparse
 import logging
-import atexit
 
-from multiprocessing import Process, Lock
+from multiprocessing import Process, Lock, Queue
+from Queue import Empty as QueueEmpty
 
 # We have to set a null handler for logging before importing the azure stuff.
 #   Otherwise a warning will be sent to stderr - and if for some other reason
 #   the agent returns a non-zero exit code this (irrelevant) warning would be
 #   all the user sees.
 logging.getLogger().addHandler(logging.NullHandler())
+# pylint: disable=wrong-import-position
 from azure.mgmt.monitor import MonitorManagementClient
 from azure.mgmt.resource import ResourceManagementClient
 from azure.common.credentials import ServicePrincipalCredentials
@@ -63,6 +64,69 @@ cmk.password_store.replace_passwords()
 LOG = logging.getLogger(__name__)
 
 
+class AsyncMapper(object):  # pylint: disable=too-few-public-methods
+    '''Create an async drop-in replacement for builtin 'map'
+
+    which does not require the involved values to be pickle-able,
+    nor third party modules such as 'multiprocess' or 'dill'.
+
+    Usage:
+             map_ = AsyncMapper()
+
+             for results in map_(function, arguments_iter):
+                 do_stuff()
+
+    Note that the order of the results does not correspond
+    to that of the arguments.
+
+    Keywords for initialization:
+
+      * timeout:  number of seconds we will wait for the next result
+                  before terminating all remaining jobs (default: None)
+      * debug:    raise exceptions in jobs (default: False)
+      * fallback: specify a function, called in case an exception occurs in
+                  the mapped function. The fallback function should return
+                  a tuple (err, value). If err is falsey, value will we be
+                  yielded (default: (1, None)).
+    '''
+
+    def __init__(self, timeout=None, debug=False, fallback=lambda x: (1, None)):
+        super(AsyncMapper, self).__init__()
+        self.timeout = timeout
+        self.debug = debug
+        self.fallback = fallback
+
+    def __call__(self, function, args_iter):
+        queue = Queue()
+        jobs = {}
+
+        def produce(id_, args):
+            try:
+                queue.put((id_, 0, function(args)))
+            except Exception as _e:  # pylint: disable=broad-except
+                queue.put((id_,) + self.fallback(args))
+                if self.debug:
+                    raise
+
+        # start
+        for id_, args in enumerate(args_iter):
+            jobs[id_] = Process(target=produce, args=(id_, args))
+            jobs[id_].start()
+
+        # consume
+        while jobs:
+            try:
+                id_, err, result = queue.get(block=True, timeout=self.timeout)
+            except QueueEmpty:
+                break
+            if not err:
+                yield result
+            jobs.pop(id_)
+
+        for job in jobs.values():
+            job.terminate()
+
+
 def parse_arguments(argv):
     parser = argparse.ArgumentParser(description=__doc__)
     parser.add_argument(
@@ -437,7 +501,7 @@ class AzureClient(object):
             try:
                 # azure-api-call
                 md_list = list(self._monitor_client.metric_definitions.list(rid))
-            except ErrorResponseException, exc:
+            except ErrorResponseException as exc:
                 if "is not a supported platform metric namespace" in exc.message:
                     md_list = []
                 else:
@@ -549,50 +613,8 @@ class AzureClient(object):
         return metrics
 
 
-class Threads(object):
-    def __init__(self, args):
-        super(Threads, self).__init__()
-        self.timeout = args.pjoin_timeout
-        self.sequential = args.sequential
-        self.debug = args.debug
-        self.jobs = []
-        if not self.sequential:
-            atexit.register(self.terminate)
-
-    def run(self, resources, client, config):
-        if self.sequential:
-            self._run_sequential(resources, client, config)
-        else:
-            self._run_parallel(resources, client, config)
-
-    def _run_sequential(self, resources, client, config):
-        for resource in resources:
-            process_resource(resource, client, config)
-
-    def _run_parallel(self, resources, client, config):
-        # start
-        for resource in resources:
-            job = Process(target=process_resource, args=(resource, client, config))
-            job.start()
-            self.jobs.append(job)
-        # join:
-        for job in self.jobs:
-            LOG.info("joining: %s", job)
-            job.join(timeout=self.timeout)
-            if job.is_alive():
-                LOG.info("still alive after %ss: %s", self.timeout, job)
-                self.timeout = 0.01
-            else:
-                self.jobs.remove(job)
-
-    def terminate(self):
-        for job in self.jobs:
-            if job.is_alive():
-                LOG.warning("terminating %s", job)
-                job.terminate()
-
-
-def process_resource(resource, client, config):
+def process_resource(args):
+    resource, client, config = args
 
     r_config = config.get_resource_config(resource)
 
@@ -602,18 +624,26 @@ def process_resource(resource, client, config):
     for metric in client.get_metrics(resource, r_config, err):
         resource.metrics.append(metric)
 
-    section = Section('agent_info')
-    section.add(('remaining-reads', client.remaining_reads))
-    section.write()
+    sections = [Section('agent_info')]
+    sections[0].add(('remaining-reads', client.remaining_reads))
 
     for piggytarget in resource.piggytargets:
         section = Section(resource.section, piggytarget)
         section.add(resource.dumpinfo())
-        section.write()
+        sections.append(section)
         if err:
             err_section = Section('agent_info', piggytarget)
             err_section.add(err.dumpinfo())
-            err_section.write()
+            sections.append(err_section)
+
+    return sections
+
+
+def write_groups(resources):
+    groups = sorted(set(r.info['group'] for r in resources))
+    section = Section('agent_info')
+    section.add(('monitored-groups', json.dumps(groups)))
+    section.write()
 
 
 def main(argv=None):
@@ -630,15 +660,17 @@ def main(argv=None):
     try:
         client = AzureClient(args)
         resources = [r for r in client.discover_resources() if config.is_configured(r)]
-        groups = sorted(set(r.info['group'] for r in resources))
+        client.init_specific(resources)
 
-        section = Section('agent_info')
-        section.add(('monitored-groups', json.dumps(groups)))
-        section.write()
+        write_groups(resources)
 
-        client.init_specific(resources)
-        Threads(args).run(resources, client, config)
-    except () if args.debug else Exception, exc:
+        args = ((resource, client, config) for resource in resources)
+        map_ = map if args.sequential else AsyncMapper(args.timeout, args.debug)
+        for sections in map_(process_resource, args):
+            for section in sections:
+                section.write()
+
+    except () if args.debug else Exception as exc:
         sys.stderr.write(_add_hint(str(exc)))
         return 1
     return 0



More information about the checkmk-commits mailing list