Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New internal module "unarchive" #1918

Open
wants to merge 21 commits into
base: dev
Choose a base branch
from

Conversation

domwhewell-sage
Copy link
Contributor

@domwhewell-sage domwhewell-sage commented Nov 4, 2024

This Draft PR adds an internal module "extract" which will contain several functions that can extract certain file types into folders ready for excavate to pull out useful information such as URLs, DNS_NAMEs etc.

@TheTechromancer
Copy link
Collaborator

TheTechromancer commented Nov 4, 2024

Nice! This will be a fun one to build out, as we add support for every compression type and enable recursive extraction (archives within archives).

I wrote code a while back to do this in credshed, which might be useful:

@TheTechromancer
Copy link
Collaborator

TheTechromancer commented Nov 29, 2024

I like the mapping of compression types to extraction functions. Probably we'll need to improve on our magic filetype detection, especially get_compression(). This will keep us from relying on extensions, since there are lots of cases e.g. where you can have a zip file with a non-zip extension.

Also we might want to favor shell commands over python libraries, since CPU resources in the main process are really scarce, and offloading to tools like 7z is an effective way to parallelize.

I wrote a system just like this in credshed, where each file would get extracted, and then its contents recursively searched for more compressed files, which would each get extracted to an auto-named folder (e.g. <file_name>.extracted):

import os
import magic
import logging
import subprocess as sp
from pathlib import Path

log = logging.getLogger('credshed.filestore.util')

supported_compressions = [
    ('microsoft excel', ['ssconvert', '-S', '{filename}', '{extract_dir}/%s.csv']),
    ('rar archive', ['unrar', 'x', '-o+', '-p-', '{filename}', '{extract_dir}/']),
    ('tar archive', ['tar', '--overwrite', '-xvf', '{filename}', '-C', '{extract_dir}/']),
    ('gzip compressed', ['tar', '--overwrite', '-xvzf', '{filename}', '-C', '{extract_dir}/']),
    ('gzip compressed', ['gunzip', '--force', '--keep', '{filename}']),
    ('bzip2 compressed', ['tar', '--overwrite', '-xvjf', '{filename}', '-C', '{extract_dir}/']),
    ('xz compressed', ['tar', '--overwrite', '-xvJf', '{filename}', '-C', '{extract_dir}/']),
    ('lzma compressed', ['tar', '--overwrite', '--lzma', '-xvf', '{filename}', '-C', '{extract_dir}/']),
    ('7-zip archive', ['7z', 'x', '-p""', '-aoa', '{filename}', '-o{extract_dir}/']),
    ('zip archive', ['7z', 'x', '-p""', '-aoa', '{filename}', '-o{extract_dir}/']),
]

def extract_file(file_path, extract_dir=None):
    file_path = Path(file_path).resolve()
    if extract_dir is None:
        extract_dir = file_path.with_suffix('.extracted')
    extract_dir = Path(extract_dir).resolve()

    # Create the extraction directory if it doesn't exist
    if not extract_dir.exists():
        extract_dir.mkdir(parents=True, exist_ok=True)

    # Determine the file type using magic
    file_type = magic.from_file(str(file_path), mime=True).lower()

    # Find the appropriate decompression command
    for magic_type, cmd_list in supported_compressions:
        if magic_type in file_type:
            log.info(f'Compression type "{magic_type}" detected in {file_path}')
            cmd_list = [s.format(filename=file_path, extract_dir=extract_dir) for s in cmd_list]
            log.info(f'>> {" ".join(cmd_list)}')
            try:
                sp.run(cmd_list, check=True)
                log.info(f'Decompression successful for {file_path}')
                # Recursively extract files in the new directory
                for item in extract_dir.iterdir():
                    if item.is_file() and is_compressed(item):
                        extract_file(item, extract_dir / item.stem)
                return True
            except sp.SubprocessError as e:
                log.error(f'Error extracting file {file_path}: {e}')
                return False
    log.warning(f'No supported compression type found for {file_path}')
    return False

def is_compressed(file_path):
    file_type = magic.from_file(str(file_path), mime=True).lower()
    return any(magic_type in file_type for magic_type, _ in supported_compressions)

@domwhewell-sage domwhewell-sage marked this pull request as ready for review December 8, 2024 16:03
@domwhewell-sage
Copy link
Contributor Author

Marked this ready for review now, This should be good for a base extracting the most popular compression types. I have also removed the jadx compatable compression types from libmagic so as to let that extract them instead of this module

@TheTechromancer
Copy link
Collaborator

TheTechromancer commented Dec 9, 2024

@domwhewell-sage thanks for your work on this. It's looking good!

A few things:

  • For the .jar and .apk exclusions, we should probably hardcode those into the module instead of the helper.
  • The module needs either the safe or aggressive tag to pass the tests (it's safe)
  • We should probably have tests for:
    • archive within archive (e.g. a .tar.gz inside a .7z)
    • archive within .jar/.apk (to test its interaction with the other modules)
  • What are your thoughts on naming the module unarchive or uncompress? I think maybe extract is a little too close to excavate, since it can have a dual meaning.

@domwhewell-sage domwhewell-sage changed the title New internal module "extract" New internal module "unarchive" Dec 15, 2024
@domwhewell-sage
Copy link
Contributor Author

Hi @TheTechromancer I have addressed all the comments but the tests for archives in .jar/.apk files as currently the module is made to handle specific archive files recursively. So would need to think about it handling folders output by jadx (so it doesn't consume its own events)

Also the tests keep failing as apt dependencies aren't getting installed for the tests for some reason is there a apt_deps that I can define for the tests?

@TheTechromancer
Copy link
Collaborator

Also the tests keep failing as apt dependencies aren't getting installed for the tests for some reason is there a apt_deps that I can define for the tests?

I'll add those to the core deps.

@TheTechromancer
Copy link
Collaborator

@domwhewell-sage #2096 has been merged so you should be okay to remove deps_apt.

@TheTechromancer
Copy link
Collaborator

TheTechromancer commented Dec 20, 2024

The tests are failing because of these commands which are being executed in the class definition:

image

The solution should be to move them into the setup function (and preferably asyncify them):

    async def setup_after_prep(self, module_test):
        # Run the commands asynchronously
        for command in self.commands:
            process = await asyncio.create_subprocess_exec(
                *command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            stdout, stderr = await process.communicate()
            assert process.returncode == 0, f"Command {command} failed with error: {stderr.decode()}":

@domwhewell-sage
Copy link
Contributor Author

Thanks! A classic case of "It worked on my machine" haha

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants