import email, email.parser import os, json, gzip, re, logging import mysql.connector as mariadb import archive.sql, archive.util from datetime import date, datetime import terminal.progress def load_from_file(filename, archive_name, archive_dir): if not filename.endswith('.json.gz'): file_path = os.path.join(archive_dir, filename + '.json.gz') else: file_path = os.path.join(archive_dir, filename) if os.path.isfile(file_path): with gzip.open(file_path, 'r') as fp: json_data = json.load(fp) return (json_data, archive_name) else: #list of all "filename[...].json.gz" in archive_dir files = sorted([f for f in os.listdir(archive_dir) if os.path.isfile(os.path.join(archive_dir, f)) and f.startswith(filename) and f.endswith('.json.gz')]) if files: filename = files[-1] # take the most recent (listed alpha-chronological) file_path = os.path.join(archive_dir, filename) if os.path.isfile(file_path): with gzip.open(file_path, 'r') as fp: json_data = json.load(fp) return (json_data, archive_name) # <--- this makes no sense.... else: #list of all json files in archive_dir/filename dir_path = os.path.join(archive_dir, filename) if not os.path.isdir(dir_path): return None files = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f)) and f.endswith('.json')] if not files: return None # load all json files threads = [] for file_path in files: with open(file_path, 'r') as fp: json_data = json.load(fp) threads.append(json_data) return (threads, archive_name) def connect_db(database, host, user, password): try: con = mariadb.connect(host=host, user=user, password=password, database=database) except mariadb.Error as error: logging.error("Error: {}".format(error)) if error.errno == 1049: logging.error("Database " + database + " does not exist.") return None finally: return con def list_tables_db(database, host, user, password): con = connect_db(database, host, user, password) if con is not None: try: cursor = con.cursor() cursor.execute(archive.sql.SHOW_TABLE) results = [] for t in cursor: results.append(t[0]) return results except mariadb.Error as error: logging.error("Error: {}".format(error)) finally: cursor.close() con.close() def list_tables_db_config(config): return list_tables_db(config['database'], config['host'], config['user'], config['password']) class Archive: data = None # "raw" json data db_con = None def __init__(self, archive_name, config): # this is twisted................ two constructors... dumb if isinstance(config, str): # need a filename or a dir name.... logging.info("reading archive " + archive_name) archive_dir = config (self.data, self.archive_name) = load_from_file(archive_name, archive_name, archive_dir) logging.info(" - done.") elif isinstance(config, dict): self.archive_name = archive_name self.db_con = connect_db(config['database'], config['host'], config['user'], config['password']) # def __init__(self, archive_name, database, host, user, password): # self.archive_name = archive_name # self.db_con = connect_db(database, host, user, password) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): if self.db_con is not None: self.db_con.close() def create_db(self, config=None): logging.info("creating table: " + self.archive_name) if self.db_con is None: if config is not None: self.db_con = connect_db(config['database'], config['host'], config['user'], config['password']) if self.db_con is None: logging.warning(" - no connection... Aborting.") return try: cursor = self.db_con.cursor() cursor.execute(archive.sql.CREATE.format(self.archive_name)) except mariadb.Error as error: logging.error("Error: {}".format(error)) finally: cursor.close() logging.info(" - done.") def insert_db(self, config=None): if self.db_con is None: if config is not None: self.db_con = connect_db(config['database'], config['host'], config['user'], config['password']) if self.db_con is None: return try: cursor = self.db_con.cursor() progress = terminal.progress.ProgressBar(self.archive_name, len(self.data), fmt=terminal.progress.ProgressBar.FULL) for t in self.data: n_inserted = self.recursive_insert_db(cursor, t["threads"]) logging.info(" - " + str(n_inserted)) if n_inserted > 0: self.db_con.commit() progress.current += 1 progress() progress.done() self.db_con.commit() except mariadb.Error as error: logging.error("Error: {}".format(error)) pass finally: cursor.close() def recursive_insert_db(self, cursor, thread): n_inserted = 0 for m in thread: try: logging.info(" - in - " + m['date'] + " " + m['subject']) from_ = archive.util.format_from(m) author_name_ = archive.util.format_author(m) to_ = archive.util.format_to(m) date_ = archive.util.format_date(m, self.archive_name) if date_ is None or from_ is None: logging.warning("\nDATE ERROR: " + m['from'] + " - " + m['date']) continue str_insert = archive.sql.INSERT.format(self.archive_name) cursor.execute(str_insert, (from_, author_name_,to_, m["subject"], date_, m["content-type"], m["content"], m["url"])) # cursor.execute(archive.sql.INSERT.format(self.archive_name, from_, author_name_,to_, m["subject"], date_, m["content-type"], m["content"], m["url"])) n_inserted += 1 if "follow-up" in m: n_inserted += self.recursive_insert_db(cursor, m["follow-up"]) except mariadb.Error as error: if error.errno == 1062: #duplication continue <------------------------- look this up... # print("\nError: {}".format(error)) logging.info("+++db+++ duplicate") continue else: logging.error("\nError: {}".format(error)) logging.error(str_insert) continue return n_inserted def content_search(self, term, bool=True): if self.db_con is None: logging.warning("Not connection to database...") return try: cursor = self.db_con.cursor(buffered=True) if bool: cursor.execute(archive.sql.CONTENT_QUERY_BOOLEAN.format(self.archive_name, term)) else: cursor.execute(archive.sql.CONTENT_QUERY.format(self.archive_name, term)) # print(cursor.rowcount) results = [] for (from_, author_name_, subject_, date_, url_) in cursor: results.append((from_, author_name_, subject_, date_, url_)) # print("{} {} {}".format(from_, str(date_), url_)) return results except mariadb.Error as error: logging.error("Error: {}".format(error)) finally: cursor.close() def from_search(self, term, bool=True): if self.db_con is None: logging.warning("Not connection to database...") return try: cursor = self.db_con.cursor(buffered=True) if bool: cursor.execute(archive.sql.FROM_QUERY_BOOLEAN.format(self.archive_name, term)) else: cursor.execute(archive.sql.FROM_QUERY.format(self.archive_name, term)) # print(cursor.rowcount) results = [] for (from_, author_name_, subject_, date_, url_) in cursor: results.append((from_, author_name_, subject_, date_, url_)) # print("{} {} {}".format(from_, str(date_), url_)) return results except mariadb.Error as error: logging.erro("Error: {}".format(error)) finally: cursor.close() # analysis def longest_field(self, field, thread, max_length=0): import archive.util for m in thread: if not field in m: if "threads" in m: max_length = self.longest_field(field, m["threads"], max_length) continue if m[field] is None: continue if field == "from": m[field] = archive.util.format_from(m) elif field == "author_name": m[field] = archive.util.format_author(m) elif field == "to": m[field] = archive.util.format_to(m) elif field == "date": m[field] = str(archive.util.format_date(m, self.archive_name)) if m[field] is None: continue l = len(m[field]) if l > max_length: max_length = l print(">> " + m[field]) if "follow-up" in m: max_length = self.longest_field(field, m["follow-up"], max_length) return max_length