• [ Регистрация ]Открытая и бесплатная
  • Tg admin@ALPHV_Admin (обязательно подтверждение в ЛС форума)

Source Merge File Sort или быстрая сортировка строк и удаление дублей интерпретатором в файле от 100 GB за счет ПЗУ

admin

#root
Администратор
Регистрация
20.01.2011
Сообщения
7,665
Розыгрыши
0
Реакции
135
Всем привет, не так давно у меня появилась идея реализовать на питоне скрипт который делает быструю сортировку и очистку дублей строк в огромном текстовом файле без колоссального потребления ресурсов ОЗУ в операционной системе.
Минимальные системные требования: Python 3.12, 4 ядра CPU, 4 гигабайта ОЗУ, свободное место на накопителе размером в импортируемый файл+10%.

Скрипт работает на встроенных библиотеках питона, кроме одной библиотеки используемой для разметки выдачи цвета лога (в общем все подробно объяснил в комментариях к коду, может быть потом исправлю тут на полноценную статью, если настроение будет):

Bash:
pip install colorlog==6.8.2

Python:
import os
import tempfile
import heapq
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from colorlog import ColoredFormatter  # Для цветного логирования

# Настраиваем цветное логирование
formatter = ColoredFormatter(
    "%(log_color)s%(asctime)s - %(levelname)s - %(message)s",  # Формат вывода логов
    datefmt=None,  # Формат даты, по умолчанию
    reset=True,  # Сброс цветов после каждой строки
    log_colors={  # Задаем цвета для различных уровней логов
        'DEBUG': 'cyan',
        'INFO': 'green',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'bold_red',
    }
)

# Настройка обработчика логов с UTF-8
handler = logging.StreamHandler()  # Логи выводятся в консоль
handler.setFormatter(formatter)  # Применяем цветной формат
logger = logging.getLogger()  # Получаем объект логгера
logger.addHandler(handler)  # Добавляем к нему наш обработчик
logger.setLevel(logging.INFO)  # Устанавливаем уровень логирования INFO (можно изменять на DEBUG для более детальных логов)

# Создаем путь к папке TEMP для временных файлов
temp_dir = os.path.join(os.getcwd(), 'TEMP')  # Директория для хранения временных файлов
if not os.path.exists(temp_dir):  # Если папки нет, создаем её в корне скрипта
    os.makedirs(temp_dir)

# Функция для обработки чанков: удаление дублей и сортировка
def process_chunk(chunk, temp_dir):
    try:
        logger.info(f"Начало обработки чанка размером {len(chunk)} строк")  # Логируем количество строк в чанке
 
        unique_items = set(chunk)  # Превращаем список в множество для удаления дублей
        logger.info(f"Удалено {len(chunk) - len(unique_items)} дублей в чанке")  # Логируем, сколько дублей удалено
 
        sorted_chunk = sorted(unique_items)  # Сортируем уникальные строки

        # Создаем временный файл, который не будет удален автоматически (delete=False)
        with tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='utf-8', errors='ignore', dir=temp_dir) as temp_file:
            temp_file.write('\n'.join(sorted_chunk) + '\n')  # Записываем отсортированный чанк во временный файл
            logger.info(f"Чанк записан во временный файл {temp_file.name}")  # Логируем путь к временному файлу
            return temp_file.name  # Возвращаем имя временного файла

    except Exception as e:
        logger.error(f"Ошибка при обработке чанка: {e}")  # Логируем ошибку, если что-то пошло не так
        return None  # Возвращаем None, если произошла ошибка

# Функция для пакетного слияния временных файлов в один общий файл
def merge_files(temp_files, output_file):
    try:
        logger.info(f"Окончательное слияние {len(temp_files)} временных файлов")  # Логируем количество файлов для слияния

        unique_count = 0  # Счетчик уникальных строк
        duplicate_count = 0  # Счетчик дублей

        # Открываем выходной файл для записи
        with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
            # Открываем все временные файлы и создаем итераторы
            file_iters = [open(f, 'r', encoding='utf-8', errors='replace') for f in temp_files]
            merged_iter = heapq.merge(*file_iters)  # Сливаем отсортированные временные файлы в один поток
            prev_line = None  # Переменная для отслеживания предыдущей строки

            # Проходим по слитым строкам
            for line in merged_iter:
                if line != prev_line:  # Если строка не совпадает с предыдущей (уникальная)
                    outfile.write(line)  # Записываем строку в выходной файл
                    prev_line = line  # Обновляем предыдущую строку
                    unique_count += 1  # Увеличиваем счетчик уникальных строк
                else:
                    duplicate_count += 1  # Если строка дублируется, увеличиваем счетчик дублей

            # Закрываем временные файлы
            for f in file_iters:
                f.close()

        logger.info(f"Слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")  # Логируем результаты
        return unique_count, duplicate_count  # Возвращаем количество уникальных строк и дублей

    except Exception as e:
        logger.error(f"Ошибка при слиянии файлов: {e}")  # Логируем ошибку, если слияние не удалось
        return 0, 0  # Возвращаем нули в случае ошибки

