Finmars Standard Library

Part of Workflow Engine that exposes to user useful utilities to work with Workflow or Finmars Platform REST API 

 import csv

import datetime

import importlib

import json

import logging

import os

import time

from datetime import timedelta

import jwt

import pandas as pd

import requests

from django.core.files.base import ContentFile

from flatten_json import flatten

from workflow.authentication import FinmarsRefreshToken

from workflow.models import User, Space

from workflow_app import settings

_l = logging.getLogger('workflow')

class DjangoStorageHandler(logging.Handler):

 def __init__(self, log_file, *args, **kwargs):

 super().__init__(*args, **kwargs)

 self.log_file = log_file

 def emit(self, record):

 log_entry = self.format(record)

 storage = Storage()

 storage.append_text(self.log_file, log_entry)

 # with storage.open(self.log_file, 'a') as log_file:

 # log_file.write(log_entry + '\n')

# DEPRECATED, remove in 1.9.0

def get_access_token(ttl_minutes=60 * 8, *args, **kwargs):

 bot = User.objects.get(username="finmars_bot")

 # Define the expiration time +1 hour from now

 expiration_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=ttl_minutes)

 space = Space.objects.all().first()

 # Define the payload with the expiration time and username

 payload = {

 'username': bot.username,

 'realm_code': space.realm_code,

 'space_code': space.realm_code,

 'exp': expiration_time,

 'iat': datetime.datetime.utcnow() # Issued at time

 }

 # Encode the JWT token

 jwt_token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')

 token = FinmarsRefreshToken(jwt_token)

 return token

# This one is good

def get_refresh_token(ttl_minutes=60 * 8, *args, **kwargs):

 bot = User.objects.get(username="finmars_bot")

 # Define the expiration time +1 hour from now

 expiration_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=ttl_minutes)

 space = Space.objects.all().first()

 # Define the payload with the expiration time and username

 payload = {

 'username': bot.username,

 'realm_code': space.realm_code,

 'space_code': space.realm_code,

 'exp': expiration_time,

 'iat': datetime.datetime.utcnow() # Issued at time

 }

 # Encode the JWT token

 jwt_token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')

 token = FinmarsRefreshToken(jwt_token)

 return token

def get_domain():

 return settings.DOMAIN_NAME

def get_space():

 space = Space.objects.all().first()

 return space

def get_space_code():

 space = Space.objects.all().first()

 return space.space_code

def get_base_path():

 # TODO http or https?

 return 'https://' + get_domain() + '/' + get_realm_code() + '/' + get_space_code()

def get_realm_code():

 space = Space.objects.all().first()

 return space.realm_code

def create_logger(name, log_format=None):

 if not log_format:

 log_format = "[%(asctime)s][%(levelname)s][%(name)s][%(filename)s:%(funcName)s:%(lineno)d] - %(message)s"

 formatter = logging.Formatter(log_format)

 log_dir = "/.system/log/"

 log_file = os.path.join(log_dir, str(name) + ".log")

 file_handler = DjangoStorageHandler(log_file)

 file_handler.setFormatter(formatter)

 logger = logging.getLogger(name)

 logger.setLevel(logging.INFO)

 logger.addHandler(file_handler)

 return logger

def execute_expression(expression):

 refresh = get_refresh_token()

 # _l.info('refresh %s' % refresh.access_token)

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 data = {

 'expression': expression,

 'is_eval': True

 }

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/utils/expression/'

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/utils/expression/'

 response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()

def execute_expression_procedure(payload):

 refresh = get_access_token

 # _l.info('refresh %s' % refresh.access_token)

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 data = payload

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/procedures/expression-procedure/execute/'

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/procedures/expression-procedure/execute/'

 response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()

def execute_data_procedure(payload):

 refresh = get_refresh_token()

 # _l.info('refresh %s' % refresh.access_token)

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 data = payload

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/procedures/data-procedure/execute/'

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/procedures/data-procedure/execute/'

 response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()

def get_data_procedure_instance(id):

 refresh = get_refresh_token()

 # _l.info('refresh %s' % refresh.access_token)

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/procedures/data-procedure-instance/%s/' % id

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/procedures/data-procedure-instance/%s/' % id

 response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()

def execute_pricing_procedure(payload):

 refresh = get_refresh_token()

 # _l.info('refresh %s' % refresh.access_token)

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 data = payload

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/procedures/pricing-procedure/execute/'

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/procedures/pricing-procedure/execute/'

 response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()

