"""Utility functions for biallelic analysis pipeline.
Provides common utilities for file I/O, string manipulation, module discovery,
dynamic imports, and visualization color schemes.
"""
import os
import sys
import gzip
import argparse
import datetime
import secrets
import string
import types
import importlib.machinery
import importlib.util
from typing import Optional, Set, Dict, Any, List, Callable
from biallelic.bgzf import BgzfWriter, BgzfReader
# Default random part length for generated UIDs
DEFAULT_UID_RANDOM_LENGTH = 4
[docs]
def camel_case_split(input_str: str) -> list:
"""Split camelCase string into individual words.
Splits a camelCase string by finding transitions from lowercase to
uppercase characters. Useful for parsing enum or class names.
Args:
input_str: CamelCase or PascalCase string to split
Returns:
List of individual words from the input string
Example:
>>> camel_case_split("CamelCaseExample")
['Camel', 'Case', 'Example']
>>> camel_case_split("HTTPSConnection")
['HTTPS', 'Connection']
"""
words = [[input_str[0]]]
for c in input_str[1:]:
if words[-1][-1].islower() and c.isupper():
words.append(list(c))
else:
words[-1].append(c)
return ["".join(word) for word in words]
[docs]
def import_module(module_name: str, module_path: str) -> types.ModuleType:
"""Dynamically load a Python module from file path.
Args:
module_name: Name for the imported module
module_path: Absolute file path to module .py file
Returns:
Imported module object
Raises:
FileNotFoundError: If module_path does not exist
ImportError: If module cannot be loaded
"""
loader = importlib.machinery.SourceFileLoader(module_name, module_path)
mod = types.ModuleType(loader.name)
loader.exec_module(mod)
return mod
[docs]
def generate_uid(n: int = DEFAULT_UID_RANDOM_LENGTH) -> str:
"""Generate a unique identifier with timestamp and random suffix.
Creates a UID combining current timestamp and random alphanumeric characters.
Format: YYMMDDHHMMss.ffffff_XXXX
Args:
n: Length of random suffix (default: 4, range: 1-10 recommended)
Returns:
Unique identifier string combining timestamp and random part
Example:
>>> uid = generate_uid() # e.g., "251029094958.153950_A1B2"
>>> uid = generate_uid(n=8) # e.g., "251029094958.153950_A1B2C3D4"
"""
alphabet = string.ascii_uppercase + string.digits
random_str = "".join([secrets.choice(alphabet) for _ in range(n)])
random_str = "%s_%s" % (
datetime.datetime.now().strftime("%y%m%d%H%M%S.%f"),
random_str,
)
return random_str
[docs]
def color_palettes(x: str = "default") -> Dict[str, str]:
"""Get color palette for biallelic inactivation visualization.
Returns a dictionary mapping biallelic hit types to hex color codes
for use in oncoprint and other visualizations.
Args:
x: Palette name (default: "default", currently only one palette available)
Returns:
Dictionary mapping hit type strings to hex color codes
Example:
>>> colors = color_palettes()
>>> colors["som_loss/som_snv"]
'#c6c0ac'
"""
pal = {
'germ_snp/som_loss': '#768b02',
'som_cn_loh/som_snv': '#8b6a54',
'som_gain_loh/som_snv': '#aa96b1',
'som_loss/methyl': '#d5b28a',
'som_loss/som_indel': '#80997f',
'som_loss/som_loss': '#d56e67',
'som_loss/som_snv': '#c6c0ac',
'som_loss/som_sv': '#5a4b67',
'som_snv/som_snv': '#2c4c68',
'som_loss/subclonal_snv': '#a05a6e'
}
return pal
[docs]
def package_modules(package) -> Set[str]:
"""Discover all modules in a package.
Scans package directory and returns fully-qualified module names
for all Python files (excluding __init__.py).
Args:
package: Package object (use your_package not "your_package")
Returns:
Set of fully-qualified module names (e.g., "biallelic.drivers.maf")
Example:
>>> import biallelic.drivers
>>> modules = package_modules(biallelic.drivers)
>>> "biallelic.drivers.maf" in modules
True
"""
pathname = package.__path__[0]
return {
".".join([package.__name__, os.path.splitext(module)[0]])
for module in os.listdir(pathname)
if module.endswith(".py") and not module.startswith("__init__")
}
[docs]
def try_import(path: str, module_name: str) -> types.ModuleType:
"""Import or create a module in a specific directory.
Creates the directory structure if needed, ensures __init__.py exists,
and imports the module.
Args:
path: Base directory path
module_name: Name of module to create/import
Returns:
Imported module object
Raises:
IOError: If directory cannot be created or __init__.py cannot be written
"""
module_path = os.path.join(path, module_name)
if not os.path.isdir(module_path):
os.makedirs(module_path)
init_path = os.path.join(module_path, "__init__.py")
# Create __init__.py if it doesn't exist
if not os.path.exists(init_path):
try:
with open(init_path, "a"):
os.utime(init_path, None)
except IOError:
raise OSError(
f"Cannot create {init_path}. "
f"Check your configuration or create the directory at {module_path}"
)
# Use importlib to load the module
spec = importlib.util.spec_from_file_location(module_name, init_path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
[docs]
def get_modules_names(parent) -> List[str]:
"""Get list of module names from a package.
Discovers all modules in a package and returns their simple names
(without the package prefix).
Args:
parent: Package object
Returns:
List of module names (e.g., ["maf", "bed", "vcf_vep_ppcg"])
"""
mods = package_modules(parent)
modules = []
for mod in mods:
try:
__import__(mod)
mod_name = mod.split(".")[-1]
modules.append(mod_name)
except AttributeError:
pass
return modules
[docs]
def get_module_method(parent, module: str, method: str) -> Optional[Callable]:
"""Get a specific method from a module in a package.
Dynamically locates a module by name and retrieves a specific method/function
from it. Used for plugin-style architecture where driver and analysis
modules are discovered and invoked at runtime.
Args:
parent: Parent package object
module: Name of module (e.g., "maf", "bed")
method: Name of method/function in module (e.g., "snv", "genes")
Returns:
Callable method object, or None if module/method not found
Example:
>>> import biallelic.drivers
>>> snv_loader = get_module_method(biallelic.drivers, "maf", "snv")
>>> snv_loader is not None
True
"""
mods = package_modules(parent)
result = None
for mod in mods:
try:
__import__(mod)
mod_name = mod.split(".")[-1]
if mod_name == module:
m = getattr(parent, mod_name)
result = getattr(m, method)
break
except AttributeError:
pass
return result
[docs]
def xopen(filename: str, mode: str = "r", bgzip: bool = False):
"""Open files transparently with automatic format detection.
Unified file opener that handles regular text files, gzip-compressed
files, and BGZF files. Automatically detects format from filename extension.
Special handling for stdin/stdout with filename '-'.
Args:
filename: Path to file, or '-' for stdin (read) or stdout (write)
mode: File open mode ('r', 'w', 'a', 'rt', 'wb', etc.)
bgzip: If True, use BGZF compression; otherwise auto-detect from .gz
Returns:
File object (text or binary mode as specified)
Raises:
FileNotFoundError: If file doesn't exist (read mode)
IOError: If file cannot be opened
Example:
>>> with xopen("data.txt", "r") as f:
... data = f.read()
>>> with xopen("data.txt.gz", "rt") as f: # Auto-decompresses
... data = f.read()
>>> with xopen("-", "r") as f: # Read from stdin
... line = f.readline()
"""
assert isinstance(filename, str)
if filename == "-":
return sys.stdin if "r" in mode else sys.stdout
if bgzip:
if mode.startswith("w"):
return BgzfWriter(filename, mode)
elif mode.startswith("r"):
return BgzfReader(filename, mode)
if filename.endswith(".gz"):
return gzip.open(filename, mode)
else:
return open(filename, mode)
[docs]
class DefaultHelpParser(argparse.ArgumentParser):
"""Custom argument parser with improved error handling.
Extends ArgumentParser to display help text when errors occur,
providing users with available options immediately.
"""
[docs]
def error(self, message: str) -> None:
"""Handle argument parsing errors with help display.
Args:
message: Error message to display
Exits with code 2 after displaying error and help.
"""
sys.stderr.write("error: %s\n" % message)
self.print_help()
sys.exit(2)