Increasing data streaming capacity

If you have a production deployment, you can increase data streaming throughput by increasing the number of Flink task managers.

About this task

You can increase the data throughput capacity for all of your incoming integrations by increasing the Base parallelism number that is associated with that integration. The recommended value for source parallelism is equal to the number of days. For example, if you are pulling historical data for training for 7 days, make the source parallelism 7 or higher so that you can pull data for multiple days simultaneously.

Similarly, for base parallelism, use a higher value than 1 so that you can process data in parallel. In a small environment, the number of available Flink slots is 16. For a large environment, the maximum available number of slots is 32.

Table 1. Base parallelism for deployment sizes
Deployment size Number of Flink task managers Total number of base parallelisms available
Starter 2 16
Production 4 32

For more information about IBM Cloud Pak® for AIOps deployment size, see Hardware requirements.

When you define incoming log integrations, increasing the base parallelism can improve the throughput or processing speed of an integration or both. If you define a live integration for anomaly detection, consider setting a base parallelism of 3 to 4 per 1,000 log messages per second. For example, if you are creating a connection to handle 8,000 log messages per second, a parallelism between 24 and 32 is recommended. Smaller base parallelisms reduce the potential need for you to scale your Flink task managers and increase your hardware resources.

Before you begin

  • Determine whether your deployment is out of capacity. Run the following command from the namespace where IBM Cloud Pak for AIOps is deployed.

    oc exec -it cp4waiops-eventprocessor-eve-29ee-ep-jobmanager-0 -c jobmanager -- curl -k -u `oc get secret $(oc get secrets | grep cp4waiops-eventprocessor-eve-29ee-ep-admin-user | awk '!/-min/' | awk '{print $1;}') -o jsonpath="{.data.username}"|base64 -d`:`oc get secret $(oc get secrets | grep cp4waiops-eventprocessor-eve-29ee-ep-admin-user | awk '!/-min/' | awk '{print $1;}') -o jsonpath="{.data.password}"|base64 -d` https://localhost:8081/overview
    

    Sample output:

    {"taskmanagers":4,"slots-total":32,"slots-available":1,"jobs-running":2,"jobs-finished":0,"jobs-cancelled":1,"jobs-failed":0,"flink-version":"1.13.0","flink-commit":"f06faf1"}
    
  • Check the logs for the task manager pods to see if an integration is working correctly and collecting live data.

    for pod in `oc get pods -n <namespace> | grep 'cp4waiops-eventprocessor-eve-29ee-ep-taskmanager' | awk '{ print $1 }' ` ; do oc logs $pod -n <namespace> | grep 'HttpSource'; done
    

    Where <namespace> is the namespace where IBM Cloud Pak for AIOps is deployed.

    Sample output:

    2021-11-19 20:36:06,397 INFO  zeno.common.sources.HttpSource                               [] - Results from extractor: 79932, total accumulated: 13769635, start time: 2021-11-19T20:35:55.716Z, end time: 2021-11-19T20:36:06.396Z, elapse time in seconds: 10
    2021-11-19 20:36:06,397 INFO  zeno.common.sources.HttpSource                               [] - Querying window: 2021-11-19T20:35:52.740Z - 2021-11-19T20:36:02.740Z, window size: 10, time delay: 0s
    2021-11-19 20:36:15,996 INFO  zeno.common.sources.HttpSource                               [] - Results from extractor: 80247, total accumulated: 13849882, start time: 2021-11-19T20:36:06.397Z, end time: 2021-11-19T20:36:15.996Z, elapse time in seconds: 9
    2021-11-19 20:36:15,996 INFO  zeno.common.sources.HttpSource                               [] - Querying window: 2021-11-19T20:36:02.741Z - 2021-11-19T20:36:12.741Z, window size: 10, time delay: 0s
    2021-11-19 20:36:26,399 INFO  zeno.common.sources.HttpSource                               [] - Results from extractor: 80280, total accumulated: 13930162, start time: 2021-11-19T20:36:15.996Z, end time: 2021-11-19T20:36:26.399Z, elapse time in seconds: 10
    2021-11-19 20:36:26,399 INFO  zeno.common.sources.HttpSource                               [] - Querying window: 2021-11-19T20:36:12.742Z - 2021-11-19T20:36:22.742Z, window size: 10, time delay: 0s
    

    In this example, the integration is behind in collecting live data. This example uses a querying window of 2021-11-19T20:36:12.742Z - 2021-11-19T20:36:22.742Z at timestamp 2021-11-19 20:36:26,399. The integration is approximately 4 seconds behind and is querying through 20:36:22 at a current time of 20:36:26 seconds. Increasing the base parallelism might improve results.

Procedure

Complete the following steps to increase the number of task managers on IBM Automation foundation Flink.

  1. Access the Red Hat® OpenShift® command line and log in to your cluster.

    oc login -u kubeadmin -p <password> <URL for the Kubernetes/openshift API> --insecure-skip-tls-verify=true
    
  2. Switch to the namespace where IBM Cloud Pak® for AIOps is installed.

    oc project <cp4waiops_namespace>
    
  3. For Flink, run the following command before you increase the number of task managers:

    oc get pods | grep cp4waiops-eventprocessor-eve-29ee-ep-taskmanager
    

    The command output resembles the following example:

    cp4waiops-eventprocessor-eve-29ee-ep-taskmanager-0                1/1     Running     0          10d
    cp4waiops-eventprocessor-eve-29ee-ep-taskmanager-1                1/1     Running     0          10d
    
  4. Update the number of task managers to the required value.

    By default, the value is 2 in a starter deployment, but you can increase the value to 3. In a production deployment, the default value is 4, but you can increase it to 5 if the node has sufficient resources.

    1. Run the following command to view and edit your subscription.

      oc edit subscription.operators.coreos.com ibm-aiops-orchestrator -n <namespace>
      

      Where <namespace> is the namespace where IBM Cloud Pak for AIOps is deployed.

    2. If there is not a FLINK_TASK_MGR_REPLICAS section, then add the following YAML, just beneath spec. Set the number of FLINK_TASK_MGR_REPLICAS to the required value, and save your changes.

      config:
        env:
        - name: FLINK_TASK_MGR_REPLICAS
          value: "3"
      

      Example:

      apiVersion: operators.coreos.com/v1alpha1
      kind: Subscription
      metadata:
        name: ibm-aiops-orchestrator
        namespace: <namespace>
      spec:
        config:
          env:
          - name: FLINK_TASK_MGR_REPLICAS
            value: "3"
        channel: v4.5
        installPlanApproval: Automatic
        name: ibm-aiops-orchestrator
        source: ibm-operator-catalog
        sourceNamespace: openshift-marketplace
      

      Where <namespace> is the namespace where IBM Cloud Pak for AIOps is deployed.

  5. For Flink, run the following command after you increase the number of replicas to 3:

    oc get pods | grep cp4waiops-eventprocessor-eve-29ee-ep-taskmanager
    

    The command output resembles the following example:

    cp4waiops-eventprocessor-eve-29ee-ep-taskmanager-0                1/1     Running     0          10d
    cp4waiops-eventprocessor-eve-29ee-ep-taskmanager-1                1/1     Running     0          10d
    cp4waiops-eventprocessor-eve-29ee-ep-taskmanager-2                1/1     Running     0          2m44s