def execute_task(task_name, payload={}):

 refresh = get_refresh_token()

 # _l.info('refresh %s' % refresh.access_token)

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 data = {

 'task_name': task_name,

 'payload': payload

 }

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/tasks/task/execute/'

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/tasks/task/execute/'

 response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()

def update_task_status(platform_task_id, status, result=None, error=None):

 refresh = get_refresh_token()

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 data = {

 'status': status,

 'result': result,

 'error': error,

 }

 url = f'{get_base_path()}/api/v1/tasks/task/{platform_task_id}/update-status/'

 response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 try:

 response.raise_for_status()

 return response.json()

 except Exception as e:

 _l.error("update_task_status error: %s" % e)

def get_task(id):

 refresh = get_refresh_token()

 # _l.info('refresh %s' % refresh.access_token)

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/tasks/task/%s/' % id

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/tasks/task/%s/' % id

 response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()

def _wait_task_to_complete_recursive(task_id=None, retries=5, retry_interval=60, counter=None):

 if counter == retries:

 raise Exception("Task exceeded retries %s count" % retries)

 try:

 result = get_task(task_id)

 if result['status'] not in ['progress', 'P', 'I']:

 return result

 except Exception as e:

 _l.error("_wait_task_to_complete_recursive %s" % e)

 counter = counter + 1

 time.sleep(retry_interval)

 return _wait_task_to_complete_recursive(task_id=task_id, retries=retries, retry_interval=retry_interval,

 counter=counter)

def wait_task_to_complete(task_id=None, retries=5, retry_interval=60):

 counter = 0

 result = None

 result = _wait_task_to_complete_recursive(task_id=task_id, retries=retries, retry_interval=retry_interval,

 counter=counter)

 return result

def poll_workflow_status(workflow_id, max_retries=100, wait_time=5):

 url = f'/workflow/api/workflow/{workflow_id}/' # Replace with your actual API endpoint

 for attempt in range(max_retries):

 data = request_api(url)

 if data:

 status = data.get('status')

 _l.info(f'Attempt {attempt + 1}: Workflow status is {status}')

 if status in ['success', 'error']:

 return status # Return the status when it's success or error

 else:

 _l.error(f'Error fetching status')

 time.sleep(wait_time) # Wait before the next attempt

 _l.info('Max retries reached. Workflow status not successful.')

 return None # Indicate that the status was not found

def _wait_procedure_to_complete_recursive(procedure_instance_id=None, retries=5, retry_interval=60, counter=None):

 if counter == retries:

 raise Exception("Task exceeded retries %s count" % retries)

 result = get_data_procedure_instance(procedure_instance_id)

 counter = counter + 1

 if result['status'] not in ['progress', 'P', 'I']:

 return result

 time.sleep(retry_interval)

 return _wait_procedure_to_complete_recursive(procedure_instance_id=procedure_instance_id, retries=retries,

 retry_interval=retry_interval, counter=counter)

def wait_procedure_to_complete(procedure_instance_id=None, retries=5, retry_interval=60):

 counter = 0

 result = None

 result = _wait_procedure_to_complete_recursive(procedure_instance_id=procedure_instance_id, retries=retries,

 retry_interval=retry_interval, counter=counter)

 return result

def execute_transaction_import(payload):

 refresh = get_refresh_token()

 # _l.info('refresh %s' % refresh.access_token)

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 data = payload

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/import/transaction-import/execute/'

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/import/transaction-import/execute/'

 response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()

def execute_simple_import(payload):

 refresh = get_refresh_token()

 # _l.info('refresh %s' % refresh.access_token)

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 data = payload

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + '/api/v1/import/simple-import/execute/'

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + '/api/v1/import/simple-import/execute/'

 response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()

