Recherche d’une chaîne de caractères dans une archive sous Python
Script de recherche et d’extraction depuis une ou plusieurs archives. Ce code va lire chaque fichier dans une archive et extraire le fichier si la chaîne de caractères est trouvée.
Prérequis:
- Python 2.5
- La librairie Python config: Librairie Python config
Voici un exemple de script qui va depuis un répertoire donné scanner les fichiers archives (.zip), ce code va ouvrir chaque fichier présent et lire son contenu.
Si une chaîne de caractères est égale à la chaîne de caractères recherchée, alors on va extraire le fichier dans un répertoire de sortie (dans ce cas des fichiers XML dans un répertoire réseau).
Tout d’abord il faut préparer les librairies à importer:
... # Import lib import os, sys, zipfile, shutil, traceback # Lib for helpers try: from lib.helper.helper_misc import ToolsBox, LogStats except ImportError: ToolsBox = None LogStats = None sys.stderr.write('ERROR: %s\n' % traceback.format_exc()) sys.exit() # Lib for config variable try: from lib.config.configobj import ConfigObj obj_config = ConfigObj('config/config.cfg') except ImportError: ConfigObj = None sys.stderr.write('ERROR: %s' % traceback.format_exc()) sys.exit() ...
Contenu de la classe « LogStats », cette classe va permettre les logs sur la console et un fichier plat en même temps:
... import os, sys, traceback """Begin LogStats class""" class LogStats(object): def __init__(self, f = os.getcwd() + '\\log\\stats.log'): """ Description: Constructor, init stats log file @param f: Log filename @return void """ self.console = sys.stdout self.file = open(f, 'w') def write(self, flow): """ Description: Write flow to specific handle @param flow: String to write @return int """ try: if flow <> '': self.console.write(flow) # Write to console self.file.write(flow) # Write to stats log file return 0 except Exception: sys.stderr.write('ERROR: %s\n' % traceback.format_exc()) sys.exit() """End LogStats class""" ...
Voici le contenu de la classe ToolsBox utilisée dans ce script:
... """Begin ToolsBox class""" class ToolsBox(object): @staticmethod def init_error_log(): """ Description: Init error log file @return int """ try: if not os.path.exists('log'): os.makedirs('log') file_socket = open(os.getcwd() + '\\log\\error.log', 'w') sys.stderr = file_socket return 0 except Exception: sys.stderr.write('ERROR: %s\n' % traceback.format_exc()) sys.exit() @staticmethod def rm_files(folder = '', ext = '.xml'): """ Description: Delete files by extension by folder @param folder: Folder input @param ext: Default extension to delete @return void """ try: if folder: lists = [f for f in os.listdir(folder) if f.endswith(ext)] for f in lists: os.remove(folder + '\\' + f) else: print 'No input folder has been provided => KO' sys.exit() except Exception: sys.stderr.write('ERROR: %s\n' % traceback.format_exc()) sys.exit() """End ToolsBox class""" ...
Enfin, voici le script principal qui va lire le fichier entrant (qui contient les chaînes de caractères à trouver dans les fichiers à scanner) et extraire le résultat dans un répertoire donné.
... # First, init log files ToolsBox.init_error_log() # Error log sys.stdout = LogStats() # Stats log dir_input = obj_config['file_section']['input_folder'] # Working folder dir_output = obj_config['file_section']['output_folder'] # Result folder dir_remote = obj_config['file_section']['remote_folder'].encode('latin-1') # Network drive csv = obj_config['file_section']['input_cfr_file'] # Get input CFR dir_output_path = os.getcwd() + '\\' + dir_output + '\\' # Full output path cfr = [] # CFR list from csv input files = [] # Output files to send """Begin main process""" try: """Read input CFR/csv file and build CFR/list""" i = 0 if os.path.exists(csv): f = open(csv, 'rb') line = f.readlines() for row in line: s = row.split(';') # Must have 2 columns if len(s) <> 2: print 'You must have 2 columns in your .csv file (cfr;vessel_name): ' + csv + ' => KO' sys.exit() # We append cfr in list if s[0] <> '': cfr.append(s[0]) i += 1 f.close() print 'Reading ' + str(i) + ' CFR => OK' else: print 'Reading csv file: ' + csv + ' => KO' print 'Did you provide 2 columns .csv file (cfr;vessel_name)?' sys.stderr.write('ERROR: %s\n' % 'Reading csv file: ' + csv + ' => KO') sys.stderr.write('ERROR: %s\n' % 'Did you provide 2 columns .csv file (cfr;vessel_name)?') """Check input CFR""" if isinstance(cfr, list) and i <> 0: # If input dir exist if os.path.isdir(dir_input): print 'Input folder exist [' + dir_input + '] => OK' # If output dir exist if os.path.isdir(dir_output): print 'Output folder exist [' + dir_output + '] => OK' # First, cleanup dir_output files ToolsBox.rm_files(dir_output) print 'Delete files from: [' + dir_output + '] => OK' # Starting loop over zip archive print 'Scanning in progress please wait ... ' for f in os.listdir(dir_input): print 'Scanning : ' + f z = zipfile.ZipFile(dir_input + '\\' + f, 'r') # Get xml file from ZIP archive for xml in z.namelist(): if xml.find('.xml') >= 0: # If extension match for s in cfr: # For each CFR loop for l in z.read(xml).split('\n'): # For each line in xml file if l.find(s) >= 0: # If CFR match files.append(xml) # We append file # We write flow to output folder fl = open(dir_output_path + xml.split('/')[1], 'wb') fl.write(z.read(xml)) fl.close() z.close() # Close ZIP handle # If we have files created if files: print str(len(files)) + ' files were extracted locally => OK' # Copy over network drive if accessible if os.path.isdir(dir_remote): j = 0 for f in files: shutil.copy(dir_output_path + f.split('/')[1], dir_remote) j += 1 else: print 'Network drive is not accessible: ' + dir_remote + ' => KO' sys.stderr.write('ERROR: %s\n' % 'Network drive is not accessible: ' + dir_remote + ' => KO') sys.exit() # Post-process, we check the copy number (locally/remotely) if str(len(files)) <> str(j): print 'Difference has been detected between the number of local files and the number of remote files copied => KO' print 'Local => ' + str(len(files)) print 'Remote => ' + str(j) sys.stderr.write('ERROR: %s\n' % 'Difference has been detected between the number of local files and the number of remote files copied => KO') sys.stderr.write('ERROR: %s\n' % 'Local => ' + str(len(files))) sys.stderr.write('ERROR: %s\n' % 'Remote => ' + str(j)) sys.exit() else: print 'No difference has been detected between the number of local files and the number of remote files copied => OK' print 'Local => ' + str(len(files)) print 'Remote => ' + str(j) # Then, cleanup dir_input files ToolsBox.rm_files(dir_input, '.zip') print 'Delete files from: [' + dir_input + '] => OK' print 'Process success => OK' sys.exit() # End if no error else: print 'Nothing to copy, input list(files) is empty => KO' sys.stderr.write('ERROR: %s\n' % 'Nothing to copy, input list(files) is empty => KO') sys.exit() else: print 'Output folder does not exist [' + dir_output + '], you must create it first => KO' sys.stderr.write('ERROR: %s\n' % 'Output folder does not exist [' + dir_output + '], you must create it first => KO') sys.exit() else: print 'Input folder does not exist [' + dir_input + '], you must create it first => KO' sys.stderr.write('ERROR: %s\n' % 'Input folder does not exist [' + dir_input + '], you must create it first => KO') sys.exit() except Exception: sys.stderr.write('ERROR: %s\n' % traceback.format_exc()) sys.exit() """End main process""" ...
Voici également mon fichier de configuration pour exemple (j’utilise la librairie: ConfigObj):
# Config file for Python
[file_section]
input_folder = INPUT
output_folder = OUTPUT
remote_folder = J:\chemin_réseau\fichiers
input_cfr_file = config\cfr.csv
FRA000684904;TEST1
FRA000752752;TEST2
FRA000766803;TEST3