Getting Started with grafanalib

Do you like Grafana but wish you could version your dashboard configuration? Do you find yourself repeating common patterns? If so, grafanalib is for you.

grafanalib lets you generate Grafana dashboards from simple Python scripts.

Grafana migrates dashboards to the latest Grafana schema version on import, meaning that dashboards created with grafanalib are supported by all versions of Grafana. You may find that some of the latest features are missing from grafanalib, please refer to the module documentation for information about supported features. If you find a missing feature please raise an issue or submit a PR to the GitHub repository

Writing dashboards

The following will configure a dashboard with a couple of example panels that use the random walk and Prometheus datasources.


from grafanalib.core import (
    Dashboard, TimeSeries, GaugePanel,
    Target, GridPos,
    OPS_FORMAT
)

dashboard = Dashboard(
    title="Python generated example dashboard",
    description="Example dashboard using the Random Walk and default Prometheus datasource",
    tags=[
        'example'
    ],
    timezone="browser",
    panels=[
        TimeSeries(
            title="Random Walk",
            dataSource='default',
            targets=[
                Target(
                    datasource='grafana',
                    expr='example',
                ),
            ],
            gridPos=GridPos(h=8, w=16, x=0, y=0),
        ),
        GaugePanel(
            title="Random Walk",
            dataSource='default',
            targets=[
                Target(
                    datasource='grafana',
                    expr='example',
                ),
            ],
            gridPos=GridPos(h=4, w=4, x=17, y=0),
        ),
        TimeSeries(
            title="Prometheus http requests",
            dataSource='prometheus',
            targets=[
                Target(
                    expr='rate(prometheus_http_requests_total[5m])',
                    legendFormat="{{ handler }}",
                    refId='A',
                ),
            ],
            unit=OPS_FORMAT,
            gridPos=GridPos(h=8, w=16, x=0, y=10),
        ),
    ],
).auto_panel_ids()

There is a fair bit of repetition here, but once you figure out what works for your needs, you can factor that out. See our Weave-specific customizations for inspiration.

Generating dashboards

If you save the above as example.dashboard.py (the suffix must be .dashboard.py), you can then generate the JSON dashboard with:

$ generate-dashboard -o frontend.json example.dashboard.py

Uploading dashboards from code

Sometimes you may need to generate and upload dashboard directly from Python code. The following example provides minimal code boilerplate for it:

from grafanalib.core import Dashboard
from grafanalib._gen import DashboardEncoder
import json
import requests
from os import getenv


def get_dashboard_json(dashboard, overwrite=False, message="Updated by grafanlib"):
    '''
    get_dashboard_json generates JSON from grafanalib Dashboard object

    :param dashboard - Dashboard() created via grafanalib
    '''

    # grafanalib generates json which need to pack to "dashboard" root element
    return json.dumps(
        {
            "dashboard": dashboard.to_json_data(),
            "overwrite": overwrite,
            "message": message
        }, sort_keys=True, indent=2, cls=DashboardEncoder)


def upload_to_grafana(json, server, api_key, verify=True):
    '''
    upload_to_grafana tries to upload dashboard to grafana and prints response

    :param json - dashboard json generated by grafanalib
    :param server - grafana server name
    :param api_key - grafana api key with read and write privileges
    '''

    headers = {'Authorization': f"Bearer {api_key}", 'Content-Type': 'application/json'}
    r = requests.post(f"https://{server}/api/dashboards/db", data=json, headers=headers, verify=verify)
    # TODO: add error handling
    print(f"{r.status_code} - {r.content}")


grafana_api_key = getenv("GRAFANA_API_KEY")
grafana_server = getenv("GRAFANA_SERVER")

my_dashboard = Dashboard(title="My awesome dashboard", uid='abifsd')
my_dashboard_json = get_dashboard_json(my_dashboard, overwrite=True)
upload_to_grafana(my_dashboard_json, grafana_server, grafana_api_key)

