Source code for IHEWAcollect.base.user

# -*- coding: utf-8 -*-
"""
**User**

Before use this module, set account information in the ``accounts.yml`` file.

**Examples:**
::

    from IHEWAcollect.base.user import User

    user = User(workspace=path, product='CFSR', is_print=True)

.. note::

    1. Create ``accounts.yml`` under root folder of the project,
       based on the ``config-example.yml``.
    2. Run ``User.credential.encrypt_cfg(path, file, password)``
       to generate ``accounts.yml-encrypted`` file.
    3. Save key to ``credential.yml``.

"""
import base64
import inspect
import os
import sys

import yaml
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

# import shutil
# import datetime



try:
    # IHEClassInitError, IHEStringError, IHETypeError, IHEKeyError, IHEFileError
    from .base.exception import IHEClassInitError,\
        IHEStringError, IHEKeyError, IHEFileError
except ImportError:
    from IHEWAcollect.base.exception import IHEClassInitError,\
        IHEStringError, IHEKeyError, IHEFileError

try:
    from .base import Base
except ImportError:
    from IHEWAcollect.base.base import Base


[docs]class User(Base): """This User class Description Args: workspace (str): Directory to accounts.yml. product (str): Product name of data products. is_print (bool): Is to print status message. kwargs (dict): Other arguments. """ status = 'Global status.' __status = { 'messages': { 0: 'S: WA.User {f:>20} : status {c}, {m}', 1: 'E: WA.User {f:>20} : status {c}: {m}', 2: 'W: WA.User {f:>20} : status {c}: {m}', }, 'code': 0, 'message': '', 'is_print': True } __conf = { 'credential': { 'file_crd': 'accounts.yml-credential', 'file_enc': 'accounts.yml-encrypted', 'password': b'', 'length': 32, 'iterations': 100000, 'salt': b'WaterAccounting_', 'key': b'' }, 'file': 'accounts.yml', 'path': '', 'data': { 'accounts': {}, }, 'account': { 'name': '', 'data': {} } } def __init__(self, workspace, product, is_print, **kwargs): """Class instantiation """ # super(User, self).__init__(workspace, product, is_print, **kwargs) Base.__init__(self, product, is_print) tmp_product = self._Base__conf['product'] for argkey, argval in kwargs.items(): if argkey == 'others': self.argkey = argval # Class self.__status['is_print'] vname, rtype, vdata = 'is_print', bool, is_print if self.check_input(vname, rtype, vdata): self.__status['is_print'] = vdata else: self.__status['code'] = 1 # Class self.__conf['path'] vname, rtype, vdata = 'workspace', str, workspace if self.check_input(vname, rtype, vdata): self.__conf['path'] = vdata else: self.__status['code'] = 1 # Class self.__conf['account']['name'] self.__conf['account']['name'] = tmp_product['data']['account'] # Class self.__conf['data'] # Class self.__conf['account']['data'] if self.__status['code'] == 0: self._user() self.__status['message'] = 'Key is: "{v}"'.format( v=self.__conf['credential']['key']) else: raise IHEClassInitError('User') from None def _user(self): """Get user information This is the main function to configure user's credentials. **Don't synchronize the details to github.** - File to read: ``accounts.yml-credential`` - File to read: ``accounts.yml-encrypted`` """ conf_enc = None fname_org = self.__conf['file'] fname_enc = self.__conf['credential']['file_enc'] fname_crd = self.__conf['credential']['file_crd'] file_org = os.path.join(self.__conf['path'], fname_org) file_enc = os.path.join(self.__conf['path'], fname_enc) file_crd = os.path.join(self.__conf['path'], fname_crd) if os.path.exists(file_org): conf_enc = yaml.load(open(file_org, 'r', encoding='UTF8'), Loader=yaml.FullLoader) else: print('"{}" not exist'.format(file_org)) conf_enc = None if conf_enc is None: if os.path.exists(file_enc): key = self._user_key(file_crd) if len(key) > 0: # Load encrypted file conf_enc = yaml.load(self._user_decrypt(file_enc), Loader=yaml.FullLoader) else: # Generate encrypted file conf_enc = self._user_encrypt(file_org) else: self.__status['code'] = 1 raise IHEFileError(file_enc) from None if isinstance(conf_enc, dict): for key in conf_enc: try: self.__conf['data'][key] = conf_enc[key] except KeyError: self.__status['code'] = 1 raise IHEKeyError(key, fname_enc) from None else: subkey = self.__conf['account']['name'] if subkey is not None: try: self.__conf['account']['data'] = conf_enc[key][subkey] except KeyError: raise IHEKeyError(subkey, fname_enc) from None else: self.__status['code'] = 0 else: self.__conf['account']['name'] = '' self.__status['code'] = 0 else: self.__status['code'] = 1 self.set_status( inspect.currentframe().f_code.co_name, prt=self.__status['is_print'], ext='') def _user_key(self, file) -> str: """Getting a key This function fun. A URL-safe base64-encoded 32-byte key. This must be kept secret. Anyone with this key is able to create and read messages. Args: file (str): File name. """ pswd = None key = None conf = None if os.path.exists(file): conf = yaml.load(open(file, 'r', encoding='UTF8'), Loader=yaml.FullLoader) else: print('"{}" not exist'.format(file)) # password, Yaml load/Python input if conf is not None and isinstance(conf, dict): if 'password' in conf.keys(): pswd = conf['password'] else: pswd = input('\33[91m' 'IHEWAcollect: Enter your password: ' '\33[0m') else: pswd = input('\33[91m' 'IHEWAcollect: Enter your password: ' '\33[0m') pswd = pswd.strip() key_from_pswd = self._user_key_generator(pswd) # self.__conf['credential']['password'] = str.encode(pswd) # key, Yaml load/Python input/Python generate if conf is not None and isinstance(conf, dict): if 'key' in conf.keys(): key = conf['key'] else: key = '' else: key = '' # Final check keys self.__conf['credential']['password'] = str.encode(pswd) if key_from_pswd == key: self.__conf['credential']['key'] = str.encode(key) return key else: return '' def _user_key_generator(self, pswd) -> str: """Getting a key This function fun. A URL-safe base64-encoded 32-byte key. This must be kept secret. Anyone with this key is able to create and read messages. Returns: str: key """ # from cryptography.fernet import Fernet # key = Fernet.generate_key() length = self.__conf['credential']['length'] iterations = self.__conf['credential']['iterations'] salt = self.__conf['credential']['salt'] # pswd = self.__conf['credential']['password'] # if isinstance(pswd, bytes): # pass if isinstance(pswd, str): pswd = str.encode(pswd) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), salt=salt, length=length, iterations=iterations, backend=default_backend() ) key = base64.urlsafe_b64encode(kdf.derive(pswd)) return key.decode() def _user_encrypt(self, file) -> dict: """Encrypt file with given key This function encrypt accounts.yml file. Args: file (str): Encrypted file name. Returns: dict: data """ pswd = self.__conf['credential']['password'] # key = self.__conf['credential']['key'] key = str.encode(self._user_key_generator(pswd.decode())) file_org = file file_enc = file_org + '-encrypted' file_crd = file_org + '-credential' if os.path.exists(file_org): with open(file_org, 'rb') as fp_in: data = fp_in.read() with open(file_enc, 'wb', buffering=0) as fp_out: fernet = Fernet(key) encrypted = fernet.encrypt(data) fp_out.write(encrypted) with open(file_crd, 'w', buffering=1) as fp_out: fp_out.write('# password should be deleted!\n') fp_out.write('# password: "{}"\n'.format(pswd.decode())) fp_out.write('key: "{}"\n'.format(key.decode())) return yaml.load(open(file_org, 'r', encoding='UTF8'), Loader=yaml.FullLoader) else: raise IHEFileError(file_org) from None def _user_decrypt(self, file) -> str: """Decrypt file with given key This function decrypt accounts.yml file. Args: file (str): File name. Returns: str: Decrypted Yaml data by utf-8. """ key = self.__conf['credential']['key'] if os.path.exists(file): with open(file, 'rb') as fp_in: data = fp_in.read() decrypted = Fernet(key).decrypt(data).decode('utf8') return decrypted else: raise FileNotFoundError('IHEWAcollect: "{f}"' ' not found.'.format(f=file))
[docs] def set_status(self, fun='', prt=False, ext=''): """Set status Args: fun (str): Function name. prt (bool): Is to print on screen? ext (str): Extra message. """ self.status = self._status(self.__status['messages'], self.__status['code'], fun, prt, ext)
[docs] def generate_encrypt(self): """Generate encrypted files """ fname_org = self.__conf['file'] fname_enc = self.__conf['credential']['file_enc'] fname_crd = self.__conf['credential']['file_crd'] file_org = os.path.join(self.__conf['path'], fname_org) file_enc = os.path.join(self.__conf['path'], fname_enc) file_crd = os.path.join(self.__conf['path'], fname_crd) pswd = input('\33[91m' 'IHEWAcollect: Enter your password: ' '\33[0m') pswd = pswd.strip() if pswd == '' or pswd is None: print(IHEStringError('password')) sys.exit(1) else: self.__conf['credential']['password'] = str.encode(pswd) key = self._user_key_generator() if os.path.exists(file_org): with open(file_org, 'rb') as fp_in: data = fp_in.read() with open(file_enc, 'wb', buffering=0) as fp_out: fernet = Fernet(key) encrypted = fernet.encrypt(data) fp_out.write(encrypted) with open(file_crd, 'w', buffering=1) as fp_out: fp_out.write('# password should be deleted!\n') fp_out.write('# password: "{}"\n'.format(pswd)) fp_out.write('key: "{}"\n'.format(key)) else: raise IHEFileError(file_org) from None
[docs] def get_user(self, key): """Get user information This is the function to get user's configuration data. **Don't synchronize the details to github.** - File to read: ``accounts.yml-credential`` contains key: ``accounts.yml-encrypted``. - File to read: ``accounts.yml-encrypted`` generated from: ``accounts.yml``. Args: key (str): Key name. Returns: dict: User data. :Example: >>> import os >>> from IHEWAcollect.base.user import User >>> user = User(os.getcwd(), 'FTP_WA_GUESS', is_print=False) >>> account = user.get_user('account') >>> account['FTP_WA_GUESS'] {'username': 'wateraccountingguest', 'password': 'W@t3r@ccounting', ... >>> accounts = user.get_user('accounts') Traceback (most recent call last): ... KeyError: """ fun_name = inspect.currentframe().f_code.co_name if key in self.__conf: self.__stcode = 0 else: self.__stcode = 1 raise KeyError('Key "{k}" not found in "{v}".' .format(k=key, v=self.__conf.keys())) self.set_status( fun=fun_name, prt=self.__prtstd, ext='') return self.__conf[key]
# @staticmethod if __name__ == "__main__": from pprint import pprint # @classmethod # print('\nUser.check_conf()\n=====') # pprint(User.check_conf('data', is_print=False)) # User __init__ print('\nUser\n=====') path = os.path.join( os.getcwd(), os.path.dirname( inspect.getfile( inspect.currentframe())), '../', '../', '../', 'tests' ) product = 'ALEXI' # product = 'ECMWF' user = User(path, product, is_print=True) # Base attributes print('\nuser._Base__conf\n=====') # pprint(user._Base__conf) # User attributes print('\nuser._User__conf\n=====') print(product, user._User__conf['account']) # print(user._User__conf['data']['accounts'].keys()) # pprint(user._User__conf) # User methods print('\nuser.get_status()\n=====') pprint(user.get_status())