scripts/debops-padlock

188 lines
6.7 KiB
Python
Executable File

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
debops-padlock: encrypt secret directory with EncFS and GPG
"""
# Copyright (C) 2014-2015 Hartmut Goebel <h.goebel@crazy-compilers.com>
# Part of the DebOps - https://debops.org/
# This program is free software; you can redistribute
# it and/or modify it under the terms of the
# GNU General Public License as published by the Free
# Software Foundation; either version 3 of the License,
# or (at your option) any later version.
#
# This program is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General
# Public License along with this program; if not,
# write to the Free Software Foundation, Inc., 59
# Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# An on-line copy of the GNU General Public License can
# be downloaded from the FSF web page at:
# https://www.gnu.org/copyleft/gpl.html
from __future__ import print_function
import os
import shutil
import argparse
import itertools
import stat
import sys
import time
from pkg_resources import resource_filename
from debops import *
from debops.cmds import *
__author__ = "Hartmut Goebel <h.goebel@crazy-compilers.com>"
__copyright__ = "Copyright 2014-2015 by Hartmut Goebel <h.goebel@crazy-compilers.com>"
__licence__ = "GNU General Public License version 3 (GPL v3) or later"
def gen_pwd():
from string import ascii_letters, digits, punctuation
import random
ALLCHARS = digits + ascii_letters + punctuation
ALLCHARS = digits + ascii_letters + '-_!@#$%^&*()_+{}|:<>?='
pwd = ''.join(random.choice(ALLCHARS) for i in range(ENCFS_KEYFILE_LENGTH))
return pwd
# Randomness source for EncFS keyfile generation
devrandom = os.environ.get('DEVRANDOM', "/dev/urandom")
SCRIPT_FILENAME = 'padlock-script'
# ---- DebOps environment setup ----
def main(subcommand_func, **kwargs):
project_root = find_debops_project(required=True)
# :todo: Source DebOps configuration file
#[ -r ${debops_config} ] && source ${debops_config}
# ---- Main script ----
# Make sure required commands are present
require_commands('encfs', 'find', 'fusermount', 'gpg')
inventory_path = find_inventorypath(project_root, required=False)
# If inventory hasn't been found automatically, assume it's the default
if not inventory_path:
inventory_path = os.path.join(project_root, 'ansible', INVENTORY)
# Create names of EncFS encrypted and decrypted directories, based on
# inventory name (absolute paths are specified)
encfs_encrypted = os.path.join(os.path.dirname(inventory_path),
ENCFS_PREFIX + SECRET_NAME)
encfs_decrypted = os.path.join(os.path.dirname(inventory_path),
SECRET_NAME)
subcommand_func(encfs_decrypted, encfs_encrypted, **kwargs)
def init(encfs_decrypted, encfs_encrypted, recipients):
# EncFS cannot create encrypted directory if directory with
# decrypted data is not empty
if not os.path.exists(encfs_decrypted):
os.makedirs(encfs_decrypted)
elif os.listdir(encfs_decrypted):
error_msg("secret directory not empty")
# Quit if encrypted directory already exists.
if os.path.isdir(encfs_encrypted):
error_msg("EncFS directory already exists")
os.makedirs(encfs_encrypted)
encfs_keyfile = os.path.join(encfs_encrypted, ENCFS_KEYFILE)
encfs_configfile = os.path.join(encfs_encrypted, ENCFS_CONFIGFILE)
# put a `-r` in front of each recipient for passing as args to gpg
recipients = list(itertools.chain.from_iterable(['-r', r]
for r in recipients))
# Generate a random password and encrypt it with GPG keys of recipients.
print("Generating a random", ENCFS_KEYFILE_LENGTH, "char password")
pwd = gen_pwd()
gpg = subprocess.Popen(['gpg', '--encrypt', '--armor',
'--output', encfs_keyfile] + recipients,
stdin=subprocess.PIPE)
gpg.communicate(pwd)
# Mount the encfs to the config file will be written. Tell encfs
# it to ask gpg for the password.
# NB1: Alternativly we could use --stdinpass, but using --extpass makes
# the user check if she has the correct passphrase early.
# NB2: We can not use padlock_unlock here, because the config file
# does not yet exist.
encfs = subprocess.Popen([
'encfs', encfs_encrypted, encfs_decrypted,
'--extpass', 'gpg --no-mdc-warning --output - '+shquote(encfs_keyfile)],
stdin=subprocess.PIPE)
encfs.communicate('p\n'+pwd)
# Create padlock-script
padlock_script = os.path.join(encfs_encrypted, PADLOCK_CMD)
# :todo: use resource_stream
shutil.copy(resource_filename('debops', SCRIPT_FILENAME), padlock_script)
os.chmod(padlock_script,
os.stat(padlock_script).st_mode|stat.S_IXUSR|stat.S_IXGRP|stat.S_IXOTH)
# Lock the EncFS directory after creation
time.sleep(0.5) # :fixme: why sleeping here?
padlock_lock(encfs_encrypted)
# Protect the EncFS configuration file by also encrypting it with
# the GPG keys of recipients.
subprocess.call(['gpg', '--encrypt', '--armor',
'--output', encfs_configfile+'.asc']
+ recipients + [encfs_configfile])
os.remove(encfs_configfile)
def lock(encfs_decrypted, encfs_encrypted, verbose):
# Unmount the directory if it is mounted
if padlock_lock(encfs_encrypted):
if verbose: print("Locked!")
else:
if verbose: print("Is already locked.")
def unlock(encfs_decrypted, encfs_encrypted, verbose):
# Mount the directory it if it is unmounted
if padlock_unlock(encfs_encrypted):
if verbose: print("Unlocked!")
else:
if verbose: print("Is already unlocked.")
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(
help='action to perform. Use `%(prog)s --help <action>` for further help.')
p = subparsers.add_parser('init')
p.add_argument('recipients', nargs='*',
help=("GPG recipients for which the secret key should be "
"encrypted for (name, e-mail or key-id)"))
p.set_defaults(subcommand_func=init)
p = subparsers.add_parser('unlock')
p.add_argument('-v', '--verbose', action='store_true', help="be verbose")
p.set_defaults(subcommand_func=unlock)
p = subparsers.add_parser('lock')
p.add_argument('-v', '--verbose', action='store_true', help="be verbose")
p.set_defaults(subcommand_func=lock)
args = parser.parse_args()
try:
main(**vars(args))
except KeyboardInterrupt:
raise SystemExit('... aborted')