Alternatively Grafana supports file based provisioning, where dashboard files are periodically loaded into the Grafana database. Tools like Anisble can assist with the deployment.

Writing Alerts

Between Grafana versions there have been significant changes in how alerts are managed. Bellow are some example of how to configure alerting in Grafana v8 and Grafana v9.

Alerts in Grafana v8

The following will configure a couple of alerts inside a group.

"""Example grafana 8.x+ Alert"""


from grafanalib.core import (
    AlertGroup,
    AlertRulev8,
    Target,
    AlertCondition,
    LowerThan,
    OP_OR,
    OP_AND,
    RTYPE_LAST
)

# An AlertGroup is one group contained in an alert folder.
alertgroup = AlertGroup(
    name="Production Alerts",
    # Each AlertRule forms a separate alert.
    rules=[
        AlertRulev8(
            # Each rule must have a unique title
            title="Database is unresponsive",
            # Several triggers can be used per alert, a trigger is a combination of a Target and its AlertCondition in a tuple.
            triggers=[
                (
                    # A target refId must be assigned, and exist only once per AlertRule.
                    Target(
                        expr='sum(kube_pod_container_status_ready{exported_pod=~"database-/*"})',
                        # Set datasource to name of your datasource
                        datasource="VictoriaMetrics",
                        refId="A",
                    ),
                    AlertCondition(
                        evaluator=LowerThan(1),
                        # To have the alert fire when either of the triggers are met in the rule, set both AlertCondition operators to OP_OR.
                        operator=OP_OR,
                        reducerType=RTYPE_LAST
                    )
                ),
                (
                    Target(
                        expr='sum by (app) (count_over_time({app="database"}[5m]))',
                        # Set datasource to name of your datasource
                        datasource="Loki",
                        refId="B",
                    ),
                    AlertCondition(
                        evaluator=LowerThan(1000),
                        operator=OP_OR,
                        reducerType=RTYPE_LAST
                    )
                )
            ],
            annotations={
                "summary": "The database is down",
                "runbook_url": "runbook-for-this-scenario.com/foo",
            },
            labels={
                "environment": "prod",
                "slack": "prod-alerts",
            },
            evaluateInterval="1m",
            evaluateFor="3m",
        ),

        # Second alert
        AlertRulev8(
            title="Service API blackbox failure",
            triggers=[
                (
                    Target(
                        expr='probe_success{instance="my-service.foo.com/ready"}',
                        # Set datasource to name of your datasource
                        datasource="VictoriaMetrics",
                        refId="A",
                    ),
                    AlertCondition(
                        evaluator=LowerThan(1),
                        operator=OP_AND,
                        reducerType=RTYPE_LAST,
                    )
                )
            ],
            annotations={
                "summary": "Service API has been unavailable for 3 minutes",
                "runbook_url": "runbook-for-this-scenario.com/foo",
            },
            labels={
                "environment": "prod",
                "slack": "prod-alerts",
            },
            evaluateInterval="1m",
            evaluateFor="3m",
        )
    ]
)

Although this example has a fair amount of boilerplate, when creating large numbers of similar alerts it can save lots of time to programmatically fill these fields.

Each AlertGroup represents a folder within Grafana’s alerts tab. This consists of one or more AlertRulev8, which contains one or more triggers. Triggers define what will cause the alert to fire.

A trigger is made up of a Target (a Grafana query on a datasource) and an AlertCondition (a condition this query must satisfy in order to alert).

Finally, there are additional settings like:

  • How the alert will behave when data sources have problems (noDataAlertState and errorAlertState)

  • How frequently the trigger is evaluated (evaluateInterval)

  • How long the AlertCondition needs to be met before the alert fires (evaluateFor)

  • Annotations and labels, which help provide contextual information and direct where your alerts will go

Alerts in Grafana v9

The following will configure a couple of alerts inside a group for Grafana v9.x+.

"""Example grafana 9.x+ Alert"""


