Python Script for Generating and Parsing Histogram Files of Synthetic Populations

This Python script is designed to generate histogram files from synthetic populations data. It includes functions to create histograms based on the data in CSV format, read specific headers and meta information from binary files, and iterate through binary files to extract, decode, and store histogram data efficiently. Primarily, it utilizes libraries such as NumPy for data manipulation and Pandas for reading CSV files. The script demonstrates advanced file handling, usage of regular expressions for data extraction, and object-oriented programming principles for structuring the code logically. It is intended for use in scientific research or data analysis tasks, where processing and visualizing large datasets of synthetic populations is required.

import numpy as np
import os
import pandas as pd
import re
 
from tqdm import tqdm
from typing import Tuple, Union
from typing import BinaryIO
 
from io import BytesIO
 
from itertools import tee, chain
 
class subArrayFixer:
    def start(self, n):
        return None
 
fixer = subArrayFixer()
 
def mkhist(path : str, bins=100) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    df = pd.read_csv(path)
    assert isinstance(df, pd.DataFrame)
    G = df['G']
    BP = df['BP']
    RP = df['RP']
    assert G is not None and BP is not None and RP is not None
    C = BP-RP
 
    hist, x, y = np.histogram2d(C, G, bins=bins)
    return hist, x, y
 
def get_hist_file_meta(path):
    with open(path, 'rb') as f:
        numLengthBytes = f.read(16)
        length = int.from_bytes(numLengthBytes, 'little')
        bins = f.read(16)
        bins = int.from_bytes(bins, 'little')
    return length, bins
 
def get_hist_file_length(path):
    with open(path, 'rb') as f:
        numLengthBytes = f.read(16)
        length = int.from_bytes(numLengthBytes, 'little')
    return length
 
def get_hist_file_bins(path):
    with open(path, 'rb') as f:
        f.seek(16)
        numBinsBytes = f.read(16)
        numBins = int.from_bytes(numBinsBytes, 'little')
    return numBins
 
 
def iter_hist_file(path, bs=1024):
    with open(path, 'rb') as infile:
        headerSizeOffset = 32
        start = headerSizeOffset
        idx = 0
        fSize = os.stat(path).st_size
        while (idx*bs) < fSize:
            # after the first table use a three block wide scanner to avoid straddling Header
            seekPos = headerSizeOffset+(idx-1)*bs if idx >= 1 else headerSizeOffset + idx*bs
            blockSize = bs*3 if idx >= 1 else bs
 
            infile.seek(seekPos,0)
            block = infile.read(blockSize)
 
            pop = re.search(b"HIST AGE:.+ BINS:.+ HEADEREND", block)
            if pop and idx > 4:
                age = float(re.findall(r"HIST AGE:(\d+\.\d+)", pop.group().decode())[0])
                popLocation = headerSizeOffset + (idx-1)*bs + pop.start(0)
                dataSize = popLocation - start
                infile.seek(start)
                byteBlock = infile.read(dataSize)
                with open('byteBlockTest.bin', 'wb') as f:
                    f.write(byteBlock)
                popData = read_population(byteBlock)
                start = popLocation
                idx += 4
                yield age, popData
            idx += 1
 
def read_population(byteBlock):
    subArrays = re.finditer(b"\x93NUMPY\x01\x00.\x00", byteBlock)
    TrailingSubArrays, LeadingSubArrays = tee(subArrays)
    next(LeadingSubArrays)
    LeadingSubArrays = chain(LeadingSubArrays, [fixer])
    output = {"H": list(), "X": list(), "Y": list()}
    translation = {0: "H", 1: "X", 2: "Y"}
    # print("===============")
    for ID, (subArrayLeft, subArrayRight) in enumerate(zip(TrailingSubArrays, LeadingSubArrays)):
        # print(subArrayLeft, subArrayRight)
        arrayContents = byteBlock[subArrayLeft.start(0):subArrayRight.start(0)]
        arrayAsFileObject = BytesIO(arrayContents)
        arr = np.load(arrayAsFileObject)
        output[translation[ID]] = arr
    return output
 
 
 
def decode_hist_file(path : Union[str, BinaryIO]) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    if isinstance(path, str):
        infIO = open(path, 'rb')
        infile = infIO.read()
    else:
        infile = path.read()
 
    populations = re.finditer(b"HIST AGE:.+ BINS:.+ HEADEREND", infile)
    TrailingPopulation, LeadingPopulations = tee(populations)
    next(LeadingPopulations)
    LeadingPopulations = chain(LeadingPopulations, [fixer])
    H = list()
    X = list()
    Y = list()
    ages = list()
    for mIDX, (matchleft, matchright) in tqdm(enumerate(zip(TrailingPopulation, LeadingPopulations))):
        popAge = matchleft.group(0).decode()
        try:
            age = float(re.findall(r"HIST AGE:(\d+\.\d+)", popAge)[0])
        except IndexError:
            print(popAge)
            return 1
        ages.append(age)
        subPop = infile[matchleft.end(0):matchright.start(0)]
        subArrays = re.finditer(b"\x93NUMPY\x01\x00.\x00", subPop)
        TrailingSubArrays, LeadingSubArrays = tee(subArrays)
        next(LeadingSubArrays)
        LeadingSubArrays = chain(LeadingSubArrays, [fixer])
        for ID, (subArrayLeft, subArrayRight) in enumerate(zip(TrailingSubArrays, LeadingSubArrays)):
            arrayContents = subPop[subArrayLeft.start(0):subArrayRight.start(0)]
            arrayAsFileObject = BytesIO(arrayContents)
            arr = np.load(arrayAsFileObject)
            if ID == 0:
                H.append(arr)
            elif ID == 1:
                X.append(arr)
            elif ID == 2:
                Y.append(arr)
    return np.array(ages), np.array(H), np.array(X), np.array(Y)
 
 
 
 
    if isinstance(path, str):
        infIO.close()
 
 
if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Generate histogram files from the synthetic populations stored in a directory")
 
    parser.add_argument("dir", help="directory with synthetic populations, each must start with fCMD and end with .out")
    parser.add_argument("bn", help="number of bins", type=int)
    parser.add_argument("-o", "--output", help="path to save all histograms too", default="histograms.npy", type=str)
 
    args = parser.parse_args()
 
    files = os.listdir(args.dir)
    files = list(filter(lambda x: x.startswith("fCMD") and x.endswith(".out"), files))
    metadata = list()
    for file in files:
        cleaned = file[:-4]
        split = cleaned.split('_')
        metadata.append((float(split[1]), float(split[2]), float(split[3])))
    paths = list(map(lambda x: os.path.join(args.dir, x), files))
    with open(args.output, 'wb') as outfile:
        numOut = len(paths)
        outfile.write(numOut.to_bytes(16,'little'))
        outfile.write(args.bn.to_bytes(16,'little'))
        for pIDX, (path, meta) in tqdm(enumerate(zip(paths, metadata)), total=len(paths)):
            Header = f"HIST AGE:{meta[0]} BINS:{args.bn} HEADEREND"
            outfile.write(Header.encode('utf-8')) 
            hist,x,y = mkhist(path, bins=args.bn)
 
            np.save(outfile, hist)
            np.save(outfile, x)
            np.save(outfile, y)
 
    # age, H, X, Y = decode_hist_file(args.output)
    # import matplotlib.pyplot as plt
    # plt.imshow(H[0])
    # plt.show()