Skip to content
Snippets Groups Projects
Commit d4d69130 authored by Amir Golparvar's avatar Amir Golparvar Committed by Theodore Chang
Browse files

Resolve "conversion of ArchiveQuery to dataframe works only for the last call...

Resolve "conversion of ArchiveQuery to dataframe works only for the last call of the downloaded entries"
parent 6dc78157
Branches
Tags
2 merge requests!1695Resolve "conversion of ArchiveQuery to dataframe works only for the last call of the downloaded entries",!1682Resolve "Memory leak in the archive packer"
......@@ -20,7 +20,7 @@ from __future__ import annotations
import asyncio
from asyncio import Semaphore
from itertools import islice
from typing import Any
from typing import Any, Union
import threading
from click import progressbar
......@@ -246,6 +246,7 @@ class ArchiveQuery:
"""
self._entries = []
self._entries_dict = []
self._current_after = self._after
self._current_results = 0
self._results_actual = 0
......@@ -486,9 +487,7 @@ class ArchiveQuery:
self.fetch(number - pending_size)
async_query = run_async(self._download_async, number)
self._entries_dict = [
aq.m_to_dict(resolve_references=True) for aq in async_query
]
self._entries_dict.append(async_query)
return async_query
async def async_fetch(self, number: int = 0) -> int:
......@@ -523,7 +522,41 @@ class ArchiveQuery:
def entry_list(self) -> list[tuple[str, str]]:
return self._entries
def entries_to_dataframe(self, keys_to_filter=None):
def entries_to_dataframe(
self,
keys_to_filter: list[str] = None,
resolve_references: bool = False,
query_selection: Union[str, list[str]] = 'last',
):
"""
Interface to convert the archives to pandas dataframe.
Params:
keys_to_filter (int): number of **entries** to download at a single time
resolve_references (bool): boolean if the references are to be resolved
query_selection (str or list[int]): selection of which archives to be used for conversion. Available options are either 'last', 'all' or a list of indices that each denoting the index of download call (e.g. [0,2,1])
Returns:
pandas dataframe of the downloaded (and selected) archives
"""
t_list: Union[list[Any], dict] = []
if query_selection == 'all':
t_list = [item for sublist in self._entries_dict for item in sublist]
elif query_selection == 'last':
t_list = self._entries_dict[-1]
elif isinstance(query_selection, list):
if not all(isinstance(i, int) for i in query_selection):
raise TypeError("All elements in 'query_selection' must be integers.")
t_list = [
item
for i, sublist in enumerate(self._entries_dict)
if i in query_selection
for item in sublist
]
else:
return
list_of_entries_dict = [
aq.m_to_dict(resolve_references=resolve_references) for aq in t_list
]
if not keys_to_filter:
keys_to_filter = []
return dict_to_dataframe(self._entries_dict, keys_to_filter=keys_to_filter)
return dict_to_dataframe(list_of_entries_dict, keys_to_filter=keys_to_filter)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment