Check for open firewall rules in Google Cloud using Python

After making the mistake of forgetting about an open firewall rule created for testing (amateur mistake i know) I created this Python script to check for open firewall rules in my Google Cloud projects and notify me if it happens again.

Here is the script if you just want to use it for yourself, but if you want to know a little more about how it works please read on.

Use it simply by running:

./firewall_checker.py --project=PROJECT_NAME --check
#!/usr/bin/env/ python

# This script requires the active session to have the credentials for the target project. 
# The client lib will check the file set on the GOOGLE_APPLICATION_CREDENTIALS env variable.
# This is normally located at ~/.config/gcloud/<some_config_name>.json

# https://github.com/googleapis/google-api-python-client
# pip install --upgrade google-api-python-client
import googleapiclient.discovery
import argparse
import os
import json
import requests


class FirewallChecker(object):
    def __init__(self, flags):
        self.flags = flags
        self.compute = googleapiclient.discovery.build('compute', 'v1')
        self.firewall_rules = self.get_firewall_rules()


    def get_firewall_rules(self):
        try:
            result = self.compute.firewalls().list(project=self.flags.project).execute()
        except googleapiclient.errors.HttpError as e:
            reason = str(e._get_reason).split("\"")[-2]
            print('Error: ' + reason)
            if "permission" in reason:
                print("Looks like there was a permission issue. Please double check GOOGLE_APPLICATION_CREDENTIALS is set in your ENV and that it is pointing to the correct service json")
            exit(-1)
        return result['items'] if 'items' in result else None


    def print_firewall_rules(self):
        firewall_rules = self.get_firewall_rules()
        for rule in firewall_rules:
            print(' - ' + rule['name'])
            print('    - Created: ' + rule['creationTimestamp'])
            print('    - Direction: ' + rule['direction'])
            if 'sourceRanges' in rule:
                print('    - Source ranges: ')
                for ip in rule['sourceRanges']:
                    print('      - ' + ip)


    def check_firewall_rules(self):
        for rule in self.firewall_rules:
            self.check_rule_for_open_access(rule)


    def check_rule_for_open_access(self, rule):
        if 'sourceRanges' in rule:
            for ip in rule['sourceRanges']:
                if ip == '0.0.0.0/0':
                    message = "*{0}* - Rule {1} has open access with 0.0.0.0/0. Please Investigate".format(
                        self.flags.project, rule['name'])
                    print(message)
                    self.notify(message)


    def notify(self, message):
        """
        Push a notification to a service like Slack for example
        """
        pass


def main():
    parser = argparse.ArgumentParser(
        __file__, __doc__,
        formatter_class=argparse.ArgumentDefaultsHelpFormatter, )
    parser.add_argument('--project', required=True,
                        type=str, help="gcp project to lookup")
    parser.add_argument('--list', action='store_true',
                        help="list filewall rules for a project")
    parser.add_argument('--check', action='store_true',
                        help="check filewall rules for issues")

    args = parser.parse_args()

    firewall_checker = FirewallChecker(args)

    if args.list:
        print('Firewall rules  in project %s:' % (args.project))
        firewall_checker.print_firewall_rules()
    if args.check:
        firewall_checker.check_firewall_rules()


if __name__ == '__main__':
    main()

Breakdown

This script uses the googleapliclient to authenticate with a project automatically, using the service account JSON file found in the ENV variable GOOGLE_APPLICATION_CREDENTIALS. Check out the Google guide for authenticating with a project if you have not done so already.

First we need to initialise the library. We are going to do this using the discovery method. This uses the service account JSON file mentioned above

self.compute = googleapiclient.discovery.build('compute', 'v1')

Next we use the API to query all of the firewall rules for a project. This function simply takes the name of the project as a parameter. Checkout the official documentation for a detailed examples.

result = self.compute.firewalls().list(project=self.flags.project).execute()

I’ve wrapped this in a try/catch and specifically check for permissions issues, because I always forget to set the environment variable.

try:
    result = self.compute.firewalls().list(project=self.flags.project).execute()
except googleapiclient.errors.HttpError as e:
    reason = str(e._get_reason).split("\"")[-2]
    print('Error: ' + reason)
    if "permission" in reason:
        print("Looks like there was a permission issue. Please double check GOOGLE_APPLICATION_CREDENTIALS is set in your ENV and that it is pointing to the correct service json")
    exit(-1)
return result['items'] if 'items' in result else None

Once we have the have the firewall rules It’s a simple case of iterating over them. The object returned from the API is quite lengthy, refer again to the official documentation for the full response body.

Depending on whether you passed the –list or –check flag, the script will either print the firewall rules, or only check for open inbound rules. Both flags will iterate over the same object.

def print_firewall_rules(self):
    firewall_rules = self.get_firewall_rules()
for rule in firewall_rules:
    print(' - ' + rule['name'])
    print('    - Created: ' + rule['creationTimestamp'])
    print('    - Direction: ' + rule['direction'])
    if 'sourceRanges' in rule:
        print('    - Source ranges: ')
        for ip in rule['sourceRanges']:
            print('      - ' + ip)
def check_rule_for_open_access(self, rule):
    if 'sourceRanges' in rule:
        for ip in rule['sourceRanges']:
            if ip == '0.0.0.0/0':
                message = "*{0}* - Rule {1} has open access with 0.0.0.0/0. Please Investigate".format(
                    self.flags.project, rule['name'])
                print(message)
                self.notify(message)

Conclusion

So there it is, a simple way to check for open firewall rules in google cloud using python. Please let me know what you think, there are plenty of improvements that can be made to this. This script was designed specifically for one purpose and to run on a cron. However it could be fleshed out to be much more than that.

Leave a Reply

Your email address will not be published. Required fields are marked *