326 lines
9.0 KiB
Python
326 lines
9.0 KiB
Python
import email, email.parser
|
|
import os, json, gzip, re, logging
|
|
import mysql.connector as mariadb
|
|
import archive.sql, archive.util
|
|
from datetime import date, datetime
|
|
import terminal.progress
|
|
|
|
def load_from_file(filename, archive_name, archive_dir):
|
|
|
|
if not filename.endswith('.json.gz'):
|
|
file_path = os.path.join(archive_dir, filename + '.json.gz')
|
|
else:
|
|
file_path = os.path.join(archive_dir, filename)
|
|
|
|
if os.path.isfile(file_path):
|
|
with gzip.open(file_path, 'r') as fp:
|
|
json_data = json.load(fp)
|
|
return (json_data, archive_name)
|
|
else:
|
|
#list of all "filename[...].json.gz" in archive_dir
|
|
files = sorted([f for f in os.listdir(archive_dir) if os.path.isfile(os.path.join(archive_dir, f)) and f.startswith(filename) and f.endswith('.json.gz')])
|
|
if files:
|
|
filename = files[-1] # take the most recent (listed alpha-chronological)
|
|
file_path = os.path.join(archive_dir, filename)
|
|
if os.path.isfile(file_path):
|
|
with gzip.open(file_path, 'r') as fp:
|
|
json_data = json.load(fp)
|
|
return (json_data, archive_name) # <--- this makes no sense....
|
|
|
|
else:
|
|
#list of all json files in archive_dir/filename
|
|
dir_path = os.path.join(archive_dir, filename)
|
|
if not os.path.isdir(dir_path):
|
|
return None
|
|
|
|
files = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f)) and f.endswith('.json')]
|
|
if not files:
|
|
return None
|
|
|
|
# load all json files
|
|
threads = []
|
|
for file_path in files:
|
|
with open(file_path, 'r') as fp:
|
|
json_data = json.load(fp)
|
|
threads.append(json_data)
|
|
|
|
return (threads, archive_name)
|
|
|
|
def connect_db(database, host, user, password):
|
|
|
|
try:
|
|
con = mariadb.connect(host=host, user=user, password=password, database=database)
|
|
except mariadb.Error as error:
|
|
logging.error("Error: {}".format(error))
|
|
if error.errno == 1049:
|
|
logging.error("Database " + database + " does not exist.")
|
|
return None
|
|
finally:
|
|
return con
|
|
|
|
def list_tables_db(database, host, user, password):
|
|
|
|
con = connect_db(database, host, user, password)
|
|
if con is not None:
|
|
|
|
try:
|
|
cursor = con.cursor()
|
|
cursor.execute(archive.sql.SHOW_TABLE)
|
|
|
|
results = []
|
|
for t in cursor:
|
|
results.append(t[0])
|
|
return results
|
|
|
|
except mariadb.Error as error:
|
|
logging.error("Error: {}".format(error))
|
|
finally:
|
|
cursor.close()
|
|
con.close()
|
|
|
|
def list_tables_db_config(config):
|
|
return list_tables_db(config['database'], config['host'], config['user'], config['password'])
|
|
|
|
class Archive:
|
|
|
|
data = None # "raw" json data
|
|
db_con = None
|
|
|
|
def __init__(self, archive_name, config):
|
|
|
|
# this is twisted................ two constructors... dumb
|
|
if isinstance(config, str):
|
|
# need a filename or a dir name....
|
|
logging.info("reading archive " + archive_name)
|
|
archive_dir = config
|
|
(self.data, self.archive_name) = load_from_file(archive_name, archive_name, archive_dir)
|
|
logging.info(" - done.")
|
|
elif isinstance(config, dict):
|
|
self.archive_name = archive_name
|
|
self.db_con = connect_db(config['database'], config['host'], config['user'], config['password'])
|
|
|
|
# def __init__(self, archive_name, database, host, user, password):
|
|
|
|
# self.archive_name = archive_name
|
|
# self.db_con = connect_db(database, host, user, password)
|
|
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
if self.db_con is not None:
|
|
self.db_con.close()
|
|
|
|
def create_db(self, config=None):
|
|
|
|
logging.info("creating table: " + self.archive_name)
|
|
if self.db_con is None:
|
|
if config is not None:
|
|
self.db_con = connect_db(config['database'], config['host'], config['user'], config['password'])
|
|
|
|
if self.db_con is None:
|
|
logging.warning(" - no connection... Aborting.")
|
|
return
|
|
|
|
try:
|
|
cursor = self.db_con.cursor()
|
|
cursor.execute(archive.sql.CREATE.format(self.archive_name))
|
|
except mariadb.Error as error:
|
|
logging.error("Error: {}".format(error))
|
|
finally:
|
|
cursor.close()
|
|
|
|
logging.info(" - done.")
|
|
|
|
def insert_db(self, config=None, file=None):
|
|
|
|
if self.db_con is None:
|
|
if config is not None:
|
|
self.db_con = connect_db(config['database'], config['host'], config['user'], config['password'])
|
|
|
|
if self.db_con is None:
|
|
return
|
|
|
|
tag = file
|
|
if file is not None:
|
|
tag = file.replace("_", " ")
|
|
|
|
try:
|
|
cursor = self.db_con.cursor()
|
|
|
|
progress = terminal.progress.ProgressBar(self.archive_name, len(self.data), fmt=terminal.progress.ProgressBar.FULL)
|
|
|
|
for t in self.data:
|
|
|
|
# print(tag)
|
|
print(t['name'])
|
|
|
|
if file is not None and t['name'] != tag:
|
|
continue
|
|
|
|
logging.info("inserting " + t['name'])
|
|
|
|
n_inserted = self.recursive_insert_db(cursor, t["threads"])
|
|
logging.info(" - " + str(n_inserted))
|
|
if n_inserted > 0:
|
|
self.db_con.commit()
|
|
|
|
progress.current += 1
|
|
progress()
|
|
|
|
progress.done()
|
|
self.db_con.commit()
|
|
|
|
except mariadb.Error as error:
|
|
logging.error("Error: {}".format(error))
|
|
pass
|
|
finally:
|
|
cursor.close()
|
|
|
|
def recursive_insert_db(self, cursor, thread):
|
|
|
|
n_inserted = 0
|
|
for m in thread:
|
|
try:
|
|
|
|
logging.info(" - in - " + m['date'] + " " + m['subject'])
|
|
|
|
from_ = archive.util.format_from(m)
|
|
author_name_ = archive.util.format_author(m)
|
|
to_ = archive.util.format_to(m)
|
|
date_ = archive.util.format_date(m, self.archive_name)
|
|
|
|
if date_ is None or from_ is None:
|
|
logging.warning("\nDATE ERROR: " + m['from'] + " - " + m['date'])
|
|
continue
|
|
|
|
|
|
str_insert = archive.sql.INSERT.format(self.archive_name)
|
|
cursor.execute(str_insert, (from_, author_name_,to_, m["subject"], date_, m["content-type"], m["content"], m["url"]))
|
|
# cursor.execute(archive.sql.INSERT.format(self.archive_name, from_, author_name_,to_, m["subject"], date_, m["content-type"], m["content"], m["url"]))
|
|
n_inserted += 1
|
|
|
|
if "follow-up" in m:
|
|
n_inserted += self.recursive_insert_db(cursor, m["follow-up"])
|
|
|
|
except mariadb.Error as error:
|
|
if error.errno == 1062:
|
|
#duplication continue <------------------------- look this up...
|
|
# print("\nError: {}".format(error))
|
|
logging.info("+++db+++ duplicate")
|
|
continue
|
|
else:
|
|
logging.error("\nError: {}".format(error))
|
|
logging.error(str_insert)
|
|
continue
|
|
|
|
return n_inserted
|
|
|
|
def content_search(self, term, bool=True):
|
|
|
|
if self.db_con is None:
|
|
logging.warning("Not connection to database...")
|
|
return
|
|
|
|
try:
|
|
cursor = self.db_con.cursor(buffered=True)
|
|
if bool:
|
|
cursor.execute(archive.sql.CONTENT_QUERY_BOOLEAN.format(self.archive_name, term))
|
|
else:
|
|
cursor.execute(archive.sql.CONTENT_QUERY.format(self.archive_name, term))
|
|
|
|
# print(cursor.rowcount)
|
|
results = []
|
|
for (from_, author_name_, subject_, date_, url_) in cursor:
|
|
results.append((from_, author_name_, subject_, date_, url_))
|
|
# print("{} {} {}".format(from_, str(date_), url_))
|
|
return results
|
|
|
|
except mariadb.Error as error:
|
|
logging.error("Error: {}".format(error))
|
|
finally:
|
|
cursor.close()
|
|
|
|
def from_search(self, term, bool=True):
|
|
|
|
if self.db_con is None:
|
|
logging.warning("Not connection to database...")
|
|
return
|
|
|
|
try:
|
|
cursor = self.db_con.cursor(buffered=True)
|
|
if bool:
|
|
cursor.execute(archive.sql.FROM_QUERY_BOOLEAN.format(self.archive_name, term))
|
|
else:
|
|
cursor.execute(archive.sql.FROM_QUERY.format(self.archive_name, term))
|
|
|
|
# print(cursor.rowcount)
|
|
results = []
|
|
for (from_, author_name_, subject_, date_, url_) in cursor:
|
|
results.append((from_, author_name_, subject_, date_, url_))
|
|
# print("{} {} {}".format(from_, str(date_), url_))
|
|
return results
|
|
|
|
except mariadb.Error as error:
|
|
logging.error("Error: {}".format(error))
|
|
finally:
|
|
cursor.close()
|
|
|
|
def select_all(self):
|
|
|
|
if self.db_con is None:
|
|
logging.warning("Not connection to database...")
|
|
return
|
|
|
|
try:
|
|
cursor = self.db_con.cursor(buffered=True)
|
|
|
|
cursor.execute(archive.sql.SELECT_ALL.format(self.archive_name))
|
|
|
|
# print(cursor.rowcount)
|
|
results = []
|
|
for (from_, author_name_, to_, subject_, date_, content_type_, content_, url_) in cursor:
|
|
results.append((from_, author_name_, to_, subject_, date_, content_type_, content_, url_))
|
|
# print("{} {} {}".format(from_, str(date_), url_))
|
|
return results
|
|
|
|
except mariadb.Error as error:
|
|
logging.error("Error: {}".format(error))
|
|
finally:
|
|
cursor.close()
|
|
|
|
# analysis
|
|
def longest_field(self, field, thread, max_length=0):
|
|
import archive.util
|
|
for m in thread:
|
|
if not field in m:
|
|
if "threads" in m:
|
|
max_length = self.longest_field(field, m["threads"], max_length)
|
|
continue
|
|
if m[field] is None:
|
|
continue
|
|
if field == "from":
|
|
m[field] = archive.util.format_from(m)
|
|
elif field == "author_name":
|
|
m[field] = archive.util.format_author(m)
|
|
elif field == "to":
|
|
m[field] = archive.util.format_to(m)
|
|
elif field == "date":
|
|
m[field] = str(archive.util.format_date(m, self.archive_name))
|
|
|
|
|
|
if m[field] is None:
|
|
continue
|
|
|
|
l = len(m[field])
|
|
if l > max_length:
|
|
max_length = l
|
|
print(">> " + m[field])
|
|
if "follow-up" in m:
|
|
max_length = self.longest_field(field, m["follow-up"], max_length)
|
|
return max_length
|
|
|
|
|
|
|