Skip to main content

Finmars Standard Library

Part of Workflow Engine that exposes to user useful utilities to work with Workflow or Finmars Platform REST API

import csv
import datetime
import importlib
import json
import logging
import os
import time
from datetime import timedelta

import jwt
import pandas as pd
import requests
from django.core.files.base import ContentFile
from flatten_json import flatten

from workflow.authentication import FinmarsRefreshToken
from workflow.models import User, Space
from workflow_app import settings

_l = logging.getLogger('workflow')


class DjangoStorageHandler(logging.Handler):
    def __init__(self, log_file, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.log_file = log_file

    def emit(self, record):
        log_entry = self.format(record)

        storage = Storage()

        storage.append_text(self.log_file, log_entry)

        # with storage.open(self.log_file, 'a') as log_file:
        #     log_file.write(log_entry + '\n')


# DEPRECATED, remove in 1.9.0
def get_access_token(ttl_minutes=60 * 8, *args, **kwargs):
    bot = User.objects.get(username="finmars_bot")

    # Define the expiration time +1 hour from now
    expiration_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=ttl_minutes)

    space = Space.objects.all().first()

    # Define the payload with the expiration time and username
    payload = {
        'username': bot.username,
        'realm_code': space.realm_code,
        'space_code': space.realm_code,
        'exp': expiration_time,
        'iat': datetime.datetime.utcnow()  # Issued at time
    }

    # Encode the JWT token
    jwt_token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')

    token = FinmarsRefreshToken(jwt_token)

    return token


# This one is good
def get_refresh_token(ttl_minutes=60 * 8, *args, **kwargs):
    bot = User.objects.get(username="finmars_bot")

    # Define the expiration time +1 hour from now
    expiration_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=ttl_minutes)

    space = Space.objects.all().first()

    # Define the payload with the expiration time and username
    payload = {
        'username': bot.username,
        'realm_code': space.realm_code,
        'space_code': space.realm_code,
        'exp': expiration_time,
        'iat': datetime.datetime.utcnow()  # Issued at time
    }

    # Encode the JWT token
    jwt_token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')

    token = FinmarsRefreshToken(jwt_token)

    return token


def get_domain():
    return settings.DOMAIN_NAME


def get_space():
    space = Space.objects.all().first()

    return space


def get_space_code():
    space = Space.objects.all().first()

    return space.space_code


def get_base_path():
    # TODO http or https?
    return 'https://' + get_domain() + '/' + get_realm_code() + '/' + get_space_code()


def get_realm_code():
    space = Space.objects.all().first()

    return space.realm_code


def create_logger(name, log_format=None):
    if not log_format:
        log_format = "[%(asctime)s][%(levelname)s][%(name)s][%(filename)s:%(funcName)s:%(lineno)d] - %(message)s"
    formatter = logging.Formatter(log_format)

    log_dir = "/.system/log/"

    log_file = os.path.join(log_dir, str(name) + ".log")
    file_handler = DjangoStorageHandler(log_file)
    file_handler.setFormatter(formatter)

    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    logger.addHandler(file_handler)

    return logger


def execute_expression(expression):
    refresh = get_refresh_token()

    # _l.info('refresh %s' % refresh.access_token)

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    data = {
        'expression': expression,
        'is_eval': True
    }

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/utils/expression/'
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/utils/expression/'

    response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()


def execute_expression_procedure(payload):
    refresh = get_access_token

    # _l.info('refresh %s' % refresh.access_token)

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    data = payload

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/procedures/expression-procedure/execute/'
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/procedures/expression-procedure/execute/'

    response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()


def execute_data_procedure(payload):
    refresh = get_refresh_token()

    # _l.info('refresh %s' % refresh.access_token)

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    data = payload

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/procedures/data-procedure/execute/'
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/procedures/data-procedure/execute/'

    response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()


def get_data_procedure_instance(id):
    refresh = get_refresh_token()

    # _l.info('refresh %s' % refresh.access_token)

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/procedures/data-procedure-instance/%s/' % id
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/procedures/data-procedure-instance/%s/' % id

    response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()


def execute_pricing_procedure(payload):
    refresh = get_refresh_token()

    # _l.info('refresh %s' % refresh.access_token)

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    data = payload

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/procedures/pricing-procedure/execute/'
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/procedures/pricing-procedure/execute/'

    response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()


def execute_task(task_name, payload={}):
    refresh = get_refresh_token()

    # _l.info('refresh %s' % refresh.access_token)

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    data = {
        'task_name': task_name,
        'payload': payload
    }

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/tasks/task/execute/'
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/tasks/task/execute/'

    response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()