from grafanalib.core import (
    AlertGroup,
    AlertRulev9,
    Target,
    AlertCondition,
    AlertExpression,
    GreaterThan,
    OP_AND,
    RTYPE_LAST,
    EXP_TYPE_CLASSIC,
    EXP_TYPE_REDUCE,
    EXP_TYPE_MATH
)

# An AlertGroup is one group contained in an alert folder.
alertgroup = AlertGroup(
    name="Production Alerts",
    # Each AlertRule forms a separate alert.
    rules=[
        # Alert rule using classic condition > 3
        AlertRulev9(
            # Each rule must have a unique title
            title="Alert for something 1",
            uid='alert1',
            # Several triggers can be used per alert
            condition='B',
            triggers=[
                # A target refId must be assigned, and exist only once per AlertRule.
                Target(
                    expr="from(bucket: \"sensors\")\n  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n  |> filter(fn: (r) => r[\"_measurement\"] == \"remote_cpu\")\n  |> filter(fn: (r) => r[\"_field\"] == \"usage_system\")\n  |> filter(fn: (r) => r[\"cpu\"] == \"cpu-total\")\n  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)\n  |> yield(name: \"mean\")",
                    # Set datasource to name of your datasource
                    datasource="influxdb",
                    refId="A",
                ),
                AlertExpression(
                    refId="B",
                    expressionType=EXP_TYPE_CLASSIC,
                    expression='A',
                    conditions=[
                        AlertCondition(
                            evaluator=GreaterThan(3),
                            operator=OP_AND,
                            reducerType=RTYPE_LAST
                        )
                    ]
                )
            ],
            annotations={
                "summary": "The database is down",
                "runbook_url": "runbook-for-this-scenario.com/foo",
            },
            labels={
                "environment": "prod",
                "slack": "prod-alerts",
            },
            evaluateFor="3m",
        ),
        # Alert rule using reduce and Math
        AlertRulev9(
            # Each rule must have a unique title
            title="Alert for something 2",
            uid='alert2',
            condition='C',
            # Several triggers can be used per alert
            triggers=[
                # A target refId must be assigned, and exist only once per AlertRule.
                Target(
                    expr="from(bucket: \"sensors\")\n  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n  |> filter(fn: (r) => r[\"_measurement\"] == \"remote_cpu\")\n  |> filter(fn: (r) => r[\"_field\"] == \"usage_system\")\n  |> filter(fn: (r) => r[\"cpu\"] == \"cpu-total\")\n  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)\n  |> yield(name: \"mean\")",
                    # Set datasource to name of your datasource
                    datasource="influxdb",
                    refId="A",
                ),
                AlertExpression(
                    refId="B",
                    expressionType=EXP_TYPE_REDUCE,
                    expression='A',
                    reduceFunction='mean',
                    reduceMode='dropNN'
                ),
                AlertExpression(
                    refId="C",
                    expressionType=EXP_TYPE_MATH,
                    expression='$B < 3'
                )
            ],
            annotations={
                "summary": "The database is down",
                "runbook_url": "runbook-for-this-scenario.com/foo",
            },
            labels={
                "environment": "prod",
                "slack": "prod-alerts",
            },
            evaluateFor="3m",
        )
    ]
)

Although this example has a fair amount of boilerplate, when creating large numbers of similar alerts it can save lots of time to programmatically fill these fields.

Each AlertGroup represents a folder within Grafana’s alerts tab. This consists of one or more AlertRulev9, which contains a list of triggers, that define what will cause the alert to fire.

A trigger can either be a Target (a Grafana query on a datasource) or an AlertExpression (a expression performed on one of the triggers).

An AlertExpression can be one of 4 types

  • Classic - Contains and list of AlertCondition’s that are evaluated

  • Reduce - Reduce the queried data

  • Resample - Resample the queried data

  • Math - Expression with the condition for the rule

