Check for open firewall rules in Google Cloud using Python

After making the mistake of forgetting about an open firewall rule created for testing (amateur mistake i know) I created this Python script to check for open firewall rules in my Google Cloud projects and notify me if it happens again.

Here is the script if you just want to use it for yourself, but if you want to know a little more about how it works please read on.

Use it simply by running:

./firewall_checker.py --project=PROJECT_NAME --check
#!/usr/bin/env/ python

# This script requires the active session to have the credentials for the target project. 
# The client lib will check the file set on the GOOGLE_APPLICATION_CREDENTIALS env variable.
# This is normally located at ~/.config/gcloud/<some_config_name>.json

# https://github.com/googleapis/google-api-python-client
# pip install --upgrade google-api-python-client
import googleapiclient.discovery
import argparse
import os
import json
import requests


class FirewallChecker(object):
    def __init__(self, flags):
        self.flags = flags
        self.compute = googleapiclient.discovery.build('compute', 'v1')
        self.firewall_rules = self.get_firewall_rules()


    def get_firewall_rules(self):
        try:
            result = self.compute.firewalls().list(project=self.flags.project).execute()
        except googleapiclient.errors.HttpError as e:
            reason = str(e._get_reason).split("\"")[-2]
            print('Error: ' + reason)
            if "permission" in reason:
                print("Looks like there was a permission issue. Please double check GOOGLE_APPLICATION_CREDENTIALS is set in your ENV and that it is pointing to the correct service json")
            exit(-1)
        return result['items'] if 'items' in result else None


    def print_firewall_rules(self):
        firewall_rules = self.get_firewall_rules()
        for rule in firewall_rules:
            print(' - ' + rule['name'])
            print('    - Created: ' + rule['creationTimestamp'])
            print('    - Direction: ' + rule['direction'])
            if 'sourceRanges' in rule:
                print('    - Source ranges: ')
                for ip in rule['sourceRanges']:
                    print('      - ' + ip)


    def check_firewall_rules(self):
        for rule in self.firewall_rules:
            self.check_rule_for_open_access(rule)


    def check_rule_for_open_access(self, rule):
        if 'sourceRanges' in rule:
            for ip in rule['sourceRanges']:
                if ip == '0.0.0.0/0':
                    message = "*{0}* - Rule {1} has open access with 0.0.0.0/0. Please Investigate".format(
                        self.flags.project, rule['name'])
                    print(message)
                    self.notify(message)


    def notify(self, message):
        """
        Push a notification to a service like Slack for example
        """
        pass


def main():
    parser = argparse.ArgumentParser(
        __file__, __doc__,
        formatter_class=argparse.ArgumentDefaultsHelpFormatter, )
    parser.add_argument('--project', required=True,
                        type=str, help="gcp project to lookup")
    parser.add_argument('--list', action='store_true',
                        help="list filewall rules for a project")
    parser.add_argument('--check', action='store_true',
                        help="check filewall rules for issues")

    args = parser.parse_args()

    firewall_checker = FirewallChecker(args)

    if args.list:
        print('Firewall rules  in project %s:' % (args.project))
        firewall_checker.print_firewall_rules()
    if args.check:
        firewall_checker.check_firewall_rules()


if __name__ == '__main__':
    main()

Breakdown

This script uses the googleapliclient to authenticate with a project automatically, using the service account JSON file found in the ENV variable GOOGLE_APPLICATION_CREDENTIALS. Check out the Google guide for authenticating with a project if you have not done so already.

First we need to initialise the library. We are going to do this using the discovery method. This uses the service account JSON file mentioned above

self.compute = googleapiclient.discovery.build('compute', 'v1')

Next we use the API to query all of the firewall rules for a project. This function simply takes the name of the project as a parameter. Checkout the official documentation for a detailed examples.

result = self.compute.firewalls().list(project=self.flags.project).execute()

I’ve wrapped this in a try/catch and specifically check for permissions issues, because I always forget to set the environment variable.

try:
    result = self.compute.firewalls().list(project=self.flags.project).execute()
except googleapiclient.errors.HttpError as e:
    reason = str(e._get_reason).split("\"")[-2]
    print('Error: ' + reason)
    if "permission" in reason:
        print("Looks like there was a permission issue. Please double check GOOGLE_APPLICATION_CREDENTIALS is set in your ENV and that it is pointing to the correct service json")
    exit(-1)
