gionuibk's picture
Upload folder using huggingface_hub
61d39e2 verified
import BaseOperation from './BaseOperation.js';
import Delete from './Delete.js';
import Insert from './Insert.js';
import Update from './Update.js';
const { db } = extension.import('data');
const svc_params = extension.import('service:params');
const svc_info = extension.import('service:information');
const { PuterPath } = extension.import('fs');
const { Context } = extension.import('core');
const { id2path } = extension.import('core').util.helpers;
const {
RootNodeSelector,
NodeChildSelector,
NodeUIDSelector,
NodePathSelector,
NodeInternalIDSelector,
} = extension.import('core').fs.selectors;
export default class {
static CONCERN = 'filesystem';
static STATUS_READY = {};
static STATUS_RUNNING_JOB = {};
constructor () {
this.status = this.constructor.STATUS_READY;
this.currentState = {
queue: [],
updating_uuids: {},
};
this.deferredState = {
queue: [],
updating_uuids: {},
};
this.entryListeners_ = {};
this.mkPromiseForQueueSize_();
// this list of properties is for read operations
// (originally in FSEntryFetcher)
this.defaultProperties = [
'id',
'associated_app_id',
'uuid',
'public_token',
'bucket',
'bucket_region',
'file_request_token',
'user_id',
'parent_uid',
'is_dir',
'is_public',
'is_shortcut',
'is_symlink',
'symlink_path',
'shortcut_to',
'sort_by',
'sort_order',
'immutable',
'name',
'metadata',
'modified',
'created',
'accessed',
'size',
'layout',
'path',
];
}
init () {
svc_params.createParameters('fsentry-service', [
{
id: 'max_queue',
description: 'Maximum queue size',
default: 50,
},
], this);
// Register information providers
// uuid -> path via mysql
svc_info.given('fs.fsentry:uuid').provide('fs.fsentry:path')
.addStrategy('mysql', async uuid => {
// TODO: move id2path here
try {
return await id2path(uuid);
} catch (e) {
console.error('DASH VOID ERROR !!', e);
return `/-void/${ uuid}`;
}
});
}
mkPromiseForQueueSize_ () {
this.queueSizePromise = new Promise((resolve, reject) => {
this.queueSizeResolve = resolve;
});
}
// #region write operations
async insert (entry) {
const op = new Insert(entry);
await this.enqueue_(op);
return op;
}
async update (uuid, entry) {
const op = new Update(uuid, entry);
await this.enqueue_(op);
return op;
}
async delete (uuid) {
const op = new Delete(uuid);
await this.enqueue_(op);
return op;
}
// #endregion
// #region read operations
async fast_get_descendants (uuid) {
return (await db.read(`
WITH RECURSIVE descendant_cte AS (
SELECT uuid, parent_uid
FROM fsentries
WHERE parent_uid = ?
UNION ALL
SELECT f.uuid, f.parent_uid
FROM fsentries f
INNER JOIN descendant_cte d ON f.parent_uid = d.uuid
)
SELECT uuid FROM descendant_cte
`, [uuid])).map(x => x.uuid);
}
async fast_get_direct_descendants (uuid) {
return (uuid === PuterPath.NULL_UUID
? await db.read('SELECT uuid FROM fsentries WHERE parent_uid IS NULL')
: await db.read('SELECT uuid FROM fsentries WHERE parent_uid = ?',
[uuid])).map(x => x.uuid);
}
waitForEntry (node, callback) {
// *** uncomment to debug slow waits ***
// console.log('ATTEMPT TO WAIT FOR', selector.describe())
let selector = node.get_selector_of_type(NodeUIDSelector);
if ( selector === null ) {
// console.log(new Error('========'));
return;
}
const entry_already_enqueued =
this.currentState.updating_uuids.hasOwnProperty(selector.value) ||
this.deferredState.updating_uuids.hasOwnProperty(selector.value) ;
if ( entry_already_enqueued ) {
callback();
return;
}
const k = `uid:${selector.value}`;
if ( ! this.entryListeners_.hasOwnProperty(k) ) {
this.entryListeners_[k] = [];
}
const det = {
detach: () => {
const i = this.entryListeners_[k].indexOf(callback);
if ( i === -1 ) return;
this.entryListeners_[k].splice(i, 1);
if ( this.entryListeners_[k].length === 0 ) {
delete this.entryListeners_[k];
}
},
};
this.entryListeners_[k].push(callback);
return det;
}
async get (uuid, fetch_entry_options) {
const answer = {};
for ( const op of this.currentState.queue ) {
if ( op.uuid != uuid ) continue;
op.apply(answer);
}
for ( const op of this.deferredState.queue ) {
if ( op.uuid != uuid ) continue;
op.apply(answer);
op.apply(answer);
}
if ( answer.is_diff ) {
const base_entry = await this.find(new NodeUIDSelector(uuid),
fetch_entry_options);
answer.entry = { ...base_entry, ...answer.entry };
}
return answer.entry;
}
async get_descendants (uuid) {
return uuid === PuterPath.NULL_UUID
? await db.read('SELECT uuid FROM fsentries WHERE parent_uid IS NULL',
[uuid])
: await db.read('SELECT uuid FROM fsentries WHERE parent_uid = ?',
[uuid])
;
}
async get_recursive_size (uuid) {
const cte_query = `
WITH RECURSIVE descendant_cte AS (
SELECT uuid, parent_uid, size
FROM fsentries
WHERE parent_uid = ?
UNION ALL
SELECT f.uuid, f.parent_uid, f.size
FROM fsentries f
INNER JOIN descendant_cte d
ON f.parent_uid = d.uuid
)
SELECT SUM(size) AS total_size FROM descendant_cte
`;
const rows = await db.read(cte_query, [uuid]);
return rows[0].total_size;
}
/**
* Finds a filesystem entry using the provided selector.
* @param {Object} selector - The selector object specifying how to find the entry
* @param {Object} fetch_entry_options - Options for fetching the entry
* @returns {Promise<Object|null>} The filesystem entry or null if not found
*/
async find (selector, fetch_entry_options) {
if ( selector instanceof RootNodeSelector ) {
return selector.entry;
}
if ( selector instanceof NodePathSelector ) {
return await this.findByPath(selector.value, fetch_entry_options);
}
if ( selector instanceof NodeUIDSelector ) {
return await this.findByUID(selector.value, fetch_entry_options);
}
if ( selector instanceof NodeInternalIDSelector ) {
return await this.findByID(selector.id, fetch_entry_options);
}
if ( selector instanceof NodeChildSelector ) {
let id;
if ( selector.parent instanceof RootNodeSelector ) {
id = await this.findNameInRoot(selector.name);
} else {
const parentEntry = await this.find(selector.parent);
if ( ! parentEntry ) return null;
id = await this.findNameInParent(parentEntry.uuid, selector.name);
}
if ( id === undefined ) return null;
if ( typeof id !== 'number' ) {
throw new Error('unexpected type for id value',
typeof id,
id);
}
return this.find(new NodeInternalIDSelector('mysql', id));
}
}
/**
* Finds a filesystem entry by its UUID.
* @param {string} uuid - The UUID of the entry to find
* @param {Object} fetch_entry_options - Options including thumbnail flag
* @returns {Promise<Object|undefined>} The filesystem entry or undefined if not found
*/
async findByUID (uuid, fetch_entry_options = {}) {
const { thumbnail } = fetch_entry_options;
let fsentry = await db.tryHardRead(`SELECT ${
this.defaultProperties.join(', ')
}${thumbnail ? ', thumbnail' : ''
} FROM fsentries WHERE uuid = ? LIMIT 1`,
[uuid]);
return fsentry[0];
}
/**
* Finds a filesystem entry by its internal database ID.
* @param {number} id - The internal ID of the entry to find
* @param {Object} fetch_entry_options - Options including thumbnail flag
* @returns {Promise<Object|undefined>} The filesystem entry or undefined if not found
*/
async findByID (id, fetch_entry_options = {}) {
const { thumbnail } = fetch_entry_options;
let fsentry = await db.tryHardRead(`SELECT ${
this.defaultProperties.join(', ')
}${thumbnail ? ', thumbnail' : ''
} FROM fsentries WHERE id = ? LIMIT 1`,
[id]);
return fsentry[0];
}
/**
* Finds a filesystem entry by its full path.
* @param {string} path - The full path of the entry to find
* @param {Object} fetch_entry_options - Options including thumbnail flag and tracer
* @returns {Promise<Object|false>} The filesystem entry or false if not found
*/
async findByPath (path, fetch_entry_options = {}) {
const { thumbnail } = fetch_entry_options;
if ( path === '/' ) {
return this.find(new RootNodeSelector());
}
const parts = path.split('/').filter(path => path !== '');
if ( parts.length === 0 ) {
// TODO: invalid path; this should be an error
return false;
}
// TODO: use a closure table for more efficient path resolving
let parent_uid = null;
let result;
const resultColsSql = this.defaultProperties.join(', ') +
(thumbnail ? ', thumbnail' : '');
result = await db.read(`SELECT ${ resultColsSql
} FROM fsentries WHERE path=? LIMIT 1`,
[path]);
// using knex instead
if ( result[0] ) return result[0];
const loop = async () => {
for ( let i = 0 ; i < parts.length ; i++ ) {
const part = parts[i];
const isLast = i == parts.length - 1;
const colsSql = isLast ? resultColsSql : 'uuid';
if ( parent_uid === null ) {
result = await db.read(`SELECT ${ colsSql
} FROM fsentries WHERE parent_uid IS NULL AND name=? LIMIT 1`,
[part]);
} else {
result = await db.read(`SELECT ${ colsSql
} FROM fsentries WHERE parent_uid=? AND name=? LIMIT 1`,
[parent_uid, part]);
}
if ( ! result[0] ) return false;
parent_uid = result[0].uuid;
}
};
if ( fetch_entry_options.tracer ) {
const tracer = fetch_entry_options.tracer;
const options = fetch_entry_options.trace_options;
await tracer.startActiveSpan('fs:sql:findByPath',
...(options ? [options] : []),
async span => {
await loop();
span.end();
});
} else {
await loop();
}
return result[0];
}
/**
* Finds the ID of a child entry with the given name in the root directory.
* @param {string} name - The name of the child entry to find
* @returns {Promise<number|undefined>} The ID of the child entry or undefined if not found
*/
async findNameInRoot (name) {
let child_id = await db.read('SELECT `id` FROM `fsentries` WHERE `parent_uid` IS NULL AND name = ? LIMIT 1',
[name]);
return child_id[0]?.id;
}
/**
* Finds the ID of a child entry with the given name under a specific parent.
* @param {string} parent_uid - The UUID of the parent directory
* @param {string} name - The name of the child entry to find
* @returns {Promise<number|undefined>} The ID of the child entry or undefined if not found
*/
async findNameInParent (parent_uid, name) {
let child_id = await db.read('SELECT `id` FROM `fsentries` WHERE `parent_uid` = ? AND name = ? LIMIT 1',
[parent_uid, name]);
return child_id[0]?.id;
}
/**
* Checks if an entry with the given name exists under a specific parent.
* @param {string} parent_uid - The UUID of the parent directory
* @param {string} name - The name to check for
* @returns {Promise<boolean>} True if the name exists under the parent, false otherwise
*/
async nameExistsUnderParent (parent_uid, name) {
let check_dupe = await db.read('SELECT `id` FROM `fsentries` WHERE `parent_uid` = ? AND name = ? LIMIT 1',
[parent_uid, name]);
return !!check_dupe[0];
}
/**
* Checks if an entry with the given name exists under a parent specified by ID.
* @param {number} parent_id - The internal ID of the parent directory
* @param {string} name - The name to check for
* @returns {Promise<boolean>} True if the name exists under the parent, false otherwise
*/
async nameExistsUnderParentID (parent_id, name) {
const parent = await this.findByID(parent_id);
if ( ! parent ) {
return false;
}
return this.nameExistsUnderParent(parent.uuid, name);
}
// #endregion
// #region queue logic
async enqueue_ (op) {
while (
this.currentState.queue.length > this.max_queue ||
this.deferredState.queue.length > this.max_queue
) {
await this.queueSizePromise;
}
if ( ! (op instanceof BaseOperation) ) {
throw new Error('Invalid operation');
}
const state = this.status === this.constructor.STATUS_READY ?
this.currentState : this.deferredState;
if ( ! state.updating_uuids.hasOwnProperty(op.uuid) ) {
state.updating_uuids[op.uuid] = [];
}
state.updating_uuids[op.uuid].push(state.queue.length);
state.queue.push(op);
// DRY: same pattern as FSOperationContext:provideValue
// DRY: same pattern as FSOperationContext:rejectValue
if ( this.entryListeners_.hasOwnProperty(op.uuid) ) {
const listeners = this.entryListeners_[op.uuid];
delete this.entryListeners_[op.uuid];
for ( const lis of listeners ) lis();
}
this.checkShouldExec_();
}
checkShouldExec_ () {
if ( this.status !== this.constructor.STATUS_READY ) return;
if ( this.currentState.queue.length === 0 ) return;
this.exec_();
}
async exec_ () {
if ( this.status !== this.constructor.STATUS_READY ) {
throw new Error('Duplicate exec_ call');
}
const queue = this.currentState.queue;
this.status = this.constructor.STATUS_RUNNING_JOB;
// const conn = await db_primary.promise().getConnection();
// await conn.beginTransaction();
for ( const op of queue ) {
op.status = op.constructor.STATUS_RUNNING;
// await conn.execute(stmt, values);
}
// await conn.commit();
// conn.release();
// const stmtAndVals = queue.map(op => op.getStatementAndValues());
// const stmts = stmtAndVals.map(x => x.stmt).join('; ');
// const vals = stmtAndVals.reduce((acc, x) => acc.concat(x.values), []);
// *** uncomment to debug batch queries ***
// this.log.debug({ stmts, vals });
// console.log('<<========================');
// console.log({ stmts, vals });
// console.log('>>========================');
// this.log.debug('array?', Array.isArray(vals))
await db.batch_write(queue.map(op => op.getStatement()));
for ( const op of queue ) {
op.status = op.constructor.STATUS_DONE;
}
this.flipState_();
this.status = this.constructor.STATUS_READY;
for ( const op of queue ) {
op.status = op.constructor.STATUS_DONE;
}
this.checkShouldExec_();
}
flipState_ () {
this.currentState = this.deferredState;
this.deferredState = {
queue: [],
updating_uuids: {},
};
const queueSizeResolve = this.queueSizeResolve;
this.mkPromiseForQueueSize_();
queueSizeResolve();
}
// #endregion
}