Skip to content
juin 30 / David Regnier

Recherche d’une chaîne de caractères dans une archive sous Python

Script de recherche et d’extraction depuis une ou plusieurs archives. Ce code va lire chaque fichier dans une archive et extraire le fichier si la chaîne de caractères est trouvée.

Prérequis:

Voici un exemple de script qui va depuis un répertoire donné scanner les fichiers archives (.zip), ce code va ouvrir chaque fichier présent et lire son contenu.
Si une chaîne de caractères est égale à la chaîne de caractères recherchée, alors on va extraire le fichier dans un répertoire de sortie (dans ce cas des fichiers XML dans un répertoire réseau).
Tout d’abord il faut préparer les librairies à importer:

...
# Import lib
import os, sys, zipfile, shutil, traceback

# Lib for helpers
try:
    from lib.helper.helper_misc import ToolsBox, LogStats
except ImportError:
    ToolsBox = None
    LogStats = None
    sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
    sys.exit()

# Lib for config variable
try:
    from lib.config.configobj import ConfigObj
    obj_config = ConfigObj('config/config.cfg')
except ImportError:
    ConfigObj = None
    sys.stderr.write('ERROR: %s' % traceback.format_exc())
    sys.exit()
...

Contenu de la classe « LogStats », cette classe va permettre les logs sur la console et un fichier plat en même temps:

...
import os, sys, traceback

"""Begin LogStats class"""
class LogStats(object):
    def __init__(self, f = os.getcwd() + '\\log\\stats.log'):
        """
        Description: Constructor, init stats log file

        @param f: Log filename
        @return void
        """
        self.console = sys.stdout
        self.file = open(f, 'w')

    def write(self, flow):
        """
        Description: Write flow to specific handle

        @param flow: String to write
        @return int
        """
        try:
            if flow <> '':
                self.console.write(flow) # Write to console
                self.file.write(flow) # Write to stats log file
            return 0
        except Exception:
            sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
            sys.exit()
"""End LogStats class"""
...

Voici le contenu de la classe ToolsBox utilisée dans ce script:

...
"""Begin ToolsBox class"""
class ToolsBox(object):
    @staticmethod
    def init_error_log():
        """
        Description: Init error log file

        @return int
        """
        try:
            if not os.path.exists('log'):
                os.makedirs('log')
            file_socket = open(os.getcwd() + '\\log\\error.log', 'w')
            sys.stderr = file_socket
            return 0
        except Exception:
            sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
            sys.exit()

    @staticmethod
    def rm_files(folder = '', ext = '.xml'):
        """
        Description: Delete files by extension by folder

        @param folder: Folder input
        @param ext: Default extension to delete
        @return void
        """
        try:
            if folder:
                lists = [f for f in os.listdir(folder) if f.endswith(ext)]
                for f in lists:
                    os.remove(folder + '\\' + f)
            else:
                print 'No input folder has been provided => KO'
                sys.exit()
        except Exception:
            sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
            sys.exit()
"""End ToolsBox class"""
...

Enfin, voici le script principal qui va lire le fichier entrant (qui contient les chaînes de caractères à trouver dans les fichiers à scanner) et extraire le résultat dans un répertoire donné.

...
# First, init log files
ToolsBox.init_error_log() # Error log
sys.stdout = LogStats() # Stats log

dir_input = obj_config['file_section']['input_folder'] # Working folder
dir_output = obj_config['file_section']['output_folder'] # Result folder
dir_remote = obj_config['file_section']['remote_folder'].encode('latin-1') # Network drive
csv = obj_config['file_section']['input_cfr_file'] # Get input CFR
dir_output_path = os.getcwd() + '\\' + dir_output + '\\' # Full output path
cfr = [] # CFR list from csv input
files = [] # Output files to send