return result['items'] if 'items' in result else None

Once we have the have the firewall rules It’s a simple case of iterating over them. The object returned from the API is quite lengthy, refer again to the official documentation for the full response body.

Depending on whether you passed the –list or –check flag, the script will either print the firewall rules, or only check for open inbound rules. Both flags will iterate over the same object.

def print_firewall_rules(self):
    firewall_rules = self.get_firewall_rules()
for rule in firewall_rules:
    print(' - ' + rule['name'])
    print('    - Created: ' + rule['creationTimestamp'])
    print('    - Direction: ' + rule['direction'])
    if 'sourceRanges' in rule:
        print('    - Source ranges: ')
        for ip in rule['sourceRanges']:
            print('      - ' + ip)
def check_rule_for_open_access(self, rule):
    if 'sourceRanges' in rule:
        for ip in rule['sourceRanges']:
            if ip == '0.0.0.0/0':
                message = "*{0}* - Rule {1} has open access with 0.0.0.0/0. Please Investigate".format(
                    self.flags.project, rule['name'])
                print(message)
                self.notify(message)

Conclusion

So there it is, a simple way to check for open firewall rules in google cloud using python. Please let me know what you think, there are plenty of improvements that can be made to this. This script was designed specifically for one purpose and to run on a cron. However it could be fleshed out to be much more than that.

Checking if a Kubernetes Deployment is balanced

Every now and then I find myself needing to check if pods are split evenly across the available nodes. This can be for a number of reasons, but the most common for me is when Kubernetes, during pod scheduling, has preferred one or two nodes.

This can cause a node to use more resources than planned and could cause service disruptions, and who wants that?

If you have a small number of pods and nodes you could run the following command to get a quick overview of pod distribution.

kubectl get pods  -o wide --sort-by=.spec.nodeName | grep -i website

But when you are dealing with hundreds of pods across many nodes its not as easy to scan the results.

So I created this bash script to help me with just that! It takes a deployment name and queries the nodes to check if they are balanced.

#!/usr/bin/env bash

# Name of the target kubernetes deployments
DEPLOYMENT_NAME="example-deployment"
# Count of all running target deployments
DEPLOYMENT_COUNT=$(kubectl get pods --field-selector=status.phase=Running | grep -i ${DEPLOYMENT_NAME} | wc -l)
# List of all nodes
NODES=($(kubectl get no -o name | sed "s/node\///g"))
# Count of nodes
NODE_COUNT=${#NODES[@]}
# Are the pods balanced?
BALANCED=1

# Get the replica count from the deployments
DEPLOYMENT_EXPECTED_COUNT=$(kubectl get deployment ${DEPLOYMENT_NAME} -o go-template --template="{{.status.replicas}}")

EXPECTED_PODS_PER_NODE=$(( ${DEPLOYMENT_EXPECTED_COUNT} / ${NODE_COUNT} ))

for node in ${NODES[@]}; do
    echo -e "------\e[93m${node}\e[39m------\n"
    PODS=($(kubectl get pods --all-namespaces --field-selector spec.nodeName="${node}" -o wide -o name | sed "s/pod\///g" | grep -i ${DEPLOYMENT_NAME}))
    for pod in ${PODS[@]}; do
        echo ${pod}
    done

    echo -e "\n"

    POD_COUNT=$(kubectl get pods --all-namespaces --field-selector spec.nodeName="${node}" -o wide -o name | sed "s/pod\///g" | grep -i ${DEPLOYMENT_NAME} | wc -l)
    if (( "${POD_COUNT}" > "${EXPECTED_PODS_PER_NODE}" )); then
        EXCESS=$(( ${POD_COUNT} - ${EXPECTED_PODS_PER_NODE} ))
        echo -e "Node (${node}) has ${EXCESS} too many pods"
        BALANCED=0
    fi

    echo -e "\n"
done

if [[ ${BALANCED} == 0 ]]; then
    echo "Pods are not balanced"
else
    echo "Pods are balanced"
fi

This will output something like this

------node_1------
example-deployment-1
example-deployment-2

------node_2------
example-deployment-3
example-deployment-4

 ------node_2------
example-deployment-5


Pods are not balanced

This script has helped me a few times identify if there is a problem with pod balancing in my clusters. Feel free to take it for your self and modify it, there are plenty of improvements that could be made for it.

If you use this or make any adjustments please do let me know (Just for my own curiosity)