listservs/archive/archive.py

291 lines
7.7 KiB
Python
Raw Normal View History

2019-07-11 13:21:42 +02:00
import email, email.parser
import os, json, gzip, re
import mysql.connector as mariadb
import archive.sql, archive.util
from datetime import date, datetime
import terminal.progress
def load_from_file(filename, archive_name, archive_dir):
if not filename.endswith('.json.gz'):
file_path = os.path.join(archive_dir, filename + '.json.gz')
else:
file_path = os.path.join(archive_dir, filename)
if os.path.isfile(file_path):
with gzip.open(file_path, 'r') as fp:
json_data = json.load(fp)
return (json_data, archive_name)
else:
#list of all "filename[...].json.gz" in archive_dir
files = sorted([f for f in os.listdir(archive_dir) if os.path.isfile(os.path.join(archive_dir, f)) and f.startswith(filename) and f.endswith('.json.gz')])
if files:
filename = files[-1] # take the most recent (listed alpha-chronological)
file_path = os.path.join(archive_dir, filename)
if os.path.isfile(file_path):
with gzip.open(file_path, 'r') as fp:
json_data = json.load(fp)
return (json_data, archive_name) # <--- this makes no sense....
else:
#list of all json files in archive_dir/filename
dir_path = os.path.join(archive_dir, filename)
if not os.path.isdir(dir_path):
return None
files = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f)) and f.endswith('.json')]
if not files:
return None
# load all json files
threads = []
for file_path in files:
with open(file_path, 'r') as fp:
json_data = json.load(fp)
threads.append(json_data)
return (threads, archive_name)
def connect_db(database, host, user, password):
try:
con = mariadb.connect(host=host, user=user, password=password, database=database)
except mariadb.Error as error:
print("Error: {}".format(error))
if error.errno == 1049:
2019-08-24 13:06:59 +02:00
print("Database " + database + " does not exist.")
2019-07-11 13:21:42 +02:00
return None
finally:
return con
2019-08-24 13:06:59 +02:00
def list_tables_db(database, host, user, password):
con = connect_db(database, host, user, password)
if con is not None:
try:
cursor = con.cursor()
cursor.execute(archive.sql.SHOW_TABLE)
results = []
for t in cursor:
results.append(t[0])
return results
except mariadb.Error as error:
print("Error: {}".format(error))
finally:
cursor.close()
con.close()
2019-07-11 13:21:42 +02:00
class Archive:
data = None # "raw" json data
db_con = None
def __init__(self, archive_name, config):
2019-07-11 13:21:42 +02:00
if isinstance(config, str):
2019-07-11 13:21:42 +02:00
# need a filename or a dir name....
print("reading archive " + archive_name, end='')
archive_dir = config
2019-07-11 13:21:42 +02:00
(self.data, self.archive_name) = load_from_file(archive_name, archive_name, archive_dir)
print(" - done.")
elif isinstance(config, dict):
2019-08-24 13:06:59 +02:00
self.archive_name = archive_name
self.db_con = connect_db(config['database'], config['host'], config['user'], config['password'])
2019-07-11 13:21:42 +02:00
2019-08-24 13:06:59 +02:00
# def __init__(self, archive_name, database, host, user, password):
2019-07-11 13:21:42 +02:00
2019-08-24 13:06:59 +02:00
# self.archive_name = archive_name
# self.db_con = connect_db(database, host, user, password)
2019-07-11 13:21:42 +02:00
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.db_con is not None:
self.db_con.close()
2019-08-24 13:06:59 +02:00
def create_db(self, config=None):
2019-07-11 13:21:42 +02:00
print("creating table: " + self.archive_name, end='')
if self.db_con is None:
2019-08-24 13:06:59 +02:00
if config is not None:
self.db_con = connect_db(config['database'], config['host'], config['user'], config['password'])
if self.db_con is None:
print(" - no connection... Aborting.")
2019-07-11 13:21:42 +02:00
return
try:
cursor = self.db_con.cursor()
cursor.execute(archive.sql.CREATE.format(self.archive_name))
except mariadb.Error as error:
print("Error: {}".format(error))
finally:
cursor.close()
print(" - done.")
2019-08-24 13:06:59 +02:00
def insert_db(self, config=None):
2019-07-11 13:21:42 +02:00
if self.db_con is None:
2019-08-24 13:06:59 +02:00
if config is not None:
self.db_con = connect_db(config['database'], config['host'], config['user'], config['password'])
if self.db_con is None:
return
2019-07-11 13:21:42 +02:00
try:
cursor = self.db_con.cursor()
progress = terminal.progress.ProgressBar(self.archive_name, len(self.data), fmt=terminal.progress.ProgressBar.FULL)
for t in self.data:
n_inserted = self.recursive_insert_db(cursor, t["threads"])
# print(" - insert: " + str(n_inserted), end='')
if n_inserted > 0:
self.db_con.commit()
progress.current += 1
progress()
progress.done()
self.db_con.commit()
except mariadb.Error as error:
2019-08-24 13:06:59 +02:00
print("Error: {}".format(error))
pass
2019-07-11 13:21:42 +02:00
finally:
cursor.close()
def recursive_insert_db(self, cursor, thread):
n_inserted = 0
for m in thread:
try:
from_ = archive.util.format_from(m)
author_name_ = archive.util.format_author(m)
to_ = archive.util.format_to(m)
date_ = archive.util.format_date(m, self.archive_name)
if date_ is None or from_ is None:
2019-08-24 13:06:59 +02:00
print("\nerrorororororo")
print(m['from'] + " -- " + m['date'])
2019-07-11 13:21:42 +02:00
continue
2019-08-24 13:06:59 +02:00
str_insert = archive.sql.INSERT.format(self.archive_name)
cursor.execute(str_insert, (from_, author_name_,to_, m["subject"], date_, m["content-type"], m["content"], m["url"]))
# cursor.execute(archive.sql.INSERT.format(self.archive_name, from_, author_name_,to_, m["subject"], date_, m["content-type"], m["content"], m["url"]))
2019-07-11 13:21:42 +02:00
n_inserted += 1
if "follow-up" in m:
n_inserted += self.recursive_insert_db(cursor, m["follow-up"])
except mariadb.Error as error:
if error.errno == 1062:
#duplication continue <------------------------- look this up...
# print("\nError: {}".format(error))
continue
2019-08-24 13:06:59 +02:00
else:
print("\nError: {}".format(error))
print(str_insert)
continue
2019-07-11 13:21:42 +02:00
return n_inserted
def content_search(self, term, bool=True):
if self.db_con is None:
print("Not connection to database...")
return
try:
cursor = self.db_con.cursor(buffered=True)
if bool:
cursor.execute(archive.sql.CONTENT_QUERY_BOOLEAN.format(self.archive_name, term))
else:
cursor.execute(archive.sql.CONTENT_QUERY.format(self.archive_name, term))
# print(cursor.rowcount)
results = []
for (from_, author_name_, subject_, date_, url_) in cursor:
results.append((from_, author_name_, subject_, date_, url_))
# print("{} {} {}".format(from_, str(date_), url_))
return results
except mariadb.Error as error:
print("Error: {}".format(error))
finally:
cursor.close()
def from_search(self, term, bool=True):
if self.db_con is None:
print("Not connection to database...")
return
try:
cursor = self.db_con.cursor(buffered=True)
if bool:
cursor.execute(archive.sql.FROM_QUERY_BOOLEAN.format(self.archive_name, term))
else:
cursor.execute(archive.sql.FROM_QUERY.format(self.archive_name, term))
# print(cursor.rowcount)
results = []
for (from_, author_name_, subject_, date_, url_) in cursor:
results.append((from_, author_name_, subject_, date_, url_))
# print("{} {} {}".format(from_, str(date_), url_))
return results
except mariadb.Error as error:
print("Error: {}".format(error))
finally:
cursor.close()
# analysis
def longest_field(self, field, thread, max_length=0):
import archive.util
for m in thread:
if not field in m:
if "threads" in m:
max_length = self.longest_field(field, m["threads"], max_length)
continue
if m[field] is None:
continue
if field == "from":
m[field] = archive.util.format_from(m)
elif field == "author_name":
m[field] = archive.util.format_author(m)
elif field == "to":
m[field] = archive.util.format_to(m)
elif field == "date":
m[field] = str(archive.util.format_date(m, self.archive_name))
if m[field] is None:
continue
l = len(m[field])
if l > max_length:
max_length = l
print(">> " + m[field])
if "follow-up" in m:
max_length = self.longest_field(field, m["follow-up"], max_length)
return max_length