def update_task_status(platform_task_id, status, result=None, error=None):
    refresh = get_refresh_token()
    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    data = {
        'status': status,
        'result': result,
        'error': error,
    }

    url = f'{get_base_path()}/api/v1/tasks/task/{platform_task_id}/update-status/'
    response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)
    try:
        response.raise_for_status()
        return response.json()
    except Exception as e:
        _l.error("update_task_status error: %s" % e)


def get_task(id):
    refresh = get_refresh_token()

    # _l.info('refresh %s' % refresh.access_token)

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/tasks/task/%s/' % id
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/tasks/task/%s/' % id

    response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()


def _wait_task_to_complete_recursive(task_id=None, retries=5, retry_interval=60, counter=None):
    if counter == retries:
        raise Exception("Task exceeded retries %s count" % retries)

    try:
        result = get_task(task_id)

        if result['status'] not in ['progress', 'P', 'I']:
            return result
    except Exception as e:
        _l.error("_wait_task_to_complete_recursive %s" % e)

    counter = counter + 1

    time.sleep(retry_interval)

    return _wait_task_to_complete_recursive(task_id=task_id, retries=retries, retry_interval=retry_interval,
                                            counter=counter)


def wait_task_to_complete(task_id=None, retries=5, retry_interval=60):
    counter = 0
    result = None

    result = _wait_task_to_complete_recursive(task_id=task_id, retries=retries, retry_interval=retry_interval,
                                              counter=counter)

    return result


def poll_workflow_status(workflow_id, max_retries=100, wait_time=5):
    url = f'/workflow/api/workflow/{workflow_id}/'  # Replace with your actual API endpoint

    for attempt in range(max_retries):
        data = request_api(url)

        if data:
            status = data.get('status')
            _l.info(f'Attempt {attempt + 1}: Workflow status is {status}')

            if status in ['success', 'error']:
                return status  # Return the status when it's success or error

        else:
            _l.error(f'Error fetching status')

        time.sleep(wait_time)  # Wait before the next attempt

    _l.info('Max retries reached. Workflow status not successful.')
    return None  # Indicate that the status was not found


def _wait_procedure_to_complete_recursive(procedure_instance_id=None, retries=5, retry_interval=60, counter=None):
    if counter == retries:
        raise Exception("Task exceeded retries %s count" % retries)

    result = get_data_procedure_instance(procedure_instance_id)

    counter = counter + 1

    if result['status'] not in ['progress', 'P', 'I']:
        return result

    time.sleep(retry_interval)

    return _wait_procedure_to_complete_recursive(procedure_instance_id=procedure_instance_id, retries=retries,
                                                 retry_interval=retry_interval, counter=counter)


def wait_procedure_to_complete(procedure_instance_id=None, retries=5, retry_interval=60):
    counter = 0
    result = None

    result = _wait_procedure_to_complete_recursive(procedure_instance_id=procedure_instance_id, retries=retries,
                                                   retry_interval=retry_interval, counter=counter)

    return result


def execute_transaction_import(payload):
    refresh = get_refresh_token()

    # _l.info('refresh %s' % refresh.access_token)

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}
    data = payload

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/import/transaction-import/execute/'
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/import/transaction-import/execute/'

    response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()


def execute_simple_import(payload):
    refresh = get_refresh_token()

    # _l.info('refresh %s' % refresh.access_token)

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    data = payload

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/import/simple-import/execute/'
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/import/simple-import/execute/'

    response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()


