Skip to content
Snippets Groups Projects
Commit 8cf004fd authored by Charles Paperman's avatar Charles Paperman
Browse files

fix ingestion

parent 1ed4b1cb
No related branches found
No related tags found
No related merge requests found
...@@ -75,7 +75,7 @@ def bulk_insert_containers(cursor, notices, notice_cls): ...@@ -75,7 +75,7 @@ def bulk_insert_containers(cursor, notices, notice_cls):
cursor.execute(insert.from_temp(container_cls, *fields)) # Update non-existing containers cursor.execute(insert.from_temp(container_cls, *fields)) # Update non-existing containers
d = dict(cursor.execute(others.uuid_raw(container_cls))) d = dict(cursor.execute(others.uuid_raw(container_cls)))
cursor.execute(f"DROP TABLE temp_{cont_name}") cursor.execute(f"DROP TABLE temp_{cont_name.value}")
# update duplicates to point toward the appropriate uuid # update duplicates to point toward the appropriate uuid
return d return d
...@@ -84,7 +84,7 @@ def bulk_insert_notices(cursor, notices, containers_map, notice_cls): ...@@ -84,7 +84,7 @@ def bulk_insert_notices(cursor, notices, containers_map, notice_cls):
fields.difference_update(("authors", "editors", "identifiers", "id", "advisors", "jury", "institutions")) fields.difference_update(("authors", "editors", "identifiers", "id", "advisors", "jury", "institutions"))
cont_id = False cont_id = False
if notice_cls.container_cls: if notice_cls.container_cls:
cont_id = notice_cls.container_cls.get_document_type+"_id" cont_id = notice_cls.container_cls.get_document_type.value+"_id"
fields.remove("container") fields.remove("container")
fields.add(cont_id) fields.add(cont_id)
def genargs(raw_uid, notice): def genargs(raw_uid, notice):
...@@ -132,7 +132,7 @@ def bulk_insert_notices(cursor, notices, containers_map, notice_cls): ...@@ -132,7 +132,7 @@ def bulk_insert_notices(cursor, notices, containers_map, notice_cls):
# update identifiers for olds notices # update identifiers for olds notices
d = dict(cursor.execute(others.uuid_raw(notice_cls))) d = dict(cursor.execute(others.uuid_raw(notice_cls)))
cursor.execute(f"DROP TABLE temp_{notice_cls.get_document_type}") cursor.execute(f"DROP TABLE temp_{notice_cls.get_document_type.value}")
return d return d
......
...@@ -4,10 +4,11 @@ from bibendum.ingestions.utils import get_tables_name ...@@ -4,10 +4,11 @@ from bibendum.ingestions.utils import get_tables_name
raw_notice = "COPY temp_raw_notice (notice, source, source_identifier) FROM STDIN" raw_notice = "COPY temp_raw_notice (notice, source, source_identifier) FROM STDIN"
person = "COPY temp_person (first_name, middle, last_name, targets) FROM STDIN" person = "COPY temp_person (first_name, middle, last_name, targets) FROM STDIN"
def container_template(cont_name, *fields): def container_template(cont_name, *fields):
cont_name = cont_name.value
fields = ", ".join(fields) + "," fields = ", ".join(fields) + ","
return f"COPY temp_{cont_name} ({fields} raws_id, new) FROM STDIN" return f"COPY temp_{cont_name} ({fields} raws_id, new) FROM STDIN"
def notice_template(cls, *fields): def notice_template(cls, *fields):
name = cls.get_document_type name = cls.get_document_type.value
fields = ", ".join(fields) + "," fields = ", ".join(fields) + ","
return f"COPY temp_{name} ({fields} raws_id, identifiers, new) FROM STDIN" return f"COPY temp_{name} ({fields} raws_id, identifiers, new) FROM STDIN"
...@@ -8,7 +8,7 @@ INSERT INTO raw_notice(notice, source, source_identifier, processed) ...@@ -8,7 +8,7 @@ INSERT INTO raw_notice(notice, source, source_identifier, processed)
""" """
def from_temp(cls, *fields): def from_temp(cls, *fields):
name = cls.get_document_type name = cls.get_document_type.value
fields = ",".join(fields) fields = ",".join(fields)
return f""" return f"""
INSERT INTO {name} (id, {fields}) INSERT INTO {name} (id, {fields})
...@@ -19,7 +19,7 @@ INSERT INTO {name} (id, {fields}) ...@@ -19,7 +19,7 @@ INSERT INTO {name} (id, {fields})
""" """
def notice_identifier_new(cls): def notice_identifier_new(cls):
name = cls.get_document_type name = cls.get_document_type.value
return f""" return f"""
INSERT INTO notice_identifier(notice_id, identifiers) INSERT INTO notice_identifier(notice_id, identifiers)
SELECT id, identifiers FROM SELECT id, identifiers FROM
...@@ -29,7 +29,7 @@ INSERT INTO notice_identifier(notice_id, identifiers) ...@@ -29,7 +29,7 @@ INSERT INTO notice_identifier(notice_id, identifiers)
""" """
def notice_raws_new(cls): def notice_raws_new(cls):
name = cls.get_document_type name = cls.get_document_type.value
return f""" return f"""
INSERT INTO notice_to_raw (notice_id, raws_id) INSERT INTO notice_to_raw (notice_id, raws_id)
SELECT id, raws_id FROM SELECT id, raws_id FROM
......
...@@ -3,6 +3,6 @@ ingested_raw_unsplit = "SELECT id, notice FROM raw_notice WHERE processed='Inges ...@@ -3,6 +3,6 @@ ingested_raw_unsplit = "SELECT id, notice FROM raw_notice WHERE processed='Inges
def uuid_raw(cls): def uuid_raw(cls):
name = cls.get_document_type name = cls.get_document_type.value
return f""" return f"""
SELECT raw_id, id FROM temp_{name}, UNNEST(raws_id) as raw_id""" SELECT raw_id, id FROM temp_{name}, UNNEST(raws_id) as raw_id"""
...@@ -6,7 +6,7 @@ raw_notice = "CREATE TEMPORARY TABLE temp_raw_notice AS SELECT * FROM raw_notice ...@@ -6,7 +6,7 @@ raw_notice = "CREATE TEMPORARY TABLE temp_raw_notice AS SELECT * FROM raw_notice
doctype_to_class = notice.Notice.doctype_to_class doctype_to_class = notice.Notice.doctype_to_class
def create_cont_temp(cls, *fields): def create_cont_temp(cls, *fields):
name = cls.get_document_type name = cls.get_document_type.value
fields = "\n " + ",\n ".join(fields)+"," fields = "\n " + ",\n ".join(fields)+","
return f""" return f"""
CREATE TEMPORARY TABLE temp_{name} AS CREATE TEMPORARY TABLE temp_{name} AS
...@@ -19,7 +19,7 @@ CREATE TEMPORARY TABLE temp_{name} AS ...@@ -19,7 +19,7 @@ CREATE TEMPORARY TABLE temp_{name} AS
""" """
def create_not_temp(cls, *fields): def create_not_temp(cls, *fields):
name = cls.get_document_type name = cls.get_document_type.value
fields = "\n " + ",\n ".join(fields)+"," fields = "\n " + ",\n ".join(fields)+","
return f""" return f"""
CREATE TEMPORARY TABLE temp_{name} AS CREATE TEMPORARY TABLE temp_{name} AS
......
...@@ -3,7 +3,7 @@ flag_notices_success = "UPDATE raw_notice SET processed='Processed'::RAW_STATUS ...@@ -3,7 +3,7 @@ flag_notices_success = "UPDATE raw_notice SET processed='Processed'::RAW_STATUS
def from_temporary(cls): def from_temporary(cls):
name = cls.get_document_type name = cls.get_document_type.value
ifields = ", ".join(f"i.{e}" for e in cls.unique_keys) ifields = ", ".join(f"i.{e}" for e in cls.unique_keys)
tfields = ", ".join(f"t.{e}" for e in cls.unique_keys) tfields = ", ".join(f"t.{e}" for e in cls.unique_keys)
return f""" return f"""
...@@ -16,7 +16,7 @@ UPDATE temp_{name} as t ...@@ -16,7 +16,7 @@ UPDATE temp_{name} as t
""" """
def temporary_uuid(cls, cont_id=False): def temporary_uuid(cls, cont_id=False):
name = cls.get_document_type name = cls.get_document_type.value
fields = list(cls.unique_keys) fields = list(cls.unique_keys)
if cont_id: if cont_id:
idx = fields.index("container") idx = fields.index("container")
...@@ -51,7 +51,7 @@ UPDATE temp_person ...@@ -51,7 +51,7 @@ UPDATE temp_person
""" """
def notice_identifier_old(cls): def notice_identifier_old(cls):
name = cls.get_document_type name = cls.get_document_type.value
return f""" return f"""
UPDATE notice_identifier as i UPDATE notice_identifier as i
SET SET
...@@ -71,7 +71,7 @@ UPDATE notice_identifier as i ...@@ -71,7 +71,7 @@ UPDATE notice_identifier as i
""" """
def notice_raw_old(cls): def notice_raw_old(cls):
name = cls.get_document_type name = cls.get_document_type.value
return f""" return f"""
UPDATE notice_to_raw as i UPDATE notice_to_raw as i
SET SET
......
...@@ -37,7 +37,7 @@ SELECT base.*, raws_id FROM ...@@ -37,7 +37,7 @@ SELECT base.*, raws_id FROM
tbl_name = document_type.value tbl_name = document_type.value
join = f""" join = f"""
{tbl_name} INNER JOIN notice_identifier ON {tbl_name}.id = notice_identifier.notice_id {tbl_name} INNER JOIN notice_identifier ON {tbl_name}.id = notice_identifier.notice_id
INNER JOIN notice_to_raw ON {tbl_name}.id = notice_to_raw.notice_id INNER JOIN notice_to_raw ON {tbl_name}.id = notice_to_raw.notice_id INNER JOIN raw_notice
""" """
cont_data = "" cont_data = ""
if notice.container_cls: if notice.container_cls:
...@@ -45,10 +45,10 @@ SELECT base.*, raws_id FROM ...@@ -45,10 +45,10 @@ SELECT base.*, raws_id FROM
join = f"{join} LEFT JOIN {cnt_name} ON {tbl_name}.{cnt_name}_id = {cnt_name}.id" join = f"{join} LEFT JOIN {cnt_name} ON {tbl_name}.{cnt_name}_id = {cnt_name}.id"
cont_data = f", row_to_json({cnt_name}.*) as container" cont_data = f", row_to_json({cnt_name}.*) as container"
return f""" return f"""
SELECT {tbl_name}.*,p.authors{cont_data}, to_json(notice_identifier.identifiers) as identifiers, notice_to_raw.raws_id SELECT {tbl_name}.*,p.authors{cont_data}, to_json(notice_identifier.identifiers) as identifiers, ROW(raw_notice.*)
FROM {join} LEFT JOIN FROM {join} LEFT JOIN
( (
SELECT target_id, json_agg(ROW(T.id, T.first_name, T.middle, T.last_name)::person) as authors SELECT target_id, json_agg(ROW(T.id, T.first_name, T.middle, T.last_name)::person) as authors,
FROM FROM
(SELECT target_id, person.* (SELECT target_id, person.*
FROM FROM
......
...@@ -2,9 +2,9 @@ import sys ...@@ -2,9 +2,9 @@ import sys
from bibendum.models import NoticeSource from bibendum.models import NoticeSource
import bibendum.ingestions as ing import bibendum.ingestions as ing
from multiprocessing import Process from multiprocessing import Process
size = 50000 size = 200000
procnb_crossref = 5 procnb_crossref = 1
procnb_hal = 2 procnb_hal = 1
def ingest(source): def ingest(source):
ing.bulk_process.process_raw_notices_unsplit(dict(dbname="bibendum"), source, size=size) ing.bulk_process.process_raw_notices_unsplit(dict(dbname="bibendum"), source, size=size)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment