from typing import List, Tuple, Union, Optional
from SSD.core.adaptive_table import AdaptiveTable
from SSD.core.database import Database
[docs]
def merge(database_files: List[str],
new_database_file: str = 'merged',
remove_existing: bool = False):
"""
Merge Databases in a new Database.
:param database_files: List of Databases files.
:param new_database_file: Name of the new Database.
:param remove_existing: If True, Database file with name 'new_database_name' will be overwritten.
"""
# Load working Databases
databases = [Database(database_name=database_name).load() for database_name in database_files]
merged_database = Database(database_name=new_database_file).new(remove_existing=remove_existing)
# Create Tables with their Fields
print("\nMerging the following Databases...")
table_type = AdaptiveTable.table_type
for db in databases:
db.print_architecture()
tables = db.get_tables()
for table in tables:
merged_database.create_table(table_name=table)
for field_name, field in db.get_fields(table, only_names=False).items():
if field_name != 'id':
field_type = type(None) if field.__class__ not in list(table_type.values()) else \
list(table_type.keys())[list(table_type.values()).index(field.__class__)]
merged_database.create_fields(table_name=table,
fields=(field_name, field_type))
# User confirm
merged_database.print_architecture()
while confirm := input("Confirm new Database architecture ? (y/n): ").lower() not in ['y', 'yes', 'n', 'no']:
print("Cannot interpret your entry.")
if confirm in ['n', 'no']:
print("Aborting.")
quit()
# Adding data
print("Proceeding...")
for db in databases:
for table_name in db.get_tables():
for line_id in range(1, db.nb_lines(table_name=table_name) + 1):
data = db.get_line(table_name=table_name, line_id=line_id)
if 'id' in data:
del data['id']
merged_database.add_data(table_name=table_name,
data=data)
db.close()
merged_database.close()
print("Merge complete.")
[docs]
def rename_tables(database_file: str,
renamed_tables: Union[Tuple[str, str], List[Tuple[str, str]]]):
"""
Rename Tables of the Database.
:param database_file: Database filename.
:param renamed_tables: Tuple or list of tuples defined as ('old_name', 'new_name').
"""
# Load the Database
db = Database(database_name=database_file).load()
# Check the table names to change
renamed_tables = [renamed_tables] if type(renamed_tables) != list else renamed_tables
current_tables = db.get_tables()
for (old_table_name, new_table_name) in renamed_tables:
if old_table_name not in current_tables:
raise ValueError(f"The Database does not contain a Table with name '{old_table_name}. "
f"Available Tables are {current_tables}.")
# Renaming
print("\nRenaming Table(s). \nProceeding...")
for (old_table_name, new_table_name) in renamed_tables:
db.rename_table(table_name=old_table_name,
new_table_name=new_table_name)
db.print_architecture()
db.close()
print("Renaming done.")
[docs]
def rename_fields(database_file: str,
table_name: str,
renamed_fields: Union[Tuple[str, str], List[Tuple[str, str]]]):
"""
Rename Fields of a Table of the Database.
:param database_file: Database filename.
:param table_name: Name of the Table.
:param renamed_fields: Tuple or list of tuples defined as ('old_name', 'new_name').
"""
# Load the Database
db = Database(database_name=database_file).load()
# Check the fields to change
renamed_fields = [renamed_fields] if type(renamed_fields) != list else renamed_fields
current_fields = db.get_fields(table_name=table_name)
for (old_field_name, new_field_name) in renamed_fields:
if old_field_name in ['id', '_dt_']:
raise ValueError("The following fields cannot be renamed: 'id', '_dt_'")
elif old_field_name not in current_fields:
raise ValueError(f"The field '{old_field_name} is not in the list of available fields: {current_fields}")
# Renaming
print("\nRenaming Field(s). \nProceeding...")
for (old_field_name, new_field_name) in renamed_fields:
db.rename_field(table_name=table_name,
field_name=old_field_name,
new_field_name=new_field_name)
db.print_architecture()
db.close()
print("Renaming done.")
[docs]
def remove_table(database_file: str,
table_names: Union[str, List[str]]):
"""
Remove Tables of the Database.
:param database_file: Database filename.
:param table_names: Table(s) to remove from the Database.
"""
# Load the Database
db = Database(database_name=database_file).load()
table_names = [table_names] if type(table_names) != list else table_names
# Removing
print("\nRemoving Table(s). \nProceeding...")
for table_name in table_names:
db.remove_table(table_name=table_name)
db.print_architecture()
db.close()
print("Removing done.")
[docs]
def remove_field(database_file: str,
table_name: str,
fields: Union[str, List[str]]):
"""
Remove Fields of a Table of the Database.
:param database_file: Database filename.
:param table_name: Name of the Table.
:param fields: Field(s) to remove from the Table.
"""
# Load the Database
db = Database(database_name=database_file).load()
# Check the fields to remove
fields = [fields] if type(fields) != list else fields
current_fields = db.get_fields(table_name=table_name)
for field in fields:
if field in ['id', '_dt_']:
raise ValueError("The following fields cannot be removed: 'id', '_dt_'")
elif field not in current_fields:
raise ValueError(f"The field '{field} is not in the list of available fields: {current_fields}")
# Removing
print("\nRemoving Filed(s). \nProceeding...")
for field in fields:
db.remove_field(table_name=table_name,
field_name=field)
db.print_architecture()
print("Removing done.")
[docs]
def export(database_file: str,
exporter: str,
filename: Optional[str] = None) -> None:
"""
Export the Database file to CSV or JSON formats.
:param database_file: Database filename.
:param exporter: Exporter type (either 'csv' or 'json').
:param filename: Exported filename.
"""
# Load the Database
db = Database(database_name=database_file).load()
# Export to file
db.export(exporter=exporter,
filename=filename if filename is not None else 'export')