def request_api(path, method='get', data=None):
    refresh = get_refresh_token()

    headers = {'Content-type': 'application/json', 'Accept': 'application/json',
               'Authorization': f'Bearer {refresh.access_token}'}

    space = get_space()

    if space.realm_code and space.realm_code != 'realm00000':
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + path
    else:
        url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + path

    response = None

    if method.lower() == 'get':

        response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

    elif method.lower() == 'post':

        response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    elif method.lower() == 'put':

        response = requests.put(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    elif method.lower() == 'patch':

        response = requests.patch(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

    elif method.lower() == 'delete':

        response = requests.delete(url=url, headers=headers, verify=settings.VERIFY_SSL)

    if response.status_code != 200 and response.status_code != 201 and response.status_code != 204:
        raise Exception(response.text)

    if response.status_code != 204:
        return response.json()

    return {"status": "no_content"}


class Storage():

    def __init__(self):

        from workflow.storage import get_storage

        self.storage = get_storage()

    def get_base_path(self):

        space = Space.objects.all().first()

        return space.space_code

    def listdir(self, path):
        return self.storage.listdir('/' + self.get_base_path() + path)

    def open(self, name, mode='rb'):

        # TODO permission check

        if name[0] == '/':
            name = self.get_base_path() + name
        else:
            name = self.get_base_path() + '/' + name

        return self.storage.open(name, mode)

    def read_json(self, filepath, mode='rb'):
        with self.open(filepath, mode) as state:
            state_content = json.loads(state.read())
        return state_content

    def read_csv(self, filepath, mode='rb'):
        with self.open(filepath, mode) as f:
            reader = csv.DictReader(f)
            data = list(reader)
        return data

    def read(self, filepath, mode='rb'):

        # Open the file from your storage backend
        file_obj = self.open(filepath, mode)  # 'rb' is to read in binary mode

        try:
            # Read the file's contents
            file_content = file_obj.read()
            return file_content
        finally:
            # Make sure we close the file object
            file_obj.close()

    def delete(self, name):

        # TODO permission check

        if name[0] == '/':
            name = self.get_base_path() + name
        else:
            name = self.get_base_path() + '/' + name

        return self.storage.delete(name)

    def exists(self, name):

        # TODO permission check

        if name[0] == '/':
            name = self.get_base_path() + name
        else:
            name = self.get_base_path() + '/' + name

        return self.storage.exists(name)

    def save(self, name, content):

        if name[0] == '/':
            name = self.get_base_path() + name
        else:
            name = self.get_base_path() + '/' + name

        return self.storage.save(name, content)

    def save_text(self, name, content):

        if name[0] == '/':
            name = self.get_base_path() + name
        else:
            name = self.get_base_path() + '/' + name

        return self.storage.save(name, ContentFile(content.encode('utf-8')))

    def append_text(self, name, content):

        if self.storage.exists(name):
            with self.open(name, 'r') as file:
                content = file.read()
                content = content + content + '\n'

        return self.storage.save(name, ContentFile(content.encode('utf-8')))


class Utils():

    def get_current_space_code(self):
        space = Space.objects.all().first()
        return space.space_code

    def get_list_of_dates_between_two_dates(self, date_from, date_to, to_string=False):
        result = []
        format = '%Y-%m-%d'

        if not isinstance(date_from, datetime.date):
            date_from = datetime.datetime.strptime(date_from, format).date()

        if not isinstance(date_to, datetime.date):
            date_to = datetime.datetime.strptime(date_to, format).date()

        diff = date_to - date_from

        for i in range(diff.days + 1):
            day = date_from + timedelta(days=i)
            if to_string:
                result.append(str(day))
            else:
                result.append(day)

        return result

    def is_business_day(self, date):
        return bool(len(pd.bdate_range(date, date)))

    def get_yesterday(self, ):
        today = datetime.now()
        yesterday = today - timedelta(days=1)
        return yesterday

    def get_list_of_business_days_between_two_dates(self, date_from, date_to, to_string=False):
        result = []
        format = '%Y-%m-%d'

        if not isinstance(date_from, datetime.date):
            date_from = datetime.datetime.strptime(date_from, format).date()

        if not isinstance(date_to, datetime.date):
            date_to = datetime.datetime.strptime(date_to, format).date()

        diff = date_to - date_from

        for i in range(diff.days + 1):
            day = date_from + timedelta(days=i)

            if self.is_business_day(day):

                if to_string:
                    result.append(str(day))
                else:
                    result.append(day)

        return result

    def import_from_storage(self, file_path):
        # get the directory and the filename without extension

        space = get_space()

        if file_path[0] == '/':
            file_path = os.path.join(settings.WORKFLOW_STORAGE_ROOT + '/tasks/' + space.space_code + file_path)
        else:
            file_path = os.path.join(settings.WORKFLOW_STORAGE_ROOT + '/tasks/' + space.space_code + '/' + file_path)

        _l.info('import_from_storage.file_path %s' % file_path)

        directory, filename = os.path.split(file_path)
        module_name, _ = os.path.splitext(filename)

        _l.info('import_from_storage.module_name %s' % module_name)
        _l.info('import_from_storage.file_path %s' % file_path)

        loader = importlib.machinery.SourceFileLoader(module_name, file_path)
        module = loader.load_module()

        # add the directory to sys.path
        # spec = importlib.util.spec_from_file_location(module_name, file_path)
        #
        # if spec is None:
        #     raise ImportError(f"Cannot import file {filename}")
        #
        # module = importlib.util.module_from_spec(spec)
        #
        # # execute the module
        # spec.loader.exec_module(module)
        #
        # # return the module
        return module

    def relative_import_from_storage(self, file_path, base_path):

        """
        Imports a module from a given file path, resolving the path from a specified base path.

        :param file_path: Relative or absolute path to the Python file to import.
        :param base_path: Base directory against which relative paths should be resolved.
        :return: The imported module.
        """

        # Resolve the relative file_path against the provided base directory
        absolute_file_path = os.path.normpath(os.path.join(base_path, file_path))

        # _l.info(f'Normalized file path: {absolute_file_path}')

        # Continue with your existing logic, but use absolute_file_path instead of file_path
        directory, filename = os.path.split(absolute_file_path)
        module_name, _ = os.path.splitext(filename)

        # _l.info(f'import_from_storage.module_name {module_name}')
        # _l.info(f'import_from_storage.file_path {absolute_file_path}')

        loader = importlib.machinery.SourceFileLoader(module_name, absolute_file_path)
        module = loader.load_module()

        # add the directory to sys.path
        # spec = importlib.util.spec_from_file_location(module_name, file_path)
        #
        # if spec is None:
        #     raise ImportError(f"Cannot import file {filename}")
        #
        # module = importlib.util.module_from_spec(spec)
        #
        # # execute the module
        # spec.loader.exec_module(module)
        #
        # # return the module
        return module

    def tree_to_flat(self, data, **kwargs):

        return flatten(data, **kwargs)

    # Example conversions:
    # "Héllo World!"        -> "hello_world!"
    # "Café.com"            -> "cafe_com"
    # "Jürgen.Smith"        -> "jurgen_smith"
    # "Mañana es jueves."   -> "manana_es_jueves_"
    # "Gérard Dépardieu"    -> "gerard_depardieu"
    # "naïve artist"        -> "naive_artist"
    # Problem here Example conversions with different accents on 'e':
    # "é" -> "e"
    # "è" -> "e"
    # "ê" -> "e"
    # "ë" -> "e"
    # Example conversions:
    # "école"        -> "U233cole"
    # "café.com"     -> "cafeU233_com"
    # "Jürgen.Smith" -> "jU252rgen_smith"
    # "élève"        -> "U233lU232ve"
    # "Mañana"       -> "manU241ana"
    # "Gödel"        -> "gU246del"
    def convert_to_ascii(self, input_string):
        # Convert the input string to lowercase
        input_string = input_string.lower()

        # Convert spaces and dots to underscores
        modified_string = input_string.replace(' ', '_').replace('.', '_')

        # Function to convert each character
        def to_ascii_or_unicode(char):
            try:
                # Try to encode the character in ASCII
                ascii_char = char.encode('ascii')
                return ascii_char.decode()  # Return as string if it's a valid ASCII character
            except UnicodeEncodeError:
                # If it's not an ASCII character, return its Unicode code point
                return f"U{ord(char)}"

        # Apply the conversion to each character in the string
        ascii_string = ''.join(to_ascii_or_unicode(c) for c in modified_string)

        return ascii_string


class Vault():

    # hashicorp
    # finmars
    def get_secret(self, path, provider="finmars"):
        refresh = get_refresh_token()  # TODO refactor, should be permission check

        # _l.info('refresh %s' % refresh.access_token)

        if provider == 'finmars':

            # pieces = path.split('/')
            # engine_name = pieces[0]
            # secret_path = pieces[1]

            headers = {'Content-type': 'application/json', 'Accept': 'application/json',
                       'Authorization': f'Bearer {refresh.access_token}'}

            space = get_space()

            url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + f'/api/v1/vault/vault-record/?user_code=' + path

            response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

            if response.status_code != 200:
                raise Exception(response.text)

            data = response.json()

            secret_data = None

            for item in data['results']:

                if path == item['user_code']:
                    secret_data = item['data']

            if not secret_data:
                raise Exception(f"Secret is {path} not found")

            return secret_data

        elif provider == 'hashicorp':

            pieces = path.split('/')
            engine_name = pieces[0]
            secret_path = pieces[1]

            headers = {'Content-type': 'application/json', 'Accept': 'application/json',
                       'Authorization': f'Bearer {refresh.access_token}'}

            space = get_space()

            if space.realm_code and space.realm_code != 'realm00000':
                url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + f'/api/v1/vault/vault-secret/get/?engine_name={engine_name}&path={secret_path}'
            else:
                url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + f'/api/v1/vault/vault-secret/get/?engine_name={engine_name}&path={secret_path}'

            response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

            if response.status_code != 200:
                raise Exception(response.text)

            return response.json()['data']['data']

        else:
            raise Exception("Unknown provider %s" % provider)


storage = Storage()

utils = Utils()

vault = Vault()