Finally, there are additional settings like:

  • How the alert will behave when data sources have problems (noDataAlertState and errorAlertState)

  • How frequently the each rule in the Alert Group is evaluated (evaluateInterval)

  • How long the AlertCondition needs to be met before the alert fires (evaluateFor)

  • Annotations and labels, which help provide contextual information and direct where your alerts will go

Generating Alerts

If you save either of the above examples for Grafana v8 or v9 as example.alertgroup.py (the suffix must be .alertgroup.py), you can then generate the JSON alert with:

$ generate-alertgroup -o alerts.json example.alertgroup.py

Uploading alerts from code

As Grafana does not currently have a user interface for importing alertgroup JSON, you must either upload the alerts via Grafana’s REST API or use file based provisioning.

Uploading alerts from code using REST API

The following example provides minimal code boilerplate for it:

from grafanalib.core import AlertGroup
from grafanalib._gen import DashboardEncoder, loader
import json
import requests
from os import getenv


def get_alert_json(alert: AlertGroup):
    '''
    get_alert_json generates JSON from grafanalib AlertGroup object

    :param alert - AlertGroup created via grafanalib
    '''

    return json.dumps(alert.to_json_data(), sort_keys=True, indent=4, cls=DashboardEncoder)


def upload_to_grafana(alertjson, folder, server, api_key, session_cookie, verify=True):
    '''
    upload_to_grafana tries to upload the AlertGroup to grafana and prints response
    WARNING: This will first delete all alerts in the AlertGroup before replacing them with the provided AlertGroup.

    :param alertjson - AlertGroup json generated by grafanalib
    :param folder - Folder to upload the AlertGroup into
    :param server - grafana server name
    :param api_key - grafana api key with read and write privileges
    '''
    groupName = json.loads(alertjson)['name']

    headers = {}
    if api_key:
        print("using bearer auth")
        headers['Authorization'] = f"Bearer {api_key}"

    if session_cookie:
        print("using session cookie")
        headers['Cookie'] = session_cookie

    print(f"deleting AlertGroup {groupName} in folder {folder}")
    r = requests.delete(f"https://{server}/api/ruler/grafana/api/v1/rules/{folder}/{groupName}", headers=headers, verify=verify)
    print(f"{r.status_code} - {r.content}")

    headers['Content-Type'] = 'application/json'

    print(f"ensuring folder {folder} exists")
    r = requests.post(f"https://{server}/api/folders", data={"title": folder}, headers=headers)
    print(f"{r.status_code} - {r.content}")

    print(f"uploading AlertGroup {groupName} to folder {folder}")
    r = requests.post(f"https://{server}/api/ruler/grafana/api/v1/rules/{folder}", data=alertjson, headers=headers, verify=verify)
    # TODO: add error handling
    print(f"{r.status_code} - {r.content}")


grafana_api_key = getenv("GRAFANA_API_KEY")
grafana_server = getenv("GRAFANA_SERVER")
grafana_cookie = getenv("GRAFANA_COOKIE")

# Generate an alert from the example
my_alergroup_json = get_alert_json(loader("./grafanalib/tests/examples/example.alertgroup.py"))
upload_to_grafana(my_alergroup_json, "testfolder", grafana_server, grafana_api_key, grafana_cookie)

Uploading alerts from code using File Based Provisioning

The alternative to using Grafana’s REST API is to use its file based provisioning for alerting.

The following example uses the AlertFileBasedProvisioning class to provision a list of alert groups:

"""Example grafana 9.x+ Alert"""


from grafanalib.core import (
    AlertGroup,
    AlertRulev9,
    Target,
    AlertCondition,
    AlertExpression,
    AlertFileBasedProvisioning,
    GreaterThan,
    OP_AND,
    RTYPE_LAST,
    EXP_TYPE_CLASSIC,
    EXP_TYPE_REDUCE,
    EXP_TYPE_MATH
)

