При записи CSV из CF в корзину: «с открытым (путь к файлу, «w») как MY_CSV:» приводит к «FileNotFoundError: [Errno 2] Нет такого файла или каталога:»
Я получаю эту ошибкуFileNotFoundError: [Errno 2] No such file or directory
когда я пытаюсь записать файл csv в корзину, используя средство записи csv, которое циклически обрабатывает пакеты данных. Полная информация об облачной функции фиксируется вокруг этой ошибки:
File "/workspace/main.py", line 299, in write_to_csv_file with open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such file or directory: 'gs://MY_BUCKET/MY_CSV.csv' Function execution took 52655 ms, finished with status: 'crash' OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k ```
И это несмотря на то, что этот Bucket_filepath определенно существует: я могу загрузить пустой фиктивный файл и получить его «gsutils URI» (щелкните правой кнопкой мыши по трем точкам в правой части файла), и Bucket_filepath будет выглядеть так же:'gs://MY_BUCKET/MY_CSV.csv'
.
Вместо этого я проверил сохранение фиктивного фрейма данных pandas, и он работал с тем же самым Bucket_filepath (!).
Следовательно, должна быть другая причина, вероятно, автора не приняли, илиwith statement
который открывает файл.
Код, который выдает ошибку, выглядит следующим образом. Это тот же код, который работает за пределами функции Google Cloud в обычном задании cron на локальном сервере. Я добавил два отладочных отпечатка вокруг строки, которая выдает ошибку:print("Right after opening the file ...")
больше не появляется. Подфункцияquery_execute_batch()
также отображается вызов для каждого пакета, но, скорее всего, это не проблема, поскольку ошибка возникает уже в самом начале при открытии записи файла csv.
requirements.txt
(которые затем импортируются как модули):
SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1
И изmain.py
:
def query_execute_batch(connection):
"""Function for reading data from the query result into batches
:yield: each result in a loop is a batch of the query result
"""
results = execute_select_batch(connection, SQL_QUERY)
print(f"len(results): {len(results)}")
for result in results:
yield result
def write_to_csv_file(connection, filepath):
"""Write the data in a loop over batches into a csv.
This is done in batches since the query from the database is huge.
:param connection: mysqldb connection to DB
:param filepath: path to csv file to write data
returns: metadata on rows and time
"""
countrows = 0
print("Right before opening the file ...")
with open(filepath, "w") as outcsv:
print("Right after opening the file ...")
writer = csv.DictWriter(
outcsv,
fieldnames=FIELDNAMES,
extrasaction="ignore",
delimiter="|",
lineterminator="\n",
)
# write header according to fieldnames
writer.writeheader()
for batch in query_execute_batch(connection):
writer.writerows(batch)
countrows += len(batch)
datetime_now_save = datetime.now()
return countrows, datetime_now_save
Имейте в виду, что для работы приведенного выше сценария я импортируюgcsfs
что делает корзину доступной для чтения и записи. В противном случае мне, скорее всего, понадобится объект облачного хранилища Google, например:
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
а затем создайте в этом ведре файл с дополнительными функциями, но это не цель.
В дальнейшемpd.to_csv
код, который работает, использует выходные данные фиктивного SQL-запросаSELECT 1
в качестве входных данных кадра данных. Это можно сохранить в том же Bucket_filepath, конечно, причина может быть не простоpd.to_csv()
как таковой, но также и то, что набор данных представляет собой пустышку, а не сложные строки Юникода из огромногоSELECT query
. Или есть другая причина, я просто предполагаю.
if records is not None:
df = pd.DataFrame(records.fetchall())
df.columns = records.keys()
df.to_csv(filepath,
index=False,
)
datetime_now_save = datetime.now()
countrows = df.shape[0]
Я хотел бы использовать средство записи csv, чтобы иметь возможность писать в Юникоде с помощью модуля unicodecsv и иметь возможность использовать пакеты.
Возможно, я захочу перейти на пакетный режим (loop + append
режим илиchunksize
) в pandas, как в разделе «Запись больших кадров данных Pandas в файл CSV частями» , чтобы избавиться от этой проблемы с путем к файлу корзины, но я бы предпочел использовать готовый код (никогда не трогайте работающую систему).
Как я могу сохранить этот CSV-файл с помощью средства записи CSV, чтобы он мог открыть новый файл в ведре вwrite
режим =with open(filepath, "w") as outcsv:
?
Данная функцияwrite_to_csv_file()
— это всего лишь крошечная часть Cloud Function, которая использует широкий спектр функций и каскадных функций. Я не могу показать здесь весь воспроизводимый случай и надеюсь, что на него можно будет ответить с помощью опыта или более простых примеров.
1 ответ
Решение удивительное. Вы должны импортировать и использовать модуль, если хотите писать в файл с помощьюopen()
.
Если вы используете ,
import gcsfs
не нужен, но
gcsfs
все еще необходим в
requirements.txt
заставить работать , таким образом, панды
to_csv()
кажется, использует его автоматически.
Если оставить в стороне сюрприз, вот код, который отвечает на вопрос (проверено):
def write_to_csv_file(connection, filepath):
"""Write the QUERY result in a loop over batches into a csv.
This is done in batches since the query from the database is huge.
:param connection: mysqldb connection to DB
:param filepath: path to csv file to write data
return: metadata on rows and time
"""
countrows = 0
print("Right before opening the file ...")
# A gcsfs object is needed to open a file.
# https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
# https://gcsfs.readthedocs.io/en/latest/index.html#examples
# Side-note (Exception):
# pd.to_csv() needs neither the gcsfs object, nor its import.
# It is not used here, but it has been tested with examples.
fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
fs.ls(BUCKET_NAME)
# wb needed, else "builtins.TypeError: must be str, not bytes"
# https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
with fs.open(filepath, 'wb') as outcsv:
print("Right after opening the file ...")
writer = csv.DictWriter(
outcsv,
fieldnames=FIELDNAMES,
extrasaction="ignore",
delimiter="|",
lineterminator="\n",
)
# write header according to fieldnames
print("before writer.writeheader()")
writer.writeheader()
print("after writer.writeheader()")
for batch in query_execute_batch(connection):
writer.writerows(batch)
countrows += len(batch)
datetime_now_save = datetime.now()
return countrows, datetime_now_save
Примечание
Не используйте средство записи csv таким образом.
Это занимает слишком много времени, вместоchunksize
параметр 5000, которому требуется всего 62 секунды для загрузки и сохранения 700 тыс. строк в виде CSV в корзине, CF с устройством записи пакетов занимает более 9 минут, что превышает лимит времени ожидания. Поэтому я вынужден использоватьpd.to_csv()
вместо этого и конвертируйте для этого мои данные в фрейм данных.