# Функция для пакетного слияния временных файлов
def batch_merge(temp_files, batch_size, temp_dir):
    try:
        logger.info(f"Начало пакетного слияния с размером пакета {batch_size}")  # Логируем начало пакетного слияния
        merged_files = []  # Список для хранения результатов пакетного слияния
        total_unique_count = 0  # Общий счетчик уникальных строк
        total_duplicate_count = 0  # Общий счетчик дублей

        # Обрабатываем файлы по частям (батчами)
        for i in range(0, len(temp_files), batch_size):
            batch = temp_files[i:i + batch_size]  # Берем очередной пакет файлов
            logger.info(f"Слияние пакета с файлов {i+1} по {min(i + batch_size, len(temp_files))}")  # Логируем диапазон файлов

            # Создаем временный файл для результата слияния пакета
            with tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='utf-8', dir=temp_dir) as temp_merged_file:
                unique_count, duplicate_count = merge_files(batch, temp_merged_file.name)  # Сливаем пакет файлов
                merged_files.append(temp_merged_file.name)  # Добавляем результат слияния в список

                total_unique_count += unique_count  # Обновляем общий счетчик уникальных строк
                total_duplicate_count += duplicate_count  # Обновляем общий счетчик дублей

            # Удаляем временные файлы после слияния
            logger.info(f"Удаление временных файлов пакета с {i+1} по {min(i + batch_size, len(temp_files))}")
            for temp_file in batch:
                if os.path.exists(temp_file):
                    os.remove(temp_file)  # Удаляем временный файл

        return merged_files, total_unique_count, total_duplicate_count  # Возвращаем список объединенных файлов и итоговые счетчики

    except Exception as e:
        logger.error(f"Ошибка при пакетном слиянии: {e}")  # Логируем ошибку при пакетном слиянии
        return temp_files, 0, 0  # Возвращаем исходные файлы и нули в случае ошибки

# Основная функция сортировки и удаления дубликатов с параллельной обработкой чанков
def sort_and_uniq_streaming(input_file, output_file, chunk_size=2000000, batch_size=10):
    temp_files = []  # Список для хранения временных файлов
    chunk = []  # Текущий чанк строк
    original_count = 0  # Счетчик всех строк в исходном файле

    logger.info(f"Чтение файла {input_file}...")  # Логируем начало чтения файла
    try:
        with open(input_file, 'r', encoding='utf-8', errors='ignore') as infile:  # Открываем файл для чтения
            with ThreadPoolExecutor() as executor:  # Создаем пул потоков для параллельной обработки
                futures = []  # Список задач для параллельной обработки

                for line in infile:
                    chunk.append(line.strip())  # Добавляем строку в чанк
                    original_count += 1  # Увеличиваем счетчик строк
                    if len(chunk) >= chunk_size:  # Если размер чанка достиг предела
                        futures.append(executor.submit(process_chunk, chunk, temp_dir))  # Запускаем обработку чанка в отдельном потоке
                        chunk = []  # Очищаем чанк для следующего набора строк

                if chunk:  # Если остались необработанные строки после завершения чтения файла
                    futures.append(executor.submit(process_chunk, chunk, temp_dir))  # Обрабатываем последний чанк

                for future in as_completed(futures):  # Ждем завершения всех задач
                    temp_file = future.result()  # Получаем результат обработки (имя временного файла)
                    if temp_file:
                        temp_files.append(temp_file)  # Добавляем временный файл в список

        logger.info(f"Все чанки обработаны. Начинается пакетное слияние временных файлов...")  # Логируем завершение обработки всех чанков

        total_unique_count = 0  # Общий счетчик уникальных строк
        total_duplicate_count = 0  # Общий счетчик дублей

        # Пока временных файлов больше, чем размер батча, продолжаем пакетное слияние
        while len(temp_files) > batch_size:
            temp_files, unique_count, duplicate_count = batch_merge(temp_files, batch_size, temp_dir)  # Выполняем пакетное слияние
            total_unique_count += unique_count  # Увеличиваем общий счетчик уникальных строк
            total_duplicate_count += duplicate_count  # Увеличиваем общий счетчик дублей

        # Если остался больше одного временного файла, выполняем финальное слияние
        if len(temp_files) > 1:
            logger.info("Завершающий этап слияния крупных оставшихся файлов")  # Логируем финальный этап
            unique_count, duplicate_count = merge_files(temp_files, output_file)  # Финальное слияние временных файлов
            total_unique_count += unique_count  # Добавляем количество уникальных строк
            total_duplicate_count += duplicate_count  # Добавляем количество дублей
        else:
            # Если остался только один временный файл, переименовываем его в выходной файл
            os.rename(temp_files[0], output_file)  # Переименование файла

        # Удаляем все оставшиеся временные файлы
        for temp_file in temp_files:
            if os.path.exists(temp_file):  # Проверяем существование файла перед удалением
                os.remove(temp_file)  # Удаляем временный файл

        logger.info("Все временные файлы удалены")  # Логируем успешное удаление всех временных файлов
 
        unique_count = original_count - total_duplicate_count  # Рассчитываем количество уникальных строк
        logger.info(f"Итоговые результаты: Уникальных строк: {unique_count}, Дублей удалено: {total_duplicate_count}")  # Выводим результаты

        return original_count  # Возвращаем общее количество строк в исходном файле

    except Exception as e:
        logger.error(f"Ошибка при обработке файла: {e}")  # Логируем ошибку при обработке файла
        return 0  # Возвращаем 0 в случае ошибки

# Точка входа
if __name__ == '__main__':
    tic = time.perf_counter()  # Замеряем время начала выполнения программы

    input_file = "large_random_emails.txt"  # Исходный файл с данными
    output_file = "output-sorted-unique.txt"  # Выходной файл для сохранения результатов
    original_count = sort_and_uniq_streaming(input_file, output_file)  # Запускаем процесс сортировки и удаления дублей

    tac = time.perf_counter()  # Замеряем время завершения выполнения программы

    logging.info(f"Всего обработано строк: {original_count}")  # Логируем общее количество обработанных строк
    logging.info(f"Удаление дублей и сортировка заняли {tac - tic:0.4f} секунд")  # Логируем время выполнения программы

by: rand
 
Activity
So far there's no one here