# An AlertGroup is one group contained in an alert folder.
alertgroup = AlertGroup(
    name="Production Alerts",
    # Each AlertRule forms a separate alert.
    rules=[
        # Alert rule using classic condition > 3
        AlertRulev9(
            # Each rule must have a unique title
            title="Alert for something 3",
            uid='alert3',
            # Several triggers can be used per alert
            condition='B',
            triggers=[
                # A target refId must be assigned, and exist only once per AlertRule.
                Target(
                    expr="from(bucket: \"sensors\")\n  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n  |> filter(fn: (r) => r[\"_measurement\"] == \"remote_cpu\")\n  |> filter(fn: (r) => r[\"_field\"] == \"usage_system\")\n  |> filter(fn: (r) => r[\"cpu\"] == \"cpu-total\")\n  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)\n  |> yield(name: \"mean\")",
                    # Set datasource to name of your datasource
                    datasource="influxdb",
                    refId="A",
                ),
                AlertExpression(
                    refId="B",
                    expressionType=EXP_TYPE_CLASSIC,
                    expression='A',
                    conditions=[
                        AlertCondition(
                            evaluator=GreaterThan(3),
                            operator=OP_AND,
                            reducerType=RTYPE_LAST
                        )
                    ]
                )
            ],
            annotations={
                "summary": "The database is down",
                "runbook_url": "runbook-for-this-scenario.com/foo",
            },
            labels={
                "environment": "prod",
                "slack": "prod-alerts",
            },
            evaluateFor="3m",
        ),
        # Alert rule using reduce and Math
        AlertRulev9(
            # Each rule must have a unique title
            title="Alert for something 4",
            uid='alert4',
            condition='C',
            # Several triggers can be used per alert
            triggers=[
                # A target refId must be assigned, and exist only once per AlertRule.
                Target(
                    expr="from(bucket: \"sensors\")\n  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n  |> filter(fn: (r) => r[\"_measurement\"] == \"remote_cpu\")\n  |> filter(fn: (r) => r[\"_field\"] == \"usage_system\")\n  |> filter(fn: (r) => r[\"cpu\"] == \"cpu-total\")\n  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)\n  |> yield(name: \"mean\")",
                    # Set datasource to name of your datasource
                    datasource="influxdb",
                    refId="A",
                ),
                AlertExpression(
                    refId="B",
                    expressionType=EXP_TYPE_REDUCE,
                    expression='A',
                    reduceFunction='mean',
                    reduceMode='dropNN'
                ),
                AlertExpression(
                    refId="C",
                    expressionType=EXP_TYPE_MATH,
                    expression='$B < 3'
                )
            ],
            annotations={
                "summary": "The database is down",
                "runbook_url": "runbook-for-this-scenario.com/foo",
            },
            labels={
                "environment": "prod",
                "slack": "prod-alerts",
            },
            evaluateFor="3m",
        )
    ]
)

alertfilebasedprovisioning = AlertFileBasedProvisioning([alertgroup])

Save the above example as example.alertfilebasedprovisioning.py (the suffix must be .alertfilebasedprovisioning.py), you can then generate the JSON alert with:

$ generate-alertgroup -o alerts.json example.alertfilebasedprovisioning.py

Then place the file in the provisioning/alerting directory and start Grafana Tools like Anisble can assist with the deployment of the alert file.

Installation

grafanalib is just a Python package, so:

$ pip install grafanalib

Support

This library is in its very early stages. We’ll probably make changes that break backwards compatibility, although we’ll try hard not to.

grafanalib works with Python 3.7, 3.8, 3.9, 3.10 and 3.11.

Developing

If you’re working on the project, and need to build from source, it’s done as follows:

$ virtualenv .env
$ . ./.env/bin/activate
$ pip install -e .

Configuring Grafana Datasources

This repo used to contain a program gfdatasource for configuring Grafana data sources, but it has been retired since Grafana now has a built-in way to do it. See https://grafana.com/docs/administration/provisioning/#datasources