"""Begin main process"""
try:
    """Read input CFR/csv file and build CFR/list"""
    i = 0
    if os.path.exists(csv):
        f = open(csv, 'rb')
        line = f.readlines()
        for row in line:
            s = row.split(';')
            # Must have 2 columns
            if len(s) <> 2:
                print 'You must have 2 columns in your .csv file (cfr;vessel_name): ' + csv + ' => KO'
                sys.exit()
            # We append cfr in list
            if s[0] <> '':
                cfr.append(s[0])
                i += 1
        f.close()
        print 'Reading ' + str(i) + ' CFR => OK'
    else:
        print 'Reading csv file: ' + csv + ' => KO'
        print 'Did you provide 2 columns .csv file (cfr;vessel_name)?'
        sys.stderr.write('ERROR: %s\n' % 'Reading csv file: ' + csv + ' => KO')
        sys.stderr.write('ERROR: %s\n' % 'Did you provide 2 columns .csv file (cfr;vessel_name)?')

    """Check input CFR"""
    if isinstance(cfr, list) and i <> 0:
        # If input dir exist
        if os.path.isdir(dir_input):
            print 'Input folder exist [' + dir_input + '] => OK'

            # If output dir exist
            if os.path.isdir(dir_output):
                print 'Output folder exist [' + dir_output + '] => OK'

                # First, cleanup dir_output files
                ToolsBox.rm_files(dir_output)
                print 'Delete files from: [' + dir_output + '] => OK'

                # Starting loop over zip archive
                print 'Scanning in progress please wait ... '
                for f in os.listdir(dir_input):
                    print 'Scanning : ' + f
                    z = zipfile.ZipFile(dir_input + '\\' + f, 'r')
                    # Get xml file from ZIP archive
                    for xml in z.namelist():
                        if xml.find('.xml') >= 0: # If extension match
                            for s in cfr: # For each CFR loop
                                for l in z.read(xml).split('\n'): # For each line in xml file
                                    if l.find(s) >= 0: # If CFR match
                                        files.append(xml) # We append file
                                        # We write flow to output folder
                                        fl = open(dir_output_path + xml.split('/')[1], 'wb')
                                        fl.write(z.read(xml))
                                        fl.close()
                    z.close() # Close ZIP handle 

                # If we have files created
                if files:
                    print str(len(files)) + ' files were extracted locally => OK'

                    # Copy over network drive if accessible
                    if os.path.isdir(dir_remote):
                        j = 0
                        for f in files:
                            shutil.copy(dir_output_path + f.split('/')[1], dir_remote)
                            j += 1
                    else:
                        print 'Network drive is not accessible: ' + dir_remote + ' => KO'
                        sys.stderr.write('ERROR: %s\n' % 'Network drive is not accessible: ' + dir_remote + ' => KO')
                        sys.exit()

                    # Post-process, we check the copy number (locally/remotely)
                    if str(len(files)) <> str(j):
                        print 'Difference has been detected between the number of local files and the number of remote files copied => KO'
                        print 'Local => ' + str(len(files))
                        print 'Remote => ' + str(j)
                        sys.stderr.write('ERROR: %s\n' % 'Difference has been detected between the number of local files and the number of remote files copied => KO')
                        sys.stderr.write('ERROR: %s\n' % 'Local => ' + str(len(files)))
                        sys.stderr.write('ERROR: %s\n' % 'Remote => ' + str(j))
                        sys.exit()
                    else:
                        print 'No difference has been detected between the number of local files and the number of remote files copied => OK'
                        print 'Local => ' + str(len(files))
                        print 'Remote => ' + str(j)                    

                    # Then, cleanup dir_input files
                    ToolsBox.rm_files(dir_input, '.zip')
                    print 'Delete files from: [' + dir_input + '] => OK'
                    print 'Process success => OK'
                    sys.exit()
                    # End if no error
                else:
                    print 'Nothing to copy, input list(files) is empty => KO'
                    sys.stderr.write('ERROR: %s\n' % 'Nothing to copy, input list(files) is empty => KO')
                    sys.exit()
            else:
                print 'Output folder does not exist [' + dir_output + '], you must create it first => KO'
                sys.stderr.write('ERROR: %s\n' % 'Output folder does not exist [' + dir_output + '], you must create it first => KO')
                sys.exit()
        else:
            print 'Input folder does not exist [' + dir_input + '], you must create it first => KO'
            sys.stderr.write('ERROR: %s\n' % 'Input folder does not exist [' + dir_input + '], you must create it first => KO')
            sys.exit()
except Exception:
    sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
    sys.exit()
"""End main process"""
...

Voici également mon fichier de configuration pour exemple (j’utilise la librairie: ConfigObj):


# Config file for Python

[file_section]
input_folder = INPUT
output_folder = OUTPUT
remote_folder = J:\chemin_réseau\fichiers
input_cfr_file = config\cfr.csv

Contenu du fichier config\cfr.csv (exemple):
FRA000684904;TEST1
FRA000752752;TEST2
FRA000766803;TEST3
Laisser un commentaire


− 5 = 3