def request_api(path, method='get', data=None):

 refresh = get_refresh_token()

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + path

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + path

 response = None

 if method.lower() == 'get':

 response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

 elif method.lower() == 'post':

 response = requests.post(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 elif method.lower() == 'put':

 response = requests.put(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 elif method.lower() == 'patch':

 response = requests.patch(url=url, data=json.dumps(data), headers=headers, verify=settings.VERIFY_SSL)

 elif method.lower() == 'delete':

 response = requests.delete(url=url, headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200 and response.status_code != 201 and response.status_code != 204:

 raise Exception(response.text)

 if response.status_code != 204:

 return response.json()

 return {"status": "no_content"}

class Storage():

 def __init__(self):

 from workflow.storage import get_storage

 self.storage = get_storage()

 def get_base_path(self):

 space = Space.objects.all().first()

 return space.space_code

 def listdir(self, path):

 return self.storage.listdir('/' + self.get_base_path() + path)

 def open(self, name, mode='rb'):

 # TODO permission check

 if name[0] == '/':

 name = self.get_base_path() + name

 else:

 name = self.get_base_path() + '/' + name

 return self.storage.open(name, mode)

 def read_json(self, filepath, mode='rb'):

 with self.open(filepath, mode) as state:

 state_content = json.loads(state.read())

 return state_content

 def read_csv(self, filepath, mode='rb'):

 with self.open(filepath, mode) as f:

 reader = csv.DictReader(f)

 data = list(reader)

 return data

 def read(self, filepath, mode='rb'):

 # Open the file from your storage backend

 file_obj = self.open(filepath, mode) # 'rb' is to read in binary mode

 try:

 # Read the file's contents

 file_content = file_obj.read()

 return file_content

 finally:

 # Make sure we close the file object

 file_obj.close()

 def delete(self, name):

 # TODO permission check

 if name[0] == '/':

 name = self.get_base_path() + name

 else:

 name = self.get_base_path() + '/' + name

 return self.storage.delete(name)

 def exists(self, name):

 # TODO permission check

 if name[0] == '/':

 name = self.get_base_path() + name

 else:

 name = self.get_base_path() + '/' + name

 return self.storage.exists(name)

 def save(self, name, content):

 if name[0] == '/':

 name = self.get_base_path() + name

 else:

 name = self.get_base_path() + '/' + name

 return self.storage.save(name, content)

 def save_text(self, name, content):

 if name[0] == '/':

 name = self.get_base_path() + name

 else:

 name = self.get_base_path() + '/' + name

 return self.storage.save(name, ContentFile(content.encode('utf-8')))

 def append_text(self, name, content):

 if self.storage.exists(name):

 with self.open(name, 'r') as file:

 content = file.read()

 content = content + content + '\n'

 return self.storage.save(name, ContentFile(content.encode('utf-8')))

class Utils():

 def get_current_space_code(self):

 space = Space.objects.all().first()

 return space.space_code

 def get_list_of_dates_between_two_dates(self, date_from, date_to, to_string=False):

 result = []

 format = '%Y-%m-%d'

 if not isinstance(date_from, datetime.date):

 date_from = datetime.datetime.strptime(date_from, format).date()

 if not isinstance(date_to, datetime.date):

 date_to = datetime.datetime.strptime(date_to, format).date()

 diff = date_to - date_from

 for i in range(diff.days + 1):

 day = date_from + timedelta(days=i)

 if to_string:

 result.append(str(day))

 else:

 result.append(day)

 return result

 def is_business_day(self, date):

 return bool(len(pd.bdate_range(date, date)))

 def get_yesterday(self, ):

 today = datetime.now()

 yesterday = today - timedelta(days=1)

 return yesterday

 def get_list_of_business_days_between_two_dates(self, date_from, date_to, to_string=False):

 result = []

 format = '%Y-%m-%d'

 if not isinstance(date_from, datetime.date):

 date_from = datetime.datetime.strptime(date_from, format).date()

 if not isinstance(date_to, datetime.date):

 date_to = datetime.datetime.strptime(date_to, format).date()

 diff = date_to - date_from

 for i in range(diff.days + 1):

 day = date_from + timedelta(days=i)

 if self.is_business_day(day):

 if to_string:

 result.append(str(day))

 else:

 result.append(day)

 return result

 def import_from_storage(self, file_path):

 # get the directory and the filename without extension

 space = get_space()

 if file_path[0] == '/':

 file_path = os.path.join(settings.WORKFLOW_STORAGE_ROOT + '/tasks/' + space.space_code + file_path)

 else:

 file_path = os.path.join(settings.WORKFLOW_STORAGE_ROOT + '/tasks/' + space.space_code + '/' + file_path)

 _l.info('import_from_storage.file_path %s' % file_path)

 directory, filename = os.path.split(file_path)

 module_name, _ = os.path.splitext(filename)

 _l.info('import_from_storage.module_name %s' % module_name)

 _l.info('import_from_storage.file_path %s' % file_path)

 loader = importlib.machinery.SourceFileLoader(module_name, file_path)

 module = loader.load_module()

 # add the directory to sys.path

 # spec = importlib.util.spec_from_file_location(module_name, file_path)

 #

 # if spec is None:

 # raise ImportError(f"Cannot import file {filename}")

 #

 # module = importlib.util.module_from_spec(spec)

 #

 # # execute the module

 # spec.loader.exec_module(module)

 #

 # # return the module

 return module

 def relative_import_from_storage(self, file_path, base_path):

 """

 Imports a module from a given file path, resolving the path from a specified base path.

 :param file_path: Relative or absolute path to the Python file to import.

 :param base_path: Base directory against which relative paths should be resolved.

 :return: The imported module.

 """

 # Resolve the relative file_path against the provided base directory

 absolute_file_path = os.path.normpath(os.path.join(base_path, file_path))

 # _l.info(f'Normalized file path: {absolute_file_path}')

 # Continue with your existing logic, but use absolute_file_path instead of file_path

 directory, filename = os.path.split(absolute_file_path)

 module_name, _ = os.path.splitext(filename)

 # _l.info(f'import_from_storage.module_name {module_name}')

 # _l.info(f'import_from_storage.file_path {absolute_file_path}')

 loader = importlib.machinery.SourceFileLoader(module_name, absolute_file_path)

 module = loader.load_module()

 # add the directory to sys.path

 # spec = importlib.util.spec_from_file_location(module_name, file_path)

 #

 # if spec is None:

 # raise ImportError(f"Cannot import file {filename}")

 #

 # module = importlib.util.module_from_spec(spec)

 #

 # # execute the module

 # spec.loader.exec_module(module)

 #

 # # return the module

 return module

 def tree_to_flat(self, data, **kwargs):

 return flatten(data, **kwargs)

 # Example conversions:

 # "Héllo World!" -> "hello_world!"

 # "Café.com" -> "cafe_com"

 # "Jürgen.Smith" -> "jurgen_smith"

 # "Mañana es jueves." -> "manana_es_jueves_"

 # "Gérard Dépardieu" -> "gerard_depardieu"

 # "naïve artist" -> "naive_artist"

 # Problem here Example conversions with different accents on 'e':

 # "é" -> "e"

 # "è" -> "e"

 # "ê" -> "e"

 # "ë" -> "e"

 # Example conversions:

 # "école" -> "U233cole"

 # "café.com" -> "cafeU233_com"

 # "Jürgen.Smith" -> "jU252rgen_smith"

 # "élève" -> "U233lU232ve"

 # "Mañana" -> "manU241ana"

 # "Gödel" -> "gU246del"

 def convert_to_ascii(self, input_string):

 # Convert the input string to lowercase

 input_string = input_string.lower()

 # Convert spaces and dots to underscores

 modified_string = input_string.replace(' ', '_').replace('.', '_')

 # Function to convert each character

 def to_ascii_or_unicode(char):

 try:

 # Try to encode the character in ASCII

 ascii_char = char.encode('ascii')

 return ascii_char.decode() # Return as string if it's a valid ASCII character

 except UnicodeEncodeError:

 # If it's not an ASCII character, return its Unicode code point

 return f"U{ord(char)}"

 # Apply the conversion to each character in the string

 ascii_string = ''.join(to_ascii_or_unicode(c) for c in modified_string)

 return ascii_string

class Vault():

 # hashicorp

 # finmars

 def get_secret(self, path, provider="finmars"):

 refresh = get_refresh_token() # TODO refactor, should be permission check

 # _l.info('refresh %s' % refresh.access_token)

 if provider == 'finmars':

 # pieces = path.split('/')

 # engine_name = pieces[0]

 # secret_path = pieces[1]

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 space = get_space()

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + f'/api/v1/vault/vault-record/?user_code=' + path

 response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 data = response.json()

 secret_data = None

 for item in data['results']:

 if path == item['user_code']:

 secret_data = item['data']

 if not secret_data:

 raise Exception(f"Secret is {path} not found")

 return secret_data

 elif provider == 'hashicorp':

 pieces = path.split('/')

 engine_name = pieces[0]

 secret_path = pieces[1]

 headers = {'Content-type': 'application/json', 'Accept': 'application/json',

 'Authorization': f'Bearer {refresh.access_token}'}

 space = get_space()

 if space.realm_code and space.realm_code != 'realm00000':

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.realm_code + '/' + space.space_code + f'/api/v1/vault/vault-secret/get/?engine_name={engine_name}&path={secret_path}'

 else:

 url = 'https://' + settings.DOMAIN_NAME + '/' + space.space_code + f'/api/v1/vault/vault-secret/get/?engine_name={engine_name}&path={secret_path}'

 response = requests.get(url=url, headers=headers, verify=settings.VERIFY_SSL)

 if response.status_code != 200:

 raise Exception(response.text)

 return response.json()['data']['data']

 else:

 raise Exception("Unknown provider %s" % provider)

storage = Storage()

utils = Utils()

vault = Vault()